Skip to content

Commit

Permalink
Handle empty page when CosmosDB Iterator uses dynamic batch size (#8066)
Browse files Browse the repository at this point in the history
* Handle empty page when CosmosDB Iterator uses dynamic batch size

* Add Empty Batch test
  • Loading branch information
itaigilo authored Aug 8, 2024
1 parent 65e9086 commit 1a09ab9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
14 changes: 8 additions & 6 deletions pkg/kv/cosmosdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func (e *EntriesIterator) Next() bool {
if !e.queryPager.More() {
return false
}

if e.batchSize != dynamicPageSize {
if err := e.handleBatchSizeChange(); err != nil {
e.err = convertError(err)
Expand All @@ -431,13 +432,14 @@ func (e *EntriesIterator) Next() bool {
e.err = fmt.Errorf("getting next page: %w", convertError(err))
return false
}
if len(e.currPage.Items) == 0 {
// returned page is empty, no more items
return false
}
e.currPageSeekedKey = nil
e.currEntryIdx = -1
}

if len(e.currPage.Items) == 0 {
// returned page is empty, no more items
return false
}
e.currPageSeekedKey = nil
e.currEntryIdx = -1
}
e.currEntryIdx++
key, value := e.getKeyValue(e.currEntryIdx)
Expand Down
38 changes: 38 additions & 0 deletions pkg/kv/kvtest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,44 @@ func testStoreScan(t *testing.T, ms MakeStore) {
testCompareEntries(t, entries, sampleData[:len(sampleData)-1])
}
})

t.Run("empty_batch", func(t *testing.T) {
// this is meant to test cosmosdb's "Iterator.Next()" behavior when it gets to an empty page,
// and since after the first batch it checks if "queryPager.More()",
// we "Scan" all but the last in order to have more items remaining to scan,
// and delete the last one, so that the next batch will be empty.
scan, err := store.Scan(ctx, []byte(testPartitionKey), kv.ScanOptions{KeyStart: samplePrefix, BatchSize: sampleItems - 1})
if err != nil {
t.Fatal("failed to scan", err)
}
defer scan.Close()
require.NoError(t, store.Delete(ctx, []byte(testPartitionKey), sampleData[len(sampleData)-1].Key), "failed to delete last key")

var entries []kv.Entry
for scan.Next() {
ent := scan.Entry()
switch {
case ent == nil:
t.Fatal("scan got nil entry")
case ent.Key == nil:
t.Fatal("Key is nil while scan item", len(entries))
case ent.Value == nil:
t.Fatal("Value is nil while scan item", len(entries))
}
if !bytes.HasPrefix(ent.Key, samplePrefix) {
break
}
entries = append(entries, *ent)
}
require.NoError(t, scan.Err(), "scan ended with an error")

// you can either get the first 99 or the whole 100 since delete happened after the scan started
if len(entries) == len(sampleData) {
testCompareEntries(t, entries, sampleData)
} else {
testCompareEntries(t, entries, sampleData[:len(sampleData)-1])
}
})
}

func testStoreMissingArgument(t *testing.T, ms MakeStore) {
Expand Down

0 comments on commit 1a09ab9

Please sign in to comment.