Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2388 Improved Bulk Write API. #1805

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/driverutil/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ const (
ListIndexesOp = "listIndexes" // ListIndexesOp is the name for listing indexes
ListDatabasesOp = "listDatabases" // ListDatabasesOp is the name for listing databases
UpdateOp = "update" // UpdateOp is the name for updating
BulkWriteOp = "bulkWrite" // BulkWriteOp is the name for client-level bulk write
)
4 changes: 1 addition & 3 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ func (bw *bulkWrite) runBatch(ctx context.Context, batch bulkWriteBatch) (BulkWr

func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (operation.InsertResult, error) {
docs := make([]bsoncore.Document, len(batch.models))
var i int
for _, model := range batch.models {
for i, model := range batch.models {
converted := model.(*InsertOneModel)
doc, err := marshal(converted.Document, bw.collection.bsonOpts, bw.collection.registry)
if err != nil {
Expand All @@ -177,7 +176,6 @@ func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (opera
}

docs[i] = doc
i++
}

op := operation.NewInsert(docs...).
Expand Down
95 changes: 85 additions & 10 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ type Client struct {
logger *logger.Logger

// client-side encryption fields
keyVaultClientFLE *Client
keyVaultCollFLE *Collection
mongocryptdFLE *mongocryptdClient
cryptFLE driver.Crypt
metadataClientFLE *Client
internalClientFLE *Client
encryptedFieldsMap map[string]interface{}
authenticator driver.Authenticator
isAutoEncryptionSet bool
keyVaultClientFLE *Client
keyVaultCollFLE *Collection
mongocryptdFLE *mongocryptdClient
cryptFLE driver.Crypt
metadataClientFLE *Client
internalClientFLE *Client
encryptedFieldsMap map[string]interface{}
authenticator driver.Authenticator
}

// Connect creates a new Client and then initializes it using the Connect method. This is equivalent to calling
Expand Down Expand Up @@ -194,6 +195,7 @@ func NewClient(opts ...*options.ClientOptions) (*Client, error) {
}
// AutoEncryptionOptions
if clientOpt.AutoEncryptionOptions != nil {
client.isAutoEncryptionSet = true
if err := client.configureAutoEncryption(clientOpt); err != nil {
return nil, err
}
Expand Down Expand Up @@ -424,8 +426,6 @@ func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error)
return nil, replaceErrors(err)
}

// Writes are not retryable on standalones, so let operation determine whether to retry
sess.RetryWrite = false
sess.RetryRead = c.retryReads

return &sessionImpl{
Expand Down Expand Up @@ -851,6 +851,81 @@ func (c *Client) createBaseCursorOptions() driver.CursorOptions {
}
}

// BulkWrite performs a client-level bulk write operation.
func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
opts ...*options.ClientBulkWriteOptions) (*ClientBulkWriteResult, error) {
// TODO: Remove once DRIVERS-2888 is implemented.
if c.isAutoEncryptionSet {
return nil, errors.New("bulkWrite does not currently support automatic encryption")
}
bwo := options.MergeClientBulkWriteOptions(opts...)

if ctx == nil {
ctx = context.Background()
}

sess := sessionFromContext(ctx)
if sess == nil && c.sessionPool != nil {
sess = session.NewImplicitClientSession(c.sessionPool, c.id)
defer sess.EndSession()
}

err := c.validSession(sess)
if err != nil {
return nil, err
}

transactionRunning := sess.TransactionRunning()
wc := c.writeConcern
if transactionRunning {
wc = nil
}
if bwo.WriteConcern != nil {
if transactionRunning {
return nil, errors.New("cannot set write concern after starting a transaction")
}
wc = bwo.WriteConcern
}
acknowledged := writeconcern.AckWrite(wc)
if !acknowledged {
if bwo.Ordered == nil || *bwo.Ordered {
return nil, errors.New("cannot request unacknowledged write concern and ordered writes")
}
sess = nil
}

writeSelector := description.CompositeSelector([]description.ServerSelector{
description.WriteSelector(),
description.LatencySelector(c.localThreshold),
})
selector := makePinnedSelector(sess, writeSelector)

op := clientBulkWrite{
models: models.models,
ordered: bwo.Ordered,
bypassDocumentValidation: bwo.BypassDocumentValidation,
comment: bwo.Comment,
let: bwo.Let,
session: sess,
client: c,
selector: selector,
writeConcern: wc,
}
if bwo.VerboseResults == nil || !(*bwo.VerboseResults) {
op.errorsOnly = true
} else if !acknowledged {
return nil, errors.New("cannot request unacknowledged write concern and verbose results")
}
if err = op.execute(ctx); err != nil {
return nil, replaceErrors(err)
}
var results *ClientBulkWriteResult
if acknowledged {
results = &op.result
}
return results, nil
}

// newLogger will use the LoggerOptions to create an internal logger and publish
// messages using a LogSink.
func newLogger(opts *options.LoggerOptions) (*logger.Logger, error) {
Expand Down
Loading
Loading