From 989a7df30158300e2777f90f3b4110d74fbb1b02 Mon Sep 17 00:00:00 2001 From: Xiaofan Hu Date: Fri, 22 Oct 2021 13:17:54 +0200 Subject: [PATCH 1/2] rebalance: introduce go-carbon health check with better sync rate control The original sync mechanism is a bit too simple, as it is controllyed by only workers. It is hard to balance both efficiency and reliability. Setting the worker count too low, rebalancing becomes too slow; setting it too high, buckyd might take away too much resources. To meet both criteria, this changes introduced a go-carbon health check and a automatic sync rate adjustments based on metrics per second per node. bucky rebalance -f -offload -ignore404 \ -graphite-ip-to-hostname \ -graphite-metrics-prefix carbon.minutely.buckytool.rebalance.$cluster.dst.$to_location.src.$from_location \ -graphite-endpoint 127.0.0.1:3002 \ -go-carbon-health-check=$enable_go_carbon_health_check \ -go-carbon-health-check-interval 5 \ -go-carbon-port 8080 \ -go-carbon-protocol http \ -go-carbon-cache-threshold 0.5 \ -sync-speed-up-interval $sync_speed_up_interval \ -metrics-per-second $sync_metrics_per_second \ -h ${seed_node}:4242 \ -workers $workers \ -timeout $timeout Using the above example for explanation: -go-carbon-health-check=true: asks buckytools to check go-carbon cache usage, which indicates if there is capacity issues. -sync-speed-up-interval: means increased sync rate (metrics per second) every specified seconds. -metrics-per-second: means the initial sync rate. Should be set to a lower value like 5 - 10 initially while using -sync-speed-up-interval. Whats more, with go-carbon health check, buckytools also does automatic sync rate easing. Could be disabled by `-no-random-easing=false`. Full flags could be found with `bucky rebalance -help`. Besides go-carbon health check, this commits also introduce exporting some internal sync rate metrics to graphite for monitoring. Could be enabled by -graphite-metrics-prefix. By default -go-carbon-health-check is disabled, which means buckytools fallback to the original sync behaviour. Relevant go-carbon PR: https://github.com/go-graphite/go-carbon/pull/433 --- Makefile | 9 + cmd/bucky/common.go | 4 +- cmd/bucky/common_sync.go | 412 +++++++++++++++++++++++++++--- cmd/bucky/copy.go | 12 +- cmd/bucky/list.go | 1 + cmd/bucky/rebalance.go | 22 +- testing/rebalance_health_check.go | 321 +++++++++++++++++++++++ 7 files changed, 715 insertions(+), 66 deletions(-) create mode 100644 testing/rebalance_health_check.go diff --git a/Makefile b/Makefile index d3185f0f..eb4c3a69 100644 --- a/Makefile +++ b/Makefile @@ -37,6 +37,15 @@ test: clean bucky buckyd go run -mod vendor testing/rebalance.go $(REBALANCE_FLAGS) go run -mod vendor testing/copy.go $(COPY_FLAGS) +# only works on linux +test_rebalance_health_check_setup: + sudo ip addr add 10.0.1.7 dev lo + sudo ip addr add 10.0.1.8 dev lo + sudo ip addr add 10.0.1.9 dev lo + +test_rebalance_health_check: clean bucky buckyd +# go run -mod vendor testing/rebalance_health_check.go $(REBALANCE_FLAGS) + clean_test: rm -rf testdata_rebalance_* rm -rf testdata_copy_* diff --git a/cmd/bucky/common.go b/cmd/bucky/common.go index 8a110606..6e75905e 100644 --- a/cmd/bucky/common.go +++ b/cmd/bucky/common.go @@ -157,7 +157,9 @@ func DeleteMetric(server, metric string) error { switch resp.StatusCode { case 200: - log.Printf("DELETED: %s", metric) + if msFlags.printDeletedMetrics { + log.Printf("DELETED: %s", metric) + } case 404: log.Printf("Not found / Not deleted: %s", metric) return fmt.Errorf("Metric not found.") diff --git a/cmd/bucky/common_sync.go b/cmd/bucky/common_sync.go index b46e4146..0d8867f7 100644 --- a/cmd/bucky/common_sync.go +++ b/cmd/bucky/common_sync.go @@ -1,31 +1,99 @@ package main import ( + "context" + "encoding/json" "errors" + "flag" "fmt" + "io/ioutil" "log" + "math/rand" + "net" "os" "os/signal" + "runtime/debug" + "sort" + "strings" "sync" "sync/atomic" "syscall" "time" ) -type metricSyncer struct { - flags struct { - workers int - delete bool - noop bool - offloadFetch bool - ignore404 bool - verbose bool +func init() { + rand.Seed(time.Now().Unix()) +} + +type metricSyncerFlags struct { + workers int + delete bool + noop bool + offloadFetch bool + ignore404 bool + verbose bool + + printDeletedMetrics bool + + goCarbonHealthCheck bool + goCarbonProtocol string + goCarbonPort string + goCarbonCacheThreshold float64 + goCarbonHealthCheckInterval int + syncSpeedUpInterval int + noRandomEasing bool + metricsPerSecond int + + graphiteEndpoint string + graphiteMetricsPrefix string + graphiteIPToHostname bool + graphiteStatInterval int + + testingHelper struct { + workerSleepSeconds int } +} + +var msFlags = &metricSyncerFlags{} + +func (msf *metricSyncerFlags) registerFlags(fs *flag.FlagSet) { + fs.BoolVar(&msf.delete, "delete", false, "Delete metrics after moving them.") + fs.BoolVar(&msf.noop, "no-op", false, "Do not alter metrics and print report.") + fs.IntVar(&msf.workers, "w", 5, "Downloader threads.") + fs.IntVar(&msf.workers, "workers", 5, "Downloader threads.") + fs.BoolVar(&msf.offloadFetch, "offload", false, "Offload metric data fetching to data nodes.") + fs.BoolVar(&msf.ignore404, "ignore404", false, "Do not treat 404 as errors.") + fs.BoolVar(&msf.printDeletedMetrics, "print-deletion", false, "Print deleted metric names.") + + fs.BoolVar(&msf.goCarbonHealthCheck, "go-carbon-health-check", false, "Throttle rebalancing/backfilling if go-carbon cache size hits threshold specified -go-carbon-cache-threshold") + fs.IntVar(&msf.goCarbonHealthCheckInterval, "go-carbon-health-check-interval", 10, "Go-Carbon health check interval (seconds). Should be less than 15 minutes.") + fs.StringVar(&msf.goCarbonProtocol, "go-carbon-protocol", "http", "Use http or https to retrieve go-carbon /admin/info") + fs.StringVar(&msf.goCarbonPort, "go-carbon-port", "8080", "Set carbon-server port to retrieve go-carbon /admin/info") + fs.Float64Var(&msf.goCarbonCacheThreshold, "go-carbon-cache-threshold", 0.75, "Go-Carbon cache threshold") + + fs.IntVar(&msf.syncSpeedUpInterval, "sync-speed-up-interval", 600, "Incraese sync speed metrics-per-second by 1 at the specified interval (seconds). Requires -go-carbon-health-check. To disbale, set it to 0 or -1.") + fs.IntVar(&msf.metricsPerSecond, "metrics-per-second", 1, "Sync rate (metrics per second) on each graphite storage (go-carbon) node. Requires -go-carbon-health-check.") + fs.BoolVar(&msf.noRandomEasing, "no-random-easing", false, "Disable randomly slowing down metricsPerSecond. Requires -go-carbon-health-check.") + + fs.StringVar(&msf.graphiteEndpoint, "graphite-endpoint", "", "Send internal stats to graphite ingestion endpoint") + fs.StringVar(&msf.graphiteMetricsPrefix, "graphite-metrics-prefix", "carbon.buckytools", "Internal graphite metric prefix") + fs.BoolVar(&msf.graphiteIPToHostname, "graphite-ip-to-hostname", false, "Convert buckyd ip address to hostname in graphite metric prefix.") + fs.IntVar(&msf.graphiteStatInterval, "graphite-stat-interval", 60, "How frequent should bucky tool generate graphite metrics (seconds)") + + fs.IntVar(&msf.testingHelper.workerSleepSeconds, "testing.worker-sleep-seconds", 0, "Testing helper flag: make worker sleep.") +} + +type metricSyncer struct { + flags *metricSyncerFlags stat struct { - notFound int - copyError int - deleteError int + totalJobs int64 + finishedJobs int64 + notFound int64 + copyError int64 + deleteError int64 + + nodes map[string]*syncPerNodeStat // key: buckyd address time struct { count int64 @@ -37,17 +105,20 @@ type metricSyncer struct { } } } + + // goCarbonStates is nil if goCarbonHealthCheck is not enabled + goCarbonStates map[string]*goCarbonState // key: buckyd address } -func newMetricSyncer(delete, noop, offloadFetch, ignore404, verbose bool, workers int) *metricSyncer { +type syncPerNodeStat struct { + finishedJobs int64 +} + +func newMetricSyncer(flags *metricSyncerFlags) *metricSyncer { var ms metricSyncer - ms.flags.workers = workers - ms.flags.delete = delete - ms.flags.noop = noop - ms.flags.offloadFetch = offloadFetch - ms.flags.ignore404 = ignore404 - ms.flags.verbose = verbose + ms.flags = flags + ms.stat.nodes = map[string]*syncPerNodeStat{} return &ms } @@ -71,43 +142,67 @@ func (ms *metricSyncer) run(jobsd map[string]map[string][]*syncJob) error { return errors.New("Halting. No-op mode enganged.") } - jobcs := map[string]chan *syncJob{} - srcThrottling := map[string]chan struct{}{} + jobcs := map[string]map[string]chan *syncJob{} + srcThrottling := map[string]chan struct{}{} // TODO: not really needed anymore workerWg := new(sync.WaitGroup) var totalWorkers int for dst, srcJobs := range jobsd { - jobc := make(chan *syncJob, ms.flags.workers) - jobcs[dst] = jobc + // jobc := make(chan *syncJob, ms.flags.workers) + jobcs[dst] = map[string]chan *syncJob{} + + ms.stat.nodes[dst] = &syncPerNodeStat{} + + for src, jobs := range srcJobs { + jobc := make(chan *syncJob, ms.flags.workers) + jobcs[dst][src] = jobc + + ms.stat.nodes[src] = &syncPerNodeStat{} + ms.stat.totalJobs += int64(len(jobs)) - for src := range srcJobs { // Why src nodes have 1.5x workers: eading data is less // expensive than writing data, so it should be ok for // source node to receive some more reading requests. if _, ok := srcThrottling[src]; !ok { srcThrottling[src] = make(chan struct{}, ms.flags.workers+ms.flags.workers/2) } - } - for i := 0; i < ms.flags.workers; i++ { - totalWorkers++ + for i := 0; i < ms.flags.workers; i++ { + totalWorkers++ - workerWg.Add(1) - go ms.sync(jobc, srcThrottling, workerWg) + workerWg.Add(1) + go ms.sync(jobc, srcThrottling, workerWg) + } } } // num of old servers * num of new servers * metric workers log.Printf("Total workers: %d", totalWorkers) + if ms.flags.goCarbonHealthCheck { + // de-duplicate server ips/servers + servers := map[string]*goCarbonState{} + for s := range jobsd { + servers[s] = ms.newGoCarbonState(fmt.Sprintf("%s:%s", strings.Split(s, ":")[0], ms.flags.goCarbonPort), ms.flags.syncSpeedUpInterval) + } + for s := range srcThrottling { + servers[s] = ms.newGoCarbonState(fmt.Sprintf("%s:%s", strings.Split(s, ":")[0], ms.flags.goCarbonPort), ms.flags.syncSpeedUpInterval) + } + + go ms.monitorGoCarbonHealth(time.Second*time.Duration(ms.flags.goCarbonHealthCheckInterval), servers) + } + // Queue up and process work var serverWg sync.WaitGroup var progress int64 + // var servers = map[string]struct{}{} for dst, jobss := range jobsd { + // servers[dst] = struct{}{} for src, jobs := range jobss { serverWg.Add(1) + // servers[src] = struct{}{} go func(dst, src string, jobs []*syncJob) { - jobc := jobcs[dst] + jobc := jobcs[dst][src] for _, job := range jobs { job.srcServer = src @@ -123,10 +218,15 @@ func (ms *metricSyncer) run(jobsd map[string]map[string][]*syncJob) error { progressDone := make(chan struct{}) go ms.displayProgressOrExit(totalJobs, &progress, progressDone) + if ms.flags.graphiteEndpoint != "" { + go ms.reportToGraphite(time.Second*time.Duration(ms.flags.graphiteStatInterval), progressDone) + } serverWg.Wait() - for _, c := range jobcs { - close(c) + for _, srcJobcs := range jobcs { + for _, c := range srcJobcs { + close(c) + } } workerWg.Wait() @@ -157,7 +257,7 @@ func (ms *metricSyncer) run(jobsd map[string]map[string][]*syncJob) error { logbreakdownStat("Delete", &ms.stat.time.delete) log.Printf(" Rest: %s (%.2f%%)", time.Duration(ms.restTime()), float64(100*ms.restTime())/float64(atomic.LoadInt64(&ms.stat.time.total))) - if (!ignore404 && ms.stat.notFound > 0) || ms.stat.copyError > 0 || ms.stat.deleteError > 0 { + if (!ms.flags.ignore404 && ms.stat.notFound > 0) || ms.stat.copyError > 0 || ms.stat.deleteError > 0 { log.Println("Errors are present in sync.") return fmt.Errorf("Errors present.") } @@ -181,9 +281,22 @@ func (ms *metricSyncer) sync(jobc chan *syncJob, srcThrottling map[string]chan s func() { src, dst := job.srcServer, job.dstServer + // Make sure that src servers doesn't receive too many + // read requests If this interfers with sync speed-up + // with go-cabron health check, we can bypass the issue + // by increase the worker count. srcThrottling[src] <- struct{}{} + // go-carbon health check + if ms.flags.goCarbonHealthCheck { + ms.requestAccess(dst) + ms.requestAccess(src) + } + syncStart := time.Now() + atomic.AddInt64(&ms.stat.finishedJobs, 1) + atomic.AddInt64(&ms.stat.nodes[src].finishedJobs, 1) + atomic.AddInt64(&ms.stat.nodes[dst].finishedJobs, 1) defer func() { atomic.AddInt64(&ms.stat.time.count, 1) atomic.AddInt64(&ms.stat.time.total, int64(time.Since(syncStart))) @@ -197,6 +310,10 @@ func (ms *metricSyncer) sync(jobc chan *syncJob, srcThrottling map[string]chan s dst, job.newName, ms.flags.delete) } + if ms.flags.testingHelper.workerSleepSeconds > 0 { + time.Sleep(time.Second * time.Duration(ms.flags.testingHelper.workerSleepSeconds)) + } + var mhstats *metricHealStats if ms.flags.offloadFetch { var err error @@ -204,9 +321,9 @@ func (ms *metricSyncer) sync(jobc chan *syncJob, srcThrottling map[string]chan s if err != nil { // errors already loggged in the func if errors.Is(err, errNotFound) { - ms.stat.notFound++ + atomic.AddInt64(&ms.stat.notFound, 1) } else { - ms.stat.copyError++ + atomic.AddInt64(&ms.stat.copyError, 1) } return @@ -216,9 +333,9 @@ func (ms *metricSyncer) sync(jobc chan *syncJob, srcThrottling map[string]chan s if err != nil { // errors already loggged in the func if errors.Is(err, errNotFound) { - ms.stat.notFound++ + atomic.AddInt64(&ms.stat.notFound, 1) } else { - ms.stat.copyError++ + atomic.AddInt64(&ms.stat.copyError, 1) } return @@ -228,9 +345,9 @@ func (ms *metricSyncer) sync(jobc chan *syncJob, srcThrottling map[string]chan s if err != nil { // errors already loggged in the func if errors.Is(err, errNotFound) { - ms.stat.notFound++ + atomic.AddInt64(&ms.stat.notFound, 1) } else { - ms.stat.copyError++ + atomic.AddInt64(&ms.stat.copyError, 1) } return @@ -267,7 +384,7 @@ func (ms *metricSyncer) sync(jobc chan *syncJob, srcThrottling map[string]chan s err := DeleteMetric(src, job.oldName) if err != nil { // errors already loggged in the func - ms.stat.deleteError++ + atomic.AddInt64(&ms.stat.deleteError, 1) } atomic.AddInt64(&ms.stat.time.delete.count, 1) @@ -342,3 +459,220 @@ func (ms *metricSyncer) displayProgressOrExit(total int, progress *int64, progre } } } + +type goCarbonState struct { + addr string + cacheSize int64 + cacheLimit int64 + + refreshedAt atomic.Value + + metricsPerSecond int64 // metric per second + syncTokens chan struct{} // generated by metricsPerSecond + fastReset chan struct{} // quick go-carbon overload notification channel +} + +func (ms *metricSyncer) newGoCarbonState(addr string, speedUpInterval int) *goCarbonState { + if speedUpInterval <= 0 { + // which means disable speed up + speedUpInterval = int(time.Hour * 24 * 365 * 10) // in practice, it's impossible to have bucky rebalance running for 10 years. + } + + mps := int64(ms.flags.metricsPerSecond) + state := &goCarbonState{ + addr: addr, + metricsPerSecond: mps, + syncTokens: make(chan struct{}, 120), + fastReset: make(chan struct{}, 1), + } + + go func() { + speedUp := time.NewTicker(time.Second * time.Duration(speedUpInterval)) + speed := time.NewTicker(time.Second / 1) + for { + select { + case <-state.fastReset: + atomic.StoreInt64(&state.metricsPerSecond, 1) + speed.Reset(time.Second / time.Duration(state.metricsPerSecond)) + case <-speedUp.C: + if ms.isGoCarbonOverload(state) { // overload check + atomic.StoreInt64(&state.metricsPerSecond, 1) + } else if !ms.flags.noRandomEasing && + atomic.LoadInt64(&state.metricsPerSecond) >= 30 && + rand.Intn(10) <= 2 { // random easing + atomic.StoreInt64(&state.metricsPerSecond, rand.Int63n(atomic.LoadInt64(&state.metricsPerSecond))+1) + } else { // randomly increased sync rate by 1-5 metrics per second + atomic.AddInt64(&state.metricsPerSecond, rand.Int63n(5)+1) + } + + // TODO: might need to add some proetections if bucky failed to talk to go-carbon at start-up. + if refreshedAt := state.refreshedAt.Load(); refreshedAt != nil && + time.Since(refreshedAt.(time.Time)) >= time.Minute*15 { + atomic.StoreInt64(&state.metricsPerSecond, mps) + } + + speed.Reset(time.Second / time.Duration(atomic.LoadInt64(&state.metricsPerSecond))) + case <-speed.C: + select { + case state.syncTokens <- struct{}{}: + default: + } + } + } + }() + + return state +} + +func (ms *metricSyncer) isGoCarbonOverload(state *goCarbonState) bool { + return float64(atomic.LoadInt64(&state.cacheSize)) >= float64(atomic.LoadInt64(&state.cacheLimit))*ms.flags.goCarbonCacheThreshold +} + +func (ms *metricSyncer) requestAccess(server string) { + if !ms.flags.goCarbonHealthCheck || ms.goCarbonStates == nil { + return + } + + state := ms.goCarbonStates[server] + if state == nil { + return + } + + <-state.syncTokens + + return +} + +func (ms *metricSyncer) monitorGoCarbonHealth(interval time.Duration, servers map[string]*goCarbonState) { + ms.goCarbonStates = servers + + ticker := time.NewTicker(interval) + for { + <-ticker.C + + var wg sync.WaitGroup + for _, state := range ms.goCarbonStates { + wg.Add(1) + go func(state *goCarbonState) { + defer wg.Done() + + resp, err := httpClient.Get(fmt.Sprintf("%s://%s/admin/info", ms.flags.goCarbonProtocol, state.addr)) + if err != nil { + log.Printf("Failed to retrieve health info from %s: %s", state.addr, err) + return + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read response from %s: %s", state.addr, body) + } + if resp.StatusCode != 200 { + log.Printf("Non-200 result returned from %s: %s", state.addr, body) + return + } + + var info struct { + Cache struct { + Size int64 `json:"size"` + Limit int64 `json:"limit"` + } `json:"cache"` + } + if err := json.Unmarshal(body, &info); err != nil { + log.Printf("Failed to decode health info from %s: err: %s body: %s", state.addr, err, body) + return + } + + state.refreshedAt.Store(time.Now()) + atomic.StoreInt64(&state.cacheLimit, info.Cache.Limit) + atomic.StoreInt64(&state.cacheSize, info.Cache.Size) + + if ms.isGoCarbonOverload(state) { + state.fastReset <- struct{}{} + } + }(state) + } + + wg.Wait() + } +} + +func (ms *metricSyncer) reportToGraphite(interval time.Duration, progressDone chan struct{}) { + defer func() { + if r := recover(); r != nil { + log.Printf("reportToGraphite panics: %v\n%s", r, debug.Stack()) + } + }() + ticker := time.NewTicker(interval) + nodeNameCache := map[string]string{} + + var exit bool + for { + select { + case <-ticker.C: + case <-progressDone: + exit = true + } + + tcpAddr, err := net.ResolveTCPAddr("tcp4", ms.flags.graphiteEndpoint) + if err != nil { + log.Printf("reportToGraphite: Failed to resolve graphite endpoint %s: %s", ms.flags.graphiteEndpoint, err) + continue + } + conn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + log.Printf("reportToGraphite: Failed to connect to graphite %s: %s", ms.flags.graphiteEndpoint, err) + continue + } + + ts := time.Now().Unix() + fmt.Fprintf(conn, "%s.main.total_jobs.last %d %d\n", ms.flags.graphiteMetricsPrefix, ms.stat.totalJobs, ts) + fmt.Fprintf(conn, "%s.main.finished_jobs.last %d %d\n", ms.flags.graphiteMetricsPrefix, ms.stat.finishedJobs, ts) + fmt.Fprintf(conn, "%s.main.not_found.last %d %d\n", ms.flags.graphiteMetricsPrefix, ms.stat.notFound, ts) + fmt.Fprintf(conn, "%s.main.copy_error.last %d %d\n", ms.flags.graphiteMetricsPrefix, ms.stat.copyError, ts) + fmt.Fprintf(conn, "%s.main.delete_error.last %d %d\n", ms.flags.graphiteMetricsPrefix, ms.stat.deleteError, ts) + + for node, stat := range ms.stat.nodes { + nodeName, ok := nodeNameCache[node] + if !ok { + nodeNameCache[node] = ms.getNodeName(node) + nodeName = nodeNameCache[node] + } + + fmt.Fprintf(conn, "%s.node.%s.finished_jobs.last %d %d\n", ms.flags.graphiteMetricsPrefix, nodeName, stat.finishedJobs, ts) + + if ms.flags.goCarbonHealthCheck { + state := ms.goCarbonStates[node] + fmt.Fprintf(conn, "%s.node.%s.metrics_per_second.last %d %d\n", ms.flags.graphiteMetricsPrefix, nodeName, atomic.LoadInt64(&state.metricsPerSecond), ts) + fmt.Fprintf(conn, "%s.node.%s.cache_limit.last %d %d\n", ms.flags.graphiteMetricsPrefix, nodeName, state.cacheLimit, ts) + fmt.Fprintf(conn, "%s.node.%s.cache_size.last %d %d\n", ms.flags.graphiteMetricsPrefix, nodeName, state.cacheSize, ts) + } + } + + conn.Close() + + if exit { + break + } + } +} + +func (ms *metricSyncer) getNodeName(src string) string { + ip := strings.Split(src, ":")[0] + if !ms.flags.graphiteIPToHostname { + return strings.ReplaceAll(ip, ".", "_") + } + + var resolver net.Resolver + names, err := resolver.LookupAddr(context.Background(), ip) + if err != nil || len(names) == 0 { + if err != nil { + log.Printf("Failed to resolve hostname for %s: %s", ip, err) + } + + return strings.ReplaceAll(ip, ".", "_") + } + + sort.Strings(names) + return strings.ReplaceAll(names[0], ".", "_") +} diff --git a/cmd/bucky/copy.go b/cmd/bucky/copy.go index 084afb70..6aba39da 100644 --- a/cmd/bucky/copy.go +++ b/cmd/bucky/copy.go @@ -5,7 +5,7 @@ import "log" type copyCommand struct { src, dst string listForce bool - metricSyncer + *metricSyncer } func init() { @@ -20,15 +20,13 @@ func init() { SetupHostname(c) SetupSingle(c) - // c.Flag.BoolVar(&doDelete, "delete", false, "Delete metrics after moving them.") c.Flag.StringVar(©Cmd.src, "src", "", "Source host to copy metrics from.") c.Flag.StringVar(©Cmd.dst, "dst", "", "Destination host to copy metrics to.") - c.Flag.BoolVar(©Cmd.flags.noop, "no-op", false, "Do not alter metrics and print report.") - c.Flag.IntVar(©Cmd.flags.workers, "w", 5, "Downloader threads.") - c.Flag.IntVar(©Cmd.flags.workers, "workers", 5, "Downloader threads.") c.Flag.BoolVar(©Cmd.listForce, "f", false, "Force the remote daemons to rebuild their cache.") - c.Flag.BoolVar(©Cmd.flags.offloadFetch, "offload", false, "Offload metric data fetching to data nodes.") - c.Flag.BoolVar(©Cmd.flags.ignore404, "ignore404", false, "Do not treat 404 as errors.") + + msFlags.registerFlags(c.Flag) + + copyCmd.metricSyncer = newMetricSyncer(msFlags) } // rebalanceCommand runs this subcommand. diff --git a/cmd/bucky/list.go b/cmd/bucky/list.go index 4abd1890..394ac541 100644 --- a/cmd/bucky/list.go +++ b/cmd/bucky/list.go @@ -183,6 +183,7 @@ func multiplexListRequests(r []metricListRequest) (map[string][]string, error) { // Errors reported by getMetricsCache comms <- metrics } else { + log.Printf("Failed to retrieve metric list from %s: %s", req.url, err) errors = true } wg.Done() diff --git a/cmd/bucky/rebalance.go b/cmd/bucky/rebalance.go index 316a6f1a..2aef0be8 100644 --- a/cmd/bucky/rebalance.go +++ b/cmd/bucky/rebalance.go @@ -6,11 +6,6 @@ import ( "sort" ) -var doDelete bool -var noOp bool -var offloadFetch bool -var ignore404 bool - func init() { usage := "[options] [additional buckyd servers...]" short := "Rebalance a server or the entire cluster." @@ -47,20 +42,9 @@ Set -offload=true to speed up rebalance.` SetupHostname(c) SetupSingle(c) - c.Flag.BoolVar(&doDelete, "delete", false, - "Delete metrics after moving them.") - c.Flag.BoolVar(&noOp, "no-op", false, - "Do not alter metrics and print report.") - c.Flag.IntVar(&metricWorkers, "w", 5, - "Downloader threads.") - c.Flag.IntVar(&metricWorkers, "workers", 5, - "Downloader threads.") + msFlags.registerFlags(c.Flag) c.Flag.BoolVar(&listForce, "f", false, "Force the remote daemons to rebuild their cache.") - c.Flag.BoolVar(&offloadFetch, "offload", false, - "Offload metric data fetching to data nodes.") - c.Flag.BoolVar(&ignore404, "ignore404", false, - "Do not treat 404 as errors.") } // countMap returns the number of metrics in a server -> metrics mapping @@ -119,7 +103,7 @@ func RebalanceMetrics(extraHostPorts []string) error { } jobs[dst][src] = append(jobs[dst][src], job) - if noOp { + if msFlags.noop { log.Printf("[%s] %s => %s", src, m, dst) } } @@ -130,7 +114,7 @@ func RebalanceMetrics(extraHostPorts []string) error { log.Printf("%d metrics on %s must be relocated", moves[server], server) } - ms := newMetricSyncer(doDelete, noOp, offloadFetch, ignore404, Verbose, metricWorkers) + ms := newMetricSyncer(msFlags) ms.run(jobs) diff --git a/testing/rebalance_health_check.go b/testing/rebalance_health_check.go new file mode 100644 index 00000000..e21b1b0f --- /dev/null +++ b/testing/rebalance_health_check.go @@ -0,0 +1,321 @@ +package main + +import ( + "crypto/rand" + "flag" + "fmt" + "io/ioutil" + "log" + "math" + mrand "math/rand" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "reflect" + "strings" + "sync" + "time" + + "github.com/go-graphite/buckytools/hashing" + "github.com/go-graphite/go-whisper" +) + +func main() { + // 1. populate metrics + // 2. start three buckyd instances + // 3. run rebalance + + keepTestData := flag.Bool("keep-testdata", false, "keep test data after test") + flag.Parse() + + log.SetFlags(log.Lshortfile) + + var failed bool + defer func() { + if failed { + os.Exit(1) + } + }() + + testDir, err := os.MkdirTemp("./", "testdata_rebalance_health_check_*") + if err != nil { + panic(err) + } + defer func() { + if !*keepTestData { + os.RemoveAll(testDir) + } + }() + + log0, err := os.Create(filepath.Join(testDir, "server0.log")) + if err != nil { + panic(err) + } + log1, err := os.Create(filepath.Join(testDir, "server1.log")) + if err != nil { + panic(err) + } + log2, err := os.Create(filepath.Join(testDir, "server2.log")) + if err != nil { + panic(err) + } + rebalanceLog, err := os.Create(filepath.Join(testDir, "rebalance.log")) + if err != nil { + panic(err) + } + + // sudo ip addr add 10.0.1.7 dev lo + // sudo ip addr add 10.0.1.8 dev lo + // sudo ip addr add 10.0.1.9 dev lo + var ( + server0 = hashing.Node{Server: "10.0.1.7", Port: 40000, Instance: "server0"} + server1 = hashing.Node{Server: "10.0.1.8", Port: 40001, Instance: "server1"} + server2 = hashing.Node{Server: "10.0.1.9", Port: 40002, Instance: "server2"} + ) + + if err := os.MkdirAll(filepath.Join(testDir, "server0"), 0755); err != nil { + panic(err) + } + if err := os.MkdirAll(filepath.Join(testDir, "server1"), 0755); err != nil { + panic(err) + } + if err := os.MkdirAll(filepath.Join(testDir, "server2"), 0755); err != nil { + panic(err) + } + + go func() { + tcpAddr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:33002") + if err != nil { + log.Printf("Failed to resolve 127.0.0.1:33002: %s", err) + os.Exit(1) + } + listener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + log.Printf("Failed to listen 127.0.0.1:33002: %s", err) + os.Exit(1) + } + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Failed to accept graphite data: %s", err) + continue + } + data, err := ioutil.ReadAll(conn) + if err != nil { + continue + } + log.Printf("metrics:\n%s\n", data) + conn.Close() + } + }() + + for _, node := range []hashing.Node{server0, server1, server2} { + go func(node hashing.Node) { + mux := http.NewServeMux() + start := time.Now() + mux.HandleFunc("/admin/info", func(w http.ResponseWriter, r *http.Request) { + if time.Since(start) >= time.Second*10 && time.Since(start) <= time.Second*20 { + fmt.Fprintf(w, `{"cache": {"size": 1000, "limit": 100000}}`) + } else { + fmt.Fprintf(w, `{"cache": {"size": 1000, "limit": 1000}}`) + } + }) + + s := &http.Server{ + Addr: fmt.Sprintf("%s:8080", node.Server), + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + log.Fatal(s.ListenAndServe()) + }(node) + } + + cmd0 := exec.Command("./buckyd", "-hash", "jump_fnv1a", "-b", nodeStr(server0), "-node", server0.Server, "-prefix", filepath.Join(testDir, "server0"), "-replicas", "1", "-sparse", nodeStr(server0), nodeStr(server1), nodeStr(server2)) + cmd0.Stdout = log0 + cmd0.Stderr = log0 + if err := cmd0.Start(); err != nil { + panic(err) + } + + cmd1 := exec.Command("./buckyd", "-hash", "jump_fnv1a", "-b", nodeStr(server1), "-node", server1.Server, "-prefix", filepath.Join(testDir, "server1"), "-replicas", "1", "-sparse", nodeStr(server0), nodeStr(server1), nodeStr(server2)) + cmd1.Stdout = log1 + cmd1.Stderr = log1 + if err := cmd1.Start(); err != nil { + panic(err) + } + + cmd2 := exec.Command("./buckyd", "-hash", "jump_fnv1a", "-b", nodeStr(server2), "-node", server2.Server, "-prefix", filepath.Join(testDir, "server2"), "-replicas", "1", "-sparse", nodeStr(server0), nodeStr(server1), nodeStr(server2)) + cmd2.Stdout = log2 + cmd2.Stderr = log2 + if err := cmd2.Start(); err != nil { + panic(err) + } + defer func() { + cmd0.Process.Kill() + cmd1.Process.Kill() + cmd2.Process.Kill() + }() + + mrand.Seed(time.Now().Unix()) + + var wg sync.WaitGroup + var totalFiles = 100 + wg.Add(totalFiles) + + var metrics = map[string]hashing.Node{} + + var ring = hashing.NewJumpHashRing(3) + ring.AddNode(server0) + ring.AddNode(server1) + + filesStart := time.Now() + for i := 0; i < totalFiles; i++ { + b := make([]byte, 32) + _, err := rand.Read(b) + if err != nil { + panic(fmt.Errorf("failed to read random bytes: %s", err)) + } + rets, err := whisper.ParseRetentionDefs("1s:3h,10s:3d,1m:31d") + if err != nil { + panic(err) + } + metric := fmt.Sprintf("metric_%x", b) + node := ring.GetNode(metric) + metrics[metric] = node + file, err := whisper.Create(filepath.Join(testDir, node.Instance, metric+".wsp"), rets, whisper.Sum, 0) + if err != nil { + panic(err) + } + go func() { + var points []*whisper.TimeSeriesPoint + var now = int(time.Now().Unix()) - 1800 + for i := 0; i < 1800; i++ { + points = append(points, &whisper.TimeSeriesPoint{Time: now + i, Value: mrand.Float64()}) + } + if err := file.UpdateMany(points); err != nil { + panic(err) + } + wg.Done() + }() + } + wg.Wait() + log.Printf("finished creating whisper files. took %s\n", time.Since(filesStart)) + + time.Sleep(time.Second * 3) + rebalanceStart := time.Now() + // rebalanceCmd := exec.Command("./bucky", "rebalance", "-f", "-h", nodeStr(server0), "-offload", "-w", "3", "-ignore404") + rebalanceCmd := exec.Command( + "./bucky", "rebalance", "-f", "-h", nodeStr(server0), + "-offload", "-ignore404", + "-testing.worker-sleep-seconds", "10", + "-graphite-ip-to-hostname", + "-graphite-metrics-prefix", "carbon.minutely.buckytool.rebalance.misc.dst.ams4a.src.ams4b", + "-graphite-endpoint", "127.0.0.1:33002", + "-graphite-stat-interval", "3", + "-go-carbon-health-check=true", + "-go-carbon-health-check-interval", "3", + "-go-carbon-port", "8080", + "-go-carbon-protocol", "http", + "-go-carbon-cache-threshold", "0.75", + // "-min-workers", "2", + "-sync-speed-up-interval", "2", + "-workers", "10", + ) + + rebalanceCmd.Stdout = rebalanceLog + rebalanceCmd.Stderr = rebalanceLog + + log.Printf("rebalanceCmd.String() = %+v\n", rebalanceCmd.String()) + if err := rebalanceCmd.Run(); err != nil { + log.Printf("failed to run rebalance command: %s", err) + failed = true + return + } + + log.Printf("finished rebalancing. took %s\n", time.Since(rebalanceStart)) + + files, err := os.ReadDir(filepath.Join(testDir, "server2")) + if err != nil { + panic(err) + } + if len(files) == 0 { + log.Printf("failed to rebalance cluster: 0 files are relocated.") + failed = true + return + } + + log.Printf("%d files relocated.", len(files)) + + var inconsistentMetrics []string + for _, m := range files { + newf, err := whisper.Open(filepath.Join(testDir, "server2", m.Name())) + if err != nil { + panic(err) + } + oldf, err := whisper.Open(filepath.Join(testDir, metrics[strings.TrimSuffix(m.Name(), ".wsp")].Instance, m.Name())) + if err != nil { + panic(err) + } + nrets := newf.Retentions() + orets := oldf.Retentions() + if !reflect.DeepEqual(nrets, orets) { + log.Printf("rention policy not equal:\n new: %#v\n old: %#v\n", nrets, orets) + } + now := int(time.Now().Unix()) + for _, ret := range nrets { + ndata, err := newf.Fetch(now-ret.MaxRetention(), now) + if err != nil { + panic(err) + } + odata, err := oldf.Fetch(now-ret.MaxRetention(), now) + if err != nil { + panic(err) + } + if ndata == nil { + log.Printf("failed to retrieve data from file %s\n", newf.File().Name()) + continue + } + if odata == nil { + log.Printf("failed to retrieve data from file %s\n", newf.File().Name()) + continue + } + + var count int + var npoints = ndata.Points() + var opoints = odata.Points() + for i, opoint := range opoints { + if !math.IsNaN(opoint.Value) && !math.IsNaN(npoints[i].Value) && opoint != npoints[i] { + count++ + log.Printf("opoints = %+v\n", opoints[i]) + log.Printf("npoints = %+v\n", npoints[i]) + + if len(inconsistentMetrics) == 0 || inconsistentMetrics[len(inconsistentMetrics)-1] != m.Name() { + inconsistentMetrics = append(inconsistentMetrics, m.Name()) + } + } + } + + if count > 0 { + log.Printf("metric %s %s: %d points not equal", m.Name(), ret, count) + } + } + + newf.Close() + oldf.Close() + } + + if len(inconsistentMetrics) > 0 { + log.Printf("%d rebalanced metrics not matching original metrics: %s", len(inconsistentMetrics), strings.Join(inconsistentMetrics, ",")) + failed = true + return + } else { + log.Printf("metrics are rebalanced properly.") + } +} + +func nodeStr(n hashing.Node) string { return fmt.Sprintf("%s:%d", n.Server, n.Port) } From 2ec000eb3dc2650b316a7c4da0386481219bc0ea Mon Sep 17 00:00:00 2001 From: Xiaofan Hu Date: Fri, 22 Oct 2021 16:55:23 +0200 Subject: [PATCH 2/2] rebalance: adds -allowed-dsts to support backfilling nodes in imbalanced replicas Sometimes we use rebalance command to backfill a storage node that replaces a broken node: bucky rebalance -h node-in-replica-1:4242 node-in-replica2:4242 This only works if replica 1 and replica 2 has the same number of nodes (assume using jump hash). With -allowed-dsts, we can backfill the node from a replica has higher or lower storage nodes. bucky rebalance -h node-in-replica-1:4242 -allowed-dsts node-in-replica-1:4242 node1-in-replica2:4242,node2-in-replica2:4242,...,nodeN-in-replica2:4242 --- Makefile | 3 ++- cmd/bucky/rebalance.go | 26 ++++++++++++++++++++++++-- testing/rebalance.go | 8 +++++++- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index eb4c3a69..ea3b83e9 100644 --- a/Makefile +++ b/Makefile @@ -44,9 +44,10 @@ test_rebalance_health_check_setup: sudo ip addr add 10.0.1.9 dev lo test_rebalance_health_check: clean bucky buckyd -# go run -mod vendor testing/rebalance_health_check.go $(REBALANCE_FLAGS) + go run -mod vendor testing/rebalance_health_check.go $(REBALANCE_FLAGS) clean_test: + rm -rf bucky buckyd rm -rf testdata_rebalance_* rm -rf testdata_copy_* diff --git a/cmd/bucky/rebalance.go b/cmd/bucky/rebalance.go index 2aef0be8..978ab798 100644 --- a/cmd/bucky/rebalance.go +++ b/cmd/bucky/rebalance.go @@ -4,8 +4,13 @@ import ( "fmt" "log" "sort" + "strings" ) +var rebalanceConfig struct { + allowedDsts string +} + func init() { usage := "[options] [additional buckyd servers...]" short := "Rebalance a server or the entire cluster." @@ -43,8 +48,8 @@ Set -offload=true to speed up rebalance.` SetupSingle(c) msFlags.registerFlags(c.Flag) - c.Flag.BoolVar(&listForce, "f", false, - "Force the remote daemons to rebuild their cache.") + c.Flag.BoolVar(&listForce, "f", false, "Force the remote daemons to rebuild their cache.") + c.Flag.StringVar(&rebalanceConfig.allowedDsts, "allowed-dsts", "", "Only copy/rebanace metrics to the allowed destinations (ip1:port,ip2:port). By default (i.e. empty), all dsts are allowed.") } // countMap returns the number of metrics in a server -> metrics mapping @@ -109,6 +114,23 @@ func RebalanceMetrics(extraHostPorts []string) error { } } + if rebalanceConfig.allowedDsts != "" { + allowm := map[string]bool{} + for _, hostport := range strings.Split(rebalanceConfig.allowedDsts, ";") { + allowm[strings.TrimSpace(hostport)] = true + } + + newJobs := map[string]map[string][]*syncJob{} + for dst, srcm := range jobs { + if allowm[dst] { + newJobs[dst] = srcm + } + + } + + jobs = newJobs + } + sort.Strings(servers) for _, server := range servers { log.Printf("%d metrics on %s must be relocated", moves[server], server) diff --git a/testing/rebalance.go b/testing/rebalance.go index 4d769ce5..771c819f 100644 --- a/testing/rebalance.go +++ b/testing/rebalance.go @@ -152,7 +152,13 @@ func main() { time.Sleep(time.Second * 3) rebalanceStart := time.Now() - rebalanceCmd := exec.Command("./bucky", "rebalance", "-f", "-h", nodeStr(server0), "-offload", "-w", "3", "-ignore404") + rebalanceCmd := exec.Command( + "./bucky", "rebalance", "-f", + "-h", nodeStr(server0), "-offload", + "-w", "3", "-ignore404", + // "-allowed-dsts", "localhost:40002", + // "-allowed-dsts", "xxx:xxx", + ) rebalanceCmd.Stdout = rebalanceLog rebalanceCmd.Stderr = rebalanceLog