From 8e7752e59bab3299908ea0b278ba9db2a0800428 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 11 Oct 2024 13:22:49 -0700 Subject: [PATCH] Memberlist: support for debouncing notifications (#592) * Adds a layer of buffering to Memberlist's notification handling so that notifications are fired at most once per NotifyInterval, at which point it will deliver notifications using the most recently-observed data. * Adds a new config flag to control this interval: -memberlist.notify-interval which defaults to 0 (off). Motivation for this change: In clusters where the memberlist KVStore watched by Ring has many replicas, redeploying those replicas can cause WatchKey and updateRingState to be called hundreds of times per second. When there are many concurrent goroutines calling ring.ShuffleShard, the high rate of updateRingState calls (which take locks and clear caches) can create heavy lock contention and latency as ShuffleShard attempts to take locks in order to repopulate those caches. --- CHANGELOG.md | 3 +- kv/memberlist/memberlist_client.go | 88 +++++++++++++++--- kv/memberlist/memberlist_client_test.go | 118 +++++++++++++++++++++++- ring/ring.go | 4 +- 4 files changed, 198 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7857018e5..f9cb11448 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ * [CHANGE] Changed `ShouldLog()` function signature in `middleware.OptionalLogging` interface to `ShouldLog(context.Context) (bool, string)`: the returned `string` contains an optional reason. When reason is valued, `GRPCServerLog` adds `()` suffix to the error. #514 * [CHANGE] Cache: Remove superfluous `cache.RemoteCacheClient` interface and unify all caches using the `cache.Cache` interface. #520 * [CHANGE] Updated the minimum required Go version to 1.21. #540 +* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538 * [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539 * [CHANGE] memberlist: Failure to fast-join a cluster via contacting a node is now logged at `info` instead of `debug`. #585 * [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564 @@ -234,7 +235,7 @@ * [ENHANCEMENT] Cache: Add `.Add()` and `.Set()` methods to cache clients. #591 * [ENHANCEMENT] Cache: Add `.Advance()` methods to mock cache clients for easier testing of TTLs. #601 * [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525 -* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538 +* [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 1d96363fe..452798e04 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -137,6 +137,7 @@ type KVConfig struct { GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"` DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"` EnableCompression bool `yaml:"compression_enabled" category:"advanced"` + NotifyInterval time.Duration `yaml:"notify_interval" category:"advanced"` // ip:port to advertise other cluster members. Used for NAT traversal AdvertiseAddr string `yaml:"advertise_addr"` @@ -195,6 +196,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.") f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.") f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") + f.DurationVar(&cfg.NotifyInterval, prefix+"memberlist.notify-interval", 0, "How frequently to notify watchers when a key changes. Can reduce CPU activity in large memberlist deployments. 0 to notify without delay.") f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") @@ -251,6 +253,10 @@ type KV struct { watchers map[string][]chan string prefixWatchers map[string][]chan string + // Delayed notifications for watchers + notifMu sync.Mutex + keyNotifications map[string]struct{} + // Buffers with sent and received messages. Used for troubleshooting only. // New messages are appended, old messages (based on configured size limit) removed from the front. messagesMu sync.Mutex @@ -359,17 +365,18 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace mlkv := &KV{ - cfg: cfg, - logger: logger, - registerer: registerer, - provider: dnsProvider, - store: make(map[string]ValueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - workersChannels: make(map[string]chan valueUpdate), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, + cfg: cfg, + logger: logger, + registerer: registerer, + provider: dnsProvider, + store: make(map[string]ValueDesc), + codecs: make(map[string]codec.Codec), + watchers: make(map[string][]chan string), + keyNotifications: make(map[string]struct{}), + prefixWatchers: make(map[string][]chan string), + workersChannels: make(map[string]chan valueUpdate), + shutdown: make(chan struct{}), + maxCasRetries: maxCasRetries, } mlkv.createAndRegisterMetrics() @@ -486,6 +493,13 @@ func (m *KV) running(ctx context.Context) error { return errFailedToJoinCluster } + if m.cfg.NotifyInterval > 0 { + // Start delayed key notifications. + notifTicker := time.NewTicker(m.cfg.NotifyInterval) + defer notifTicker.Stop() + go m.monitorKeyNotifications(ctx, notifTicker.C) + } + var tickerChan <-chan time.Time if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 { t := time.NewTicker(m.cfg.RejoinInterval) @@ -905,7 +919,59 @@ func removeWatcherChannel(k string, w chan string, watchers map[string][]chan st } } +// notifyWatchers sends notification to all watchers of given key. If delay is +// enabled, it accumulates them for later sending. func (m *KV) notifyWatchers(key string) { + if m.cfg.NotifyInterval <= 0 { + m.notifyWatchersSync(key) + return + } + + m.notifMu.Lock() + defer m.notifMu.Unlock() + m.keyNotifications[key] = struct{}{} +} + +// monitorKeyNotifications sends accumulated notifications to all watchers of +// respective keys when the given channel ticks. +func (m *KV) monitorKeyNotifications(ctx context.Context, tickChan <-chan time.Time) { + if m.cfg.NotifyInterval <= 0 { + panic("sendNotifications called with NotifyInterval <= 0") + } + + for { + select { + case <-tickChan: + m.sendKeyNotifications() + case <-ctx.Done(): + return + } + } +} + +// sendKeyNotifications sends accumulated notifications to watchers of respective keys. +func (m *KV) sendKeyNotifications() { + newNotifs := func() map[string]struct{} { + // Grab and clear accumulated notifications. + m.notifMu.Lock() + defer m.notifMu.Unlock() + + if len(m.keyNotifications) == 0 { + return nil + } + newMap := make(map[string]struct{}) + notifs := m.keyNotifications + m.keyNotifications = newMap + return notifs + } + + for key := range newNotifs() { + m.notifyWatchersSync(key) + } +} + +// notifyWatcherSync immediately sends notification to all watchers of given key. +func (m *KV) notifyWatchersSync(key string) { m.watchersMu.Lock() defer m.watchersMu.Unlock() diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 54930c875..47e8b3a8f 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -255,7 +255,6 @@ func getLocalhostAddrs() []string { func TestBasicGetAndCas(t *testing.T) { c := dataCodec{} - name := "Ing 1" var cfg KVConfig flagext.DefaultValues(&cfg) cfg.TCPTransport = TCPTransportConfig{ @@ -278,6 +277,7 @@ func TestBasicGetAndCas(t *testing.T) { } // Create member in PENDING state, with some tokens + name := "Ing 1" err = cas(kv, key, updateFn(name)) require.NoError(t, err) @@ -1803,3 +1803,119 @@ func marshalState(t *testing.T, kvps ...*KeyValuePair) []byte { return buf.Bytes() } + +func TestNotificationDelay(t *testing.T) { + cfg := KVConfig{} + // We're going to trigger sends manually, so effectively disable the automatic send interval. + const hundredYears = 100 * 365 * 24 * time.Hour + cfg.NotifyInterval = hundredYears + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + + watchChan := make(chan string, 16) + + // Add ourselves as a watcher. + kv.watchersMu.Lock() + kv.watchers["foo_123"] = append(kv.watchers["foo_123"], watchChan) + kv.watchers["foo_124"] = append(kv.watchers["foo_124"], watchChan) + kv.watchersMu.Unlock() + + defer func() { + kv.watchersMu.Lock() + removeWatcherChannel("foo_123", watchChan, kv.watchers) + removeWatcherChannel("foo_124", watchChan, kv.watchers) + kv.watchersMu.Unlock() + }() + + verifyNotifs := func(expected map[string]int, comment string) { + observed := make(map[string]int, len(expected)) + for kk := range expected { + observed[kk] = 0 + } + loop: + for { + select { + case k := <-watchChan: + observed[k]++ + default: + break loop + } + } + require.Equal(t, expected, observed, comment) + } + + drainChan := func() { + for { + select { + case <-watchChan: + default: + return + } + } + } + + kv.notifyWatchers("foo_123") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 1}, "1 change 1 notification") + + // Test coalescing of updates. + drainChan() + verifyNotifs(map[string]int{"foo_123": 0}, "chan drained") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.notifyWatchers("foo_123") + verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 1}, "flush should coalesce updates") + + // multiple buffered updates + drainChan() + verifyNotifs(map[string]int{"foo_123": 0}, "chan drained") + kv.notifyWatchers("foo_123") + kv.sendKeyNotifications() + kv.notifyWatchers("foo_123") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 2}, "two buffered updates") + + // multiple keys + drainChan() + kv.notifyWatchers("foo_123") + kv.notifyWatchers("foo_124") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 1, "foo_124": 1}, "2 changes 2 notifications") + kv.sendKeyNotifications() + verifyNotifs(map[string]int{"foo_123": 0, "foo_124": 0}, "no new notifications") + + // sendKeyNotifications can be called repeatedly without new updates. + kv.sendKeyNotifications() + kv.sendKeyNotifications() + kv.sendKeyNotifications() + kv.sendKeyNotifications() + + // Finally, exercise the monitor method. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tick := make(chan time.Time) + go kv.monitorKeyNotifications(ctx, tick) + kv.notifyWatchers("foo_123") + tick <- time.Now() + + require.Eventually(t, func() bool { + select { + case k := <-watchChan: + if k != "foo_123" { + panic(fmt.Sprintf("unexpected key: %s", k)) + } + return true + default: // nothing yet. + return false + } + }, 20*time.Second, 100*time.Millisecond) +} diff --git a/ring/ring.go b/ring/ring.go index c8db7da50..d47eb8fe2 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -215,13 +215,13 @@ type Ring struct { // Number of registered instances per zone. instancesCountPerZone map[string]int - // Nubmber of registered instances with tokens per zone. + // Number of registered instances with tokens per zone. instancesWithTokensCountPerZone map[string]int // Number of registered instances are writable and have tokens. writableInstancesWithTokensCount int - // Nubmber of registered instances with tokens per zone that are writable. + // Number of registered instances with tokens per zone that are writable. writableInstancesWithTokensCountPerZone map[string]int // Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes.