Skip to content

Commit

Permalink
*: move key_path.go (#8596)
Browse files Browse the repository at this point in the history
ref #4399

Signed-off-by: okJiang <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] authored Sep 10, 2024
1 parent b5c4a58 commit b95ddda
Show file tree
Hide file tree
Showing 46 changed files with 323 additions and 261 deletions.
5 changes: 3 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -230,8 +231,8 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
err2 := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
idPath := endpoint.KeyspaceIDPath(request.Name)
metaPath := endpoint.KeyspaceMetaPath(newID)
idPath := keypath.KeyspaceIDPath(request.Name)
metaPath := keypath.KeyspaceMetaPath(newID)
e := txn.Remove(idPath)
if e != nil {
return e
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -1153,8 +1154,8 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
return "", ErrKeyspaceGroupNotExists(id)
}

rootPath := endpoint.TSOSvcRootPath(m.clusterID)
primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, id)
rootPath := keypath.TSOSvcRootPath(m.clusterID)
primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id)
leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ import (
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand Down Expand Up @@ -310,7 +310,7 @@ func (s *Server) startServer() (err error) {
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, endpoint.ResourceManagerSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election")
s.participant.InitInfo(p, keypath.ResourceManagerSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election")

s.service = &Service{
ctx: s.Context(),
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -85,9 +85,9 @@ func NewWatcher(
cw := &Watcher{
ctx: ctx,
cancel: cancel,
configPath: endpoint.ConfigPath(clusterID),
configPath: keypath.ConfigPath(clusterID),
ttlConfigPrefix: sc.TTLConfigPrefix,
schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID),
schedulerConfigPathPrefix: keypath.SchedulerConfigPathPrefix(clusterID),
etcdClient: etcdClient,
PersistConfig: persistConfig,
storage: storage,
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -59,7 +59,7 @@ func NewWatcher(
ctx: ctx,
cancel: cancel,
clusterID: clusterID,
storePathPrefix: endpoint.StorePathPrefix(clusterID),
storePathPrefix: keypath.StorePathPrefix(clusterID),
etcdClient: etcdClient,
basicCluster: basicCluster,
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func (w *Watcher) initializeStoreWatcher() error {
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
storeID, err := endpoint.ExtractStoreIDFromPath(w.clusterID, key)
storeID, err := keypath.ExtractStoreIDFromPath(w.clusterID, key)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -85,10 +86,10 @@ func NewWatcher(
rw := &Watcher{
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
rulesPathPrefix: keypath.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: keypath.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: keypath.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: keypath.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStorage: ruleStorage,
checkerController: checkerController,
Expand Down
11 changes: 6 additions & 5 deletions pkg/mcs/scheduling/server/rule/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
)
Expand Down Expand Up @@ -64,10 +65,10 @@ func runWatcherLoadLabelRule(ctx context.Context, re *require.Assertions, client
rw := &Watcher{
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
rulesPathPrefix: keypath.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: keypath.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: keypath.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: keypath.RegionLabelPathPrefix(clusterID),
etcdClient: client,
ruleStorage: storage,
regionLabeler: labelerManager,
Expand Down Expand Up @@ -99,7 +100,7 @@ func prepare(t require.TestingT) (context.Context, *clientv3.Client, func()) {
}
value, err := json.Marshal(rule)
re.NoError(err)
key := endpoint.RegionLabelPathPrefix(clusterID) + "/" + rule.ID
key := keypath.RegionLabelPathPrefix(clusterID) + "/" + rule.ID
_, err = clientv3.NewKV(client).Put(ctx, key, string(value))
re.NoError(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand Down Expand Up @@ -458,7 +459,7 @@ func (s *Server) startServer() (err error) {
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election")
s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election")

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ import (
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
Expand Down Expand Up @@ -363,8 +363,8 @@ func (s *Server) startServer() (err error) {

// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID)
tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID)
legacySvcRootPath := keypath.LegacyRootPath(s.clusterID)
tsoSvcRootPath := keypath.TSOSvcRootPath(s.clusterID)
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr,
s.clusterID, legacySvcRootPath, tsoSvcRootPath, s.cfg)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/utils/expected_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -173,10 +173,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName
var primaryPath string
switch serviceName {
case constant.SchedulingServiceName:
primaryPath = endpoint.SchedulingPrimaryPath(clusterID)
primaryPath = keypath.SchedulingPrimaryPath(clusterID)
case constant.TSOServiceName:
tsoRootPath := endpoint.TSOSvcRootPath(clusterID)
primaryPath = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
tsoRootPath := keypath.TSOSvcRootPath(clusterID)
primaryPath = keypath.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
}
_, err = markExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID)
if err != nil {
Expand Down
17 changes: 9 additions & 8 deletions pkg/storage/endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -36,9 +37,9 @@ type ConfigStorage interface {

var _ ConfigStorage = (*StorageEndpoint)(nil)

// LoadConfig loads config from configPath then unmarshal it to cfg.
// LoadConfig loads config from keypath.Config then unmarshal it to cfg.
func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) {
value, err := se.Load(configPath)
value, err := se.Load(keypath.Config)
if err != nil || value == "" {
return false, err
}
Expand All @@ -49,14 +50,14 @@ func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) {
return true, nil
}

// SaveConfig stores marshallable cfg to the configPath.
// SaveConfig stores marshallable cfg to the keypath.Config.
func (se *StorageEndpoint) SaveConfig(cfg any) error {
return se.saveJSON(configPath, cfg)
return se.saveJSON(keypath.Config, cfg)
}

// LoadAllSchedulerConfigs loads all schedulers' config.
func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) {
prefix := customSchedulerConfigPath + "/"
prefix := keypath.CustomSchedulerConfigPath + "/"
keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), MinKVRangeLimit)
for i, key := range keys {
keys[i] = strings.TrimPrefix(key, prefix)
Expand All @@ -66,15 +67,15 @@ func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error)

// LoadSchedulerConfig loads the config of the given scheduler.
func (se *StorageEndpoint) LoadSchedulerConfig(schedulerName string) (string, error) {
return se.Load(schedulerConfigPath(schedulerName))
return se.Load(keypath.SchedulerConfigPath(schedulerName))
}

// SaveSchedulerConfig saves the config of the given scheduler.
func (se *StorageEndpoint) SaveSchedulerConfig(schedulerName string, data []byte) error {
return se.Save(schedulerConfigPath(schedulerName), string(data))
return se.Save(keypath.SchedulerConfigPath(schedulerName), string(data))
}

// RemoveSchedulerConfig removes the config of the given scheduler.
func (se *StorageEndpoint) RemoveSchedulerConfig(schedulerName string) error {
return se.Remove(schedulerConfigPath(schedulerName))
return se.Remove(keypath.SchedulerConfigPath(schedulerName))
}
5 changes: 3 additions & 2 deletions pkg/storage/endpoint/external_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strconv"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/keypath"
)

// ExternalTimestamp is the external timestamp.
Expand All @@ -36,7 +37,7 @@ var _ ExternalTSStorage = (*StorageEndpoint)(nil)

// LoadExternalTS loads the external timestamp from storage.
func (se *StorageEndpoint) LoadExternalTS() (uint64, error) {
value, err := se.Load(ExternalTimestampPath())
value, err := se.Load(keypath.ExternalTimestampPath())
if err != nil || value == "" {
return 0, err
}
Expand All @@ -50,5 +51,5 @@ func (se *StorageEndpoint) LoadExternalTS() (uint64, error) {
// SaveExternalTS saves the external timestamp.
func (se *StorageEndpoint) SaveExternalTS(timestamp uint64) error {
value := strconv.FormatUint(timestamp, 16)
return se.Save(ExternalTimestampPath(), value)
return se.Save(keypath.ExternalTimestampPath(), value)
}
21 changes: 11 additions & 10 deletions pkg/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -49,7 +50,7 @@ var _ GCSafePointStorage = (*StorageEndpoint)(nil)

// LoadGCSafePoint loads current GC safe point from storage.
func (se *StorageEndpoint) LoadGCSafePoint() (uint64, error) {
value, err := se.Load(gcSafePointPath())
value, err := se.Load(keypath.GCSafePointPath())
if err != nil || value == "" {
return 0, err
}
Expand All @@ -63,12 +64,12 @@ func (se *StorageEndpoint) LoadGCSafePoint() (uint64, error) {
// SaveGCSafePoint saves new GC safe point to storage.
func (se *StorageEndpoint) SaveGCSafePoint(safePoint uint64) error {
value := strconv.FormatUint(safePoint, 16)
return se.Save(gcSafePointPath(), value)
return se.Save(keypath.GCSafePointPath(), value)
}

// LoadMinServiceGCSafePoint returns the minimum safepoint across all services
func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error) {
prefix := GCSafePointServicePrefixPath()
prefix := keypath.GCSafePointServicePrefixPath()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
Expand All @@ -88,7 +89,7 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}
if ssp.ServiceID == GCWorkerServiceSafePointID {
if ssp.ServiceID == keypath.GCWorkerServiceSafePointID {
hasGCWorker = true
// If gc_worker's expire time is incorrectly set, fix it.
if ssp.ExpiredAt != math.MaxInt64 {
Expand Down Expand Up @@ -128,7 +129,7 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf

func (se *StorageEndpoint) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) {
ssp := &ServiceSafePoint{
ServiceID: GCWorkerServiceSafePointID,
ServiceID: keypath.GCWorkerServiceSafePointID,
SafePoint: initialValue,
ExpiredAt: math.MaxInt64,
}
Expand All @@ -140,7 +141,7 @@ func (se *StorageEndpoint) initServiceGCSafePointForGCWorker(initialValue uint64

// LoadAllServiceGCSafePoints returns all services GC safepoints
func (se *StorageEndpoint) LoadAllServiceGCSafePoints() ([]*ServiceSafePoint, error) {
prefix := GCSafePointServicePrefixPath()
prefix := keypath.GCSafePointServicePrefixPath()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
Expand Down Expand Up @@ -168,18 +169,18 @@ func (se *StorageEndpoint) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error {
return errors.New("service id of service safepoint cannot be empty")
}

if ssp.ServiceID == GCWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 {
if ssp.ServiceID == keypath.GCWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 {
return errors.New("TTL of gc_worker's service safe point must be infinity")
}

return se.saveJSON(gcSafePointServicePath(ssp.ServiceID), ssp)
return se.saveJSON(keypath.GCSafePointServicePath(ssp.ServiceID), ssp)
}

// RemoveServiceGCSafePoint removes a GC safepoint for the service
func (se *StorageEndpoint) RemoveServiceGCSafePoint(serviceID string) error {
if serviceID == GCWorkerServiceSafePointID {
if serviceID == keypath.GCWorkerServiceSafePointID {
return errors.New("cannot remove service safe point of gc_worker")
}
key := gcSafePointServicePath(serviceID)
key := keypath.GCSafePointServicePath(serviceID)
return se.Remove(key)
}
Loading

0 comments on commit b95ddda

Please sign in to comment.