diff --git a/mongo/change_stream.go b/mongo/change_stream.go index 773cbb0e5d..4160cd3f9c 100644 --- a/mongo/change_stream.go +++ b/mongo/change_stream.go @@ -133,17 +133,26 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in return nil, cs.Err() } - cs.aggregate = operation.NewAggregate(nil). - ReadPreference(config.readPreference).ReadConcern(config.readConcern). - Deployment(cs.client.deployment).ClusterClock(cs.client.clock). - CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone). - ServerAPI(cs.client.serverAPI).Crypt(config.crypt).Timeout(cs.client.timeout) + retry := driver.RetryNone + cs.aggregate = &operation.Aggregate{ + ReadPreference: config.readPreference, + ReadConcern: config.readConcern, + Deployment: cs.client.deployment, + Clock: cs.client.clock, + Monitor: cs.client.monitor, + Session: cs.sess, + Selector: cs.selector, + Retry: &retry, + ServerAPI: cs.client.serverAPI, + Crypt: config.crypt, + Timeout: cs.client.timeout, + } if cs.options.Collation != nil { - cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument())) + cs.aggregate.Collation = bsoncore.Document(cs.options.Collation.ToDocument()) } if comment := cs.options.Comment; comment != nil { - cs.aggregate.Comment(*comment) + cs.aggregate.Comment = comment commentVal, err := marshalValue(comment, cs.bsonOpts, cs.registry) if err != nil { @@ -152,7 +161,7 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in cs.cursorOptions.Comment = commentVal } if cs.options.BatchSize != nil { - cs.aggregate.BatchSize(*cs.options.BatchSize) + cs.aggregate.BatchSize = cs.options.BatchSize cs.cursorOptions.BatchSize = *cs.options.BatchSize } if cs.options.MaxAwaitTime != nil { @@ -172,7 +181,7 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData} customOptions[optionName] = optionValueBSON } - cs.aggregate.CustomOptions(customOptions) + cs.aggregate.CustomOptions = customOptions } if cs.options.CustomPipeline != nil { // Marshal all custom pipeline options before building pipeline slice. Return @@ -192,11 +201,12 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in switch cs.streamType { case ClientStream: - cs.aggregate.Database("admin") + cs.aggregate.Database = "admin" case DatabaseStream: - cs.aggregate.Database(config.databaseName) + cs.aggregate.Database = config.databaseName case CollectionStream: - cs.aggregate.Collection(config.collectionName).Database(config.databaseName) + cs.aggregate.Collection = config.collectionName + cs.aggregate.Database = config.databaseName default: closeImplicitSession(cs.sess) return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType) @@ -223,7 +233,7 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in } var pipelineArr bsoncore.Document pipelineArr, cs.err = cs.pipelineToBSON() - cs.aggregate.Pipeline(pipelineArr) + cs.aggregate.Pipeline = pipelineArr if cs.err = cs.executeOperation(ctx, false); cs.err != nil { closeImplicitSession(cs.sess) @@ -254,7 +264,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err defer conn.Close() cs.wireVersion = conn.Description().WireVersion - cs.aggregate.Deployment(cs.createOperationDeployment(server, conn)) + cs.aggregate.Deployment = cs.createOperationDeployment(server, conn) if resuming { cs.replaceOptions(cs.wireVersion) @@ -274,7 +284,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil { return cs.Err() } - cs.aggregate.Pipeline(plArr) + cs.aggregate.Pipeline = plArr } // If no deadline is set on the passed-in context, cs.client.timeout is set, and context is not already @@ -333,7 +343,7 @@ AggregateExecuteLoop: cs.wireVersion = conn.Description().WireVersion // Reset deployment. - cs.aggregate.Deployment(cs.createOperationDeployment(server, conn)) + cs.aggregate.Deployment = cs.createOperationDeployment(server, conn) default: // Do not retry if error is not a driver error. break AggregateExecuteLoop diff --git a/mongo/collection.go b/mongo/collection.go index 6abbea9792..86bc4d5613 100644 --- a/mongo/collection.go +++ b/mongo/collection.go @@ -846,42 +846,43 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) { cursorOpts.MarshalValueEncoderFn = newEncoderFn(a.bsonOpts, a.registry) - op := operation.NewAggregate(pipelineArr). - Session(sess). - WriteConcern(wc). - ReadConcern(rc). - ReadPreference(a.readPreference). - CommandMonitor(a.client.monitor). - ServerSelector(selector). - ClusterClock(a.client.clock). - Database(a.db). - Collection(a.col). - Deployment(a.client.deployment). - Crypt(a.client.cryptFLE). - ServerAPI(a.client.serverAPI). - HasOutputStage(hasOutputStage). - Timeout(a.client.timeout). - MaxTime(ao.MaxTime) - - if ao.AllowDiskUse != nil { - op.AllowDiskUse(*ao.AllowDiskUse) + op := &operation.Aggregate{ + Pipeline: pipelineArr, + Session: sess, + WriteConcern: wc, + ReadConcern: rc, + ReadPreference: a.readPreference, + Monitor: a.client.monitor, + Selector: selector, + Clock: a.client.clock, + Database: a.db, + Collection: a.col, + Deployment: a.client.deployment, + Crypt: a.client.cryptFLE, + ServerAPI: a.client.serverAPI, + HasOutputStage: hasOutputStage, + Timeout: a.client.timeout, + MaxTime: ao.MaxTime, + AllowDiskUse: ao.AllowDiskUse, + Comment: ao.Comment, } + // ignore batchSize of 0 with $out if ao.BatchSize != nil && !(*ao.BatchSize == 0 && hasOutputStage) { - op.BatchSize(*ao.BatchSize) + op.BatchSize = ao.BatchSize cursorOpts.BatchSize = *ao.BatchSize } if ao.BypassDocumentValidation != nil && *ao.BypassDocumentValidation { - op.BypassDocumentValidation(*ao.BypassDocumentValidation) + op.BypassDocumentValidation = ao.BypassDocumentValidation } if ao.Collation != nil { - op.Collation(bsoncore.Document(ao.Collation.ToDocument())) + op.Collation = bsoncore.Document(ao.Collation.ToDocument()) } if ao.MaxAwaitTime != nil { cursorOpts.MaxTimeMS = int64(*ao.MaxAwaitTime / time.Millisecond) } if ao.Comment != nil { - op.Comment(*ao.Comment) + op.Comment = ao.Comment commentVal, err := marshalValue(ao.Comment, a.bsonOpts, a.registry) if err != nil { @@ -897,14 +898,14 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) { if err != nil { return nil, err } - op.Hint(hintVal) + op.Hint = hintVal } if ao.Let != nil { let, err := marshal(ao.Let, a.bsonOpts, a.registry) if err != nil { return nil, err } - op.Let(let) + op.Let = let } if ao.Custom != nil { // Marshal all custom options before passing to the aggregate operation. Return @@ -918,14 +919,14 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) { optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData} customOptions[optionName] = optionValueBSON } - op.CustomOptions(customOptions) + op.CustomOptions = customOptions } retry := driver.RetryNone if a.retryRead && !hasOutputStage { retry = driver.RetryOncePerCommand } - op = op.Retry(retry) + op.Retry = &retry err = op.Execute(a.ctx) if err != nil { @@ -980,15 +981,26 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{}, } selector := makeReadPrefSelector(sess, coll.readSelector, coll.client.localThreshold) - op := operation.NewAggregate(pipelineArr).Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference). - CommandMonitor(coll.client.monitor).ServerSelector(selector).ClusterClock(coll.client.clock).Database(coll.db.name). - Collection(coll.name).Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).ServerAPI(coll.client.serverAPI). - Timeout(coll.client.timeout).MaxTime(countOpts.MaxTime) - if countOpts.Collation != nil { - op.Collation(bsoncore.Document(countOpts.Collation.ToDocument())) + op := &operation.Aggregate{ + Pipeline: pipelineArr, + Session: sess, + ReadConcern: rc, + ReadPreference: coll.readPreference, + Monitor: coll.client.monitor, + Selector: selector, + Clock: coll.client.clock, + Database: coll.db.name, + Collection: coll.name, + Deployment: coll.client.deployment, + Crypt: coll.client.cryptFLE, + ServerAPI: coll.client.serverAPI, + Timeout: coll.client.timeout, + MaxTime: countOpts.MaxTime, + Comment: countOpts.Comment, } - if countOpts.Comment != nil { - op.Comment(*countOpts.Comment) + + if countOpts.Collation != nil { + op.Collation = bsoncore.Document(countOpts.Collation.ToDocument()) } if countOpts.Hint != nil { if isUnorderedMap(countOpts.Hint) { @@ -998,13 +1010,14 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{}, if err != nil { return 0, err } - op.Hint(hintVal) + op.Hint = hintVal } + retry := driver.RetryNone if coll.client.retryReads { retry = driver.RetryOncePerCommand } - op = op.Retry(retry) + op.Retry = &retry err = op.Execute(ctx) if err != nil { diff --git a/x/mongo/driver/integration/aggregate_test.go b/x/mongo/driver/integration/aggregate_test.go index 95a9b821ae..fe42c8a21b 100644 --- a/x/mongo/driver/integration/aggregate_test.go +++ b/x/mongo/driver/integration/aggregate_test.go @@ -81,9 +81,17 @@ func TestAggregate(t *testing.T) { noerr(t, err) clearChannels(started, succeeded, failed) - op := operation.NewAggregate(bsoncore.BuildDocumentFromElements(nil)). - Collection(collName).Database(dbName).Deployment(top).ServerSelector(description.WriteSelector()). - CommandMonitor(monitor).BatchSize(2) + + var batchSize int32 = 2 + op := &operation.Aggregate{ + Pipeline: bsoncore.BuildDocumentFromElements(nil), + Collection: collName, + Database: dbName, + Deployment: top, + Selector: description.WriteSelector(), + Monitor: monitor, + BatchSize: &batchSize, + } err = op.Execute(context.Background()) noerr(t, err) batchCursor, err := op.Result(driver.CursorOptions{MaxTimeMS: 10, BatchSize: 2, CommandMonitor: monitor}) @@ -124,21 +132,28 @@ func TestAggregate(t *testing.T) { wc := writeconcern.New(writeconcern.WMajority()) autoInsertDocs(t, wc, ds...) - op := operation.NewAggregate(bsoncore.BuildArray(nil, - bsoncore.BuildDocumentValue( - bsoncore.BuildDocumentElement(nil, - "$match", bsoncore.BuildDocumentElement(nil, - "_id", bsoncore.AppendInt32Element(nil, "$gt", 2), + var batchSize int32 = 2 + op := &operation.Aggregate{ + Pipeline: bsoncore.BuildArray(nil, + bsoncore.BuildDocumentValue( + bsoncore.BuildDocumentElement(nil, + "$match", bsoncore.BuildDocumentElement(nil, + "_id", bsoncore.AppendInt32Element(nil, "$gt", 2), + ), ), ), - ), - bsoncore.BuildDocumentValue( - bsoncore.BuildDocumentElement(nil, - "$sort", bsoncore.AppendInt32Element(nil, "_id", 1), + bsoncore.BuildDocumentValue( + bsoncore.BuildDocumentElement(nil, + "$sort", bsoncore.AppendInt32Element(nil, "_id", 1), + ), ), ), - )).Collection(integtest.ColName(t)).Database(dbName).Deployment(integtest.Topology(t)). - ServerSelector(description.WriteSelector()).BatchSize(2) + Collection: integtest.ColName(t), + Database: dbName, + Deployment: integtest.Topology(t), + Selector: description.WriteSelector(), + BatchSize: &batchSize, + } err := op.Execute(context.Background()) noerr(t, err) cursor, err := op.Result(driver.CursorOptions{BatchSize: 2}) @@ -172,8 +187,15 @@ func TestAggregate(t *testing.T) { wc := writeconcern.New(writeconcern.WMajority()) autoInsertDocs(t, wc, ds...) - op := operation.NewAggregate(bsoncore.BuildArray(nil)).Collection(integtest.ColName(t)).Database(dbName). - Deployment(integtest.Topology(t)).ServerSelector(description.WriteSelector()).AllowDiskUse(true) + allowDiskUse := true + op := &operation.Aggregate{ + Pipeline: bsoncore.BuildArray(nil), + Collection: integtest.ColName(t), + Database: dbName, + Deployment: integtest.Topology(t), + Selector: description.WriteSelector(), + AllowDiskUse: &allowDiskUse, + } err := op.Execute(context.Background()) if err != nil { t.Errorf("Expected no error from allowing disk use, but got %v", err) diff --git a/x/mongo/driver/operation/aggregate.go b/x/mongo/driver/operation/aggregate.go index ca0e796523..5d8e6dab5c 100644 --- a/x/mongo/driver/operation/aggregate.go +++ b/x/mongo/driver/operation/aggregate.go @@ -25,49 +25,102 @@ import ( // Aggregate represents an aggregate operation. type Aggregate struct { - allowDiskUse *bool - batchSize *int32 - bypassDocumentValidation *bool - collation bsoncore.Document - comment *string - hint bsoncore.Value - maxTime *time.Duration - pipeline bsoncore.Document - session *session.Client - clock *session.ClusterClock - collection string - monitor *event.CommandMonitor - database string - deployment driver.Deployment - readConcern *readconcern.ReadConcern - readPreference *readpref.ReadPref - retry *driver.RetryMode - selector description.ServerSelector - writeConcern *writeconcern.WriteConcern - crypt driver.Crypt - serverAPI *driver.ServerAPIOptions - let bsoncore.Document - hasOutputStage bool - customOptions map[string]bsoncore.Value - timeout *time.Duration + // AllowDiskUse enables writing to temporary files. When true, aggregation + // stages can write to the dbPath/_tmp directory. + AllowDiskUse *bool - result driver.CursorResponse -} + // BatchSize specifies the number of documents to return in every batch. + BatchSize *int32 -// NewAggregate constructs and returns a new Aggregate. -func NewAggregate(pipeline bsoncore.Document) *Aggregate { - return &Aggregate{ - pipeline: pipeline, - } + // BypassDocumentValidation allows the write to opt-out of document level + // validation. This only applies when the $out stage is specified. + BypassDocumentValidation *bool + + // Collation specifies a collation. This option is only valid for server + // versions 3.4 and above. + Collation bsoncore.Document + + // Comment specifies an arbitrary string to help trace the operation through + // the database profiler, currentOp, and logs. + Comment *string + + // Hint specifies the index to use. + Hint bsoncore.Value + + // MaxTime specifies the maximum amount of time to allow the query to run on + // the server. + MaxTime *time.Duration + + // Pipeline determines how data is transformed for an aggregation. + Pipeline bsoncore.Document + + // Session is the session for this operation. + Session *session.Client + + // Clock is the cluster clock for this operation. + Clock *session.ClusterClock + + // Collection is the collection that this command will run against. + Collection string + + // Monitor is the monitor to use for APM events. + Monitor *event.CommandMonitor + + // Database is the database to run this operation against. + Database string + + // Deployment is the deployment to use for this operation. + Deployment driver.Deployment + + // ReadConcern is the read concern for this operation. + ReadConcern *readconcern.ReadConcern + + // ReadPreference is the read preference used with this operation. + ReadPreference *readpref.ReadPref + + // Retry enables retryable writes for this operation. Retries are not + // handled automatically, instead a boolean is returned from Execute and + // SelectAndExecute that indicates if the operation can be retried. Retrying + // is handled by calling RetryExecute. + Retry *driver.RetryMode + + // Selector is the selector used to retrieve a server. + Selector description.ServerSelector + + // WriteConcern is the write concern for this operation. + WriteConcern *writeconcern.WriteConcern + + // Crypt is the Crypt object to use for automatic encryption and decryption. + Crypt driver.Crypt + + // ServerAPI is the server API version for this operation. + ServerAPI *driver.ServerAPIOptions + + // Let specifies the let document to use. This option is only valid for + // server versions 5.0 and above. + Let bsoncore.Document + + // HasOutputStage specifies whether the aggregate contains an output stage. + // Used in determining when to append read preference at the operation + // level. + HasOutputStage bool + + // CustomOptions specifies extra options to use in the aggregate command. + CustomOptions map[string]bsoncore.Value + + // Timeout is the timeout for this operation. + Timeout *time.Duration + + result driver.CursorResponse } // Result returns the result of executing this operation. func (a *Aggregate) Result(opts driver.CursorOptions) (*driver.BatchCursor, error) { - clientSession := a.session + clientSession := a.Session - clock := a.clock - opts.ServerAPI = a.serverAPI + clock := a.Clock + opts.ServerAPI = a.ServerAPI return driver.NewBatchCursor(a.result, clientSession, clock, opts) } @@ -87,7 +140,7 @@ func (a *Aggregate) processResponse(info driver.ResponseInfo) error { // Execute runs this operations and returns an error if the operation did not execute successfully. func (a *Aggregate) Execute(ctx context.Context) error { - if a.deployment == nil { + if a.Deployment == nil { return errors.New("the Aggregate operation must have a Deployment set before Execute can be called") } @@ -95,70 +148,70 @@ func (a *Aggregate) Execute(ctx context.Context) error { CommandFn: a.command, ProcessResponseFn: a.processResponse, - Client: a.session, - Clock: a.clock, - CommandMonitor: a.monitor, - Database: a.database, - Deployment: a.deployment, - ReadConcern: a.readConcern, - ReadPreference: a.readPreference, + Client: a.Session, + Clock: a.Clock, + CommandMonitor: a.Monitor, + Database: a.Database, + Deployment: a.Deployment, + ReadConcern: a.ReadConcern, + ReadPreference: a.ReadPreference, Type: driver.Read, - RetryMode: a.retry, - Selector: a.selector, - WriteConcern: a.writeConcern, - Crypt: a.crypt, + RetryMode: a.Retry, + Selector: a.Selector, + WriteConcern: a.WriteConcern, + Crypt: a.Crypt, MinimumWriteConcernWireVersion: 5, - ServerAPI: a.serverAPI, - IsOutputAggregate: a.hasOutputStage, - MaxTime: a.maxTime, - Timeout: a.timeout, + ServerAPI: a.ServerAPI, + IsOutputAggregate: a.HasOutputStage, + MaxTime: a.MaxTime, + Timeout: a.Timeout, Name: driverutil.AggregateOp, }.Execute(ctx) } func (a *Aggregate) command(dst []byte, desc description.SelectedServer) ([]byte, error) { - header := bsoncore.Value{Type: bsontype.String, Data: bsoncore.AppendString(nil, a.collection)} - if a.collection == "" { + header := bsoncore.Value{Type: bsontype.String, Data: bsoncore.AppendString(nil, a.Collection)} + if a.Collection == "" { header = bsoncore.Value{Type: bsontype.Int32, Data: []byte{0x01, 0x00, 0x00, 0x00}} } dst = bsoncore.AppendValueElement(dst, "aggregate", header) cursorIdx, cursorDoc := bsoncore.AppendDocumentStart(nil) - if a.allowDiskUse != nil { + if a.AllowDiskUse != nil { - dst = bsoncore.AppendBooleanElement(dst, "allowDiskUse", *a.allowDiskUse) + dst = bsoncore.AppendBooleanElement(dst, "allowDiskUse", *a.AllowDiskUse) } - if a.batchSize != nil { - cursorDoc = bsoncore.AppendInt32Element(cursorDoc, "batchSize", *a.batchSize) + if a.BatchSize != nil { + cursorDoc = bsoncore.AppendInt32Element(cursorDoc, "batchSize", *a.BatchSize) } - if a.bypassDocumentValidation != nil { + if a.BypassDocumentValidation != nil { - dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *a.bypassDocumentValidation) + dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *a.BypassDocumentValidation) } - if a.collation != nil { + if a.Collation != nil { if desc.WireVersion == nil || !desc.WireVersion.Includes(5) { return nil, errors.New("the 'collation' command parameter requires a minimum server wire version of 5") } - dst = bsoncore.AppendDocumentElement(dst, "collation", a.collation) + dst = bsoncore.AppendDocumentElement(dst, "collation", a.Collation) } - if a.comment != nil { + if a.Comment != nil { - dst = bsoncore.AppendStringElement(dst, "comment", *a.comment) + dst = bsoncore.AppendStringElement(dst, "comment", *a.Comment) } - if a.hint.Type != bsontype.Type(0) { + if a.Hint.Type != bsontype.Type(0) { - dst = bsoncore.AppendValueElement(dst, "hint", a.hint) + dst = bsoncore.AppendValueElement(dst, "hint", a.Hint) } - if a.pipeline != nil { + if a.Pipeline != nil { - dst = bsoncore.AppendArrayElement(dst, "pipeline", a.pipeline) + dst = bsoncore.AppendArrayElement(dst, "pipeline", a.Pipeline) } - if a.let != nil { - dst = bsoncore.AppendDocumentElement(dst, "let", a.let) + if a.Let != nil { + dst = bsoncore.AppendDocumentElement(dst, "let", a.Let) } - for optionName, optionValue := range a.customOptions { + for optionName, optionValue := range a.CustomOptions { dst = bsoncore.AppendValueElement(dst, optionName, optionValue) } cursorDoc, _ = bsoncore.AppendDocumentEnd(cursorDoc, cursorIdx) @@ -166,256 +219,3 @@ func (a *Aggregate) command(dst []byte, desc description.SelectedServer) ([]byte return dst, nil } - -// AllowDiskUse enables writing to temporary files. When true, aggregation stages can write to the dbPath/_tmp directory. -func (a *Aggregate) AllowDiskUse(allowDiskUse bool) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.allowDiskUse = &allowDiskUse - return a -} - -// BatchSize specifies the number of documents to return in every batch. -func (a *Aggregate) BatchSize(batchSize int32) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.batchSize = &batchSize - return a -} - -// BypassDocumentValidation allows the write to opt-out of document level validation. This only applies when the $out stage is specified. -func (a *Aggregate) BypassDocumentValidation(bypassDocumentValidation bool) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.bypassDocumentValidation = &bypassDocumentValidation - return a -} - -// Collation specifies a collation. This option is only valid for server versions 3.4 and above. -func (a *Aggregate) Collation(collation bsoncore.Document) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.collation = collation - return a -} - -// Comment specifies an arbitrary string to help trace the operation through the database profiler, currentOp, and logs. -func (a *Aggregate) Comment(comment string) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.comment = &comment - return a -} - -// Hint specifies the index to use. -func (a *Aggregate) Hint(hint bsoncore.Value) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.hint = hint - return a -} - -// MaxTime specifies the maximum amount of time to allow the query to run on the server. -func (a *Aggregate) MaxTime(maxTime *time.Duration) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.maxTime = maxTime - return a -} - -// Pipeline determines how data is transformed for an aggregation. -func (a *Aggregate) Pipeline(pipeline bsoncore.Document) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.pipeline = pipeline - return a -} - -// Session sets the session for this operation. -func (a *Aggregate) Session(session *session.Client) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.session = session - return a -} - -// ClusterClock sets the cluster clock for this operation. -func (a *Aggregate) ClusterClock(clock *session.ClusterClock) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.clock = clock - return a -} - -// Collection sets the collection that this command will run against. -func (a *Aggregate) Collection(collection string) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.collection = collection - return a -} - -// CommandMonitor sets the monitor to use for APM events. -func (a *Aggregate) CommandMonitor(monitor *event.CommandMonitor) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.monitor = monitor - return a -} - -// Database sets the database to run this operation against. -func (a *Aggregate) Database(database string) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.database = database - return a -} - -// Deployment sets the deployment to use for this operation. -func (a *Aggregate) Deployment(deployment driver.Deployment) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.deployment = deployment - return a -} - -// ReadConcern specifies the read concern for this operation. -func (a *Aggregate) ReadConcern(readConcern *readconcern.ReadConcern) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.readConcern = readConcern - return a -} - -// ReadPreference set the read preference used with this operation. -func (a *Aggregate) ReadPreference(readPreference *readpref.ReadPref) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.readPreference = readPreference - return a -} - -// ServerSelector sets the selector used to retrieve a server. -func (a *Aggregate) ServerSelector(selector description.ServerSelector) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.selector = selector - return a -} - -// WriteConcern sets the write concern for this operation. -func (a *Aggregate) WriteConcern(writeConcern *writeconcern.WriteConcern) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.writeConcern = writeConcern - return a -} - -// Retry enables retryable writes for this operation. Retries are not handled automatically, -// instead a boolean is returned from Execute and SelectAndExecute that indicates if the -// operation can be retried. Retrying is handled by calling RetryExecute. -func (a *Aggregate) Retry(retry driver.RetryMode) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.retry = &retry - return a -} - -// Crypt sets the Crypt object to use for automatic encryption and decryption. -func (a *Aggregate) Crypt(crypt driver.Crypt) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.crypt = crypt - return a -} - -// ServerAPI sets the server API version for this operation. -func (a *Aggregate) ServerAPI(serverAPI *driver.ServerAPIOptions) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.serverAPI = serverAPI - return a -} - -// Let specifies the let document to use. This option is only valid for server versions 5.0 and above. -func (a *Aggregate) Let(let bsoncore.Document) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.let = let - return a -} - -// HasOutputStage specifies whether the aggregate contains an output stage. Used in determining when to -// append read preference at the operation level. -func (a *Aggregate) HasOutputStage(hos bool) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.hasOutputStage = hos - return a -} - -// CustomOptions specifies extra options to use in the aggregate command. -func (a *Aggregate) CustomOptions(co map[string]bsoncore.Value) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.customOptions = co - return a -} - -// Timeout sets the timeout for this operation. -func (a *Aggregate) Timeout(timeout *time.Duration) *Aggregate { - if a == nil { - a = new(Aggregate) - } - - a.timeout = timeout - return a -}