diff --git a/pkg/kv/cosmosdb/store.go b/pkg/kv/cosmosdb/store.go index 881534d29ce..07a8489e27d 100644 --- a/pkg/kv/cosmosdb/store.go +++ b/pkg/kv/cosmosdb/store.go @@ -419,6 +419,7 @@ func (e *EntriesIterator) Next() bool { if !e.queryPager.More() { return false } + if e.batchSize != dynamicPageSize { if err := e.handleBatchSizeChange(); err != nil { e.err = convertError(err) @@ -431,13 +432,14 @@ func (e *EntriesIterator) Next() bool { e.err = fmt.Errorf("getting next page: %w", convertError(err)) return false } - if len(e.currPage.Items) == 0 { - // returned page is empty, no more items - return false - } - e.currPageSeekedKey = nil - e.currEntryIdx = -1 } + + if len(e.currPage.Items) == 0 { + // returned page is empty, no more items + return false + } + e.currPageSeekedKey = nil + e.currEntryIdx = -1 } e.currEntryIdx++ key, value := e.getKeyValue(e.currEntryIdx) diff --git a/pkg/kv/kvtest/store.go b/pkg/kv/kvtest/store.go index 5bd5558f2d2..12a9f701a3d 100644 --- a/pkg/kv/kvtest/store.go +++ b/pkg/kv/kvtest/store.go @@ -454,6 +454,44 @@ func testStoreScan(t *testing.T, ms MakeStore) { testCompareEntries(t, entries, sampleData[:len(sampleData)-1]) } }) + + t.Run("empty_batch", func(t *testing.T) { + // this is meant to test cosmosdb's "Iterator.Next()" behavior when it gets to an empty page, + // and since after the first batch it checks if "queryPager.More()", + // we "Scan" all but the last in order to have more items remaining to scan, + // and delete the last one, so that the next batch will be empty. + scan, err := store.Scan(ctx, []byte(testPartitionKey), kv.ScanOptions{KeyStart: samplePrefix, BatchSize: sampleItems - 1}) + if err != nil { + t.Fatal("failed to scan", err) + } + defer scan.Close() + require.NoError(t, store.Delete(ctx, []byte(testPartitionKey), sampleData[len(sampleData)-1].Key), "failed to delete last key") + + var entries []kv.Entry + for scan.Next() { + ent := scan.Entry() + switch { + case ent == nil: + t.Fatal("scan got nil entry") + case ent.Key == nil: + t.Fatal("Key is nil while scan item", len(entries)) + case ent.Value == nil: + t.Fatal("Value is nil while scan item", len(entries)) + } + if !bytes.HasPrefix(ent.Key, samplePrefix) { + break + } + entries = append(entries, *ent) + } + require.NoError(t, scan.Err(), "scan ended with an error") + + // you can either get the first 99 or the whole 100 since delete happened after the scan started + if len(entries) == len(sampleData) { + testCompareEntries(t, entries, sampleData) + } else { + testCompareEntries(t, entries, sampleData[:len(sampleData)-1]) + } + }) } func testStoreMissingArgument(t *testing.T, ms MakeStore) {