Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/http: handle 503/leader/primary change to schedule member check #8223

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,17 +1431,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
return resp, nil
}

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == errs.ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
Expand Down
4 changes: 4 additions & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
NotServedErr = "is not served"
// RetryTimeoutErr indicates the server is busy.
RetryTimeoutErr = "retry timeout"
// NotPrimaryErr indicates the non-primary member received the requests which should be received by primary.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the primary is changed.
NotPrimaryErr = "is not primary"
)

// client errors
Expand Down
14 changes: 14 additions & 0 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,25 @@
package errs

import (
"strings"

"github.com/pingcap/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, NotLeaderErr) ||
strings.Contains(errMsg, MismatchLeaderErr) ||
strings.Contains(errMsg, NotServedErr) ||
strings.Contains(errMsg, NotPrimaryErr)
}

// ZapError is used to make the log output easier.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
Expand Down
51 changes: 34 additions & 17 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,41 @@ func (ci *clientInner) requestWithRetry(
headerOpts ...HeaderOption,
) error {
var (
serverURL string
isLeader bool
statusCode int
err error
logFields = append(reqInfo.logFields(),
zap.String("source", ci.source),
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
zap.Int("status-code", statusCode),
zap.Error(err))
)
execFunc := func() error {
defer func() {
// Handle some special status codes and errors to increase the success rate of the following requests.
ci.handleHTTPStatusCodeAndErr(statusCode, err)
log.Debug("[pd] http request finished", logFields...)
}()
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers.
clients := ci.sd.GetAllServiceClients()
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
skipNum := 0
for _, cli := range clients {
url := cli.GetURL()
if reqInfo.targetURL != "" && reqInfo.targetURL != url {
serverURL = cli.GetURL()
isLeader = cli.IsConnectedToLeader()
if len(reqInfo.targetURL) > 0 && reqInfo.targetURL != serverURL {
skipNum++
continue
}
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
statusCode, err = ci.doRequest(ctx, serverURL, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
log.Debug("[pd] http request url failed", logFields...)
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}
if skipNum == len(clients) {
return errs.ErrClientNoTargetMember
Expand All @@ -153,13 +166,22 @@ func (ci *clientInner) requestWithRetry(
}
// Copy a new backoffer for each request.
bo := *reqInfo.bo
// Backoffer also needs to check the status code to determine whether to retry.
// Set the retryable checker for the backoffer if it's not set.
bo.SetRetryableChecker(func(err error) bool {
// Backoffer also needs to check the status code to determine whether to retry.
return err != nil && !noNeedRetry(statusCode)
})
}, false)
return bo.Exec(ctx, execFunc)
}

func (ci *clientInner) handleHTTPStatusCodeAndErr(code int, err error) {
// - If the status code is 503, it indicates that there may be PD leader/follower changes.
// - If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change.
if code == http.StatusServiceUnavailable || errs.IsLeaderChange(err) {
ci.sd.ScheduleCheckMemberChanged()
}
}

func noNeedRetry(statusCode int) bool {
return statusCode == http.StatusNotFound ||
statusCode == http.StatusForbidden ||
Expand All @@ -168,26 +190,21 @@ func noNeedRetry(statusCode int) bool {

func (ci *clientInner) doRequest(
ctx context.Context,
url string, reqInfo *requestInfo,
serverURL string, reqInfo *requestInfo,
headerOpts ...HeaderOption,
) (int, error) {
var (
source = ci.source
callerID = reqInfo.callerID
name = reqInfo.name
method = reqInfo.method
body = reqInfo.body
res = reqInfo.res
respHandler = reqInfo.respHandler
url = reqInfo.getURL(serverURL)
logFields = append(reqInfo.logFields(),
zap.String("source", ci.source),
zap.String("url", url))
)
url = reqInfo.getURL(url)
logFields := []zap.Field{
zap.String("source", source),
zap.String("name", name),
zap.String("url", url),
zap.String("method", method),
zap.String("caller-id", callerID),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body))
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
)

// The following constants are the names of the requests.
Expand Down Expand Up @@ -157,3 +158,13 @@ func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo {
func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}

func (ri *requestInfo) logFields() []zap.Field {
return []zap.Field{
zap.String("callerID", ri.callerID),
zap.String("name", ri.name),
zap.String("uri", ri.uri),
zap.String("method", ri.method),
zap.String("targetURL", ri.targetURL),
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}
}
24 changes: 14 additions & 10 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Backoffer struct {
// total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
total time.Duration
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
// If it's not set, it will use `defaultRetryableChecker` to retry on all non-nil errors.
retryableChecker func(err error) bool
// logInterval defines the log interval for retrying.
logInterval time.Duration
Expand Down Expand Up @@ -132,12 +132,9 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer
total = base
}
bo := &Backoffer{
base: base,
max: max,
total: total,
retryableChecker: func(err error) bool {
return err != nil
},
base: base,
max: max,
total: total,
next: base,
currentTotal: 0,
attempt: 0,
Expand All @@ -148,18 +145,25 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer
return bo
}

// SetRetryableChecker sets the retryable checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) {
// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) {
if !overwrite && bo.retryableChecker != nil {
return
}
bo.retryableChecker = checker
}

func (bo *Backoffer) isRetryable(err error) bool {
if bo.retryableChecker == nil {
return true
return defaultRetryableChecker(err)
}
return bo.retryableChecker(err)
}

func defaultRetryableChecker(err error) bool {
return err != nil
}

// nextInterval for now use the `exponentialInterval`.
func (bo *Backoffer) nextInterval() time.Duration {
return bo.exponentialInterval()
Expand Down
26 changes: 22 additions & 4 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,34 @@ func TestBackoffer(t *testing.T) {
// Test the retryable checker.
execCount = 0
bo = InitialBackoffer(base, max, total)
bo.SetRetryableChecker(func(error) bool {
retryableChecker := func(error) bool {
return execCount < 2
})
err = bo.Exec(ctx, func() error {
}
bo.SetRetryableChecker(retryableChecker, false)
execFunc := func() error {
execCount++
return nil
})
}
err = bo.Exec(ctx, execFunc)
re.NoError(err)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
// Test the retryable checker with overwrite.
execCount = 0
retryableChecker = func(error) bool {
return execCount < 4
}
bo.SetRetryableChecker(retryableChecker, false)
err = bo.Exec(ctx, execFunc)
re.NoError(err)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
execCount = 0
bo.SetRetryableChecker(retryableChecker, true)
err = bo.Exec(ctx, execFunc)
re.NoError(err)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
}

func isBackofferReset(bo *Backoffer) bool {
Expand Down
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ tsoBatchLoop:
cancel()
stream = nil
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if errs.IsLeaderChange(err) {
if err := bo.Exec(ctx, svcDiscovery.CheckMemberChanged); err != nil {
select {
case <-ctx.Done():
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ error = '''
redirect to not leader
'''

["PD:apiutil:ErrRedirectToNotPrimary"]
error = '''
redirect to not primary
'''

["PD:autoscaling:ErrEmptyMetricsResponse"]
error = '''
metrics response from Prometheus is empty
Expand Down
8 changes: 4 additions & 4 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ var (

// apiutil errors
var (
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
// ErrRedirectToNotLeader is the error message for redirect to not leader.
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
ErrRedirectToNotPrimary = errors.Normalize("redirect to not primary", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotPrimary"))
)

// grpcutil errors
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/apiutil/multiservicesapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func ServiceRedirector() gin.HandlerFunc {

// Prevent more than one redirection.
if name := c.Request.Header.Get(ServiceRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotPrimary))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotPrimary.FastGenByArgs().Error())
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/API servers.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect))
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/middlewares/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func Redirector() gin.HandlerFunc {

// Prevent more than one redirection.
if name := c.Request.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotLeader.FastGenByArgs().Error())
return
}

Expand Down
3 changes: 2 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
clierrs "github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -528,7 +529,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())
_, _, err = cli.GetTS(ctx)
re.Error(err)
re.True(pd.IsLeaderChange(err))
re.True(clierrs.IsLeaderChange(err))
_, _, err = cli.GetTS(ctx)
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateMember"))
Expand Down
Loading