Skip to content

Commit

Permalink
Broadcast Delete change.
Browse files Browse the repository at this point in the history
  • Loading branch information
pstibrany authored and aldernero committed Oct 21, 2024
1 parent 18a3348 commit 5c59354
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,18 +1030,25 @@ func (m *KV) Delete(key string) error {
defer m.storeMu.Unlock()

val, ok := m.store[key]
if !ok {
if !ok || val.Deleted {
return nil
}

m, v, deleted, updated, err := m.mergeValueForKey(key, nil, false, 0, val.CodecID, true, time.Now())
c := m.GetCodec(val.CodecID)
if c == nil {
return fmt.Errorf("invalid codec: %s", val.CodecID)
}

change, newver, deleted, updated, err := m.mergeValueForKey(key, nil, false, 0, val.CodecID, true, time.Now())
if err != nil {
return err
}

// We don't broadcast Delete immediately, and rely on Push/Pull sync instead. As long as there are no
// updates to this key in the cluster, that will work just fine, even if propagation is very slow.
// TODO: add broadcasting.
if newver > 0 {
m.notifyWatchers(key)
m.broadcastNewValue(key, change, newver, c, false, deleted, updated)
}
return nil
}

// CAS implements Compare-And-Set/Swap operation.
Expand Down Expand Up @@ -1498,7 +1505,7 @@ func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.
// cluster members to update their state, and new version of the value.
// If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported.
// If no modification occurred, new version is 0.
func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time) (Mergeable, uint, bool, time.Time, error) {
func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time) (change Mergeable, newVersion uint, newDeleted bool, newUpdated time.Time, err error) {
m.storeMu.Lock()
defer m.storeMu.Unlock()

Expand All @@ -1515,12 +1522,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue
return nil, 0, false, time.Time{}, err
}

newVersion := curr.Version + 1
newUpdateTime := curr.UpdateTime
newDeleted := curr.Deleted
newVersion = curr.Version + 1
newUpdated = curr.UpdateTime
newDeleted = curr.Deleted

if !updateTime.IsZero() && updateTime.After(newUpdateTime) {
newUpdateTime = updateTime
if !updateTime.IsZero() && updateTime.After(newUpdated) {
newUpdated = updateTime
newDeleted = deleted
}

Expand Down Expand Up @@ -1552,14 +1559,14 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue
Version: newVersion,
CodecID: codecID,
Deleted: newDeleted,
UpdateTime: newUpdateTime,
UpdateTime: newUpdated,
}

// The "changes" returned by Merge() can contain references to the "result"
// state. Therefore, make sure we clone it before releasing the lock.
change = change.Clone()

return change, newVersion, newDeleted, newUpdateTime, nil
return change, newVersion, newDeleted, newUpdated, nil
}

// returns [result, change, error]
Expand Down

0 comments on commit 5c59354

Please sign in to comment.