From 64a19e4b6ea82b5a847486f7ca12f96266fe0a29 Mon Sep 17 00:00:00 2001 From: Lucas Rodriguez Date: Mon, 9 Sep 2024 12:50:35 -0500 Subject: [PATCH] Backport TestLessorRenewExtendPileup race condition fix for release-3.5 Signed-off-by: Lucas Rodriguez --- server/lease/lessor.go | 20 +++++++++++++++----- server/lease/lessor_test.go | 18 ++++++++---------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/server/lease/lessor.go b/server/lease/lessor.go index e0c2273dfc0..c21a3e2be88 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -44,8 +44,8 @@ var v3_6 = semver.Version{Major: 3, Minor: 6} var ( forever = time.Time{} - // maximum number of leases to revoke per second; configurable for tests - leaseRevokeRate = 1000 + // default number of leases to revoke per second; configurable for tests + defaultLeaseRevokeRate = 1000 // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests leaseCheckpointRate = 1000 @@ -172,6 +172,9 @@ type lessor struct { // requests for shorter TTLs are extended to the minimum TTL. minLeaseTTL int64 + // maximum number of leases to revoke per second + leaseRevokeRate int + expiredC chan []*Lease // stopC is a channel whose closure indicates that the lessor should be stopped. stopC chan struct{} @@ -200,6 +203,8 @@ type LessorConfig struct { CheckpointInterval time.Duration ExpiredLeasesRetryInterval time.Duration CheckpointPersist bool + + leaseRevokeRate int } func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor { @@ -209,12 +214,16 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval + leaseRevokeRate := cfg.leaseRevokeRate if checkpointInterval == 0 { checkpointInterval = defaultLeaseCheckpointInterval } if expiredLeaseRetryInterval == 0 { expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval } + if leaseRevokeRate == 0 { + leaseRevokeRate = defaultLeaseRevokeRate + } l := &lessor{ leaseMap: make(map[LeaseID]*Lease), itemMap: make(map[LeaseItem]LeaseID), @@ -222,6 +231,7 @@ func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon leaseCheckpointHeap: make(LeaseQueue, 0), b: b, minLeaseTTL: cfg.MinLeaseTTL, + leaseRevokeRate: leaseRevokeRate, checkpointInterval: checkpointInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval, checkpointPersist: cfg.CheckpointPersist, @@ -474,7 +484,7 @@ func (le *lessor) Promote(extend time.Duration) { le.scheduleCheckpointIfNeeded(l) } - if len(le.leaseMap) < leaseRevokeRate { + if len(le.leaseMap) < le.leaseRevokeRate { // no possibility of lease pile-up return } @@ -488,7 +498,7 @@ func (le *lessor) Promote(extend time.Duration) { expires := 0 // have fewer expires than the total revoke rate so piled up leases // don't consume the entire revoke limit - targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 + targetExpiresPerSecond := (3 * le.leaseRevokeRate) / 4 for _, l := range leases { remaining := l.Remaining() if remaining > nextWindow { @@ -627,7 +637,7 @@ func (le *lessor) revokeExpiredLeases() { var ls []*Lease // rate limit - revokeLimit := leaseRevokeRate / 2 + revokeLimit := le.leaseRevokeRate / 2 le.mu.RLock() if le.isPrimary() { diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 06c2a8a9664..46e20757f23 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -291,17 +291,15 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { // TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many // expire at the same time. func TestLessorRenewExtendPileup(t *testing.T) { - oldRevokeRate := leaseRevokeRate - defer func() { leaseRevokeRate = oldRevokeRate }() + leaseRevokeRate := 10 lg := zap.NewNop() - leaseRevokeRate = 10 dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate}) ttl := int64(10) - for i := 1; i <= leaseRevokeRate*10; i++ { + for i := 1; i <= le.leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { t.Fatal(err) } @@ -318,7 +316,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) + le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -333,11 +331,11 @@ func TestLessorRenewExtendPileup(t *testing.T) { for i := ttl; i < ttl+20; i++ { c := windowCounts[i] - if c > leaseRevokeRate { - t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c) + if c > le.leaseRevokeRate { + t.Errorf("expected at most %d expiring at %ds, got %d", le.leaseRevokeRate, i, c) } - if c < leaseRevokeRate/2 { - t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c) + if c < le.leaseRevokeRate/2 { + t.Errorf("expected at least %d expiring at %ds, got %d", le.leaseRevokeRate/2, i, c) } } }