From 6c9bdd37561555f76c43e72169794f7fcb37c9cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 9 Oct 2024 15:38:30 +0200 Subject: [PATCH 1/3] Initial support for deleting keys in memberlist KV store. --- kv/memberlist/kv.pb.go | 125 +++++++++++++++++++--- kv/memberlist/kv.proto | 5 + kv/memberlist/memberlist_client.go | 134 +++++++++++++++++++----- kv/memberlist/memberlist_client_test.go | 10 +- kv/memberlist/status.gohtml | 6 +- 5 files changed, 229 insertions(+), 51 deletions(-) diff --git a/kv/memberlist/kv.pb.go b/kv/memberlist/kv.pb.go index 4c2eb9265..2080e9789 100644 --- a/kv/memberlist/kv.pb.go +++ b/kv/memberlist/kv.pb.go @@ -76,6 +76,10 @@ type KeyValuePair struct { Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` // ID of the codec used to write the value Codec string `protobuf:"bytes,3,opt,name=codec,proto3" json:"codec,omitempty"` + // Is this Key marked for deletion? + Deleted bool `protobuf:"varint,4,opt,name=deleted,proto3" json:"deleted,omitempty"` + // When was the key last updated? + UpdateTimeMillis int64 `protobuf:"varint,5,opt,name=update_time_millis,json=updateTimeMillis,proto3" json:"update_time_millis,omitempty"` } func (m *KeyValuePair) Reset() { *m = KeyValuePair{} } @@ -131,6 +135,20 @@ func (m *KeyValuePair) GetCodec() string { return "" } +func (m *KeyValuePair) GetDeleted() bool { + if m != nil { + return m.Deleted + } + return false +} + +func (m *KeyValuePair) GetUpdateTimeMillis() int64 { + if m != nil { + return m.UpdateTimeMillis + } + return 0 +} + func init() { proto.RegisterType((*KeyValueStore)(nil), "memberlist.KeyValueStore") proto.RegisterType((*KeyValuePair)(nil), "memberlist.KeyValuePair") @@ -139,22 +157,25 @@ func init() { func init() { proto.RegisterFile("kv.proto", fileDescriptor_2216fe83c9c12408) } var fileDescriptor_2216fe83c9c12408 = []byte{ - // 236 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xc8, 0x2e, 0xd3, 0x2b, - 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0x4d, 0xcd, 0x4d, 0x4a, 0x2d, 0xca, 0xc9, 0x2c, 0x2e, - 0x91, 0xd2, 0x4d, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xcf, 0x4f, - 0xcf, 0xd7, 0x07, 0x2b, 0x49, 0x2a, 0x4d, 0x03, 0xf3, 0xc0, 0x1c, 0x30, 0x0b, 0xa2, 0x55, 0xc9, - 0x9e, 0x8b, 0xd7, 0x3b, 0xb5, 0x32, 0x2c, 0x31, 0xa7, 0x34, 0x35, 0xb8, 0x24, 0xbf, 0x28, 0x55, - 0x48, 0x8f, 0x8b, 0xb5, 0x20, 0x31, 0xb3, 0xa8, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0xdb, 0x48, - 0x42, 0x0f, 0x61, 0xb6, 0x1e, 0x4c, 0x65, 0x40, 0x62, 0x66, 0x51, 0x10, 0x44, 0x99, 0x92, 0x0f, - 0x17, 0x0f, 0xb2, 0xb0, 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0xa5, 0x04, 0xa3, 0x02, 0xa3, 0x06, - 0x67, 0x10, 0x88, 0x29, 0x24, 0xc2, 0xc5, 0x5a, 0x06, 0x92, 0x96, 0x60, 0x52, 0x60, 0xd4, 0xe0, - 0x09, 0x82, 0x70, 0x40, 0xa2, 0xc9, 0xf9, 0x29, 0xa9, 0xc9, 0x12, 0xcc, 0x60, 0x95, 0x10, 0x8e, - 0x93, 0xc9, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, - 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, - 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, - 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x24, 0x36, 0xb0, 0x5f, 0x8c, 0x01, 0x01, - 0x00, 0x00, 0xff, 0xff, 0x7a, 0x22, 0xdf, 0xec, 0x12, 0x01, 0x00, 0x00, + // 288 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x90, 0x31, 0x4b, 0xc3, 0x40, + 0x18, 0x86, 0xef, 0x33, 0x56, 0xeb, 0x59, 0xa1, 0x1c, 0x0e, 0x87, 0xc3, 0x47, 0xe8, 0x94, 0x41, + 0x53, 0x50, 0x77, 0xc1, 0x55, 0x04, 0x89, 0xe2, 0x5a, 0x92, 0xe6, 0xb3, 0x1e, 0xcd, 0x71, 0x25, + 0xbd, 0x14, 0xba, 0xf9, 0x13, 0x1c, 0xfc, 0x11, 0xfe, 0x14, 0xc7, 0x8e, 0x1d, 0xed, 0x65, 0x71, + 0xec, 0x4f, 0x90, 0x5c, 0x28, 0xba, 0xbd, 0xcf, 0xfb, 0x3e, 0x77, 0xc3, 0xc7, 0xbb, 0xd3, 0x45, + 0x3c, 0x2b, 0x8d, 0x35, 0x82, 0x6b, 0xd2, 0x19, 0x95, 0x85, 0x9a, 0xdb, 0xb3, 0x8b, 0x89, 0xb2, + 0xaf, 0x55, 0x16, 0x8f, 0x8d, 0x1e, 0x4e, 0xcc, 0xc4, 0x0c, 0xbd, 0x92, 0x55, 0x2f, 0x9e, 0x3c, + 0xf8, 0xd4, 0x3e, 0x1d, 0xdc, 0xf0, 0x93, 0x3b, 0x5a, 0x3e, 0xa7, 0x45, 0x45, 0x8f, 0xd6, 0x94, + 0x24, 0x62, 0xde, 0x99, 0xa5, 0xaa, 0x9c, 0x4b, 0x08, 0x83, 0xe8, 0xf8, 0x52, 0xc6, 0x7f, 0x7f, + 0xc7, 0x3b, 0xf3, 0x21, 0x55, 0x65, 0xd2, 0x6a, 0x83, 0x0f, 0xe0, 0xbd, 0xff, 0xbd, 0xe8, 0xf3, + 0x60, 0x4a, 0x4b, 0x09, 0x21, 0x44, 0x47, 0x49, 0x13, 0xc5, 0x29, 0xef, 0x2c, 0x9a, 0x59, 0xee, + 0x85, 0x10, 0xf5, 0x92, 0x16, 0x9a, 0x76, 0x6c, 0x72, 0x1a, 0xcb, 0xc0, 0x9b, 0x2d, 0x08, 0xc9, + 0x0f, 0x73, 0x2a, 0xc8, 0x52, 0x2e, 0xf7, 0x43, 0x88, 0xba, 0xc9, 0x0e, 0xc5, 0x39, 0x17, 0xd5, + 0x2c, 0x4f, 0x2d, 0x8d, 0xac, 0xd2, 0x34, 0xd2, 0xaa, 0x28, 0xd4, 0x5c, 0x76, 0x42, 0x88, 0x82, + 0xa4, 0xdf, 0x2e, 0x4f, 0x4a, 0xd3, 0xbd, 0xef, 0x6f, 0xaf, 0x57, 0x1b, 0x64, 0xeb, 0x0d, 0xb2, + 0xed, 0x06, 0xe1, 0xcd, 0x21, 0x7c, 0x3a, 0x84, 0x2f, 0x87, 0xb0, 0x72, 0x08, 0xdf, 0x0e, 0xe1, + 0xc7, 0x21, 0xdb, 0x3a, 0x84, 0xf7, 0x1a, 0xd9, 0xaa, 0x46, 0xb6, 0xae, 0x91, 0x65, 0x07, 0xfe, + 0x28, 0x57, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe0, 0x1f, 0xee, 0xce, 0x5b, 0x01, 0x00, 0x00, } func (this *KeyValueStore) Equal(that interface{}) bool { @@ -214,6 +235,12 @@ func (this *KeyValuePair) Equal(that interface{}) bool { if this.Codec != that1.Codec { return false } + if this.Deleted != that1.Deleted { + return false + } + if this.UpdateTimeMillis != that1.UpdateTimeMillis { + return false + } return true } func (this *KeyValueStore) GoString() string { @@ -232,11 +259,13 @@ func (this *KeyValuePair) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 9) s = append(s, "&memberlist.KeyValuePair{") s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") s = append(s, "Codec: "+fmt.Sprintf("%#v", this.Codec)+",\n") + s = append(s, "Deleted: "+fmt.Sprintf("%#v", this.Deleted)+",\n") + s = append(s, "UpdateTimeMillis: "+fmt.Sprintf("%#v", this.UpdateTimeMillis)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -305,6 +334,21 @@ func (m *KeyValuePair) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.UpdateTimeMillis != 0 { + i = encodeVarintKv(dAtA, i, uint64(m.UpdateTimeMillis)) + i-- + dAtA[i] = 0x28 + } + if m.Deleted { + i-- + if m.Deleted { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x20 + } if len(m.Codec) > 0 { i -= len(m.Codec) copy(dAtA[i:], m.Codec) @@ -373,6 +417,12 @@ func (m *KeyValuePair) Size() (n int) { if l > 0 { n += 1 + l + sovKv(uint64(l)) } + if m.Deleted { + n += 2 + } + if m.UpdateTimeMillis != 0 { + n += 1 + sovKv(uint64(m.UpdateTimeMillis)) + } return n } @@ -405,6 +455,8 @@ func (this *KeyValuePair) String() string { `Key:` + fmt.Sprintf("%v", this.Key) + `,`, `Value:` + fmt.Sprintf("%v", this.Value) + `,`, `Codec:` + fmt.Sprintf("%v", this.Codec) + `,`, + `Deleted:` + fmt.Sprintf("%v", this.Deleted) + `,`, + `UpdateTimeMillis:` + fmt.Sprintf("%v", this.UpdateTimeMillis) + `,`, `}`, }, "") return s @@ -631,6 +683,45 @@ func (m *KeyValuePair) Unmarshal(dAtA []byte) error { } m.Codec = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Deleted", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Deleted = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UpdateTimeMillis", wireType) + } + m.UpdateTimeMillis = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.UpdateTimeMillis |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipKv(dAtA[iNdEx:]) diff --git a/kv/memberlist/kv.proto b/kv/memberlist/kv.proto index cc5f12463..b2e513b07 100644 --- a/kv/memberlist/kv.proto +++ b/kv/memberlist/kv.proto @@ -19,4 +19,9 @@ message KeyValuePair { // ID of the codec used to write the value string codec = 3; + + // Is this Key marked for deletion? + bool deleted = 4; + // When was the key last updated? + int64 update_time_millis = 5; } diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 1d96363fe..b4d1e3040 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -72,8 +72,14 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) { } // Delete is part of kv.Client interface. -func (c *Client) Delete(_ context.Context, _ string) error { - return errors.New("memberlist does not support Delete") +func (c *Client) Delete(ctx context.Context, key string) error { + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return err + } + + c.kv.Delete(key) + return nil } // CAS is part of kv.Client interface @@ -154,7 +160,8 @@ type KVConfig struct { RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"` // Remove LEFT ingesters from ring after this timeout. - LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` + LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` + ObsoleteEntriesTimeout time.Duration `yaml:"obsolete_entries_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` @@ -324,6 +331,9 @@ type ValueDesc struct { // ID of codec used to write this value. Only used when sending full state. CodecID string + + Deleted bool + UpdateTime time.Time } func (v ValueDesc) Clone() (result ValueDesc) { @@ -338,6 +348,8 @@ type valueUpdate struct { value []byte codec codec.Codec messageSize int + deleted bool + updateTime time.Time } func (v ValueDesc) String() string { @@ -494,6 +506,9 @@ func (m *KV) running(ctx context.Context) error { tickerChan = t.C } + obsoleteEntriesTicker := time.NewTicker(m.cfg.PushPullInterval) + defer obsoleteEntriesTicker.Stop() + logger := log.With(m.logger, "phase", "periodic_rejoin") for { select { @@ -507,6 +522,11 @@ func (m *KV) running(ctx context.Context) error { level.Warn(logger).Log("msg", "re-joining memberlist cluster failed", "err", err, "next_try_in", m.cfg.RejoinInterval) } + case <-obsoleteEntriesTicker.C: + // cleanupObsoleteEntries is normally called during push/pull, but if there are no other + // nodes to push/pull with, we can call it periodically to make sure we remove unused entries from memory. + m.cleanupObsoleteEntries() + case <-ctx.Done(): return nil } @@ -939,6 +959,20 @@ func (m *KV) notifyWatchers(key string) { } } +func (m *KV) Delete(key string) { + m.storeMu.Lock() + defer m.storeMu.Unlock() + + val, ok := m.store[key] + if !ok || val.Deleted { + return + } + + val.Deleted = true + val.UpdateTime = time.Now() + m.store[key] = val +} + // CAS implements Compare-And-Set/Swap operation. // // CAS expects that value returned by 'f' function implements Mergeable interface. If it doesn't, CAS fails immediately. @@ -969,7 +1003,7 @@ outer: } } - change, newver, retry, err := m.trySingleCas(key, codec, f) + change, newver, retry, updateTime, err := m.trySingleCas(key, codec, f) if err != nil { level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry) @@ -984,7 +1018,7 @@ outer: m.casSuccesses.Inc() m.notifyWatchers(key) - m.broadcastNewValue(key, change, newver, codec, true) + m.broadcastNewValue(key, change, newver, codec, true, false, updateTime) } return nil @@ -1001,50 +1035,51 @@ outer: // returns change, error (or nil, if CAS succeeded), and whether to retry or not. // returns errNoChangeDetected if merge failed to detect change in f's output. -func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, error) { +func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, time.Time, error) { val, ver, err := m.get(key, codec) if err != nil { - return nil, 0, false, fmt.Errorf("failed to get value: %v", err) + return nil, 0, false, time.Time{}, fmt.Errorf("failed to get value: %v", err) } out, retry, err := f(val) if err != nil { - return nil, 0, retry, fmt.Errorf("fn returned error: %v", err) + return nil, 0, retry, time.Time{}, fmt.Errorf("fn returned error: %v", err) } if out == nil { // no change to be done - return nil, 0, false, nil + return nil, 0, false, time.Time{}, nil } // Don't even try incomingValue, ok := out.(Mergeable) if !ok || incomingValue == nil { - return nil, 0, retry, fmt.Errorf("invalid type: %T, expected Mergeable", out) + return nil, 0, retry, time.Time{}, fmt.Errorf("invalid type: %T, expected Mergeable", out) } // To support detection of removed items from value, we will only allow CAS operation to // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'. // Supplied function may have kept a reference to the returned "incoming value". // If KV store will keep this value as well, it needs to make a clone. - change, newver, err := m.mergeValueForKey(key, incomingValue, true, ver, codec) + ut := time.Now() + change, newver, err := m.mergeValueForKey(key, incomingValue, true, ver, codec, false, ut) if err == errVersionMismatch { - return nil, 0, retry, err + return nil, 0, retry, time.Time{}, err } if err != nil { - return nil, 0, retry, fmt.Errorf("merge failed: %v", err) + return nil, 0, retry, time.Time{}, fmt.Errorf("merge failed: %v", err) } if newver == 0 { // CAS method reacts on this error - return nil, 0, retry, errNoChangeDetected + return nil, 0, retry, time.Time{}, errNoChangeDetected } - return change, newver, retry, nil + return change, newver, retry, ut, nil } -func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool) { +func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time) { if locallyGenerated && m.State() != services.Running { level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key) return @@ -1057,7 +1092,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } - kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID()} + kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID(), Deleted: deleted, UpdateTimeMillis: updateTimeMillis(updateTime)} pairData, err := kvPair.Marshal() if err != nil { level.Error(m.logger).Log("msg", "failed to serialize KV pair", "key", key, "version", version, "err", err) @@ -1134,7 +1169,7 @@ func (m *KV) NotifyMsg(msg []byte) { ch := m.getKeyWorkerChannel(kvPair.Key) select { - case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg)}: + case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg), deleted: kvPair.Deleted, updateTime: updateTime(kvPair.UpdateTimeMillis)}: default: m.numberOfDroppedMessages.Inc() level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key) @@ -1161,7 +1196,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { select { case update := <-workerCh: // we have a value update! Let's merge it with our current version for given key - mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec) + mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime) changes := []string(nil) if mod != nil { @@ -1186,7 +1221,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { m.notifyWatchers(key) // Don't resend original message, but only changes. - m.broadcastNewValue(key, mod, version, update.codec, false) + m.broadcastNewValue(key, mod, version, update.codec, false, update.deleted, update.updateTime) } case <-m.shutdown: @@ -1229,6 +1264,8 @@ func (m *KV) LocalState(_ bool) []byte { m.numberOfPulls.Inc() + m.cleanupObsoleteEntries() + m.storeMu.Lock() defer m.storeMu.Unlock() @@ -1260,6 +1297,8 @@ func (m *KV) LocalState(_ bool) []byte { kvPair.Key = key kvPair.Value = encoded kvPair.Codec = val.CodecID + kvPair.Deleted = val.Deleted + kvPair.UpdateTimeMillis = updateTimeMillis(val.UpdateTime) ser, err := kvPair.Marshal() if err != nil { @@ -1341,8 +1380,13 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { continue } + updateTime := updateTime(kvPair.UpdateTimeMillis) + if updateTime.IsZero() { + updateTime = time.Now() + } + // we have both key and value, try to merge it with our state - change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) + change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, updateTime) changes := []string(nil) if change != nil { @@ -1361,7 +1405,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) } else if newver > 0 { m.notifyWatchers(kvPair.Key) - m.broadcastNewValue(kvPair.Key, change, newver, codec, false) + m.broadcastNewValue(kvPair.Key, change, newver, codec, false, kvPair.Deleted, updateTime) } } @@ -1370,7 +1414,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { } } -func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec) (Mergeable, uint, error) { +func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { decodedValue, err := codec.Decode(incomingData) if err != nil { return nil, 0, fmt.Errorf("failed to decode value: %v", err) @@ -1382,14 +1426,14 @@ func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec. } // No need to clone this "incomingValue", since we have just decoded it from bytes, and won't be using it. - return m.mergeValueForKey(key, incomingValue, false, 0, codec) + return m.mergeValueForKey(key, incomingValue, false, 0, codec, deleted, updateTime) } // Merges incoming value with value we have in our store. Returns "a change" that can be sent to other // cluster members to update their state, and new version of the value. // If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. // If no modification occurred, new version is 0. -func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codec codec.Codec) (Mergeable, uint, error) { +func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { m.storeMu.Lock() defer m.storeMu.Unlock() @@ -1430,10 +1474,19 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue } newVersion := curr.Version + 1 + newUpdateTime := curr.UpdateTime + newDeleted := curr.Deleted + + if !updateTime.IsZero() && updateTime.After(newUpdateTime) { + newUpdateTime = updateTime + newDeleted = deleted + } m.store[key] = ValueDesc{ - value: result, - Version: newVersion, - CodecID: codec.CodecID(), + value: result, + Version: newVersion, + CodecID: codec.CodecID(), + Deleted: newDeleted, + UpdateTime: newUpdateTime, } // The "changes" returned by Merge() can contain references to the "result" @@ -1518,6 +1571,17 @@ func (m *KV) deleteSentReceivedMessages() { m.receivedMessagesSize = 0 } +func (m *KV) cleanupObsoleteEntries() { + m.storeMu.Lock() + defer m.storeMu.Lock() + + for k, v := range m.store { + if v.Deleted && time.Since(v.UpdateTime) > m.cfg.ObsoleteEntriesTimeout { + delete(m.store, k) + } + } +} + func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Message, int) { msgs = append(msgs, msg) size += msg.Size @@ -1529,3 +1593,17 @@ func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Mes return msgs, size } + +func updateTime(val int64) time.Time { + if val == 0 { + return time.Time{} + } + return time.UnixMilli(val) +} + +func updateTimeMillis(ts time.Time) int64 { + if ts.IsZero() { + return 0 + } + return ts.UnixMilli() +} diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 0d14c5f76..76c469f96 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1669,11 +1669,11 @@ func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) { require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) // Check that locally-generated broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue. - kv.broadcastNewValue("non-local", smallUpdate, 1, codec, false) - kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false) - kv.broadcastNewValue("local", smallUpdate, 1, codec, true) - kv.broadcastNewValue("local", bigUpdate, 2, codec, true) - kv.broadcastNewValue("local", mediumUpdate, 3, codec, true) + kv.broadcastNewValue("non-local", smallUpdate, 1, codec, false, time.Now()) + kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false, time.Now()) + kv.broadcastNewValue("local", smallUpdate, 1, codec, true, time.Now()) + kv.broadcastNewValue("local", bigUpdate, 2, codec, true, time.Now()) + kv.broadcastNewValue("local", mediumUpdate, 3, codec, true, time.Now()) err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP memberlist_client_messages_in_broadcast_queue Number of user messages in the broadcast queue diff --git a/kv/memberlist/status.gohtml b/kv/memberlist/status.gohtml index 6f845b6e0..becf3652c 100644 --- a/kv/memberlist/status.gohtml +++ b/kv/memberlist/status.gohtml @@ -22,6 +22,8 @@ Key Codec Version + Deleted + Update Time Actions @@ -32,6 +34,8 @@ {{ $k }} {{ $v.CodecID }} {{ $v.Version }} + {{ $v.Deleted }} + {{ $v.UpdateTime }} json | json-pretty @@ -149,4 +153,4 @@

Message history buffer is disabled, refer to the configuration to enable it in order to troubleshoot the message history.

{{ end }} - \ No newline at end of file + From caa0691b8c52e9e60772fc0078c6394b14f10c7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 9 Oct 2024 15:57:23 +0200 Subject: [PATCH 2/3] Initial support for deleting keys in memberlist KV store. --- kv/memberlist/memberlist_client.go | 104 +++++++++++++++-------------- 1 file changed, 54 insertions(+), 50 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index b4d1e3040..c78ed6b16 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -959,18 +959,23 @@ func (m *KV) notifyWatchers(key string) { } } -func (m *KV) Delete(key string) { +func (m *KV) Delete(key string) error { m.storeMu.Lock() defer m.storeMu.Unlock() val, ok := m.store[key] - if !ok || val.Deleted { - return + if !ok { + return nil + } + + m, v, deleted, updated, err := m.mergeValueForKey(key, nil, false, 0, val.CodecID, true, time.Now()) + if err != nil { + return err } - val.Deleted = true - val.UpdateTime = time.Now() - m.store[key] = val + // We don't broadcast Delete immediately, and rely on Push/Pull sync instead. As long as there are no + // updates to this key in the cluster, that will work just fine, even if propagation is very slow. + // TODO: add broadcasting. } // CAS implements Compare-And-Set/Swap operation. @@ -1003,7 +1008,7 @@ outer: } } - change, newver, retry, updateTime, err := m.trySingleCas(key, codec, f) + change, newver, retry, deleted, updated, err := m.trySingleCas(key, codec, f) if err != nil { level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry) @@ -1018,13 +1023,13 @@ outer: m.casSuccesses.Inc() m.notifyWatchers(key) - m.broadcastNewValue(key, change, newver, codec, true, false, updateTime) + m.broadcastNewValue(key, change, newver, codec, true, deleted, updated) } return nil } - if lastError == errVersionMismatch { + if errors.Is(lastError, errVersionMismatch) { // this is more likely error than version mismatch. lastError = errTooManyRetries } @@ -1035,48 +1040,47 @@ outer: // returns change, error (or nil, if CAS succeeded), and whether to retry or not. // returns errNoChangeDetected if merge failed to detect change in f's output. -func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, time.Time, error) { +func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, bool, time.Time, error) { val, ver, err := m.get(key, codec) if err != nil { - return nil, 0, false, time.Time{}, fmt.Errorf("failed to get value: %v", err) + return nil, 0, false, false, time.Time{}, fmt.Errorf("failed to get value: %v", err) } out, retry, err := f(val) if err != nil { - return nil, 0, retry, time.Time{}, fmt.Errorf("fn returned error: %v", err) + return nil, 0, retry, false, time.Time{}, fmt.Errorf("fn returned error: %v", err) } if out == nil { // no change to be done - return nil, 0, false, time.Time{}, nil + return nil, 0, false, false, time.Time{}, nil } // Don't even try incomingValue, ok := out.(Mergeable) if !ok || incomingValue == nil { - return nil, 0, retry, time.Time{}, fmt.Errorf("invalid type: %T, expected Mergeable", out) + return nil, 0, retry, false, time.Time{}, fmt.Errorf("invalid type: %T, expected Mergeable", out) } // To support detection of removed items from value, we will only allow CAS operation to // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'. // Supplied function may have kept a reference to the returned "incoming value". // If KV store will keep this value as well, it needs to make a clone. - ut := time.Now() - change, newver, err := m.mergeValueForKey(key, incomingValue, true, ver, codec, false, ut) + change, newver, deleted, updated, err := m.mergeValueForKey(key, incomingValue, true, ver, codec.CodecID(), false, time.Now()) if err == errVersionMismatch { - return nil, 0, retry, time.Time{}, err + return nil, 0, retry, false, time.Time{}, err } if err != nil { - return nil, 0, retry, time.Time{}, fmt.Errorf("merge failed: %v", err) + return nil, 0, retry, false, time.Time{}, fmt.Errorf("merge failed: %v", err) } if newver == 0 { // CAS method reacts on this error - return nil, 0, retry, time.Time{}, errNoChangeDetected + return nil, 0, retry, deleted, updated, errNoChangeDetected } - return change, newver, retry, ut, nil + return change, newver, retry, deleted, updated, nil } func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time) { @@ -1196,7 +1200,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { select { case update := <-workerCh: // we have a value update! Let's merge it with our current version for given key - mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime) + mod, version, deleted, updated, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime) changes := []string(nil) if mod != nil { @@ -1220,8 +1224,8 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { } else if version > 0 { m.notifyWatchers(key) - // Don't resend original message, but only changes. - m.broadcastNewValue(key, mod, version, update.codec, false, update.deleted, update.updateTime) + // Don't resend original message, but only changes, if any. + m.broadcastNewValue(key, mod, version, update.codec, false, deleted, updated) } case <-m.shutdown: @@ -1380,13 +1384,8 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { continue } - updateTime := updateTime(kvPair.UpdateTimeMillis) - if updateTime.IsZero() { - updateTime = time.Now() - } - // we have both key and value, try to merge it with our state - change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, updateTime) + change, newver, deleted, updated, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, updateTime(kvPair.UpdateTimeMillis)) changes := []string(nil) if change != nil { @@ -1405,7 +1404,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) } else if newver > 0 { m.notifyWatchers(kvPair.Key) - m.broadcastNewValue(kvPair.Key, change, newver, codec, false, kvPair.Deleted, updateTime) + m.broadcastNewValue(kvPair.Key, change, newver, codec, false, deleted, updated) } } @@ -1414,26 +1413,26 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { } } -func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { +func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, bool, time.Time, error) { decodedValue, err := codec.Decode(incomingData) if err != nil { - return nil, 0, fmt.Errorf("failed to decode value: %v", err) + return nil, 0, false, time.Time{}, fmt.Errorf("failed to decode value: %v", err) } incomingValue, ok := decodedValue.(Mergeable) if !ok { - return nil, 0, fmt.Errorf("expected Mergeable, got: %T", decodedValue) + return nil, 0, false, time.Time{}, fmt.Errorf("expected Mergeable, got: %T", decodedValue) } // No need to clone this "incomingValue", since we have just decoded it from bytes, and won't be using it. - return m.mergeValueForKey(key, incomingValue, false, 0, codec, deleted, updateTime) + return m.mergeValueForKey(key, incomingValue, false, 0, codec.CodecID(), deleted, updateTime) } // Merges incoming value with value we have in our store. Returns "a change" that can be sent to other // cluster members to update their state, and new version of the value. // If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. // If no modification occurred, new version is 0. -func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { +func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time) (Mergeable, uint, bool, time.Time, error) { m.storeMu.Lock() defer m.storeMu.Unlock() @@ -1443,16 +1442,25 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue curr := m.store[key] // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. if casVersion > 0 && curr.Version != casVersion { - return nil, 0, errVersionMismatch + return nil, 0, false, time.Time{}, errVersionMismatch } result, change, err := computeNewValue(incomingValue, incomingValueRequiresClone, curr.value, casVersion > 0) if err != nil { - return nil, 0, err + return nil, 0, false, time.Time{}, err + } + + newVersion := curr.Version + 1 + newUpdateTime := curr.UpdateTime + newDeleted := curr.Deleted + + if !updateTime.IsZero() && updateTime.After(newUpdateTime) { + newUpdateTime = updateTime + newDeleted = deleted } // No change, don't store it. - if change == nil || len(change.MergeContent()) == 0 { - return nil, 0, nil + if (change == nil || len(change.MergeContent()) == 0) && curr.Deleted == newDeleted { + return nil, 0, curr.Deleted, curr.UpdateTime, nil } if m.cfg.LeftIngestersTimeout > 0 { @@ -1469,22 +1477,14 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue // RemoveTombstones twice with same limit should be noop. change.RemoveTombstones(limit) if len(change.MergeContent()) == 0 { - return nil, 0, nil + return nil, 0, curr.Deleted, curr.UpdateTime, nil } } - newVersion := curr.Version + 1 - newUpdateTime := curr.UpdateTime - newDeleted := curr.Deleted - - if !updateTime.IsZero() && updateTime.After(newUpdateTime) { - newUpdateTime = updateTime - newDeleted = deleted - } m.store[key] = ValueDesc{ value: result, Version: newVersion, - CodecID: codec.CodecID(), + CodecID: codecID, Deleted: newDeleted, UpdateTime: newUpdateTime, } @@ -1493,7 +1493,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue // state. Therefore, make sure we clone it before releasing the lock. change = change.Clone() - return change, newVersion, nil + return change, newVersion, newDeleted, newUpdateTime, nil } // returns [result, change, error] @@ -1510,6 +1510,10 @@ func computeNewValue(incoming Mergeable, incomingValueRequiresClone bool, oldVal return incoming, incoming, nil } + if incoming == nil { + return oldVal, nil, nil + } + // otherwise we have two mergeables, so merge them change, err := oldVal.Merge(incoming, cas) return oldVal, change, err From 006b0247bacc25dde7c22a5b3df5ebe67eb679b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 9 Oct 2024 16:29:40 +0200 Subject: [PATCH 3/3] Broadcast Delete change. --- kv/memberlist/memberlist_client.go | 33 ++++++++++++++++++------------ 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index c78ed6b16..83c5a1272 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -964,18 +964,25 @@ func (m *KV) Delete(key string) error { defer m.storeMu.Unlock() val, ok := m.store[key] - if !ok { + if !ok || val.Deleted { return nil } - m, v, deleted, updated, err := m.mergeValueForKey(key, nil, false, 0, val.CodecID, true, time.Now()) + c := m.GetCodec(val.CodecID) + if c == nil { + return fmt.Errorf("invalid codec: %s", val.CodecID) + } + + change, newver, deleted, updated, err := m.mergeValueForKey(key, nil, false, 0, val.CodecID, true, time.Now()) if err != nil { return err } - // We don't broadcast Delete immediately, and rely on Push/Pull sync instead. As long as there are no - // updates to this key in the cluster, that will work just fine, even if propagation is very slow. - // TODO: add broadcasting. + if newver > 0 { + m.notifyWatchers(key) + m.broadcastNewValue(key, change, newver, c, false, deleted, updated) + } + return nil } // CAS implements Compare-And-Set/Swap operation. @@ -1432,7 +1439,7 @@ func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec. // cluster members to update their state, and new version of the value. // If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. // If no modification occurred, new version is 0. -func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time) (Mergeable, uint, bool, time.Time, error) { +func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time) (change Mergeable, newVersion uint, newDeleted bool, newUpdated time.Time, err error) { m.storeMu.Lock() defer m.storeMu.Unlock() @@ -1449,12 +1456,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue return nil, 0, false, time.Time{}, err } - newVersion := curr.Version + 1 - newUpdateTime := curr.UpdateTime - newDeleted := curr.Deleted + newVersion = curr.Version + 1 + newUpdated = curr.UpdateTime + newDeleted = curr.Deleted - if !updateTime.IsZero() && updateTime.After(newUpdateTime) { - newUpdateTime = updateTime + if !updateTime.IsZero() && updateTime.After(newUpdated) { + newUpdated = updateTime newDeleted = deleted } @@ -1486,14 +1493,14 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue Version: newVersion, CodecID: codecID, Deleted: newDeleted, - UpdateTime: newUpdateTime, + UpdateTime: newUpdated, } // The "changes" returned by Merge() can contain references to the "result" // state. Therefore, make sure we clone it before releasing the lock. change = change.Clone() - return change, newVersion, newDeleted, newUpdateTime, nil + return change, newVersion, newDeleted, newUpdated, nil } // returns [result, change, error]