Skip to content

Commit

Permalink
Fix the multierr bug
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed May 30, 2024
1 parent 75da7c0 commit 541981c
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 38 deletions.
5 changes: 4 additions & 1 deletion client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"go.uber.org/zap/zapcore"
)

// IsLeaderChange will determine whether there is a leader change.
// IsLeaderChange will determine whether there is a leader/primary change.
func IsLeaderChange(err error) bool {
if err == nil {
return false
}
if err == ErrClientTSOStreamClosed {
return true
}
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.62.1
Expand All @@ -34,6 +33,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
3 changes: 1 addition & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
Expand Down
39 changes: 20 additions & 19 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,20 @@ func (ci *clientInner) requestWithRetry(
isLeader bool
statusCode int
err error
logFields = append(reqInfo.logFields(),
zap.String("source", ci.source),
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
zap.Int("status-code", statusCode),
zap.Error(err))
logFields = append(reqInfo.logFields(), zap.String("source", ci.source))
)
execFunc := func() error {
defer func() {
// Handle some special status codes and errors to increase the success rate of the following requests.
ci.handleHTTPStatusCodeAndErr(statusCode, err)
log.Debug("[pd] http request finished", logFields...)
// - If the status code is 503, it indicates that there may be PD leader/follower changes.
// - If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change.
if statusCode == http.StatusServiceUnavailable || errs.IsLeaderChange(err) {
ci.sd.ScheduleCheckMemberChanged()
}
log.Info("[pd] http request finished", append(logFields,
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
zap.Int("status-code", statusCode),
zap.Error(err))...)
}()
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers.
clients := ci.sd.GetAllServiceClients()
Expand All @@ -154,7 +156,11 @@ func (ci *clientInner) requestWithRetry(
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] http request url failed", logFields...)
log.Info("[pd] http request url failed", append(logFields,
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
zap.Int("status-code", statusCode),
zap.Error(err))...)
}
if skipNum == len(clients) {
return errs.ErrClientNoTargetMember
Expand All @@ -174,14 +180,6 @@ func (ci *clientInner) requestWithRetry(
return bo.Exec(ctx, execFunc)
}

func (ci *clientInner) handleHTTPStatusCodeAndErr(code int, err error) {
// - If the status code is 503, it indicates that there may be PD leader/follower changes.
// - If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change.
if code == http.StatusServiceUnavailable || errs.IsLeaderChange(err) {
ci.sd.ScheduleCheckMemberChanged()
}
}

func noNeedRetry(statusCode int) bool {
return statusCode == http.StatusNotFound ||
statusCode == http.StatusForbidden ||
Expand Down Expand Up @@ -245,11 +243,14 @@ func (ci *clientInner) doRequest(
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
bs = bytes.TrimSpace(bs)
logFields = append(logFields, zap.ByteString("body", bs))
}

log.Error("[pd] request failed with a non-200 status", logFields...)
return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s'", resp.Status)
return resp.StatusCode, errors.Errorf(
"request pd http api failed with status: '%s', body: '%s'", resp.Status, bs,
)
}

if res == nil {
Expand Down
4 changes: 2 additions & 2 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ func (ri *requestInfo) getURL(addr string) string {

func (ri *requestInfo) logFields() []zap.Field {
return []zap.Field{
zap.String("callerID", ri.callerID),
zap.String("caller-id", ri.callerID),
zap.String("name", ri.name),
zap.String("uri", ri.uri),
zap.String("method", ri.method),
zap.String("targetURL", ri.targetURL),
zap.String("target-url", ri.targetURL),
}
}
16 changes: 4 additions & 12 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const maxRecordErrorCount = 20

// Option is used to customize the backoffer.
type Option func(*Backoffer)

Expand Down Expand Up @@ -69,18 +66,13 @@ func (bo *Backoffer) Exec(
) error {
defer bo.resetBackoff()
var (
allErrors error
err error
after *time.Timer
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
err = fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
allErrors = multierr.Append(allErrors, err)
}
if !bo.isRetryable(err) {
break
}
Expand All @@ -100,7 +92,7 @@ func (bo *Backoffer) Exec(
select {
case <-ctx.Done():
after.Stop()
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
return errors.Trace(ctx.Err())
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
Expand All @@ -115,7 +107,7 @@ func (bo *Backoffer) Exec(
}
}
}
return allErrors
return err
}

// InitialBackoffer make the initial state for retrying.
Expand Down
2 changes: 1 addition & 1 deletion client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestBackoffer(t *testing.T) {
return expectedErr
})
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.ErrorContains(err, "test; test; test; test")
re.ErrorContains(err, "test")
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
Expand Down
41 changes: 41 additions & 0 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -757,3 +758,43 @@ func (suite *httpClientTestSuite) TestGetHealthStatus() {
re.Equal("pd2", healths[1].Name)
re.True(healths[0].Health && healths[1].Health)
}

func (suite *httpClientTestSuite) TestRetryOnLeaderChange() {
re := suite.Require()
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0)
client := suite.client.WithBackoffer(bo)
for {
healths, err := client.GetHealthStatus(ctx)
if err != nil && strings.Contains(err.Error(), "context canceled") {
return
}
re.NoError(err)
re.Len(healths, 2)
select {
case <-ctx.Done():
return
default:
}
}
}()

leader := suite.cluster.GetLeaderServer()
re.NotNil(leader)
for i := 0; i < 3; i++ {
leader.ResignLeader()
re.NotEmpty(suite.cluster.WaitLeader())
leader = suite.cluster.GetLeaderServer()
re.NotNil(leader)
}

// Cancel the context to stop the goroutine.
cancel()
wg.Wait()
}

0 comments on commit 541981c

Please sign in to comment.