Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Oct 22, 2024
1 parent f1b81d6 commit f0cd171
Show file tree
Hide file tree
Showing 9 changed files with 7,705 additions and 14 deletions.
10 changes: 5 additions & 5 deletions mongo/integration/crud_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func TestClientBulkWrite(t *testing.T) {
})

mt.Run("bulkWrite handles individual WriteErrors across batches", func(mt *mtest.T) {
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, false)
err := coll.Drop(context.Background())
require.NoError(mt, err, "Drop error")
_, err = coll.InsertOne(context.Background(), bson.D{{"_id", 1}})
Expand Down Expand Up @@ -580,7 +580,7 @@ func TestClientBulkWrite(t *testing.T) {
})

mt.Run("bulkWrite handles a cursor requiring a getMore", func(mt *mtest.T) {
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, false)
err := coll.Drop(context.Background())
require.NoError(mt, err, "Drop error")

Expand Down Expand Up @@ -618,7 +618,7 @@ func TestClientBulkWrite(t *testing.T) {
})

mt.Run("bulkWrite handles a cursor requiring a getMore within a transaction", func(mt *mtest.T) {
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, false)
err := coll.Drop(context.Background())
require.NoError(mt, err, "Drop error")

Expand Down Expand Up @@ -693,7 +693,7 @@ func TestClientBulkWrite(t *testing.T) {
},
})

coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, false)
err = coll.Drop(context.Background())
require.NoError(mt, err, "Drop error")

Expand Down Expand Up @@ -910,7 +910,7 @@ func TestClientBulkWrite(t *testing.T) {

mt.ResetClient(options.Client().SetMonitor(monitor))

coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, true)
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, false)
err = coll.Drop(context.Background())
require.NoError(mt, err, "Drop error")

Expand Down
53 changes: 53 additions & 0 deletions mongo/integration/csot_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/internal/integtest"
"go.mongodb.org/mongo-driver/internal/require"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down Expand Up @@ -160,4 +161,56 @@ func TestCSOTProse(t *testing.T) {
"expected ping to fail within 150ms")
})
})
mt.RunOpts("11. multi-batch bulkWrites", mtest.NewOptions().MinServerVersion("8.0").
AtlasDataLake(false).Topologies(mtest.Single), func(mt *mtest.T) {
coll := mt.CreateCollection(mtest.Collection{DB: "db", Name: "coll"}, false)
err := coll.Drop(context.Background())
require.NoError(mt, err, "Drop error: %v", err)

mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 2,
},
Data: mtest.FailPointData{
FailCommands: []string{"bulkWrite"},
BlockConnection: true,
BlockTimeMS: 1010,
},
})

var hello struct {
MaxBsonObjectSize int
MaxMessageSizeBytes int
}
err = mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello)
require.NoError(mt, err, "Hello error: %v", err)

models := &mongo.ClientWriteModels{}
n := hello.MaxMessageSizeBytes/hello.MaxBsonObjectSize + 1
for i := 0; i < n; i++ {
models.
AppendInsertOne("db", "coll", &mongo.ClientInsertOneModel{
Document: bson.D{{"a", strings.Repeat("b", hello.MaxBsonObjectSize-500)}},
})
}

var cnt int
cm := &event.CommandMonitor{
Started: func(_ context.Context, evt *event.CommandStartedEvent) {
if evt.CommandName == "bulkWrite" {
cnt++
}
},
}
cliOptions := options.Client().
SetTimeout(2 * time.Second).
SetMonitor(cm).
ApplyURI(mtest.ClusterURI())
cli, err := mongo.Connect(context.Background(), cliOptions)
require.NoError(mt, err, "Connect error: %v", err)
_, err = cli.BulkWrite(context.Background(), models)
assert.ErrorContains(mt, err, "context deadline exceeded", "expected a timeout error, got: %v", err)
assert.Equal(mt, 2, cnt, "expected bulkWrite calls: %d, got: %d", 2, cnt)
})
}
59 changes: 50 additions & 9 deletions mongo/integration/unified/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ import (
// expectedError represents an error that is expected to occur during a test. This type ignores the "isError" field in
// test files because it is always true if it is specified, so the runner can simply assert that an error occurred.
type expectedError struct {
IsClientError *bool `bson:"isClientError"`
IsTimeoutError *bool `bson:"isTimeoutError"`
ErrorSubstring *string `bson:"errorContains"`
Code *int32 `bson:"errorCode"`
CodeName *string `bson:"errorCodeName"`
IncludedLabels []string `bson:"errorLabelsContain"`
OmittedLabels []string `bson:"errorLabelsOmit"`
ExpectedResult *bson.RawValue `bson:"expectResult"`
ErrorResponse *bson.Raw `bson:"errorResponse"`
IsClientError *bool `bson:"isClientError"`
IsTimeoutError *bool `bson:"isTimeoutError"`
ErrorSubstring *string `bson:"errorContains"`
Code *int32 `bson:"errorCode"`
CodeName *string `bson:"errorCodeName"`
IncludedLabels []string `bson:"errorLabelsContain"`
OmittedLabels []string `bson:"errorLabelsOmit"`
ExpectedResult *bson.RawValue `bson:"expectResult"`
ErrorResponse *bson.Raw `bson:"errorResponse"`
WriteErrors map[int]clientBulkWriteException `bson:"writeErrors"`
WriteConcernErrors []clientBulkWriteException `bson:"writeConcernErrors"`
}

type clientBulkWriteException struct {
Code *int `bson:"code"`
Message *string `bson:"message"`
}

// verifyOperationError compares the expected error to the actual operation result. If the expected parameter is nil,
Expand Down Expand Up @@ -140,6 +147,40 @@ func verifyOperationError(ctx context.Context, expected *expectedError, result *
return fmt.Errorf("error response comparison error: %w", err)
}
}
if expected.WriteErrors != nil {
var exception mongo.ClientBulkWriteException
if !errors.As(result.Err, &exception) {
return fmt.Errorf("expected a ClientBulkWriteException, got %T", result.Err)
}
if len(expected.WriteErrors) != len(exception.WriteErrors) {
return fmt.Errorf("expected errors: %v, got: %v", expected.WriteErrors, exception.WriteErrors)
}
for k, e := range expected.WriteErrors {
if e.Code != nil && *e.Code != exception.WriteErrors[k].Code {
return fmt.Errorf("expected errors: %v, got: %v", expected.WriteConcernErrors, exception.WriteConcernErrors)
}
if e.Message != nil && *e.Message != exception.WriteErrors[k].Message {
return fmt.Errorf("expected errors: %v, got: %v", expected.WriteConcernErrors, exception.WriteConcernErrors)
}
}
}
if expected.WriteConcernErrors != nil {
var exception mongo.ClientBulkWriteException
if !errors.As(result.Err, &exception) {
return fmt.Errorf("expected a ClientBulkWriteException, got %T", result.Err)
}
if len(expected.WriteConcernErrors) != len(exception.WriteConcernErrors) {
return fmt.Errorf("expected errors: %v, got: %v", expected.WriteConcernErrors, exception.WriteConcernErrors)
}
for i, e := range expected.WriteConcernErrors {
if e.Code != nil && *e.Code != exception.WriteConcernErrors[i].Code {
return fmt.Errorf("expected errors: %v, got: %v", expected.WriteConcernErrors, exception.WriteConcernErrors)
}
if e.Message != nil && *e.Message != exception.WriteConcernErrors[i].Message {
return fmt.Errorf("expected errors: %v, got: %v", expected.WriteConcernErrors, exception.WriteConcernErrors)
}
}
}
return nil
}

Expand Down
Loading

0 comments on commit f0cd171

Please sign in to comment.