Skip to content

Commit

Permalink
Handle HTTP errors for leader/primary change
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed May 30, 2024
1 parent d952e06 commit 75da7c0
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 27 deletions.
11 changes: 0 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,17 +1431,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
return resp, nil
}

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == errs.ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
Expand Down
4 changes: 4 additions & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
NotServedErr = "is not served"
// RetryTimeoutErr indicates the server is busy.
RetryTimeoutErr = "retry timeout"
// NotPrimaryErr indicates the non-primary member received the requests which should be received by primary.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the primary is changed.
NotPrimaryErr = "is not primary"
)

// client errors
Expand Down
14 changes: 14 additions & 0 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,25 @@
package errs

import (
"strings"

"github.com/pingcap/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, NotLeaderErr) ||
strings.Contains(errMsg, MismatchLeaderErr) ||
strings.Contains(errMsg, NotServedErr) ||
strings.Contains(errMsg, NotPrimaryErr)
}

// ZapError is used to make the log output easier.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
Expand Down
11 changes: 6 additions & 5 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func (ci *clientInner) requestWithRetry(
)
execFunc := func() error {
defer func() {
// Handle some special status codes to increase the success rate of the following requests.
ci.handleHTTPStatusCode(statusCode)
// Handle some special status codes and errors to increase the success rate of the following requests.
ci.handleHTTPStatusCodeAndErr(statusCode, err)
log.Debug("[pd] http request finished", logFields...)
}()
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers.
Expand Down Expand Up @@ -174,9 +174,10 @@ func (ci *clientInner) requestWithRetry(
return bo.Exec(ctx, execFunc)
}

func (ci *clientInner) handleHTTPStatusCode(code int) {
// If the status code is 503, it indicates that there may be PD leader/follower changes.
if code == http.StatusServiceUnavailable {
func (ci *clientInner) handleHTTPStatusCodeAndErr(code int, err error) {
// - If the status code is 503, it indicates that there may be PD leader/follower changes.
// - If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change.
if code == http.StatusServiceUnavailable || errs.IsLeaderChange(err) {
ci.sd.ScheduleCheckMemberChanged()
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ tsoBatchLoop:
cancel()
stream = nil
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if errs.IsLeaderChange(err) {
if err := bo.Exec(ctx, svcDiscovery.CheckMemberChanged); err != nil {
select {
case <-ctx.Done():
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ error = '''
redirect to not leader
'''

["PD:apiutil:ErrRedirectToNotPrimary"]
error = '''
redirect to not primary
'''

["PD:autoscaling:ErrEmptyMetricsResponse"]
error = '''
metrics response from Prometheus is empty
Expand Down
8 changes: 4 additions & 4 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ var (

// apiutil errors
var (
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
// ErrRedirectToNotLeader is the error message for redirect to not leader.
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
ErrRedirectToNotPrimary = errors.Normalize("redirect to not primary", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotPrimary"))
)

// grpcutil errors
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/apiutil/multiservicesapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func ServiceRedirector() gin.HandlerFunc {

// Prevent more than one redirection.
if name := c.Request.Header.Get(ServiceRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotPrimary))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotPrimary.FastGenByArgs().Error())
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/API servers.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect))
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/middlewares/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func Redirector() gin.HandlerFunc {

// Prevent more than one redirection.
if name := c.Request.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotLeader.FastGenByArgs().Error())
return
}

Expand Down
3 changes: 2 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
clierrs "github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -528,7 +529,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())
_, _, err = cli.GetTS(ctx)
re.Error(err)
re.True(pd.IsLeaderChange(err))
re.True(clierrs.IsLeaderChange(err))
_, _, err = cli.GetTS(ctx)
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateMember"))
Expand Down

0 comments on commit 75da7c0

Please sign in to comment.