diff --git a/README.md b/README.md index c2147522768..dcdaeb5b61a 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,6 @@ [![Check Status](https://github.com/tikv/pd/actions/workflows/check.yaml/badge.svg)](https://github.com/tikv/pd/actions/workflows/check.yaml) [![Build & Test Status](https://github.com/tikv/pd/actions/workflows/pd-tests.yaml/badge.svg?branch=master)](https://github.com/tikv/pd/actions/workflows/pd-tests.yaml) -[![TSO Consistency Test Status](https://github.com/tikv/pd/actions/workflows/tso-consistency-test.yaml/badge.svg)](https://github.com/tikv/pd/actions/workflows/tso-consistency-test.yaml) [![GitHub release](https://img.shields.io/github/release/tikv/pd.svg)](https://github.com/tikv/pd/releases) [![Go Report Card](https://goreportcard.com/badge/github.com/tikv/pd)](https://goreportcard.com/report/github.com/tikv/pd) [![codecov](https://codecov.io/gh/tikv/pd/branch/master/graph/badge.svg)](https://codecov.io/gh/tikv/pd) diff --git a/client/client.go b/client/client.go index 330e1bd4974..e3675b52419 100644 --- a/client/client.go +++ b/client/client.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/tlsutil" "go.uber.org/zap" ) @@ -78,25 +79,25 @@ type RPCClient interface { // taking care of region change. // Also, it may return nil if PD finds no Region for the key temporarily, // client should retry later. - GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) + GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) // GetRegionFromMember gets a region from certain members. - GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) + GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error) // GetPrevRegion gets the previous region and its leader Peer of the region where the key is located. - GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) + GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) // GetRegionByID gets a region and its leader Peer from PD by id. - GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) + GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) // Deprecated: use BatchScanRegions instead. // ScanRegions gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). - ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) + ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) // BatchScanRegions gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). // The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned. - BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) + BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) // GetStore gets a store from PD by store id. // The store may expire later. Caller is responsible for caching and taking care // of store change. @@ -104,7 +105,7 @@ type RPCClient interface { // GetAllStores gets all stores from pd. // The store may expire later. Caller is responsible for caching and taking care // of store change. - GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error) + GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) // UpdateGCSafePoint TiKV will check it and do GC themselves if necessary. // If the given safePoint is less than the current one, it will not be updated. // Returns the new safePoint after updating. @@ -120,11 +121,11 @@ type RPCClient interface { ScatterRegion(ctx context.Context, regionID uint64) error // ScatterRegions scatters the specified regions. Should use it for a batch of regions, // and the distribution of these regions will be dispersed. - ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) + ScatterRegions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) // SplitRegions split regions by given split keys - SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error) + SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error) // SplitAndScatterRegions split regions by given split keys and scatter new regions - SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) + SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) // GetOperator gets the status of operator of the specified region. GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) @@ -179,7 +180,7 @@ type Client interface { GetServiceDiscovery() ServiceDiscovery // UpdateOption updates the client option. - UpdateOption(option DynamicOption, value any) error + UpdateOption(option opt.DynamicOption, value any) error // Close closes the client. Close() @@ -278,7 +279,7 @@ func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) { // NewClient creates a PD client. func NewClient( callerComponent caller.Component, - svrAddrs []string, security SecurityOption, opts ...ClientOption, + svrAddrs []string, security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { return NewClientWithContext(context.Background(), callerComponent, svrAddrs, security, opts...) @@ -289,7 +290,7 @@ func NewClientWithContext( ctx context.Context, callerComponent caller.Component, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { return createClientWithKeyspace(ctx, callerComponent, nullKeyspaceID, svrAddrs, security, opts...) @@ -301,7 +302,7 @@ func NewClientWithKeyspace( ctx context.Context, callerComponent caller.Component, keyspaceID uint32, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { if keyspaceID < defaultKeyspaceID || keyspaceID > maxKeyspaceID { return nil, errors.Errorf("invalid keyspace id %d. It must be in the range of [%d, %d]", @@ -316,7 +317,7 @@ func createClientWithKeyspace( ctx context.Context, callerComponent caller.Component, keyspaceID uint32, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { tlsCfg, err := tlsutil.TLSConfig{ CAPath: security.CAPath, @@ -342,13 +343,13 @@ func createClientWithKeyspace( ctx: clientCtx, cancel: clientCancel, tlsCfg: tlsCfg, - option: newOption(), + option: opt.NewOption(), }, } // Inject the client options. for _, opt := range opts { - opt(c.inner) + opt(c.inner.option) } return c, c.inner.init(nil) @@ -416,7 +417,7 @@ func NewClientWithAPIContext( ctx context.Context, apiCtx APIContext, callerComponent caller.Component, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { apiVersion, keyspaceName := apiCtx.GetAPIVersion(), apiCtx.GetKeyspaceName() switch apiVersion { @@ -436,7 +437,7 @@ func newClientWithKeyspaceName( ctx context.Context, callerComponent caller.Component, keyspaceName string, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { tlsCfg, err := tlsutil.TLSConfig{ CAPath: security.CAPath, @@ -463,13 +464,13 @@ func newClientWithKeyspaceName( cancel: clientCancel, svrUrls: svrAddrs, tlsCfg: tlsCfg, - option: newOption(), + option: opt.NewOption(), }, } // Inject the client options. for _, opt := range opts { - opt(c.inner) + opt(c.inner.option) } updateKeyspaceIDFunc := func() error { @@ -521,17 +522,17 @@ func (c *client) GetServiceDiscovery() ServiceDiscovery { } // UpdateOption updates the client option. -func (c *client) UpdateOption(option DynamicOption, value any) error { +func (c *client) UpdateOption(option opt.DynamicOption, value any) error { switch option { - case MaxTSOBatchWaitInterval: + case opt.MaxTSOBatchWaitInterval: interval, ok := value.(time.Duration) if !ok { return errors.New("[pd] invalid value type for MaxTSOBatchWaitInterval option, it should be time.Duration") } - if err := c.inner.option.setMaxTSOBatchWaitInterval(interval); err != nil { + if err := c.inner.option.SetMaxTSOBatchWaitInterval(interval); err != nil { return err } - case EnableTSOFollowerProxy: + case opt.EnableTSOFollowerProxy: if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE { return errors.New("[pd] tso follower proxy is only supported in PD service mode") } @@ -539,19 +540,19 @@ func (c *client) UpdateOption(option DynamicOption, value any) error { if !ok { return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool") } - c.inner.option.setEnableTSOFollowerProxy(enable) - case EnableFollowerHandle: + c.inner.option.SetEnableTSOFollowerProxy(enable) + case opt.EnableFollowerHandle: enable, ok := value.(bool) if !ok { return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool") } - c.inner.option.setEnableFollowerHandle(enable) - case TSOClientRPCConcurrency: + c.inner.option.SetEnableFollowerHandle(enable) + case opt.TSOClientRPCConcurrency: value, ok := value.(int) if !ok { return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int") } - c.inner.option.setTSOClientRPCConcurrency(value) + c.inner.option.SetTSOClientRPCConcurrency(value) default: return errors.New("[pd] unsupported client option") } @@ -563,7 +564,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { start := time.Now() defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.GetMembersRequest{Header: c.requestHeader()} protoClient, ctx := c.getClientAndContext(ctx) @@ -634,7 +635,7 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e default: return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode") } - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() // Call GetMinTS API to get the minimal TS from the API leader. protoClient, ctx := c.getClientAndContext(ctx) @@ -683,7 +684,7 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { } // GetRegionFromMember implements the RPCClient interface. -func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...GetRegionOption) (*Region, error) { +func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -722,27 +723,27 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs } // GetRegion implements the RPCClient interface. -func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { +func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } req := &pdpb.GetRegionRequest{ Header: c.requestHeader(), RegionKey: key, - NeedBuckets: options.needBuckets, + NeedBuckets: options.NeedBuckets, } serviceClient, cctx := c.inner.getRegionAPIClientAndContext(ctx, - options.allowFollowerHandle && c.inner.option.getEnableFollowerHandle()) + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -762,27 +763,27 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt } // GetPrevRegion implements the RPCClient interface. -func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { +func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } req := &pdpb.GetRegionRequest{ Header: c.requestHeader(), RegionKey: key, - NeedBuckets: options.needBuckets, + NeedBuckets: options.NeedBuckets, } serviceClient, cctx := c.inner.getRegionAPIClientAndContext(ctx, - options.allowFollowerHandle && c.inner.option.getEnableFollowerHandle()) + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -802,27 +803,27 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio } // GetRegionByID implements the RPCClient interface. -func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) { +func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } req := &pdpb.GetRegionByIDRequest{ Header: c.requestHeader(), RegionId: regionID, - NeedBuckets: options.needBuckets, + NeedBuckets: options.NeedBuckets, } serviceClient, cctx := c.inner.getRegionAPIClientAndContext(ctx, - options.allowFollowerHandle && c.inner.option.getEnableFollowerHandle()) + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -842,7 +843,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get } // ScanRegions implements the RPCClient interface. -func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) { +func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -853,10 +854,10 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, var cancel context.CancelFunc scanCtx := ctx if _, ok := ctx.Deadline(); !ok { - scanCtx, cancel = context.WithTimeout(ctx, c.inner.option.timeout) + scanCtx, cancel = context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() } - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } @@ -867,7 +868,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, Limit: int32(limit), } serviceClient, cctx := c.inner.getRegionAPIClientAndContext(scanCtx, - options.allowFollowerHandle && c.inner.option.getEnableFollowerHandle()) + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -893,7 +894,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, } // BatchScanRegions implements the RPCClient interface. -func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) { +func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -904,10 +905,10 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit var cancel context.CancelFunc scanCtx := ctx if _, ok := ctx.Deadline(); !ok { - scanCtx, cancel = context.WithTimeout(ctx, c.inner.option.timeout) + scanCtx, cancel = context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() } - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } @@ -917,13 +918,13 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit } req := &pdpb.BatchScanRegionsRequest{ Header: c.requestHeader(), - NeedBuckets: options.needBuckets, + NeedBuckets: options.NeedBuckets, Ranges: pbRanges, Limit: int32(limit), - ContainAllKeyRange: options.outputMustContainAllKeyRange, + ContainAllKeyRange: options.OutputMustContainAllKeyRange, } serviceClient, cctx := c.inner.getRegionAPIClientAndContext(scanCtx, - options.allowFollowerHandle && c.inner.option.getEnableFollowerHandle()) + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -1001,7 +1002,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e start := time.Now() defer func() { cmdDurationGetStore.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.GetStoreRequest{ Header: c.requestHeader(), @@ -1031,9 +1032,9 @@ func handleStoreResponse(resp *pdpb.GetStoreResponse) (*metapb.Store, error) { } // GetAllStores implements the RPCClient interface. -func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error) { +func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { // Applies options - options := &GetStoreOp{} + options := &opt.GetStoreOp{} for _, opt := range opts { opt(options) } @@ -1045,11 +1046,11 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m start := time.Now() defer func() { cmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.GetAllStoresRequest{ Header: c.requestHeader(), - ExcludeTombstoneStores: options.excludeTombstone, + ExcludeTombstoneStores: options.ExcludeTombstone, } protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1072,7 +1073,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 start := time.Now() defer func() { cmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.UpdateGCSafePointRequest{ Header: c.requestHeader(), @@ -1103,7 +1104,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, start := time.Now() defer func() { cmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.UpdateServiceGCSafePointRequest{ Header: c.requestHeader(), @@ -1136,7 +1137,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g start := time.Now() defer func() { cmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.ScatterRegionRequest{ Header: c.requestHeader(), @@ -1158,7 +1159,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g } // ScatterRegions implements the RPCClient interface. -func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) { +func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -1167,24 +1168,24 @@ func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts .. } // SplitAndScatterRegions implements the RPCClient interface. -func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) { +func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationSplitAndScatterRegions.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() - options := &RegionsOp{} + options := &opt.RegionsOp{} for _, opt := range opts { opt(options) } req := &pdpb.SplitAndScatterRegionsRequest{ Header: c.requestHeader(), SplitKeys: splitKeys, - Group: options.group, - RetryLimit: options.retryLimit, + Group: options.Group, + RetryLimit: options.RetryLimit, } protoClient, ctx := c.getClientAndContext(ctx) @@ -1203,7 +1204,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe start := time.Now() defer func() { cmdDurationGetOperator.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.GetOperatorRequest{ Header: c.requestHeader(), @@ -1217,23 +1218,23 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe } // SplitRegions split regions by given split keys -func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error) { +func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() - options := &RegionsOp{} + options := &opt.RegionsOp{} for _, opt := range opts { opt(options) } req := &pdpb.SplitRegionsRequest{ Header: c.requestHeader(), SplitKeys: splitKeys, - RetryLimit: options.retryLimit, + RetryLimit: options.RetryLimit, } protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1250,21 +1251,21 @@ func (c *client) requestHeader() *pdpb.RequestHeader { } } -func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) { +func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) { start := time.Now() defer func() { cmdDurationScatterRegions.Observe(time.Since(start).Seconds()) }() - options := &RegionsOp{} + options := &opt.RegionsOp{} for _, opt := range opts { opt(options) } - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.ScatterRegionRequest{ Header: c.requestHeader(), - Group: options.group, + Group: options.Group, RegionsId: regionsID, - RetryLimit: options.retryLimit, - SkipStoreLimit: options.skipStoreLimit, + RetryLimit: options.RetryLimit, + SkipStoreLimit: options.SkipStoreLimit, } protoClient, ctx := c.getClientAndContext(ctx) @@ -1295,7 +1296,7 @@ func trimHTTPPrefix(str string) string { // LoadGlobalConfig implements the RPCClient interface. func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) { - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1327,7 +1328,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items for i, it := range items { resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType, Payload: it.PayLoad} } - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1345,7 +1346,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis // TODO: Add retry mechanism // register watch components there globalConfigWatcherCh := make(chan []GlobalConfigItem, 16) - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1394,7 +1395,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis // GetExternalTimestamp implements the RPCClient interface. func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1415,7 +1416,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { // SetExternalTimestamp implements the RPCClient interface. func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error { - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { diff --git a/client/client_test.go b/client/client_test.go index efc905935d3..234bb2da10a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/client/caller" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/testutil" "github.com/tikv/pd/client/utils/tsoutil" "go.uber.org/goleak" @@ -56,7 +57,7 @@ func TestUpdateURLs(t *testing.T) { } return } - cli := &pdServiceDiscovery{option: newOption()} + cli := &pdServiceDiscovery{option: opt.NewOption()} cli.urls.Store([]string{}) cli.updateURLs(members[1:]) re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs()) @@ -89,7 +90,7 @@ func TestClientWithRetry(t *testing.T) { re := require.New(t) start := time.Now() _, err := NewClientWithContext(context.TODO(), caller.TestComponent, - []string{testClientURL}, SecurityOption{}, WithMaxErrorRetry(5)) + []string{testClientURL}, SecurityOption{}, opt.WithMaxErrorRetry(5)) re.Error(err) re.Less(time.Since(start), time.Second*10) } @@ -104,10 +105,10 @@ func TestGRPCDialOption(t *testing.T) { ctx: ctx, cancel: cancel, tlsCfg: nil, - option: newOption(), + option: opt.NewOption(), } cli.urls.Store([]string{testClientURL}) - cli.option.gRPCDialOptions = []grpc.DialOption{grpc.WithBlock()} + cli.option.GRPCDialOptions = []grpc.DialOption{grpc.WithBlock()} err := cli.updateMember() re.Error(err) re.Greater(time.Since(start), 500*time.Millisecond) diff --git a/client/gc_client.go b/client/gc_client.go index 782f8fe2214..2b64cb91c4a 100644 --- a/client/gc_client.go +++ b/client/gc_client.go @@ -41,7 +41,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf start := time.Now() defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &pdpb.UpdateGCSafePointV2Request{ Header: c.requestHeader(), KeyspaceId: keyspaceID, @@ -70,7 +70,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32 start := time.Now() defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &pdpb.UpdateServiceSafePointV2Request{ Header: c.requestHeader(), KeyspaceId: keyspaceID, @@ -99,7 +99,7 @@ func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan [ Revision: revision, } - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { diff --git a/client/inner_client.go b/client/inner_client.go index 1bbbdd0eb88..47acda56e42 100644 --- a/client/inner_client.go +++ b/client/inner_client.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -35,7 +36,7 @@ type innerClient struct { cancel context.CancelFunc wg sync.WaitGroup tlsCfg *tls.Config - option *option + option *opt.Option } func (c *innerClient) init(updateKeyspaceIDCb updateKeyspaceIDFunc) error { @@ -57,7 +58,7 @@ func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) { c.Lock() defer c.Unlock() - if c.option.useTSOServerProxy { + if c.option.UseTSOServerProxy { // If we are using TSO server proxy, we always use PD_SVC_MODE. newMode = pdpb.ServiceMode_PD_SVC_MODE } @@ -158,8 +159,8 @@ func (c *innerClient) close() { func (c *innerClient) setup() error { // Init the metrics. - if c.option.initMetrics { - initAndRegisterMetrics(c.option.metricsLabels) + if c.option.InitMetrics { + initAndRegisterMetrics(c.option.MetricsLabels) } // Init the client base. diff --git a/client/keyspace_client.go b/client/keyspace_client.go index f5891817fd7..3f8cea993c0 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -52,7 +52,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key } start := time.Now() defer func() { cmdDurationLoadKeyspace.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &keyspacepb.LoadKeyspaceRequest{ Header: c.requestHeader(), Name: name, @@ -96,7 +96,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp } start := time.Now() defer func() { cmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &keyspacepb.UpdateKeyspaceStateRequest{ Header: c.requestHeader(), Id: id, @@ -140,7 +140,7 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint } start := time.Now() defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.inner.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &keyspacepb.GetAllKeyspacesRequest{ Header: c.requestHeader(), StartId: startID, diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index c93089687fa..3e56268d547 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -118,7 +118,7 @@ func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...OpOpti start := time.Now() defer func() { cmdDurationPut.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &meta_storagepb.PutRequest{ Key: key, Value: value, @@ -157,7 +157,7 @@ func (c *innerClient) Get(ctx context.Context, key []byte, opts ...OpOption) (*m start := time.Now() defer func() { cmdDurationGet.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &meta_storagepb.GetRequest{ Key: key, RangeEnd: options.rangeEnd, diff --git a/client/option.go b/client/opt/option.go similarity index 65% rename from client/option.go rename to client/opt/option.go index d3cadf0c7d4..b90ff3a905c 100644 --- a/client/option.go +++ b/client/opt/option.go @@ -1,4 +1,4 @@ -// Copyright 2021 TiKV Project Authors. +// Copyright 2024 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pd +package opt import ( "sync/atomic" @@ -50,31 +50,31 @@ const ( dynamicOptionCount ) -// option is the configurable option for the PD client. +// Option is the configurable option for the PD client. // It provides the ability to change some PD client's options online from the outside. -type option struct { +type Option struct { // Static options. - gRPCDialOptions []grpc.DialOption - timeout time.Duration - maxRetryTimes int - enableForwarding bool - useTSOServerProxy bool - metricsLabels prometheus.Labels - initMetrics bool + GRPCDialOptions []grpc.DialOption + Timeout time.Duration + MaxRetryTimes int + EnableForwarding bool + UseTSOServerProxy bool + MetricsLabels prometheus.Labels + InitMetrics bool // Dynamic options. dynamicOptions [dynamicOptionCount]atomic.Value - enableTSOFollowerProxyCh chan struct{} + EnableTSOFollowerProxyCh chan struct{} } -// newOption creates a new PD client option with the default values set. -func newOption() *option { - co := &option{ - timeout: defaultPDTimeout, - maxRetryTimes: maxInitClusterRetries, - enableTSOFollowerProxyCh: make(chan struct{}, 1), - initMetrics: true, +// NewOption creates a new PD client option with the default values set. +func NewOption() *Option { + co := &Option{ + Timeout: defaultPDTimeout, + MaxRetryTimes: maxInitClusterRetries, + EnableTSOFollowerProxyCh: make(chan struct{}, 1), + InitMetrics: true, } co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval) @@ -84,68 +84,124 @@ func newOption() *option { return co } -// setMaxTSOBatchWaitInterval sets the max TSO batch wait interval option. +// SetMaxTSOBatchWaitInterval sets the max TSO batch wait interval option. // It only accepts the interval value between 0 and 10ms. -func (o *option) setMaxTSOBatchWaitInterval(interval time.Duration) error { +func (o *Option) SetMaxTSOBatchWaitInterval(interval time.Duration) error { if interval < 0 || interval > 10*time.Millisecond { return errors.New("[pd] invalid max TSO batch wait interval, should be between 0 and 10ms") } - old := o.getMaxTSOBatchWaitInterval() + old := o.GetMaxTSOBatchWaitInterval() if interval != old { o.dynamicOptions[MaxTSOBatchWaitInterval].Store(interval) } return nil } -// setEnableFollowerHandle set the Follower Handle option. -func (o *option) setEnableFollowerHandle(enable bool) { - old := o.getEnableFollowerHandle() +// SetEnableFollowerHandle set the Follower Handle option. +func (o *Option) SetEnableFollowerHandle(enable bool) { + old := o.GetEnableFollowerHandle() if enable != old { o.dynamicOptions[EnableFollowerHandle].Store(enable) } } -// getMaxTSOBatchWaitInterval gets the Follower Handle enable option. -func (o *option) getEnableFollowerHandle() bool { +// GetEnableFollowerHandle gets the Follower Handle enable option. +func (o *Option) GetEnableFollowerHandle() bool { return o.dynamicOptions[EnableFollowerHandle].Load().(bool) } -// getMaxTSOBatchWaitInterval gets the max TSO batch wait interval option. -func (o *option) getMaxTSOBatchWaitInterval() time.Duration { +// GetMaxTSOBatchWaitInterval gets the max TSO batch wait interval option. +func (o *Option) GetMaxTSOBatchWaitInterval() time.Duration { return o.dynamicOptions[MaxTSOBatchWaitInterval].Load().(time.Duration) } -// setEnableTSOFollowerProxy sets the TSO Follower Proxy option. -func (o *option) setEnableTSOFollowerProxy(enable bool) { - old := o.getEnableTSOFollowerProxy() +// SetEnableTSOFollowerProxy sets the TSO Follower Proxy option. +func (o *Option) SetEnableTSOFollowerProxy(enable bool) { + old := o.GetEnableTSOFollowerProxy() if enable != old { o.dynamicOptions[EnableTSOFollowerProxy].Store(enable) select { - case o.enableTSOFollowerProxyCh <- struct{}{}: + case o.EnableTSOFollowerProxyCh <- struct{}{}: default: } } } -// getEnableTSOFollowerProxy gets the TSO Follower Proxy option. -func (o *option) getEnableTSOFollowerProxy() bool { +// GetEnableTSOFollowerProxy gets the TSO Follower Proxy option. +func (o *Option) GetEnableTSOFollowerProxy() bool { return o.dynamicOptions[EnableTSOFollowerProxy].Load().(bool) } -func (o *option) setTSOClientRPCConcurrency(value int) { - old := o.getTSOClientRPCConcurrency() +// SetTSOClientRPCConcurrency sets the TSO client RPC concurrency option. +func (o *Option) SetTSOClientRPCConcurrency(value int) { + old := o.GetTSOClientRPCConcurrency() if value != old { o.dynamicOptions[TSOClientRPCConcurrency].Store(value) } } -func (o *option) getTSOClientRPCConcurrency() int { +// GetTSOClientRPCConcurrency gets the TSO client RPC concurrency option. +func (o *Option) GetTSOClientRPCConcurrency() int { return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int) } +// ClientOption configures client. +type ClientOption func(*Option) + +// WithGRPCDialOptions configures the client with gRPC dial options. +func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { + return func(op *Option) { + op.GRPCDialOptions = append(op.GRPCDialOptions, opts...) + } +} + +// WithCustomTimeoutOption configures the client with timeout option. +func WithCustomTimeoutOption(timeout time.Duration) ClientOption { + return func(op *Option) { + op.Timeout = timeout + } +} + +// WithForwardingOption configures the client with forwarding option. +func WithForwardingOption(enableForwarding bool) ClientOption { + return func(op *Option) { + op.EnableForwarding = enableForwarding + } +} + +// WithTSOServerProxyOption configures the client to use TSO server proxy, +// i.e., the client will send TSO requests to the API leader (the TSO server +// proxy) which will forward the requests to the TSO servers. +func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { + return func(op *Option) { + op.UseTSOServerProxy = useTSOServerProxy + } +} + +// WithMaxErrorRetry configures the client max retry times when connect meets error. +func WithMaxErrorRetry(count int) ClientOption { + return func(op *Option) { + op.MaxRetryTimes = count + } +} + +// WithMetricsLabels configures the client with metrics labels. +func WithMetricsLabels(labels prometheus.Labels) ClientOption { + return func(op *Option) { + op.MetricsLabels = labels + } +} + +// WithInitMetricsOption configures the client with metrics labels. +func WithInitMetricsOption(initMetrics bool) ClientOption { + return func(op *Option) { + op.InitMetrics = initMetrics + } +} + // GetStoreOp represents available options when getting stores. type GetStoreOp struct { - excludeTombstone bool + ExcludeTombstone bool } // GetStoreOption configures GetStoreOp. @@ -153,14 +209,14 @@ type GetStoreOption func(*GetStoreOp) // WithExcludeTombstone excludes tombstone stores from the result. func WithExcludeTombstone() GetStoreOption { - return func(op *GetStoreOp) { op.excludeTombstone = true } + return func(op *GetStoreOp) { op.ExcludeTombstone = true } } // RegionsOp represents available options when operate regions type RegionsOp struct { - group string - retryLimit uint64 - skipStoreLimit bool + Group string + RetryLimit uint64 + SkipStoreLimit bool } // RegionsOption configures RegionsOp @@ -168,24 +224,24 @@ type RegionsOption func(op *RegionsOp) // WithGroup specify the group during Scatter/Split Regions func WithGroup(group string) RegionsOption { - return func(op *RegionsOp) { op.group = group } + return func(op *RegionsOp) { op.Group = group } } // WithRetry specify the retry limit during Scatter/Split Regions func WithRetry(retry uint64) RegionsOption { - return func(op *RegionsOp) { op.retryLimit = retry } + return func(op *RegionsOp) { op.RetryLimit = retry } } // WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions func WithSkipStoreLimit() RegionsOption { - return func(op *RegionsOp) { op.skipStoreLimit = true } + return func(op *RegionsOp) { op.SkipStoreLimit = true } } // GetRegionOp represents available options when getting regions. type GetRegionOp struct { - needBuckets bool - allowFollowerHandle bool - outputMustContainAllKeyRange bool + NeedBuckets bool + AllowFollowerHandle bool + OutputMustContainAllKeyRange bool } // GetRegionOption configures GetRegionOp. @@ -193,69 +249,15 @@ type GetRegionOption func(op *GetRegionOp) // WithBuckets means getting region and its buckets. func WithBuckets() GetRegionOption { - return func(op *GetRegionOp) { op.needBuckets = true } + return func(op *GetRegionOp) { op.NeedBuckets = true } } // WithAllowFollowerHandle means that client can send request to follower and let it handle this request. func WithAllowFollowerHandle() GetRegionOption { - return func(op *GetRegionOp) { op.allowFollowerHandle = true } + return func(op *GetRegionOp) { op.AllowFollowerHandle = true } } // WithOutputMustContainAllKeyRange means the output must contain all key ranges. func WithOutputMustContainAllKeyRange() GetRegionOption { - return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true } -} - -// ClientOption configures innerClient. -type ClientOption func(c *innerClient) - -// WithGRPCDialOptions configures the client with gRPC dial options. -func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { - return func(c *innerClient) { - c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...) - } -} - -// WithCustomTimeoutOption configures the client with timeout option. -func WithCustomTimeoutOption(timeout time.Duration) ClientOption { - return func(c *innerClient) { - c.option.timeout = timeout - } -} - -// WithForwardingOption configures the client with forwarding option. -func WithForwardingOption(enableForwarding bool) ClientOption { - return func(c *innerClient) { - c.option.enableForwarding = enableForwarding - } -} - -// WithTSOServerProxyOption configures the client to use TSO server proxy, -// i.e., the client will send TSO requests to the API leader (the TSO server -// proxy) which will forward the requests to the TSO servers. -func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { - return func(c *innerClient) { - c.option.useTSOServerProxy = useTSOServerProxy - } -} - -// WithMaxErrorRetry configures the client max retry times when connect meets error. -func WithMaxErrorRetry(count int) ClientOption { - return func(c *innerClient) { - c.option.maxRetryTimes = count - } -} - -// WithMetricsLabels configures the client with metrics labels. -func WithMetricsLabels(labels prometheus.Labels) ClientOption { - return func(c *innerClient) { - c.option.metricsLabels = labels - } -} - -// WithInitMetricsOption configures the client with metrics labels. -func WithInitMetricsOption(initMetrics bool) ClientOption { - return func(c *innerClient) { - c.option.initMetrics = initMetrics - } + return func(op *GetRegionOp) { op.OutputMustContainAllKeyRange = true } } diff --git a/client/option_test.go b/client/opt/option_test.go similarity index 55% rename from client/option_test.go rename to client/opt/option_test.go index 84e5dd3abce..4a0b8d16fcf 100644 --- a/client/option_test.go +++ b/client/opt/option_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 TiKV Project Authors. +// Copyright 2024 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pd +package opt import ( "testing" @@ -24,43 +24,43 @@ import ( func TestDynamicOptionChange(t *testing.T) { re := require.New(t) - o := newOption() + o := NewOption() // Check the default value setting. - re.Equal(defaultMaxTSOBatchWaitInterval, o.getMaxTSOBatchWaitInterval()) - re.Equal(defaultEnableTSOFollowerProxy, o.getEnableTSOFollowerProxy()) - re.Equal(defaultEnableFollowerHandle, o.getEnableFollowerHandle()) + re.Equal(defaultMaxTSOBatchWaitInterval, o.GetMaxTSOBatchWaitInterval()) + re.Equal(defaultEnableTSOFollowerProxy, o.GetEnableTSOFollowerProxy()) + re.Equal(defaultEnableFollowerHandle, o.GetEnableFollowerHandle()) // Check the invalid value setting. - re.Error(o.setMaxTSOBatchWaitInterval(time.Second)) - re.Equal(defaultMaxTSOBatchWaitInterval, o.getMaxTSOBatchWaitInterval()) + re.Error(o.SetMaxTSOBatchWaitInterval(time.Second)) + re.Equal(defaultMaxTSOBatchWaitInterval, o.GetMaxTSOBatchWaitInterval()) expectInterval := time.Millisecond - o.setMaxTSOBatchWaitInterval(expectInterval) - re.Equal(expectInterval, o.getMaxTSOBatchWaitInterval()) + o.SetMaxTSOBatchWaitInterval(expectInterval) + re.Equal(expectInterval, o.GetMaxTSOBatchWaitInterval()) expectInterval = time.Duration(float64(time.Millisecond) * 0.5) - o.setMaxTSOBatchWaitInterval(expectInterval) - re.Equal(expectInterval, o.getMaxTSOBatchWaitInterval()) + o.SetMaxTSOBatchWaitInterval(expectInterval) + re.Equal(expectInterval, o.GetMaxTSOBatchWaitInterval()) expectInterval = time.Duration(float64(time.Millisecond) * 1.5) - o.setMaxTSOBatchWaitInterval(expectInterval) - re.Equal(expectInterval, o.getMaxTSOBatchWaitInterval()) + o.SetMaxTSOBatchWaitInterval(expectInterval) + re.Equal(expectInterval, o.GetMaxTSOBatchWaitInterval()) expectBool := true - o.setEnableTSOFollowerProxy(expectBool) + o.SetEnableTSOFollowerProxy(expectBool) // Check the value changing notification. testutil.Eventually(re, func() bool { - <-o.enableTSOFollowerProxyCh + <-o.EnableTSOFollowerProxyCh return true }) - re.Equal(expectBool, o.getEnableTSOFollowerProxy()) + re.Equal(expectBool, o.GetEnableTSOFollowerProxy()) // Check whether any data will be sent to the channel. // It will panic if the test fails. - close(o.enableTSOFollowerProxyCh) + close(o.EnableTSOFollowerProxyCh) // Setting the same value should not notify the channel. - o.setEnableTSOFollowerProxy(expectBool) + o.SetEnableTSOFollowerProxy(expectBool) expectBool = true - o.setEnableFollowerHandle(expectBool) - re.Equal(expectBool, o.getEnableFollowerHandle()) + o.SetEnableFollowerHandle(expectBool) + re.Equal(expectBool, o.GetEnableFollowerHandle()) expectBool = false - o.setEnableFollowerHandle(expectBool) - re.Equal(expectBool, o.getEnableFollowerHandle()) + o.SetEnableFollowerHandle(expectBool) + re.Equal(expectBool, o.GetEnableFollowerHandle()) } diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index c961e9e42fd..0bdc6868c65 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/utils/grpcutil" "go.uber.org/zap" @@ -436,7 +437,7 @@ type pdServiceDiscovery struct { keyspaceID uint32 tlsCfg *tls.Config // Client option. - option *option + option *opt.Option } // NewDefaultPDServiceDiscovery returns a new default PD service discovery-based client. @@ -445,7 +446,7 @@ func NewDefaultPDServiceDiscovery( urls []string, tlsCfg *tls.Config, ) *pdServiceDiscovery { var wg sync.WaitGroup - return newPDServiceDiscovery(ctx, cancel, &wg, nil, nil, defaultKeyspaceID, urls, tlsCfg, newOption()) + return newPDServiceDiscovery(ctx, cancel, &wg, nil, nil, defaultKeyspaceID, urls, tlsCfg, opt.NewOption()) } // newPDServiceDiscovery returns a new PD service discovery-based client. @@ -455,7 +456,7 @@ func newPDServiceDiscovery( serviceModeUpdateCb func(pdpb.ServiceMode), updateKeyspaceIDFunc updateKeyspaceIDFunc, keyspaceID uint32, - urls []string, tlsCfg *tls.Config, option *option, + urls []string, tlsCfg *tls.Config, option *opt.Option, ) *pdServiceDiscovery { pdsd := &pdServiceDiscovery{ checkMembershipCh: make(chan struct{}, 1), @@ -515,7 +516,7 @@ func (c *pdServiceDiscovery) initRetry(f func() error) error { var err error ticker := time.NewTicker(time.Second) defer ticker.Stop() - for range c.option.maxRetryTimes { + for range c.option.MaxRetryTimes { if err = f(); err == nil { return nil } @@ -607,7 +608,7 @@ func (c *pdServiceDiscovery) memberHealthCheckLoop() { } func (c *pdServiceDiscovery) checkLeaderHealth(ctx context.Context) { - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() leader := c.getLeaderServiceClient() leader.checkNetworkAvailable(ctx) @@ -673,7 +674,7 @@ func (c *pdServiceDiscovery) discoverMicroservice(svcType serviceType) (urls []s case tsoService: leaderURL := c.getLeaderURL() if len(leaderURL) > 0 { - clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.timeout) + clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.Timeout) if err != nil { log.Error("[pd] failed to get cluster info", zap.String("leader-url", leaderURL), errs.ZapError(err)) @@ -744,7 +745,7 @@ func (c *pdServiceDiscovery) getServiceClientByKind(kind apiKind) ServiceClient // GetServiceClient returns the leader/primary ServiceClient if it is healthy. func (c *pdServiceDiscovery) GetServiceClient() ServiceClient { leaderClient := c.getLeaderServiceClient() - if c.option.enableForwarding && !leaderClient.Available() { + if c.option.EnableForwarding && !leaderClient.Available() { if followerClient := c.getServiceClientByKind(forwardAPIKind); followerClient != nil { log.Debug("[pd] use follower client", zap.String("url", followerClient.GetURL())) return followerClient @@ -823,7 +824,7 @@ func (c *pdServiceDiscovery) initClusterID() error { defer cancel() clusterID := uint64(0) for _, url := range c.GetServiceURLs() { - members, err := c.getMembers(ctx, url, c.option.timeout) + members, err := c.getMembers(ctx, url, c.option.Timeout) if err != nil || members.GetHeader() == nil { log.Warn("[pd] failed to get cluster id", zap.String("url", url), errs.ZapError(err)) continue @@ -854,7 +855,7 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error { return errors.New("no leader found") } - clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.timeout) + clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.Timeout) if err != nil { if strings.Contains(err.Error(), "Unimplemented") { // If the method is not supported, we set it to pd mode. @@ -967,7 +968,7 @@ func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) { } c.urls.Store(urls) // Update the connection contexts when member changes if TSO Follower Proxy is enabled. - if c.option.getEnableTSOFollowerProxy() { + if c.option.GetEnableTSOFollowerProxy() { // Run callbacks to reflect the membership changes in the leader and followers. for _, cb := range c.membersChangedCbs { cb() @@ -1079,7 +1080,7 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL. func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { - return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...) + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.GRPCDialOptions...) } func addrsToURLs(addrs []string, tlsCfg *tls.Config) []string { diff --git a/client/tso_client.go b/client/tso_client.go index 584c5df6134..18e39dffd14 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/grpcutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -68,7 +69,7 @@ type tsoClient struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - option *option + option *opt.Option svcDiscovery ServiceDiscovery tsoStreamBuilderFactory @@ -83,7 +84,7 @@ type tsoClient struct { // newTSOClient returns a new TSO client. func newTSOClient( - ctx context.Context, option *option, + ctx context.Context, option *opt.Option, svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory, ) *tsoClient { ctx, cancel := context.WithCancel(ctx) @@ -111,7 +112,7 @@ func newTSOClient( return c } -func (c *tsoClient) getOption() *option { return c.option } +func (c *tsoClient) getOption() *opt.Option { return c.option } func (c *tsoClient) getServiceDiscovery() ServiceDiscovery { return c.svcDiscovery } @@ -207,7 +208,7 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(url); err != nil { continue } - healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.Timeout) resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { @@ -232,7 +233,7 @@ type tsoConnectionContext struct { func (c *tsoClient) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool { // Normal connection creating, it will be affected by the `enableForwarding`. createTSOConnection := c.tryConnectToTSO - if c.option.getEnableTSOFollowerProxy() { + if c.option.GetEnableTSOFollowerProxy() { createTSOConnection = c.tryConnectToTSOWithProxy } if err := createTSOConnection(ctx, connectionCtxs); err != nil { @@ -285,7 +286,7 @@ func (c *tsoClient) tryConnectToTSO( } if cc != nil { cctx, cancel := context.WithCancel(ctx) - stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.Timeout) failpoint.Inject("unreachableNetwork", func() { stream = nil err = status.New(codes.Unavailable, "unavailable").Err() @@ -295,7 +296,7 @@ func (c *tsoClient) tryConnectToTSO( return nil } - if err != nil && c.option.enableForwarding { + if err != nil && c.option.EnableForwarding { // The reason we need to judge if the error code is equal to "Canceled" here is that // when we create a stream we use a goroutine to manually control the timeout of the connection. // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. @@ -329,7 +330,7 @@ func (c *tsoClient) tryConnectToTSO( // create the follower stream cctx, cancel := context.WithCancel(ctx) cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) - stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.Timeout) if err == nil { forwardedHostTrim := trimHTTPPrefix(forwardedHost) addr := trimHTTPPrefix(backupURL) @@ -370,7 +371,7 @@ func (c *tsoClient) checkLeader( healthCli = healthpb.NewHealthClient(cc) } if healthCli != nil { - healthCtx, healthCancel := context.WithTimeout(ctx, c.option.timeout) + healthCtx, healthCancel := context.WithTimeout(ctx, c.option.Timeout) resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) failpoint.Inject("unreachableNetwork", func() { resp.Status = healthpb.HealthCheckResponse_UNKNOWN @@ -379,7 +380,7 @@ func (c *tsoClient) checkLeader( if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { // create a stream of the original tso leader cctx, cancel := context.WithCancel(ctx) - stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.Timeout) if err == nil && stream != nil { log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("url", url)) updateAndClear(url, &tsoConnectionContext{cctx, cancel, url, stream}) @@ -435,7 +436,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy( cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) } // Create the TSO stream. - stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout) + stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.Timeout) if err == nil { if addr != leaderAddr { forwardedHostTrim := trimHTTPPrefix(forwardedHost) @@ -468,7 +469,7 @@ func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { continue } - healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.Timeout) resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 3d77610179d..7d19a11c2d0 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/utils/timerutil" "github.com/tikv/pd/client/utils/tsoutil" @@ -67,7 +68,7 @@ type tsoInfo struct { } type tsoServiceProvider interface { - getOption() *option + getOption() *opt.Option getServiceDiscovery() ServiceDiscovery updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool } @@ -223,7 +224,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { stream *tsoStream ) // Loop through each batch of TSO requests and send them for processing. - streamLoopTimer := time.NewTimer(option.timeout) + streamLoopTimer := time.NewTimer(option.Timeout) defer streamLoopTimer.Stop() // Create a not-started-timer to be used for collecting batches for concurrent RPC. @@ -245,7 +246,7 @@ tsoBatchLoop: tsoBatchController = td.batchBufferPool.Get().(*batchController[*tsoRequest]) } - maxBatchWaitInterval := option.getMaxTSOBatchWaitInterval() + maxBatchWaitInterval := option.GetMaxTSOBatchWaitInterval() currentBatchStartTime := time.Now() // Update concurrency settings if needed. @@ -280,7 +281,7 @@ tsoBatchLoop: } // We need be careful here, see more details in the comments of Timer.Reset. // https://pkg.go.dev/time@master#Timer.Reset - streamLoopTimer.Reset(option.timeout) + streamLoopTimer.Reset(option.Timeout) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { @@ -390,7 +391,7 @@ tsoBatchLoop: } done := make(chan struct{}) - dl := newTSDeadline(option.timeout, done, cancel) + dl := newTSDeadline(option.Timeout, done, cancel) select { case <-ctx.Done(): // Finish the collected requests if the context is canceled. @@ -485,8 +486,8 @@ func (td *tsoDispatcher) connectionCtxsUpdater() { case <-ctx.Done(): log.Info("[tso] exit tso connection contexts updater") return - case <-option.enableTSOFollowerProxyCh: - enableTSOFollowerProxy := option.getEnableTSOFollowerProxy() + case <-option.EnableTSOFollowerProxyCh: + enableTSOFollowerProxy := option.GetEnableTSOFollowerProxy() log.Info("[tso] tso follower proxy status changed", zap.Bool("enable", enableTSOFollowerProxy)) if enableTSOFollowerProxy && updateTicker.C == nil { @@ -711,8 +712,8 @@ func (td *tsoDispatcher) checkTSORPCConcurrency(ctx context.Context, maxBatchWai } td.lastCheckConcurrencyTime = now - newConcurrency := td.provider.getOption().getTSOClientRPCConcurrency() - if maxBatchWaitInterval > 0 || td.provider.getOption().getEnableTSOFollowerProxy() { + newConcurrency := td.provider.getOption().GetTSOClientRPCConcurrency() + if maxBatchWaitInterval > 0 || td.provider.getOption().GetEnableTSOFollowerProxy() { newConcurrency = 1 } diff --git a/client/tso_dispatcher_test.go b/client/tso_dispatcher_test.go index 2eb30066532..84bc6a4dc99 100644 --- a/client/tso_dispatcher_test.go +++ b/client/tso_dispatcher_test.go @@ -26,23 +26,24 @@ import ( "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/opt" "go.uber.org/zap/zapcore" ) type mockTSOServiceProvider struct { - option *option + option *opt.Option createStream func(ctx context.Context) *tsoStream updateConnMu sync.Mutex } -func newMockTSOServiceProvider(option *option, createStream func(ctx context.Context) *tsoStream) *mockTSOServiceProvider { +func newMockTSOServiceProvider(option *opt.Option, createStream func(ctx context.Context) *tsoStream) *mockTSOServiceProvider { return &mockTSOServiceProvider{ option: option, createStream: createStream, } } -func (m *mockTSOServiceProvider) getOption() *option { +func (m *mockTSOServiceProvider) getOption() *opt.Option { return m.option } @@ -79,15 +80,15 @@ type testTSODispatcherSuite struct { stream *tsoStream dispatcher *tsoDispatcher dispatcherWg sync.WaitGroup - option *option + option *opt.Option reqPool *sync.Pool } func (s *testTSODispatcherSuite) SetupTest() { s.re = require.New(s.T()) - s.option = newOption() - s.option.timeout = time.Hour + s.option = opt.NewOption() + s.option.Timeout = time.Hour // As the internal logic of the tsoDispatcher allows it to create streams multiple times, but our tests needs // single stable access to the inner stream, we do not allow it to create it more than once in these tests. creating := new(atomic.Bool) @@ -203,7 +204,7 @@ func (s *testTSODispatcherSuite) checkIdleTokenCount(expectedTotal int) { func (s *testTSODispatcherSuite) testStaticConcurrencyImpl(concurrency int) { ctx := context.Background() - s.option.setTSOClientRPCConcurrency(concurrency) + s.option.SetTSOClientRPCConcurrency(concurrency) // Make sure the state of the mock stream is clear. Unexpected batching may make the requests sent to the stream // less than expected, causing there are more `generateNext` signals or generated results. @@ -285,7 +286,7 @@ func (s *testTSODispatcherSuite) TestConcurrentRPC() { func (s *testTSODispatcherSuite) TestBatchDelaying() { ctx := context.Background() - s.option.setTSOClientRPCConcurrency(2) + s.option.SetTSOClientRPCConcurrency(2) s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return")) s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`)) @@ -309,13 +310,13 @@ func (s *testTSODispatcherSuite) TestBatchDelaying() { s.reqMustReady(req) // Try other concurrency. - s.option.setTSOClientRPCConcurrency(3) + s.option.SetTSOClientRPCConcurrency(3) s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`)) req = s.sendReq(ctx) s.streamInner.generateNext() s.reqMustReady(req) - s.option.setTSOClientRPCConcurrency(4) + s.option.SetTSOClientRPCConcurrency(4) s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`)) req = s.sendReq(ctx) s.streamInner.generateNext() @@ -347,7 +348,7 @@ func BenchmarkTSODispatcherHandleRequests(b *testing.B) { return req } - dispatcher := newTSODispatcher(ctx, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption(), nil)) + dispatcher := newTSODispatcher(ctx, defaultMaxTSOBatchSize, newMockTSOServiceProvider(opt.NewOption(), nil)) var wg sync.WaitGroup wg.Add(1) diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index b8debf05efe..d4c8d22e3fe 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/grpcutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -149,13 +150,13 @@ type tsoServiceDiscovery struct { tlsCfg *tls.Config // Client option. - option *option + option *opt.Option } // newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service. func newTSOServiceDiscovery( ctx context.Context, metacli MetaStorageClient, apiSvcDiscovery ServiceDiscovery, - keyspaceID uint32, tlsCfg *tls.Config, option *option, + keyspaceID uint32, tlsCfg *tls.Config, option *opt.Option, ) ServiceDiscovery { ctx, cancel := context.WithCancel(ctx) c := &tsoServiceDiscovery{ @@ -190,9 +191,9 @@ func newTSOServiceDiscovery( // Init initialize the concrete client underlying func (c *tsoServiceDiscovery) Init() error { log.Info("initializing tso service discovery", - zap.Int("max-retry-times", c.option.maxRetryTimes), + zap.Int("max-retry-times", c.option.MaxRetryTimes), zap.Duration("retry-interval", initRetryInterval)) - if err := c.retry(c.option.maxRetryTimes, initRetryInterval, c.updateMember); err != nil { + if err := c.retry(c.option.MaxRetryTimes, initRetryInterval, c.updateMember); err != nil { log.Error("failed to update member. initialization failed.", zap.Error(err)) c.cancel() return err @@ -325,7 +326,7 @@ func (c *tsoServiceDiscovery) GetBackupURLs() []string { // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL. func (c *tsoServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { - return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...) + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.GRPCDialOptions...) } // ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in service endpoints. diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index b0e600f9c7f..a57f3f5e02b 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -39,6 +39,7 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/caller" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/utils/constant" @@ -225,7 +226,7 @@ func TestGetTSAfterTransferLeader(t *testing.T) { re.NotEmpty(leader) defer cluster.Destroy() - cli := setupCli(ctx, re, endpoints, pd.WithCustomTimeoutOption(10*time.Second)) + cli := setupCli(ctx, re, endpoints, opt.WithCustomTimeoutOption(10*time.Second)) defer cli.Close() var leaderSwitched atomic.Bool @@ -261,7 +262,7 @@ func TestTSOFollowerProxy(t *testing.T) { defer cli1.Close() cli2 := setupCli(ctx, re, endpoints) defer cli2.Close() - err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + err = cli2.UpdateOption(opt.EnableTSOFollowerProxy, true) re.NoError(err) var wg sync.WaitGroup @@ -288,7 +289,7 @@ func TestTSOFollowerProxy(t *testing.T) { wg.Wait() // Disable the follower proxy and check if the stream is updated. - err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, false) + err = cli2.UpdateOption(opt.EnableTSOFollowerProxy, false) re.NoError(err) wg.Add(tsoRequestConcurrencyNumber) @@ -343,7 +344,7 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) { re.NotNil(cli) defer cli.Close() // TSO service does not support the follower proxy, so enabling it should fail. - err = cli.UpdateOption(pd.EnableTSOFollowerProxy, true) + err = cli.UpdateOption(opt.EnableTSOFollowerProxy, true) re.Error(err) re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) } @@ -421,7 +422,7 @@ func TestCustomTimeout(t *testing.T) { defer cluster.Destroy() endpoints := runServer(re, cluster) - cli := setupCli(ctx, re, endpoints, pd.WithCustomTimeoutOption(time.Second)) + cli := setupCli(ctx, re, endpoints, opt.WithCustomTimeoutOption(time.Second)) defer cli.Close() start := time.Now() @@ -494,7 +495,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionByFollowerForwardin ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - cli := setupCli(ctx, re, suite.endpoints, pd.WithForwardingOption(true)) + cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true)) defer cli.Close() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)")) time.Sleep(200 * time.Millisecond) @@ -514,7 +515,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1( re := suite.Require() ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - cli := setupCli(ctx, re, suite.endpoints, pd.WithForwardingOption(true)) + cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true)) defer cli.Close() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) @@ -549,7 +550,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding2( re := suite.Require() ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - cli := setupCli(ctx, re, suite.endpoints, pd.WithForwardingOption(true)) + cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true)) defer cli.Close() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) @@ -586,7 +587,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoAndRegionByFollowerFor follower := cluster.GetServer(cluster.GetFollower()) re.NoError(failpoint.Enable("github.com/tikv/pd/client/utils/grpcutil/unreachableNetwork2", fmt.Sprintf("return(\"%s\")", follower.GetAddr()))) - cli := setupCli(ctx, re, suite.endpoints, pd.WithForwardingOption(true)) + cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true)) defer cli.Close() var lastTS uint64 testutil.Eventually(re, func() bool { @@ -690,7 +691,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { cluster := suite.cluster cli := setupCli(ctx, re, suite.endpoints) defer cli.Close() - cli.UpdateOption(pd.EnableFollowerHandle, true) + cli.UpdateOption(opt.EnableFollowerHandle, true) re.NotEmpty(cluster.WaitLeader()) leader := cluster.GetLeaderServer() testutil.Eventually(re, func() bool { @@ -708,7 +709,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { // follower have no region cnt := 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -722,7 +723,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { time.Sleep(150 * time.Millisecond) cnt = 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -737,7 +738,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { time.Sleep(100 * time.Millisecond) cnt = 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -750,7 +751,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { re.NoError(failpoint.Enable("github.com/tikv/pd/server/followerHandleError", "return(true)")) cnt = 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -765,7 +766,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { time.Sleep(100 * time.Millisecond) cnt = 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -849,7 +850,7 @@ func runServer(re *require.Assertions, cluster *tests.TestCluster) []string { return endpoints } -func setupCli(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { +func setupCli(ctx context.Context, re *require.Assertions, endpoints []string, opts ...opt.ClientOption) pd.Client { cli, err := pd.NewClientWithContext(ctx, caller.TestComponent, endpoints, pd.SecurityOption{}, opts...) re.NoError(err) @@ -1118,7 +1119,7 @@ func (suite *clientTestSuite) TestGetRegion() { } re.NoError(suite.reportBucket.Send(breq)) testutil.Eventually(re, func() bool { - r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets()) + r, err := suite.client.GetRegion(context.Background(), []byte("a"), opt.WithBuckets()) re.NoError(err) if r == nil { return false @@ -1128,7 +1129,7 @@ func (suite *clientTestSuite) TestGetRegion() { suite.srv.GetRaftCluster().GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(false) testutil.Eventually(re, func() bool { - r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets()) + r, err := suite.client.GetRegion(context.Background(), []byte("a"), opt.WithBuckets()) re.NoError(err) if r == nil { return false @@ -1372,7 +1373,7 @@ func (suite *clientTestSuite) TestGetStore() { re.True(contains) // Should not return tombstone stores. - stores, err = suite.client.GetAllStores(context.Background(), pd.WithExcludeTombstone()) + stores, err = suite.client.GetAllStores(context.Background(), opt.WithExcludeTombstone()) re.NoError(err) for _, store := range stores { if store.GetId() == physicallyDestroyedStoreID { @@ -1586,7 +1587,7 @@ func (suite *clientTestSuite) TestScatterRegion() { regionsID := []uint64{regionID} // Test interface `ScatterRegions`. testutil.Eventually(re, func() bool { - scatterResp, err := suite.client.ScatterRegions(context.Background(), regionsID, pd.WithGroup("test"), pd.WithRetry(1)) + scatterResp, err := suite.client.ScatterRegions(context.Background(), regionsID, opt.WithGroup("test"), opt.WithRetry(1)) if err != nil { return false } @@ -1871,12 +1872,12 @@ func (suite *clientTestSuite) TestBatchScanRegions() { check := func(ranges []pd.KeyRange, limit int, expect []*metapb.Region) { for _, bucket := range []bool{false, true} { for _, outputMustContainAllKeyRange := range outputMustContainAllKeyRangeOptions { - var opts []pd.GetRegionOption + var opts []opt.GetRegionOption if bucket { - opts = append(opts, pd.WithBuckets()) + opts = append(opts, opt.WithBuckets()) } if outputMustContainAllKeyRange { - opts = append(opts, pd.WithOutputMustContainAllKeyRange()) + opts = append(opts, opt.WithOutputMustContainAllKeyRange()) } scanRegions, err := suite.client.BatchScanRegions(ctx, ranges, limit, opts...) re.NoError(err) @@ -1950,7 +1951,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { ctx, []pd.KeyRange{{StartKey: []byte{1}, EndKey: []byte{0}}}, 10, - pd.WithOutputMustContainAllKeyRange(), + opt.WithOutputMustContainAllKeyRange(), ) re.ErrorContains(err, "invalid key range, start key > end key") _, err = suite.client.BatchScanRegions(ctx, []pd.KeyRange{ @@ -1962,7 +1963,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { ctx, []pd.KeyRange{{StartKey: []byte{9}, EndKey: []byte{10, 1}}}, 10, - pd.WithOutputMustContainAllKeyRange(), + opt.WithOutputMustContainAllKeyRange(), ) re.ErrorContains(err, "found a hole region in the last") req := &pdpb.RegionHeartbeatRequest{ @@ -1987,7 +1988,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { ctx, []pd.KeyRange{{StartKey: []byte{9}, EndKey: []byte{101}}}, 10, - pd.WithOutputMustContainAllKeyRange(), + opt.WithOutputMustContainAllKeyRange(), ) return err != nil && strings.Contains(err.Error(), "found a hole region between") }) diff --git a/tests/integrations/client/client_tls_test.go b/tests/integrations/client/client_tls_test.go index 717a26e396d..321e3df474c 100644 --- a/tests/integrations/client/client_tls_test.go +++ b/tests/integrations/client/client_tls_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/caller" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/netutil" "github.com/tikv/pd/server/config" @@ -168,7 +169,7 @@ func testTLSReload( CAPath: testClientTLSInfo.TrustedCAFile, CertPath: testClientTLSInfo.CertFile, KeyPath: testClientTLSInfo.KeyFile, - }, pd.WithGRPCDialOptions(grpc.WithBlock())) + }, opt.WithGRPCDialOptions(grpc.WithBlock())) if err != nil { errc <- err dcancel() @@ -201,7 +202,7 @@ func testTLSReload( CAPath: testClientTLSInfo.TrustedCAFile, CertPath: testClientTLSInfo.CertFile, KeyPath: testClientTLSInfo.KeyFile, - }, pd.WithGRPCDialOptions(grpc.WithBlock())) + }, opt.WithGRPCDialOptions(grpc.WithBlock())) re.NoError(err) dcancel() cli.Close() @@ -216,7 +217,7 @@ func testTLSReload( SSLCABytes: caData, SSLCertBytes: certData, SSLKEYBytes: keyData, - }, pd.WithGRPCDialOptions(grpc.WithBlock())) + }, opt.WithGRPCDialOptions(grpc.WithBlock())) re.NoError(err) defer cli.Close() cancel1() @@ -330,7 +331,7 @@ func testAllowedCN(ctx context.Context, endpoints []string, tls transport.TLSInf CAPath: tls.TrustedCAFile, CertPath: tls.CertFile, KeyPath: tls.KeyFile, - }, pd.WithGRPCDialOptions(grpc.WithBlock())) + }, opt.WithGRPCDialOptions(grpc.WithBlock())) if err != nil { return err } diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index fd2a80cbdda..f130c782363 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -21,13 +21,14 @@ import ( "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/caller" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" ) // SetupClientWithAPIContext creates a TSO client with api context name for test. func SetupClientWithAPIContext( - ctx context.Context, re *require.Assertions, apiCtx pd.APIContext, endpoints []string, opts ...pd.ClientOption, + ctx context.Context, re *require.Assertions, apiCtx pd.APIContext, endpoints []string, opts ...opt.ClientOption, ) pd.Client { cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, caller.TestComponent, @@ -39,7 +40,7 @@ func SetupClientWithAPIContext( // SetupClientWithKeyspaceID creates a TSO client with the given keyspace id for test. func SetupClientWithKeyspaceID( ctx context.Context, re *require.Assertions, - keyspaceID uint32, endpoints []string, opts ...pd.ClientOption, + keyspaceID uint32, endpoints []string, opts ...opt.ClientOption, ) pd.Client { cli, err := pd.NewClientWithKeyspace(ctx, caller.TestComponent, @@ -111,7 +112,7 @@ func WaitForMultiKeyspacesTSOAvailable( clients := make([]pd.Client, 0, len(keyspaceIDs)) for _, keyspaceID := range keyspaceIDs { - cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints, pd.WithForwardingOption(true)) + cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints, opt.WithForwardingOption(true)) re.NotNil(cli) clients = append(clients, cli) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 74396d19f56..8624454aec3 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/caller" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/discovery" tso "github.com/tikv/pd/pkg/mcs/tso/server" @@ -247,7 +248,7 @@ func NewAPIServerForward(re *require.Assertions) APIServerForward { re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)")) suite.pdClient, err = pd.NewClientWithContext(context.Background(), caller.TestComponent, - []string{suite.backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1)) + []string{suite.backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1)) re.NoError(err) return suite } diff --git a/tests/integrations/realcluster/cluster_id_test.go b/tests/integrations/realcluster/cluster_id_test.go index e2a69d77b6c..518991fef97 100644 --- a/tests/integrations/realcluster/cluster_id_test.go +++ b/tests/integrations/realcluster/cluster_id_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/caller" + "github.com/tikv/pd/client/opt" ) type clusterIDSuite struct { @@ -49,7 +50,7 @@ func (s *clusterIDSuite) TestClientClusterID() { // Try to create a client with the mixed endpoints. _, err := pd.NewClientWithContext( ctx, caller.TestComponent, pdEndpoints, - pd.SecurityOption{}, pd.WithMaxErrorRetry(1), + pd.SecurityOption{}, opt.WithMaxErrorRetry(1), ) re.Error(err) re.Contains(err.Error(), "unmatched cluster id") diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 65c7f0b906b..121a61b1986 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/caller" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/testutil" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/mcs/utils/constant" @@ -151,7 +152,7 @@ func (suite *tsoClientTestSuite) SetupTest() { if suite.legacy { client, err := pd.NewClientWithContext(suite.ctx, caller.TestComponent, - suite.getBackendEndpoints(), pd.SecurityOption{}, pd.WithForwardingOption(true)) + suite.getBackendEndpoints(), pd.SecurityOption{}, opt.WithForwardingOption(true)) re.NoError(err) innerClient, ok := client.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -552,7 +553,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)")) pdClient, err := pd.NewClientWithContext(context.Background(), caller.TestComponent, - []string{backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1)) + []string{backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1)) re.NoError(err) defer pdClient.Close() diff --git a/tools/pd-api-bench/cases/cases.go b/tools/pd-api-bench/cases/cases.go index 54c1247c208..118b8aaed5e 100644 --- a/tools/pd-api-bench/cases/cases.go +++ b/tools/pd-api-bench/cases/cases.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" pd "github.com/tikv/pd/client" pdHttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -323,7 +324,7 @@ func newGetRegionEnableFollower() func() GRPCCase { func (*getRegionEnableFollower) unary(ctx context.Context, cli pd.Client) error { id := rand.Intn(totalRegion)*4 + 1 - _, err := cli.GetRegion(ctx, generateKeyForSimulator(id), pd.WithAllowFollowerHandle()) + _, err := cli.GetRegion(ctx, generateKeyForSimulator(id), opt.WithAllowFollowerHandle()) if err != nil { return err } diff --git a/tools/pd-api-bench/main.go b/tools/pd-api-bench/main.go index 0583b24e388..fa8a9164891 100644 --- a/tools/pd-api-bench/main.go +++ b/tools/pd-api-bench/main.go @@ -36,6 +36,7 @@ import ( pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/caller" pdHttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/tlsutil" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" @@ -119,7 +120,7 @@ func main() { pdClis := make([]pd.Client, cfg.Client) for i := range cfg.Client { pdClis[i] = newPDClient(ctx, cfg) - pdClis[i].UpdateOption(pd.EnableFollowerHandle, true) + pdClis[i].UpdateOption(opt.EnableFollowerHandle, true) } etcdClis := make([]*clientv3.Client, cfg.Client) for i := range cfg.Client { @@ -382,7 +383,7 @@ func newPDClient(ctx context.Context, cfg *config.Config) pd.Client { CertPath: cfg.CertPath, KeyPath: cfg.KeyPath, }, - pd.WithGRPCDialOptions( + opt.WithGRPCDialOptions( grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: keepaliveTime, Timeout: keepaliveTimeout, diff --git a/tools/pd-tso-bench/main.go b/tools/pd-tso-bench/main.go index 62ba79a7584..944fdcd91dc 100644 --- a/tools/pd-tso-bench/main.go +++ b/tools/pd-tso-bench/main.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/caller" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -431,11 +432,11 @@ func createPDClient(ctx context.Context) (pd.Client, error) { err error ) - opts := make([]pd.ClientOption, 0) + opts := make([]opt.ClientOption, 0) if *useTSOServerProxy { - opts = append(opts, pd.WithTSOServerProxyOption(true)) + opts = append(opts, opt.WithTSOServerProxyOption(true)) } - opts = append(opts, pd.WithGRPCDialOptions( + opts = append(opts, opt.WithGRPCDialOptions( grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: keepaliveTime, Timeout: keepaliveTimeout, @@ -465,8 +466,8 @@ func createPDClient(ctx context.Context) (pd.Client, error) { return nil, err } - pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval) - pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy) + pdCli.UpdateOption(opt.MaxTSOBatchWaitInterval, *maxBatchWaitInterval) + pdCli.UpdateOption(opt.EnableTSOFollowerProxy, *enableTSOFollowerProxy) return pdCli, err }