From 0b15af946de013a7b4bb61edc256b350dfe7a665 Mon Sep 17 00:00:00 2001 From: Hu# Date: Wed, 4 Sep 2024 10:18:59 +0800 Subject: [PATCH] pdms: optimize upgrade pdms to avoid unnecessary primary transfer (#2414) --- pkg/cluster/operation/upgrade.go | 28 +++++++++++++++++++++++----- pkg/cluster/spec/scheduling.go | 26 +++++++++++++++++++++++++- pkg/cluster/spec/tso.go | 26 +++++++++++++++++++++++++- 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index 5c4c420858..3cace668ff 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -131,17 +131,19 @@ func Upgrade( if instance.IgnoreMonitorAgent() { noAgentHosts.Insert(instance.GetManageHost()) } + + // Usage within the switch statement switch component.Name() { - case spec.ComponentPD: - // defer PD leader to be upgraded after others - isLeader, err := instance.(*spec.PDInstance).IsLeader(ctx, topo, int(options.APITimeout), tlsCfg) + case spec.ComponentPD, spec.ComponentTSO, spec.ComponentScheduling: + // defer PD related leader/primary to be upgraded after others + isLeader, err := checkAndDeferPDLeader(ctx, topo, int(options.APITimeout), tlsCfg, instance) if err != nil { - logger.Warnf("cannot found pd leader, ignore: %s", err) + logger.Warnf("cannot found pd related leader/primary, ignore: %s, instance: %s", err, instance.ID()) return err } if isLeader { deferInstances = append(deferInstances, instance) - logger.Debugf("Deferred upgrading of PD leader %s", instance.ID()) + logger.Debugf("Upgrading deferred instance %s...", instance.ID()) continue } case spec.ComponentCDC: @@ -218,6 +220,22 @@ func Upgrade( return RestartMonitored(ctx, uniqueHosts.Slice(), noAgentHosts, topo.GetMonitoredOptions(), options.OptTimeout, systemdMode) } +// checkAndDeferPDLeader checks the PD related leader/primary instance's status and defers its upgrade if necessary. +func checkAndDeferPDLeader(ctx context.Context, topo spec.Topology, apiTimeout int, tlsCfg *tls.Config, instance spec.Instance) (isLeader bool, err error) { + switch instance.ComponentName() { + case spec.ComponentPD: + isLeader, err = instance.(*spec.PDInstance).IsLeader(ctx, topo, apiTimeout, tlsCfg) + case spec.ComponentScheduling: + isLeader, err = instance.(*spec.SchedulingInstance).IsPrimary(ctx, topo, tlsCfg) + case spec.ComponentTSO: + isLeader, err = instance.(*spec.TSOInstance).IsPrimary(ctx, topo, tlsCfg) + } + if err != nil { + return false, err + } + return isLeader, nil +} + func upgradeInstance( ctx context.Context, topo spec.Topology, diff --git a/pkg/cluster/spec/scheduling.go b/pkg/cluster/spec/scheduling.go index 3da3975f2d..31957570d3 100644 --- a/pkg/cluster/spec/scheduling.go +++ b/pkg/cluster/spec/scheduling.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/ctxt" "github.com/pingcap/tiup/pkg/cluster/template/scripts" @@ -29,6 +30,8 @@ import ( "github.com/pingcap/tiup/pkg/utils" ) +var schedulingService = "scheduling" + // SchedulingSpec represents the scheduling topology specification in topology.yaml type SchedulingSpec struct { Host string `yaml:"host"` @@ -66,7 +69,7 @@ func (s *SchedulingSpec) Status(ctx context.Context, timeout time.Duration, tlsC return "Down" } - primary, err := pc.GetServicePrimary("scheduling") + primary, err := pc.GetServicePrimary(schedulingService) if err != nil { return "ERR" } @@ -309,6 +312,27 @@ func (i *SchedulingInstance) setTLSConfig(ctx context.Context, enableTLS bool, c return configs, nil } +// IsPrimary checks if the instance is primary +func (i *SchedulingInstance) IsPrimary(ctx context.Context, topo Topology, tlsCfg *tls.Config) (bool, error) { + tidbTopo, ok := topo.(*Specification) + if !ok { + panic("topo should be type of tidb topology") + } + pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg) + primary, err := pdClient.GetServicePrimary(schedulingService) + if err != nil { + return false, errors.Annotatef(err, "failed to get Scheduling primary %s", i.GetHost()) + } + + spec := i.InstanceSpec.(*SchedulingSpec) + enableTLS := false + if tlsCfg != nil { + enableTLS = true + } + + return primary == spec.GetAdvertiseListenURL(enableTLS), nil +} + // ScaleConfig deploy temporary config on scaling func (i *SchedulingInstance) ScaleConfig( ctx context.Context, diff --git a/pkg/cluster/spec/tso.go b/pkg/cluster/spec/tso.go index e309f1d652..3b02f0630e 100644 --- a/pkg/cluster/spec/tso.go +++ b/pkg/cluster/spec/tso.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/ctxt" "github.com/pingcap/tiup/pkg/cluster/template/scripts" @@ -29,6 +30,8 @@ import ( "github.com/pingcap/tiup/pkg/utils" ) +var tsoService = "tso" + // TSOSpec represents the TSO topology specification in topology.yaml type TSOSpec struct { Host string `yaml:"host"` @@ -66,7 +69,7 @@ func (s *TSOSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls return "Down" } - primary, err := pc.GetServicePrimary("tso") + primary, err := pc.GetServicePrimary(tsoService) if err != nil { return "ERR" } @@ -309,6 +312,27 @@ func (i *TSOInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs return configs, nil } +// IsPrimary checks if the instance is primary +func (i *TSOInstance) IsPrimary(ctx context.Context, topo Topology, tlsCfg *tls.Config) (bool, error) { + tidbTopo, ok := topo.(*Specification) + if !ok { + panic("topo should be type of tidb topology") + } + pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg) + primary, err := pdClient.GetServicePrimary(tsoService) + if err != nil { + return false, errors.Annotatef(err, "failed to get TSO primary %s", i.GetHost()) + } + + spec := i.InstanceSpec.(*TSOSpec) + enableTLS := false + if tlsCfg != nil { + enableTLS = true + } + + return primary == spec.GetAdvertiseListenURL(enableTLS), nil +} + // ScaleConfig deploy temporary config on scaling func (i *TSOInstance) ScaleConfig( ctx context.Context,