Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-3049 Replace bsoncore.DocumentSequence with a bson type-agnostic analogue #1492

Merged
merged 11 commits into from
Jan 13, 2024
30 changes: 14 additions & 16 deletions internal/integration/mtest/sent_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type SentMessage struct {
// The documents sent for an insert, update, or delete command. This is separated into its own field because it's
// sent as part of the command document in OP_QUERY and as a document sequence outside the command document in
// OP_MSG.
DocumentSequence *bsoncore.DocumentSequence
Batch *bsoncore.Iterator
}

type sentMsgParseFn func([]byte) (*SentMessage, error)
Expand Down Expand Up @@ -87,26 +87,25 @@ func parseOpQuery(wm []byte) (*SentMessage, error) {

// For OP_QUERY, inserts, updates, and deletes are sent as a BSON array of documents inside the main command
// document. Pull these sequences out into an ArrayStyle DocumentSequence.
var docSequence *bsoncore.DocumentSequence
var batch *bsoncore.Iterator
cmdElems, _ := commandDoc.Elements()
for _, elem := range cmdElems {
switch elem.Key() {
case "documents", "updates", "deletes":
docSequence = &bsoncore.DocumentSequence{
Style: bsoncore.ArrayStyle,
Data: elem.Value().Array(),
batch = &bsoncore.Iterator{
List: elem.Value().Array(),
}
}
if docSequence != nil {
if batch != nil {
// There can only be one of these arrays in a well-formed command, so we exit the loop once one is found.
break
}
}

sm := &SentMessage{
Command: commandDoc,
ReadPreference: rpDoc,
DocumentSequence: docSequence,
Command: commandDoc,
ReadPreference: rpDoc,
Batch: batch,
}
return sm, nil
}
Expand Down Expand Up @@ -156,7 +155,7 @@ func parseSentOpMsg(wm []byte) (*SentMessage, error) {
rpDoc = rpVal.Document()
}

var docSequence *bsoncore.DocumentSequence
var batch *bsoncore.Iterator
if len(wm) != 0 {
// If there are bytes remaining in the wire message, they must correspond to a DocumentSequence section.
if wm, err = assertMsgSectionType(wm, wiremessage.DocumentSequence); err != nil {
Expand All @@ -169,16 +168,15 @@ func parseSentOpMsg(wm []byte) (*SentMessage, error) {
return nil, errors.New("failed to read document sequence")
}

docSequence = &bsoncore.DocumentSequence{
Style: bsoncore.SequenceStyle,
Data: data,
batch = &bsoncore.Iterator{
List: data,
}
}

sm := &SentMessage{
Command: commandDoc,
ReadPreference: rpDoc,
DocumentSequence: docSequence,
Command: commandDoc,
ReadPreference: rpDoc,
Batch: batch,
}
return sm, nil
}
Expand Down
2 changes: 1 addition & 1 deletion mongo/batch_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type batchCursor interface {

// Batch will return a DocumentSequence for the current batch of documents. The returned
// DocumentSequence is only valid until the next call to Next or Close.
Batch() *bsoncore.DocumentSequence
Batch() *bsoncore.Iterator

// Server returns a pointer to the cursor's server.
Server() driver.Server
Expand Down
47 changes: 31 additions & 16 deletions mongo/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Cursor struct {
Current bson.Raw

bc batchCursor
batch *bsoncore.DocumentSequence
batch *bsoncore.Iterator
batchLength int
bsonOpts *options.BSONOptions
registry *bsoncodec.Registry
Expand Down Expand Up @@ -72,9 +72,10 @@ func newCursorWithSession(
c.closeImplicitSession()
}

// Initialize just the batchLength here so RemainingBatchLength will return an accurate result. The actual batch
// will be pulled up by the first Next/TryNext call.
c.batchLength = c.bc.Batch().DocumentCount()
// Initialize just the batchLength here so RemainingBatchLength will return an
// accurate result. The actual batch will be pulled up by the first
// Next/TryNext call.
c.batchLength = c.bc.Batch().Count()
return c, nil
}

Expand All @@ -91,39 +92,53 @@ func NewCursorFromDocuments(documents []interface{}, err error, registry *bsonco
registry = bson.DefaultRegistry
}

// Convert documents slice to a sequence-style byte array.
buf := new(bytes.Buffer)
enc := new(bson.Encoder)
for _, doc := range documents {

values := make([]bsoncore.Value, len(documents))
for i, doc := range documents {
switch t := doc.(type) {
case nil:
return nil, ErrNilDocument
case []byte:
// Slight optimization so we'll just use MarshalBSON and not go through the codec machinery.
doc = bson.Raw(t)
}

vw, err := bsonrw.NewBSONValueWriter(buf)
if err != nil {
return nil, err
}

enc.Reset(vw)
enc.SetRegistry(registry)
err = enc.Encode(doc)
if err != nil {

if err = enc.Encode(doc); err != nil {
return nil, err
}

dup := make([]byte, len(buf.Bytes()))
copy(dup, buf.Bytes())

values[i] = bsoncore.Value{
Type: bson.TypeEmbeddedDocument,
Data: dup,
}

buf.Reset()
}

c := &Cursor{
bc: driver.NewBatchCursorFromDocuments(buf.Bytes()),
bc: driver.NewBatchCursorFromList(bsoncore.BuildArray(nil, values...)),
registry: registry,
err: err,
}

// Initialize batch and batchLength here. The underlying batch cursor will be preloaded with the
// provided contents, and thus already has a batch before calls to Next/TryNext.
c.batch = c.bc.Batch()
c.batchLength = c.bc.Batch().DocumentCount()
c.batchLength = c.bc.Batch().Count()

return c, nil
}

Expand Down Expand Up @@ -166,12 +181,12 @@ func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
if ctx == nil {
ctx = context.Background()
}
doc, err := c.batch.Next()
val, err := c.batch.Next()
switch {
case err == nil:
// Consume the next document in the current batch.
c.batchLength--
c.Current = bson.Raw(doc)
c.Current = bson.Raw(val.Data)
return true
case errors.Is(err, io.EOF): // Need to do a getMore
default:
Expand Down Expand Up @@ -209,12 +224,12 @@ func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {

// Use the new batch to update the batch and batchLength fields. Consume the first document in the batch.
c.batch = c.bc.Batch()
c.batchLength = c.batch.DocumentCount()
doc, err = c.batch.Next()
c.batchLength = c.batch.Count()
val, err = c.batch.Next()
switch {
case err == nil:
c.batchLength--
c.Current = bson.Raw(doc)
c.Current = bson.Raw(val.Data)
return true
case errors.Is(err, io.EOF): // Empty batch so we continue
default:
Expand Down Expand Up @@ -348,7 +363,7 @@ func (c *Cursor) RemainingBatchLength() int {

// addFromBatch adds all documents from batch to sliceVal starting at the given index. It returns the new slice value,
// the next empty index in the slice, and an error if one occurs.
func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.DocumentSequence,
func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.Iterator,
index int) (reflect.Value, int, error) {

docs, err := batch.Documents()
Expand Down
44 changes: 35 additions & 9 deletions mongo/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsontype"
"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/internal/require"
"go.mongodb.org/mongo-driver/mongo/options"
Expand All @@ -21,17 +22,17 @@ import (
)

type testBatchCursor struct {
batches []*bsoncore.DocumentSequence
batch *bsoncore.DocumentSequence
batches []*bsoncore.Iterator
batch *bsoncore.Iterator
closed bool
}

func newTestBatchCursor(numBatches, batchSize int) *testBatchCursor {
batches := make([]*bsoncore.DocumentSequence, 0, numBatches)
batches := make([]*bsoncore.Iterator, 0, numBatches)

counter := 0
for batch := 0; batch < numBatches; batch++ {
var docSequence []byte
var values []bsoncore.Value

for doc := 0; doc < batchSize; doc++ {
var elem []byte
Expand All @@ -40,12 +41,18 @@ func newTestBatchCursor(numBatches, batchSize int) *testBatchCursor {

var doc []byte
doc = bsoncore.BuildDocumentFromElements(doc, elem)
docSequence = append(docSequence, doc...)
val := bsoncore.Value{
Type: bsontype.EmbeddedDocument,
Data: doc,
}

values = append(values, val)
}

batches = append(batches, &bsoncore.DocumentSequence{
Style: bsoncore.SequenceStyle,
Data: docSequence,
arr := bsoncore.BuildArray(nil, values...)

batches = append(batches, &bsoncore.Iterator{
List: arr,
})
}

Expand All @@ -72,7 +79,7 @@ func (tbc *testBatchCursor) Next(context.Context) bool {
return true
}

func (tbc *testBatchCursor) Batch() *bsoncore.DocumentSequence {
func (tbc *testBatchCursor) Batch() *bsoncore.Iterator {
return tbc.batch
}

Expand Down Expand Up @@ -262,3 +269,22 @@ func TestNewCursorFromDocuments(t *testing.T) {
mockErr, cur.Err())
})
}

func BenchmarkNewCursorFromDocuments(b *testing.B) {
// Prepare sample data
documents := []interface{}{
bson.D{{"_id", 0}, {"foo", "bar"}},
bson.D{{"_id", 1}, {"baz", "qux"}},
bson.D{{"_id", 2}, {"quux", "quuz"}},
}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := NewCursorFromDocuments(documents, nil, nil)
if err != nil {
b.Fatalf("Error creating cursor: %v", err)
}
}
}
Loading
Loading