From b8cb654be67b06b59c8e9930bdabdaa3dc5f0733 Mon Sep 17 00:00:00 2001 From: Joshua Zhang Date: Thu, 7 Mar 2024 08:58:01 +0000 Subject: [PATCH] Use strict synchronization for revision getter to minimize flaky result caused by time racing. Signed-off-by: Joshua Zhang Addressed review comments Co-authored-by: Abhishek Kr Srivastav Signed-off-by: Abhishek Kr Srivastav --- client/pkg/testutil/recorder.go | 5 ++- .../api/v3compactor/periodic_test.go | 38 +++++++++++-------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/client/pkg/testutil/recorder.go b/client/pkg/testutil/recorder.go index cc99914f609..0863593ef7a 100644 --- a/client/pkg/testutil/recorder.go +++ b/client/pkg/testutil/recorder.go @@ -115,7 +115,10 @@ func (r *recorderStream) Chan() <-chan Action { func (r *recorderStream) Wait(n int) ([]Action, error) { acts := make([]Action, n) - timeoutC := time.After(r.waitTimeout) + var timeoutC <-chan time.Time + if r.waitTimeout != 0 { + timeoutC = time.After(r.waitTimeout) + } for i := 0; i < n; i++ { select { case acts[i] = <-r.ch: diff --git a/server/etcdserver/api/v3compactor/periodic_test.go b/server/etcdserver/api/v3compactor/periodic_test.go index 5053482a807..c3a3cb60688 100644 --- a/server/etcdserver/api/v3compactor/periodic_test.go +++ b/server/etcdserver/api/v3compactor/periodic_test.go @@ -33,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) { fc := clockwork.NewFakeClock() // TODO: Do not depand or real time (Recorder.Wait) in unit tests. - rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} + rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) @@ -43,8 +43,8 @@ func TestPeriodicHourly(t *testing.T) { initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 // compaction doesn't happen til 2 hours elapse - for i := 0; i < initialIntervals; i++ { - rg.Wait(1) + for i := 0; i < initialIntervals-1; i++ { + waitOneAction(t, rg) fc.Advance(tb.getRetryInterval()) } @@ -63,7 +63,7 @@ func TestPeriodicHourly(t *testing.T) { for i := 0; i < 3; i++ { // advance one hour, one revision for each interval for j := 0; j < intervalsPerPeriod; j++ { - rg.Wait(1) + waitOneAction(t, rg) fc.Advance(tb.getRetryInterval()) } @@ -84,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) { retentionDuration := time.Duration(retentionMinutes) * time.Minute fc := clockwork.NewFakeClock() - rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} + rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) @@ -94,8 +94,8 @@ func TestPeriodicMinutes(t *testing.T) { initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 // compaction doesn't happen til 5 minutes elapse - for i := 0; i < initialIntervals; i++ { - rg.Wait(1) + for i := 0; i < initialIntervals-1; i++ { + waitOneAction(t, rg) fc.Advance(tb.getRetryInterval()) } @@ -113,7 +113,7 @@ func TestPeriodicMinutes(t *testing.T) { for i := 0; i < 5; i++ { // advance 5-minute, one revision for each interval for j := 0; j < intervalsPerPeriod; j++ { - rg.Wait(1) + waitOneAction(t, rg) fc.Advance(tb.getRetryInterval()) } @@ -132,7 +132,7 @@ func TestPeriodicMinutes(t *testing.T) { func TestPeriodicPause(t *testing.T) { fc := clockwork.NewFakeClock() retentionDuration := time.Hour - rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} + rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) @@ -143,7 +143,7 @@ func TestPeriodicPause(t *testing.T) { // tb will collect 3 hours of revisions but not compact since paused for i := 0; i < n*3; i++ { - rg.Wait(1) + waitOneAction(t, rg) fc.Advance(tb.getRetryInterval()) } // t.revs = [21 22 23 24 25 26 27 28 29 30] @@ -156,7 +156,7 @@ func TestPeriodicPause(t *testing.T) { // tb resumes to being blocked on the clock tb.Resume() - rg.Wait(1) + waitOneAction(t, rg) // unblock clock, will kick off a compaction at T=3h6m by retry fc.Advance(tb.getRetryInterval()) @@ -179,7 +179,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) { retentionDuration := time.Duration(retentionMinutes) * time.Minute fc := clockwork.NewFakeClock() - rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} + rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) @@ -189,10 +189,10 @@ func TestPeriodicSkipRevNotChange(t *testing.T) { initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 // first compaction happens til 5 minutes elapsed - for i := 0; i < initialIntervals; i++ { + for i := 0; i < initialIntervals-1; i++ { // every time set the same revision with 100 rg.SetRev(int64(100)) - rg.Wait(1) + waitOneAction(t, rg) fc.Advance(tb.getRetryInterval()) } @@ -212,7 +212,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) { for i := 0; i < 5; i++ { for j := 0; j < intervalsPerPeriod; j++ { rg.SetRev(int64(100)) - rg.Wait(1) + waitOneAction(t, rg) fc.Advance(tb.getRetryInterval()) } @@ -224,7 +224,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) { // when revision changed, compaction is normally for i := 0; i < initialIntervals; i++ { - rg.Wait(1) + waitOneAction(t, rg) fc.Advance(tb.getRetryInterval()) } @@ -238,3 +238,9 @@ func TestPeriodicSkipRevNotChange(t *testing.T) { t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) } } + +func waitOneAction(t *testing.T, r testutil.Recorder) { + if actions, _ := r.Wait(1); len(actions) != 1 { + t.Errorf("expect 1 action, got %v instead", len(actions)) + } +}