Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Oct 12, 2024
1 parent ceab570 commit 9782fe1
Show file tree
Hide file tree
Showing 30 changed files with 855 additions and 709 deletions.
698 changes: 389 additions & 309 deletions mongo/client_bulk_write.go

Large diffs are not rendered by default.

36 changes: 21 additions & 15 deletions x/mongo/driver/batch_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,29 @@ type CursorResponse struct {
postBatchResumeToken bsoncore.Document
}

// NewCursorResponse constructs a cursor response from the given response and
// server. If the provided database response does not contain a cursor, it
// returns ErrNoCursor.
//
// NewCursorResponse can be used within the ProcessResponse method for an operation.
func NewCursorResponse(info ResponseInfo) (CursorResponse, error) {
response := info.ServerResponse
// ExtractCursorDocument retrieves cursor document from a database response. If the
// provided response does not contain a cursor, it returns ErrNoCursor.
func ExtractCursorDocument(response bsoncore.Document) (bsoncore.Document, error) {
cur, err := response.LookupErr("cursor")
if errors.Is(err, bsoncore.ErrElementNotFound) {
return CursorResponse{}, ErrNoCursor
return nil, ErrNoCursor
}
if err != nil {
return CursorResponse{}, fmt.Errorf("error getting cursor from database response: %w", err)
return nil, fmt.Errorf("error getting cursor from database response: %w", err)
}
curDoc, ok := cur.DocumentOK()
if !ok {
return CursorResponse{}, fmt.Errorf("cursor should be an embedded document but is BSON type %s", cur.Type)
return nil, fmt.Errorf("cursor should be an embedded document but is BSON type %s", cur.Type)
}
elems, err := curDoc.Elements()
return curDoc, nil
}

// NewCursorResponse constructs a cursor response from the given cursor document
// extracted from a database response.
//
// NewCursorResponse can be used within the ProcessResponse method for an operation.
func NewCursorResponse(response bsoncore.Document, info ResponseInfo) (CursorResponse, error) {
elems, err := response.Elements()
if err != nil {
return CursorResponse{}, fmt.Errorf("error getting elements from cursor: %w", err)
}
Expand All @@ -115,15 +119,17 @@ func NewCursorResponse(info ResponseInfo) (CursorResponse, error) {
curresp.Database = database
curresp.Collection = collection
case "id":
curresp.ID, ok = elem.Value().Int64OK()
id, ok := elem.Value().Int64OK()
if !ok {
return CursorResponse{}, fmt.Errorf("id should be an int64 but it is a BSON %s", elem.Value().Type)
}
curresp.ID = id
case "postBatchResumeToken":
curresp.postBatchResumeToken, ok = elem.Value().DocumentOK()
token, ok := elem.Value().DocumentOK()
if !ok {
return CursorResponse{}, fmt.Errorf("post batch resume token should be a document but it is a BSON %s", elem.Value().Type)
}
curresp.postBatchResumeToken = token
}
}

Expand Down Expand Up @@ -393,8 +399,8 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
},
Database: bc.database,
Deployment: bc.getOperationDeployment(),
ProcessResponseFn: func(_ context.Context, info ResponseInfo) error {
response := info.ServerResponse
ProcessResponseFn: func(_ context.Context, response bsoncore.Document, info ResponseInfo) error {
// response := info.ServerResponse
id, ok := response.Lookup("cursor", "id").Int64OK()
if !ok {
return fmt.Errorf("cursor.id should be an int64 but is a BSON %s", response.Lookup("cursor", "id").Type)
Expand Down
109 changes: 71 additions & 38 deletions x/mongo/driver/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,66 +7,99 @@
package driver

import (
"errors"
"io"
"strconv"

"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
)

// ErrDocumentTooLarge occurs when a document that is larger than the maximum size accepted by a
// server is passed to an insert command.
var ErrDocumentTooLarge = errors.New("an inserted document is too large")

// Batches contains the necessary information to batch split an operation. This is only used for write
// operations.
type Batches struct {
Identifier string
Documents []bsoncore.Document
Current []bsoncore.Document
Ordered *bool
}

// ClearBatch clears the Current batch. This must be called before AdvanceBatch will advance to the
// next batch.
func (b *Batches) ClearBatch() { b.Current = b.Current[:0] }

// AdvanceBatch splits the next batch using maxCount and targetBatchSize. This method will do nothing if
// the current batch has not been cleared. We do this so that when this is called during execute we
// can call it without first needing to check if we already have a batch, which makes the code
// simpler and makes retrying easier.
// The maxDocSize parameter is used to check that any one document is not too large. If the first document is bigger
// than targetBatchSize but smaller than maxDocSize, a batch of size 1 containing that document will be created.
func (b *Batches) AdvanceBatch(maxCount, targetBatchSize, maxDocSize int) error {
if len(b.Current) > 0 {
return nil
}
offset int
}

if maxCount <= 0 {
maxCount = 1
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
if b.End() {
return 0, dst, io.EOF
}

splitAfter := 0
size := 0
for i, doc := range b.Documents {
if i == maxCount {
l := len(dst)
var idx int32
dst = wiremessage.AppendMsgSectionType(dst, wiremessage.DocumentSequence)
idx, dst = bsoncore.ReserveLength(dst)
dst = append(dst, b.Identifier...)
dst = append(dst, 0x00)
size := len(dst) - l
var n int
for i := b.offset; i < len(b.Documents); i++ {
if n == maxCount {
break
}
doc := b.Documents[i]
if len(doc) > maxDocSize {
return ErrDocumentTooLarge
break
}
if size+len(doc) > targetBatchSize {
size += len(doc)
if size >= totalSize {
break
}
dst = append(dst, doc...)
n++
}
if n == 0 {
return 0, dst[:l], nil
}
dst = bsoncore.UpdateLength(dst, idx, int32(len(dst[idx:])))
return n, dst, nil
}

func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
if b.End() {
return 0, dst, io.EOF
}
l := len(dst)
aidx, dst := bsoncore.AppendArrayElementStart(dst, b.Identifier)
size := len(dst) - l
var n int
for i := b.offset; i < len(b.Documents); i++ {
if n == maxCount {
break
}
doc := b.Documents[i]
if len(doc) > maxDocSize {
break
}
size += len(doc)
splitAfter++
if size >= totalSize {
break
}
dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(n), doc)
n++
}

// if there are no documents, take the first one.
// this can happen if there is a document that is smaller than maxDocSize but greater than targetBatchSize.
if splitAfter == 0 {
splitAfter = 1
if n == 0 {
return 0, dst[:l], nil
}
var err error
dst, err = bsoncore.AppendArrayEnd(dst, aidx)
if err != nil {
return 0, nil, err
}
return n, dst, nil
}

func (b *Batches) IsOrdered() *bool {
return b.Ordered
}

func (b *Batches) AdvanceBatches(n int) {
b.offset += n
}

b.Current, b.Documents = b.Documents[:splitAfter], b.Documents[splitAfter:]
return nil
func (b *Batches) End() bool {
return len(b.Documents) <= b.offset
}
Loading

0 comments on commit 9782fe1

Please sign in to comment.