Skip to content

Commit

Permalink
playground: add dm support for playground (#2465)
Browse files Browse the repository at this point in the history
* add dm support for playground

Signed-off-by: Siddon Tang <[email protected]>

* support command-line args, scale in, alive check

* fix exported

* fix cognitive-complexity

* fix name

* fix command-line args

* remove debug comment

---------

Signed-off-by: Siddon Tang <[email protected]>
Co-authored-by: Siddon Tang <[email protected]>
  • Loading branch information
GMHDBJD and siddontang authored Oct 29, 2024
1 parent 964b40b commit 57e845b
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 5 deletions.
8 changes: 8 additions & 0 deletions components/playground/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) {
{"ticdc", opt.TiCDC},
{"tikv-cdc", opt.TiKVCDC},
{"drainer", opt.Drainer},
{"dm-master", opt.DMMaster},
{"dm-worker", opt.DMWorker},
}

for _, cmd := range commands {
Expand Down Expand Up @@ -113,6 +115,8 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.TSO.Host, "tso.host", "", opt.TSO.Host, "Playground TSO host. If not provided, TSO will still use `host` flag as its host")
cmd.Flags().StringVarP(&opt.Scheduling.Host, "scheduling.host", "", opt.Scheduling.Host, "Playground Scheduling host. If not provided, Scheduling will still use `host` flag as its host")
cmd.Flags().StringVarP(&opt.TiProxy.Host, "tiproxy.host", "", opt.PD.Host, "Playground TiProxy host. If not provided, TiProxy will still use `host` flag as its host")
cmd.Flags().IntVarP(&opt.DMMaster.Num, "dm-master", "", opt.DMMaster.Num, "DM-master instance number")
cmd.Flags().IntVarP(&opt.DMWorker.Num, "dm-worker", "", opt.DMWorker.Num, "DM-worker instance number")

cmd.Flags().StringVarP(&opt.TiDB.ConfigPath, "db.config", "", opt.TiDB.ConfigPath, "TiDB instance configuration file")
cmd.Flags().StringVarP(&opt.TiKV.ConfigPath, "kv.config", "", opt.TiKV.ConfigPath, "TiKV instance configuration file")
Expand All @@ -123,6 +127,8 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.TiProxy.ConfigPath, "tiproxy.config", "", opt.TiProxy.ConfigPath, "TiProxy instance configuration file")
cmd.Flags().StringVarP(&opt.Pump.ConfigPath, "pump.config", "", opt.Pump.ConfigPath, "Pump instance configuration file")
cmd.Flags().StringVarP(&opt.Drainer.ConfigPath, "drainer.config", "", opt.Drainer.ConfigPath, "Drainer instance configuration file")
cmd.Flags().StringVarP(&opt.DMMaster.ConfigPath, "dm-master.config", "", opt.DMMaster.ConfigPath, "DM-master instance configuration file")
cmd.Flags().StringVarP(&opt.DMWorker.ConfigPath, "dm-worker.config", "", opt.DMWorker.ConfigPath, "DM-worker instance configuration file")

cmd.Flags().StringVarP(&opt.TiDB.BinPath, "db.binpath", "", opt.TiDB.BinPath, "TiDB instance binary path")
cmd.Flags().StringVarP(&opt.TiKV.BinPath, "kv.binpath", "", opt.TiKV.BinPath, "TiKV instance binary path")
Expand All @@ -135,6 +141,8 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.TiKVCDC.BinPath, "kvcdc.binpath", "", opt.TiKVCDC.BinPath, "TiKVCDC instance binary path")
cmd.Flags().StringVarP(&opt.Pump.BinPath, "pump.binpath", "", opt.Pump.BinPath, "Pump instance binary path")
cmd.Flags().StringVarP(&opt.Drainer.BinPath, "drainer.binpath", "", opt.Drainer.BinPath, "Drainer instance binary path")
cmd.Flags().StringVarP(&opt.DMMaster.BinPath, "dm-master.binpath", "", opt.DMMaster.BinPath, "DM-master instance binary path")
cmd.Flags().StringVarP(&opt.DMWorker.BinPath, "dm-worker.binpath", "", opt.DMWorker.BinPath, "DM-worker instance binary path")

return cmd
}
Expand Down
90 changes: 90 additions & 0 deletions components/playground/instance/dm_master.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package instance

import (
"context"
"fmt"
"path/filepath"
"strings"

"github.com/pingcap/tiup/pkg/utils"
)

// DMMaster represent a DM master instance.
type DMMaster struct {
instance
Process
initEndpoints []*DMMaster
}

var _ Instance = &DMMaster{}

// NewDMMaster create a new DMMaster instance.
func NewDMMaster(binPath string, dir, host, configPath string, portOffset int, id int, port int) *DMMaster {
if port <= 0 {
port = 8261
}
return &DMMaster{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, 8291, portOffset),
// Similar like PD's client port, here use StatusPort for Master Port.
StatusPort: utils.MustGetFreePort(host, port, portOffset),
ConfigPath: configPath,
},
}
}

// Name return the name of the instance.
func (m *DMMaster) Name() string {
return fmt.Sprintf("dm-master-%d", m.ID)
}

// Start starts the instance.
func (m *DMMaster) Start(ctx context.Context) error {
args := []string{
fmt.Sprintf("--name=%s", m.Name()),
fmt.Sprintf("--master-addr=http://%s", utils.JoinHostPort(m.Host, m.StatusPort)),
fmt.Sprintf("--advertise-addr=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.StatusPort)),
fmt.Sprintf("--peer-urls=http://%s", utils.JoinHostPort(m.Host, m.Port)),
fmt.Sprintf("--advertise-peer-urls=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.Port)),
fmt.Sprintf("--log-file=%s", m.LogFile()),
}

endpoints := make([]string, 0)
for _, master := range m.initEndpoints {
endpoints = append(endpoints, fmt.Sprintf("%s=http://%s", master.Name(), utils.JoinHostPort(master.Host, master.Port)))
}
args = append(args, fmt.Sprintf("--initial-cluster=%s", strings.Join(endpoints, ",")))

if m.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", m.ConfigPath))
}

m.Process = &process{cmd: PrepareCommand(ctx, m.BinPath, args, nil, m.Dir)}

logIfErr(m.Process.SetOutputFile(m.LogFile()))
return m.Process.Start()
}

// SetInitEndpoints set the initial endpoints for the DM master.
func (m *DMMaster) SetInitEndpoints(endpoints []*DMMaster) {
m.initEndpoints = endpoints
}

// Component return the component of the instance.
func (m *DMMaster) Component() string {
return "dm-master"
}

// LogFile return the log file path of the instance.
func (m *DMMaster) LogFile() string {
return filepath.Join(m.Dir, "dm-master.log")
}

// Addr return the address of the instance.
func (m *DMMaster) Addr() string {
return utils.JoinHostPort(m.Host, m.StatusPort)
}
83 changes: 83 additions & 0 deletions components/playground/instance/dm_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package instance

import (
"context"
"fmt"
"path/filepath"
"strings"

"github.com/pingcap/tiup/pkg/utils"
)

// DMWorker represent a DM worker instance.
type DMWorker struct {
instance
Process

masters []*DMMaster
}

var _ Instance = &DMWorker{}

// NewDMWorker create a DMWorker instance.
func NewDMWorker(binPath string, dir, host, configPath string, portOffset int, id int, port int, masters []*DMMaster) *DMWorker {
if port <= 0 {
port = 8262
}
return &DMWorker{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, port, portOffset),
ConfigPath: configPath,
},
masters: masters,
}
}

// MasterAddrs return the master addresses.
func (w *DMWorker) MasterAddrs() []string {
var addrs []string
for _, master := range w.masters {
addrs = append(addrs, utils.JoinHostPort(AdvertiseHost(master.Host), master.StatusPort))
}
return addrs
}

// Name return the name of the instance.
func (w *DMWorker) Name() string {
return fmt.Sprintf("dm-worker-%d", w.ID)
}

// Start starts the instance.
func (w *DMWorker) Start(ctx context.Context) error {
args := []string{
fmt.Sprintf("--name=%s", w.Name()),
fmt.Sprintf("--worker-addr=%s", utils.JoinHostPort(w.Host, w.Port)),
fmt.Sprintf("--advertise-addr=%s", utils.JoinHostPort(AdvertiseHost(w.Host), w.Port)),
fmt.Sprintf("--join=%s", strings.Join(w.MasterAddrs(), ",")),
fmt.Sprintf("--log-file=%s", w.LogFile()),
}

if w.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", w.ConfigPath))
}

w.Process = &process{cmd: PrepareCommand(ctx, w.BinPath, args, nil, w.Dir)}

logIfErr(w.Process.SetOutputFile(w.LogFile()))

return w.Process.Start()
}

// Component return the component of the instance.
func (w *DMWorker) Component() string {
return "dm-worker"
}

// LogFile return the log file of the instance.
func (w *DMWorker) LogFile() string {
return filepath.Join(w.Dir, "dm-worker.log")
}
30 changes: 30 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type BootOptions struct {
CSEOpts instance.CSEOptions `yaml:"cse"` // Only available when mode == tidb-cse
GrafanaPort int `yaml:"grafana_port"`
PortOffset int `yaml:"port_offset"`
DMMaster instance.Config `yaml:"dm_master"`
DMWorker instance.Config `yaml:"dm_worker"`
}

var (
Expand Down Expand Up @@ -298,6 +300,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().IntVar(&options.TiKVCDC.Num, "kvcdc", 0, "TiKV-CDC instance number")
rootCmd.Flags().IntVar(&options.Pump.Num, "pump", 0, "Pump instance number")
rootCmd.Flags().IntVar(&options.Drainer.Num, "drainer", 0, "Drainer instance number")
rootCmd.Flags().IntVar(&options.DMMaster.Num, "dm-master", 0, "DM-master instance number")
rootCmd.Flags().IntVar(&options.DMWorker.Num, "dm-worker", 0, "DM-worker instance number")

rootCmd.Flags().IntVar(&options.TiDB.UpTimeout, "db.timeout", 60, "TiDB max wait time in seconds for starting, 0 means no limit")
rootCmd.Flags().IntVar(&options.TiFlash.UpTimeout, "tiflash.timeout", 120, "TiFlash max wait time in seconds for starting, 0 means no limit")
Expand All @@ -314,6 +318,10 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().IntVar(&options.TiCDC.Port, "ticdc.port", 0, "Playground TiCDC port. If not provided, TiCDC will use 8300 as its port")
rootCmd.Flags().StringVar(&options.TiProxy.Host, "tiproxy.host", "", "Playground TiProxy host. If not provided, TiProxy will still use `host` flag as its host")
rootCmd.Flags().IntVar(&options.TiProxy.Port, "tiproxy.port", 0, "Playground TiProxy port. If not provided, TiProxy will use 6000 as its port")
rootCmd.Flags().StringVar(&options.DMMaster.Host, "dm-master.host", "", "DM-master instance host")
rootCmd.Flags().IntVar(&options.DMMaster.Port, "dm-master.port", 8261, "DM-master instance port")
rootCmd.Flags().StringVar(&options.DMWorker.Host, "dm-worker.host", "", "DM-worker instance host")
rootCmd.Flags().IntVar(&options.DMWorker.Port, "dm-worker.port", 8262, "DM-worker instance port")

rootCmd.Flags().StringVar(&options.TiDB.ConfigPath, "db.config", "", "TiDB instance configuration file")
rootCmd.Flags().StringVar(&options.TiKV.ConfigPath, "kv.config", "", "TiKV instance configuration file")
Expand All @@ -328,6 +336,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().StringVar(&options.Drainer.ConfigPath, "drainer.config", "", "Drainer instance configuration file")
rootCmd.Flags().StringVar(&options.TiCDC.ConfigPath, "ticdc.config", "", "TiCDC instance configuration file")
rootCmd.Flags().StringVar(&options.TiKVCDC.ConfigPath, "kvcdc.config", "", "TiKV-CDC instance configuration file")
rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dm-master.config", "", "DM-master instance configuration file")
rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dm-worker.config", "", "DM-worker instance configuration file")

rootCmd.Flags().StringVar(&options.TiDB.BinPath, "db.binpath", "", "TiDB instance binary path")
rootCmd.Flags().StringVar(&options.TiKV.BinPath, "kv.binpath", "", "TiKV instance binary path")
Expand All @@ -343,6 +353,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().StringVar(&options.TiKVCDC.BinPath, "kvcdc.binpath", "", "TiKV-CDC instance binary path")
rootCmd.Flags().StringVar(&options.Pump.BinPath, "pump.binpath", "", "Pump instance binary path")
rootCmd.Flags().StringVar(&options.Drainer.BinPath, "drainer.binpath", "", "Drainer instance binary path")
rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dm-master.binpath", "", "DM-master instance binary path")
rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dm-worker.binpath", "", "DM-worker instance binary path")

rootCmd.Flags().StringVar(&options.TiKVCDC.Version, "kvcdc.version", "", "TiKV-CDC instance version")

Expand Down Expand Up @@ -466,6 +478,24 @@ func checkStoreStatus(pdClient *api.PDClient, storeAddr string, timeout int) boo
}
}

func checkDMMasterStatus(dmMasterClient *api.DMMasterClient, dmMasterAddr string, timeout int) bool {
if timeout > 0 {
for i := 0; i < timeout; i++ {
if _, isActive, _, err := dmMasterClient.GetMaster(dmMasterAddr); err == nil && isActive {
return true
}
time.Sleep(time.Second)
}
return false
}
for {
if _, isActive, _, err := dmMasterClient.GetMaster(dmMasterAddr); err == nil && isActive {
return true
}
time.Sleep(time.Second)
}
}

func hasDashboard(pdAddr string) bool {
resp, err := http.Get(fmt.Sprintf("http://%s/dashboard", pdAddr))
if err != nil {
Expand Down
Loading

0 comments on commit 57e845b

Please sign in to comment.