Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move key_path.go #8596

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -230,8 +231,8 @@
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
err2 := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
idPath := endpoint.KeyspaceIDPath(request.Name)
metaPath := endpoint.KeyspaceMetaPath(newID)
idPath := keypath.KeyspaceIDPath(request.Name)
metaPath := keypath.KeyspaceMetaPath(newID)

Check warning on line 235 in pkg/keyspace/keyspace.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/keyspace.go#L234-L235

Added lines #L234 - L235 were not covered by tests
e := txn.Remove(idPath)
if e != nil {
return e
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -1153,8 +1154,8 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
return "", ErrKeyspaceGroupNotExists(id)
}

rootPath := endpoint.TSOSvcRootPath(m.clusterID)
primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, id)
rootPath := keypath.TSOSvcRootPath(m.clusterID)
primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id)
leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
Expand Down Expand Up @@ -310,7 +310,7 @@ func (s *Server) startServer() (err error) {
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, endpoint.ResourceManagerSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election")
s.participant.InitInfo(p, keypath.ResourceManagerSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election")

s.service = &Service{
ctx: s.Context(),
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -85,9 +85,9 @@ func NewWatcher(
cw := &Watcher{
ctx: ctx,
cancel: cancel,
configPath: endpoint.ConfigPath(clusterID),
configPath: keypath.ConfigPath(clusterID),
ttlConfigPrefix: sc.TTLConfigPrefix,
schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID),
schedulerConfigPathPrefix: keypath.SchedulerConfigPathPrefix(clusterID),
etcdClient: etcdClient,
PersistConfig: persistConfig,
storage: storage,
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -59,7 +59,7 @@ func NewWatcher(
ctx: ctx,
cancel: cancel,
clusterID: clusterID,
storePathPrefix: endpoint.StorePathPrefix(clusterID),
storePathPrefix: keypath.StorePathPrefix(clusterID),
etcdClient: etcdClient,
basicCluster: basicCluster,
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func (w *Watcher) initializeStoreWatcher() error {
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
storeID, err := endpoint.ExtractStoreIDFromPath(w.clusterID, key)
storeID, err := keypath.ExtractStoreIDFromPath(w.clusterID, key)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -85,10 +86,10 @@ func NewWatcher(
rw := &Watcher{
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
rulesPathPrefix: keypath.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: keypath.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: keypath.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: keypath.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStorage: ruleStorage,
checkerController: checkerController,
Expand Down
11 changes: 6 additions & 5 deletions pkg/mcs/scheduling/server/rule/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
)
Expand Down Expand Up @@ -64,10 +65,10 @@ func runWatcherLoadLabelRule(ctx context.Context, re *require.Assertions, client
rw := &Watcher{
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
rulesPathPrefix: keypath.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: keypath.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: keypath.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: keypath.RegionLabelPathPrefix(clusterID),
etcdClient: client,
ruleStorage: storage,
regionLabeler: labelerManager,
Expand Down Expand Up @@ -99,7 +100,7 @@ func prepare(t require.TestingT) (context.Context, *clientv3.Client, func()) {
}
value, err := json.Marshal(rule)
re.NoError(err)
key := endpoint.RegionLabelPathPrefix(clusterID) + "/" + rule.ID
key := keypath.RegionLabelPathPrefix(clusterID) + "/" + rule.ID
_, err = clientv3.NewKV(client).Put(ctx, key, string(value))
re.NoError(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
Expand Down Expand Up @@ -458,7 +459,7 @@ func (s *Server) startServer() (err error) {
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election")
s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election")

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ import (
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand Down Expand Up @@ -363,8 +363,8 @@ func (s *Server) startServer() (err error) {

// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID)
tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID)
legacySvcRootPath := keypath.LegacyRootPath(s.clusterID)
tsoSvcRootPath := keypath.TSOSvcRootPath(s.clusterID)
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr,
s.clusterID, legacySvcRootPath, tsoSvcRootPath, s.cfg)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/utils/expected_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -173,10 +173,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName
var primaryPath string
switch serviceName {
case constant.SchedulingServiceName:
primaryPath = endpoint.SchedulingPrimaryPath(clusterID)
primaryPath = keypath.SchedulingPrimaryPath(clusterID)
case constant.TSOServiceName:
tsoRootPath := endpoint.TSOSvcRootPath(clusterID)
primaryPath = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
tsoRootPath := keypath.TSOSvcRootPath(clusterID)
primaryPath = keypath.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
}
_, err = markExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID)
if err != nil {
Expand Down
17 changes: 9 additions & 8 deletions pkg/storage/endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -36,9 +37,9 @@ type ConfigStorage interface {

var _ ConfigStorage = (*StorageEndpoint)(nil)

// LoadConfig loads config from configPath then unmarshal it to cfg.
// LoadConfig loads config from keypath.Config then unmarshal it to cfg.
func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) {
value, err := se.Load(configPath)
value, err := se.Load(keypath.Config)
if err != nil || value == "" {
return false, err
}
Expand All @@ -49,14 +50,14 @@ func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) {
return true, nil
}

// SaveConfig stores marshallable cfg to the configPath.
// SaveConfig stores marshallable cfg to the keypath.Config.
func (se *StorageEndpoint) SaveConfig(cfg any) error {
return se.saveJSON(configPath, cfg)
return se.saveJSON(keypath.Config, cfg)
}

// LoadAllSchedulerConfigs loads all schedulers' config.
func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) {
prefix := customSchedulerConfigPath + "/"
prefix := keypath.CustomSchedulerConfigPath + "/"
keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), MinKVRangeLimit)
for i, key := range keys {
keys[i] = strings.TrimPrefix(key, prefix)
Expand All @@ -66,15 +67,15 @@ func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error)

// LoadSchedulerConfig loads the config of the given scheduler.
func (se *StorageEndpoint) LoadSchedulerConfig(schedulerName string) (string, error) {
return se.Load(schedulerConfigPath(schedulerName))
return se.Load(keypath.SchedulerConfigPath(schedulerName))
}

// SaveSchedulerConfig saves the config of the given scheduler.
func (se *StorageEndpoint) SaveSchedulerConfig(schedulerName string, data []byte) error {
return se.Save(schedulerConfigPath(schedulerName), string(data))
return se.Save(keypath.SchedulerConfigPath(schedulerName), string(data))
}

// RemoveSchedulerConfig removes the config of the given scheduler.
func (se *StorageEndpoint) RemoveSchedulerConfig(schedulerName string) error {
return se.Remove(schedulerConfigPath(schedulerName))
return se.Remove(keypath.SchedulerConfigPath(schedulerName))
}
5 changes: 3 additions & 2 deletions pkg/storage/endpoint/external_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strconv"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
)

// ExternalTimestamp is the external timestamp.
Expand All @@ -36,7 +37,7 @@ var _ ExternalTSStorage = (*StorageEndpoint)(nil)

// LoadExternalTS loads the external timestamp from storage.
func (se *StorageEndpoint) LoadExternalTS() (uint64, error) {
value, err := se.Load(ExternalTimestampPath())
value, err := se.Load(keypath.ExternalTimestampPath())
if err != nil || value == "" {
return 0, err
}
Expand All @@ -50,5 +51,5 @@ func (se *StorageEndpoint) LoadExternalTS() (uint64, error) {
// SaveExternalTS saves the external timestamp.
func (se *StorageEndpoint) SaveExternalTS(timestamp uint64) error {
value := strconv.FormatUint(timestamp, 16)
return se.Save(ExternalTimestampPath(), value)
return se.Save(keypath.ExternalTimestampPath(), value)
}
21 changes: 11 additions & 10 deletions pkg/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/etcdutil/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -49,7 +50,7 @@ var _ GCSafePointStorage = (*StorageEndpoint)(nil)

// LoadGCSafePoint loads current GC safe point from storage.
func (se *StorageEndpoint) LoadGCSafePoint() (uint64, error) {
value, err := se.Load(gcSafePointPath())
value, err := se.Load(keypath.GCSafePointPath())
if err != nil || value == "" {
return 0, err
}
Expand All @@ -63,12 +64,12 @@ func (se *StorageEndpoint) LoadGCSafePoint() (uint64, error) {
// SaveGCSafePoint saves new GC safe point to storage.
func (se *StorageEndpoint) SaveGCSafePoint(safePoint uint64) error {
value := strconv.FormatUint(safePoint, 16)
return se.Save(gcSafePointPath(), value)
return se.Save(keypath.GCSafePointPath(), value)
}

// LoadMinServiceGCSafePoint returns the minimum safepoint across all services
func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error) {
prefix := GCSafePointServicePrefixPath()
prefix := keypath.GCSafePointServicePrefixPath()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
Expand All @@ -88,7 +89,7 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}
if ssp.ServiceID == GCWorkerServiceSafePointID {
if ssp.ServiceID == keypath.GCWorkerServiceSafePointID {
hasGCWorker = true
// If gc_worker's expire time is incorrectly set, fix it.
if ssp.ExpiredAt != math.MaxInt64 {
Expand Down Expand Up @@ -128,7 +129,7 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf

func (se *StorageEndpoint) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) {
ssp := &ServiceSafePoint{
ServiceID: GCWorkerServiceSafePointID,
ServiceID: keypath.GCWorkerServiceSafePointID,
SafePoint: initialValue,
ExpiredAt: math.MaxInt64,
}
Expand All @@ -140,7 +141,7 @@ func (se *StorageEndpoint) initServiceGCSafePointForGCWorker(initialValue uint64

// LoadAllServiceGCSafePoints returns all services GC safepoints
func (se *StorageEndpoint) LoadAllServiceGCSafePoints() ([]*ServiceSafePoint, error) {
prefix := GCSafePointServicePrefixPath()
prefix := keypath.GCSafePointServicePrefixPath()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
Expand Down Expand Up @@ -168,18 +169,18 @@ func (se *StorageEndpoint) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error {
return errors.New("service id of service safepoint cannot be empty")
}

if ssp.ServiceID == GCWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 {
if ssp.ServiceID == keypath.GCWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 {
return errors.New("TTL of gc_worker's service safe point must be infinity")
}

return se.saveJSON(gcSafePointServicePath(ssp.ServiceID), ssp)
return se.saveJSON(keypath.GCSafePointServicePath(ssp.ServiceID), ssp)
}

// RemoveServiceGCSafePoint removes a GC safepoint for the service
func (se *StorageEndpoint) RemoveServiceGCSafePoint(serviceID string) error {
if serviceID == GCWorkerServiceSafePointID {
if serviceID == keypath.GCWorkerServiceSafePointID {
return errors.New("cannot remove service safe point of gc_worker")
}
key := gcSafePointServicePath(serviceID)
key := keypath.GCSafePointServicePath(serviceID)
return se.Remove(key)
}
Loading