Skip to content

Commit

Permalink
Backport TestLessorRenewExtendPileup race condition fix for release-3.4
Browse files Browse the repository at this point in the history
Signed-off-by: Lucas Rodriguez <[email protected]>
  • Loading branch information
lucasrod16 committed Sep 9, 2024
1 parent 728e3c2 commit 0621622
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
20 changes: 15 additions & 5 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ var (

leaseBucketName = []byte("lease")

// maximum number of leases to revoke per second; configurable for tests
leaseRevokeRate = 1000
// default number of leases to revoke per second; configurable for tests
defaultLeaseRevokeRate = 1000

// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
leaseCheckpointRate = 1000
Expand Down Expand Up @@ -173,6 +173,9 @@ type lessor struct {
// requests for shorter TTLs are extended to the minimum TTL.
minLeaseTTL int64

// maximum number of leases to revoke per second
leaseRevokeRate int

expiredC chan []*Lease
// stopC is a channel whose closure indicates that the lessor should be stopped.
stopC chan struct{}
Expand Down Expand Up @@ -201,6 +204,8 @@ type LessorConfig struct {
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool

leaseRevokeRate int
}

func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
Expand All @@ -210,19 +215,24 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
leaseRevokeRate := cfg.leaseRevokeRate
if checkpointInterval == 0 {
checkpointInterval = defaultLeaseCheckpointInterval
}
if expiredLeaseRetryInterval == 0 {
expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
}
if leaseRevokeRate == 0 {
leaseRevokeRate = defaultLeaseRevokeRate
}
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseExpiredNotifier: newLeaseExpiredNotifier(),
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
leaseRevokeRate: leaseRevokeRate,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
checkpointPersist: cfg.CheckpointPersist,
Expand Down Expand Up @@ -475,7 +485,7 @@ func (le *lessor) Promote(extend time.Duration) {
le.scheduleCheckpointIfNeeded(l)
}

if len(le.leaseMap) < leaseRevokeRate {
if len(le.leaseMap) < le.leaseRevokeRate {
// no possibility of lease pile-up
return
}
Expand All @@ -489,7 +499,7 @@ func (le *lessor) Promote(extend time.Duration) {
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
targetExpiresPerSecond := (3 * le.leaseRevokeRate) / 4
for _, l := range leases {
remaining := l.Remaining()
if remaining > nextWindow {
Expand Down Expand Up @@ -628,7 +638,7 @@ func (le *lessor) revokeExpiredLeases() {
var ls []*Lease

// rate limit
revokeLimit := leaseRevokeRate / 2
revokeLimit := le.leaseRevokeRate / 2

le.mu.RLock()
if le.isPrimary() {
Expand Down
18 changes: 8 additions & 10 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,17 +289,15 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
// expire at the same time.
func TestLessorRenewExtendPileup(t *testing.T) {
oldRevokeRate := leaseRevokeRate
defer func() { leaseRevokeRate = oldRevokeRate }()
leaseRevokeRate := 10
lg := zap.NewNop()
leaseRevokeRate = 10

dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)

le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate})
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
for i := 1; i <= le.leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
t.Fatal(err)
}
Expand All @@ -316,7 +314,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate})
defer le.Stop()

// extend after recovery should extend expiration on lease pile-up
Expand All @@ -331,11 +329,11 @@ func TestLessorRenewExtendPileup(t *testing.T) {

for i := ttl; i < ttl+20; i++ {
c := windowCounts[i]
if c > leaseRevokeRate {
t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c)
if c > le.leaseRevokeRate {
t.Errorf("expected at most %d expiring at %ds, got %d", le.leaseRevokeRate, i, c)
}
if c < leaseRevokeRate/2 {
t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c)
if c < le.leaseRevokeRate/2 {
t.Errorf("expected at least %d expiring at %ds, got %d", le.leaseRevokeRate/2, i, c)
}
}
}
Expand Down

0 comments on commit 0621622

Please sign in to comment.