From 24ab39f768d64f6d67e773cca99ba30de0b908ae Mon Sep 17 00:00:00 2001 From: Lucas Rodriguez Date: Sun, 8 Sep 2024 20:53:51 -0500 Subject: [PATCH] Fix races in TestLessorRenewExtendPileup Signed-off-by: Lucas Rodriguez --- server/lease/lessor.go | 19 ++++++++++++++----- server/lease/lessor_test.go | 18 ++++++++---------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/server/lease/lessor.go b/server/lease/lessor.go index fc29b9b6663c..4b1b5b58094e 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -42,8 +42,8 @@ const MaxLeaseTTL = 9000000000 var ( forever = time.Time{} - // maximum number of leases to revoke per second; configurable for tests - leaseRevokeRate = 1000 + // default number of leases to revoke per second; configurable for tests + defaultLeaseRevokeRate = 1000 // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests leaseCheckpointRate = 1000 @@ -170,6 +170,9 @@ type lessor struct { // requests for shorter TTLs are extended to the minimum TTL. minLeaseTTL int64 + // maximum number of leases to revoke per second + leaseRevokeRate int + expiredC chan []*Lease // stopC is a channel whose closure indicates that the lessor should be stopped. stopC chan struct{} @@ -198,6 +201,7 @@ type LessorConfig struct { CheckpointInterval time.Duration ExpiredLeasesRetryInterval time.Duration CheckpointPersist bool + LeaseRevokeRate int } func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor { @@ -207,12 +211,16 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval + leaseRevokeRate := cfg.LeaseRevokeRate if checkpointInterval == 0 { checkpointInterval = defaultLeaseCheckpointInterval } if expiredLeaseRetryInterval == 0 { expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval } + if leaseRevokeRate == 0 { + leaseRevokeRate = defaultLeaseRevokeRate + } l := &lessor{ leaseMap: make(map[LeaseID]*Lease), itemMap: make(map[LeaseItem]LeaseID), @@ -220,6 +228,7 @@ func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon leaseCheckpointHeap: make(LeaseQueue, 0), b: b, minLeaseTTL: cfg.MinLeaseTTL, + leaseRevokeRate: leaseRevokeRate, checkpointInterval: checkpointInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval, checkpointPersist: cfg.CheckpointPersist, @@ -473,7 +482,7 @@ func (le *lessor) Promote(extend time.Duration) { le.scheduleCheckpointIfNeeded(l) } - if len(le.leaseMap) < leaseRevokeRate { + if len(le.leaseMap) < le.leaseRevokeRate { // no possibility of lease pile-up return } @@ -487,7 +496,7 @@ func (le *lessor) Promote(extend time.Duration) { expires := 0 // have fewer expires than the total revoke rate so piled up leases // don't consume the entire revoke limit - targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 + targetExpiresPerSecond := (3 * le.leaseRevokeRate) / 4 for _, l := range leases { remaining := l.Remaining() if remaining > nextWindow { @@ -623,7 +632,7 @@ func (le *lessor) revokeExpiredLeases() { var ls []*Lease // rate limit - revokeLimit := leaseRevokeRate / 2 + revokeLimit := le.leaseRevokeRate / 2 le.mu.RLock() if le.isPrimary() { diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 736909169dad..72a62b258c19 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -307,17 +307,15 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { // TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many // expire at the same time. func TestLessorRenewExtendPileup(t *testing.T) { - oldRevokeRate := leaseRevokeRate - defer func() { leaseRevokeRate = oldRevokeRate }() + leaseRevokeRate := 10 lg := zap.NewNop() - leaseRevokeRate = 10 dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL, LeaseRevokeRate: leaseRevokeRate}) ttl := int64(10) - for i := 1; i <= leaseRevokeRate*10; i++ { + for i := 1; i <= le.leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { t.Fatal(err) } @@ -334,7 +332,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) + le = newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL, LeaseRevokeRate: leaseRevokeRate}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -349,11 +347,11 @@ func TestLessorRenewExtendPileup(t *testing.T) { for i := ttl; i < ttl+20; i++ { c := windowCounts[i] - if c > leaseRevokeRate { - t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c) + if c > le.leaseRevokeRate { + t.Errorf("expected at most %d expiring at %ds, got %d", le.leaseRevokeRate, i, c) } - if c < leaseRevokeRate/2 { - t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c) + if c < le.leaseRevokeRate/2 { + t.Errorf("expected at least %d expiring at %ds, got %d", le.leaseRevokeRate/2, i, c) } } }