From c4268a1bbe70fb847554177c1cd6302a8880d61d Mon Sep 17 00:00:00 2001 From: Kasiske Date: Tue, 12 Nov 2024 11:52:36 +0100 Subject: [PATCH 1/5] feat: added duration test type --- Makefile | 3 +- collection_api.go | 80 +++++++++++ docs_testing_strategy.go | 196 ++++++++++++++++++++++++++ duration_testing_strategy.go | 176 +++++++++++++++++++++++ main.go | 266 ++++------------------------------- mongo_bench_test.go | 67 ++++++--- strategy.go | 14 ++ 7 files changed, 541 insertions(+), 261 deletions(-) create mode 100644 collection_api.go create mode 100644 docs_testing_strategy.go create mode 100644 duration_testing_strategy.go create mode 100644 strategy.go diff --git a/Makefile b/Makefile index 1a5e3ed..29f3631 100644 --- a/Makefile +++ b/Makefile @@ -3,13 +3,12 @@ GOBUILD=$(GOCMD) build GOCLEAN=$(GOCMD) clean GOTEST=$(GOCMD) test BINARY_NAME=mongo-bench -MAIN_FILE=main.go .PHONY: all all: build build: test - $(GOBUILD) -o $(BINARY_NAME) $(MAIN_FILE) + $(GOBUILD) -o $(BINARY_NAME) *.go @echo "Build complete: $(BINARY_NAME)" run: diff --git a/collection_api.go b/collection_api.go new file mode 100644 index 0000000..171fd80 --- /dev/null +++ b/collection_api.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "fmt" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "log" +) + +// CollectionAPI defines an interface for MongoDB operations, allowing for testing +type CollectionAPI interface { + InsertOne(ctx context.Context, document interface{}) (*mongo.InsertOneResult, error) + UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) + DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error) + CountDocuments(ctx context.Context, filter interface{}) (int64, error) + Drop(ctx context.Context) error + Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) +} + +// MongoDBCollection is a wrapper around mongo.Collection to implement CollectionAPI +type MongoDBCollection struct { + *mongo.Collection +} + +func (c *MongoDBCollection) InsertOne(ctx context.Context, document interface{}) (*mongo.InsertOneResult, error) { + return c.Collection.InsertOne(ctx, document) +} + +func (c *MongoDBCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) { + return c.Collection.UpdateOne(ctx, filter, update, opts...) +} + +func (c *MongoDBCollection) DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error) { + return c.Collection.DeleteOne(ctx, filter) +} + +func (c *MongoDBCollection) CountDocuments(ctx context.Context, filter interface{}) (int64, error) { + return c.Collection.CountDocuments(ctx, filter) +} + +func (c *MongoDBCollection) Drop(ctx context.Context) error { + return c.Collection.Drop(ctx) +} + +func (c *MongoDBCollection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) { + return c.Collection.Find(ctx, filter, opts...) +} + +func fetchDocumentIDs(collection CollectionAPI) ([]primitive.ObjectID, error) { + var docIDs []primitive.ObjectID + + cursor, err := collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1})) + if err != nil { + return nil, fmt.Errorf("failed to fetch document IDs: %v", err) + } + defer cursor.Close(context.Background()) + + for cursor.Next(context.Background()) { + var result bson.M + if err := cursor.Decode(&result); err != nil { + log.Printf("Failed to decode document: %v", err) + continue + } + // Check if `_id` is of type `ObjectId` and add to `docIDs` + if id, ok := result["_id"].(primitive.ObjectID); ok { + docIDs = append(docIDs, id) + } else { + log.Printf("Skipping document with unsupported _id type: %T", result["_id"]) + } + } + + if err := cursor.Err(); err != nil { + return nil, fmt.Errorf("cursor error: %v", err) + } + + return docIDs, nil +} diff --git a/docs_testing_strategy.go b/docs_testing_strategy.go new file mode 100644 index 0000000..ca9062c --- /dev/null +++ b/docs_testing_strategy.go @@ -0,0 +1,196 @@ +package main + +import ( + "context" + "encoding/csv" + "fmt" + "github.com/rcrowley/go-metrics" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo/options" + "log" + "math/rand" + "os" + "sync" + "time" +) + +type DocCountTestingStrategy struct{} + +func (t DocCountTestingStrategy) runTestSequence(collection CollectionAPI, config TestingConfig) { + tests := []string{"insert", "update", "delete", "upsert"} + for _, test := range tests { + t.runTest(collection, test, config, fetchDocumentIDs) + } +} + +func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) { + if testType == "insert" || testType == "upsert" { + if err := collection.Drop(context.Background()); err != nil { + log.Fatalf("Failed to drop collection: %v", err) + } + log.Println("Collection dropped. Starting new rate test...") + } else { + log.Printf("Starting %s test...\n", testType) + } + + insertRate := metrics.NewMeter() + var records [][]string + records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"}) + + var partitions [][]primitive.ObjectID + + var threads = config.Threads + var docCount = config.DocCount + + // Prepare partitions based on test type + switch testType { + case "delete": + // Fetch document IDs as ObjectId and partition them + docIDs, err := fetchDocIDs(collection) + if err != nil { + log.Fatalf("Failed to fetch document IDs: %v", err) + } + partitions = make([][]primitive.ObjectID, threads) + for i, id := range docIDs { + partitions[i%threads] = append(partitions[i%threads], id) + } + + case "insert", "upsert": + partitions = make([][]primitive.ObjectID, threads) + for i := 0; i < docCount; i++ { + partitions[i%threads] = append(partitions[i%threads], primitive.NewObjectID()) + } + + case "update": + docIDs, err := fetchDocIDs(collection) + if err != nil { + log.Fatalf("Failed to fetch document IDs: %v", err) + } + + partitions = make([][]primitive.ObjectID, threads) + for i := 0; i < len(docIDs); i++ { + docID := docIDs[rand.Intn(len(docIDs))] + partitions[i%threads] = append(partitions[i%threads], docID) + } + } + + // Start the ticker just before starting the main workload goroutines + secondTicker := time.NewTicker(1 * time.Second) + defer secondTicker.Stop() + go func() { + for range secondTicker.C { + timestamp := time.Now().Unix() + count := insertRate.Count() + mean := insertRate.RateMean() + m1Rate := insertRate.Rate1() + m5Rate := insertRate.Rate5() + m15Rate := insertRate.Rate15() + + log.Printf("Timestamp: %d, Document Count: %d, Mean Rate: %.2f docs/sec, m1_rate: %.2f, m5_rate: %.2f, m15_rate: %.2f", + timestamp, count, mean, m1Rate, m5Rate, m15Rate) + + record := []string{ + fmt.Sprintf("%d", timestamp), + fmt.Sprintf("%d", count), + fmt.Sprintf("%.6f", mean), + fmt.Sprintf("%.6f", m1Rate), + fmt.Sprintf("%.6f", m5Rate), + fmt.Sprintf("%.6f", m15Rate), + } + records = append(records, record) + } + }() + + // Launch threads based on the specific workload type + var wg sync.WaitGroup + wg.Add(threads) + + for i := 0; i < threads; i++ { + go func(partition []primitive.ObjectID) { + defer wg.Done() + for _, docID := range partition { + switch testType { + case "insert": + // Let MongoDB generate the _id automatically + doc := bson.M{"threadRunCount": 1, "rnd": rand.Int63(), "v": 1} + _, err := collection.InsertOne(context.Background(), doc) + if err == nil { + insertRate.Mark(1) + } else { + log.Printf("Insert failed: %v", err) + } + + case "update": + filter := bson.M{"_id": docID} + update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}} + _, err := collection.UpdateOne(context.Background(), filter, update) + if err == nil { + insertRate.Mark(1) + } else { + log.Printf("Update failed for _id %v: %v", docID, err) + } + + case "upsert": + randomDocID := partition[rand.Intn(len(partition)/2)] + filter := bson.M{"_id": randomDocID} + update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}} + opts := options.Update().SetUpsert(true) + _, err := collection.UpdateOne(context.Background(), filter, update, opts) + if err == nil { + insertRate.Mark(1) + } else { + log.Printf("Upsert failed for _id %v: %v", docID, err) + } + + case "delete": + // Use ObjectId in the filter for delete + filter := bson.M{"_id": docID} + result, err := collection.DeleteOne(context.Background(), filter) + if err != nil { + log.Printf("Delete failed for _id %v: %v", docID, err) + continue // Move to next document without retrying + } + if result.DeletedCount > 0 { + insertRate.Mark(1) + } + } + } + }(partitions[i]) + } + + wg.Wait() + + // Final metrics recording + timestamp := time.Now().Unix() + count := insertRate.Count() + mean := insertRate.RateMean() + m1Rate := insertRate.Rate1() + m5Rate := insertRate.Rate5() + m15Rate := insertRate.Rate15() + + finalRecord := []string{ + fmt.Sprintf("%d", timestamp), + fmt.Sprintf("%d", count), + fmt.Sprintf("%.6f", mean), + fmt.Sprintf("%.6f", m1Rate), + fmt.Sprintf("%.6f", m5Rate), + fmt.Sprintf("%.6f", m15Rate), + } + records = append(records, finalRecord) + + filename := fmt.Sprintf("benchmark_results_%s.csv", testType) + file, err := os.Create(filename) + if err != nil { + log.Fatalf("Failed to create CSV file: %v", err) + } + defer file.Close() + + writer := csv.NewWriter(file) + if err := writer.WriteAll(records); err != nil { + log.Fatalf("Failed to write records to CSV: %v", err) + } + writer.Flush() + + fmt.Printf("Benchmarking completed. Results saved to %s\n", filename) +} diff --git a/duration_testing_strategy.go b/duration_testing_strategy.go new file mode 100644 index 0000000..b2f29f4 --- /dev/null +++ b/duration_testing_strategy.go @@ -0,0 +1,176 @@ +package main + +import ( + "context" + "encoding/csv" + "fmt" + "go.mongodb.org/mongo-driver/bson/primitive" + "log" + "math/rand" + "os" + "sync" + "time" + + "github.com/rcrowley/go-metrics" + "go.mongodb.org/mongo-driver/bson" +) + +type DurationTestingStrategy struct{} + +func (t DurationTestingStrategy) runTestSequence(collection CollectionAPI, config TestingConfig) { + tests := []string{"insert", "update"} + for _, test := range tests { + t.runTest(collection, test, config, fetchDocumentIDs) + } +} + +func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) { + // Set up the timer for the duration of the test + endTime := time.Now().Add(time.Duration(config.Duration) * time.Second) + + // Set up the ticker to record metrics every second + insertRate := metrics.NewMeter() + records := [][]string{ + {"timestamp", "count", "mean_rate", "m1_rate", "m5_rate", "m15_rate"}, + } + + var partitions [][]primitive.ObjectID + if testType == "insert" { + if err := collection.Drop(context.Background()); err != nil { + log.Fatalf("Failed to clear collection before test: %v", err) + } + log.Println("Collection cleared before insert test.") + } else if testType == "update" { + docIDs, err := fetchDocIDs(collection) + if err != nil { + log.Fatalf("Failed to fetch document IDs: %v", err) + } + + if len(docIDs) == 0 { + log.Fatalf("No document IDs found for update operations") + } + + // Create partitions from fetched document IDs + partitions = make([][]primitive.ObjectID, config.Threads) + for i, id := range docIDs { + partitions[i%config.Threads] = append(partitions[i%config.Threads], id) + } + } + + secondTicker := time.NewTicker(1 * time.Second) + defer secondTicker.Stop() + go func() { + for range secondTicker.C { + timestamp := time.Now().Unix() + count := insertRate.Count() + mean := insertRate.RateMean() + m1Rate := insertRate.Rate1() + m5Rate := insertRate.Rate5() + m15Rate := insertRate.Rate15() + + log.Printf("Timestamp: %d, Document Count: %d, Mean Rate: %.2f docs/sec, m1_rate: %.2f, m5_rate: %.2f, m15_rate: %.2f", + timestamp, count, mean, m1Rate, m5Rate, m15Rate) + + record := []string{ + fmt.Sprintf("%d", timestamp), + fmt.Sprintf("%d", count), + fmt.Sprintf("%.6f", mean), + fmt.Sprintf("%.6f", m1Rate), + fmt.Sprintf("%.6f", m5Rate), + fmt.Sprintf("%.6f", m15Rate), + } + records = append(records, record) + } + }() + + // Launch the workload in goroutines + var wg sync.WaitGroup + wg.Add(config.Threads) + + if testType == "insert" { + // Insert operations using generated IDs + for i := 0; i < config.Threads; i++ { + go func() { + defer wg.Done() + + for time.Now().Before(endTime) { + // Insert without specifying an ID; MongoDB will auto-generate it + doc := bson.M{"rnd": rand.Int63(), "v": 1} + _, err := collection.InsertOne(context.Background(), doc) + if err == nil { + insertRate.Mark(1) + } else { + log.Printf("Insert failed: %v", err) + } + } + }() + } + } else { + for i := 0; i < config.Threads; i++ { + // Check if the partition is non-empty for this thread + if len(partitions) <= i || len(partitions[i]) == 0 { + log.Printf("Skipping empty partition for thread %d in %s operation", i, testType) + wg.Done() + continue + } + partition := partitions[i] + + go func(partition []primitive.ObjectID) { + defer wg.Done() + + for time.Now().Before(endTime) { + docID := partition[rand.Intn(len(partition))] + + switch testType { + case "update": + filter := bson.M{"_id": docID} + update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}} + _, err := collection.UpdateOne(context.Background(), filter, update) + if err == nil { + insertRate.Mark(1) + } else { + log.Printf("Update failed for _id %v: %v", docID, err) + } + } + } + }(partition) + } + } + + // Wait for all threads to complete + wg.Wait() + + // Final metrics recording + timestamp := time.Now().Unix() + count := insertRate.Count() + mean := insertRate.RateMean() + m1Rate := insertRate.Rate1() + m5Rate := insertRate.Rate5() + m15Rate := insertRate.Rate15() + + finalRecord := []string{ + fmt.Sprintf("%d", timestamp), + fmt.Sprintf("%d", count), + fmt.Sprintf("%.6f", mean), + fmt.Sprintf("%.6f", m1Rate), + fmt.Sprintf("%.6f", m5Rate), + fmt.Sprintf("%.6f", m15Rate), + } + records = append(records, finalRecord) + + // Write metrics to CSV file + filename := fmt.Sprintf("benchmark_results_%s.csv", testType) + file, err := os.Create(filename) + if err != nil { + log.Fatalf("Failed to create CSV file: %v", err) + } + defer file.Close() + + writer := csv.NewWriter(file) + if err := writer.WriteAll(records); err != nil { + log.Fatalf("Failed to write records to CSV: %v", err) + } + writer.Flush() + + fmt.Printf("Benchmarking completed. Results saved to %s\n", filename) +} diff --git a/main.go b/main.go index 0e4c1dc..9d6782c 100644 --- a/main.go +++ b/main.go @@ -2,93 +2,18 @@ package main import ( "context" - "encoding/csv" "flag" - "fmt" - "log" - "math/rand" - "os" - "sync" - "time" - - "github.com/rcrowley/go-metrics" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "log" ) -// CollectionAPI defines an interface for MongoDB operations, allowing for testing -type CollectionAPI interface { - InsertOne(ctx context.Context, document interface{}) (*mongo.InsertOneResult, error) - UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) - DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error) - CountDocuments(ctx context.Context, filter interface{}) (int64, error) - Drop(ctx context.Context) error - Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) -} - -// MongoDBCollection is a wrapper around mongo.Collection to implement CollectionAPI -type MongoDBCollection struct { - *mongo.Collection -} - -func (c *MongoDBCollection) InsertOne(ctx context.Context, document interface{}) (*mongo.InsertOneResult, error) { - return c.Collection.InsertOne(ctx, document) -} - -func (c *MongoDBCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) { - return c.Collection.UpdateOne(ctx, filter, update, opts...) -} - -func (c *MongoDBCollection) DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error) { - return c.Collection.DeleteOne(ctx, filter) -} - -func (c *MongoDBCollection) CountDocuments(ctx context.Context, filter interface{}) (int64, error) { - return c.Collection.CountDocuments(ctx, filter) -} - -func (c *MongoDBCollection) Drop(ctx context.Context) error { - return c.Collection.Drop(ctx) -} - -func (c *MongoDBCollection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) { - return c.Collection.Find(ctx, filter, opts...) -} - -// fetchDocumentIDs fetches all document IDs from the collection for delete operations -func fetchDocumentIDs(collection CollectionAPI) ([]int64, error) { - var docIDs []int64 - - cursor, err := collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1})) - if err != nil { - return nil, fmt.Errorf("failed to fetch document IDs: %v", err) - } - defer cursor.Close(context.Background()) - - for cursor.Next(context.Background()) { - var result bson.M - if err := cursor.Decode(&result); err != nil { - log.Printf("Failed to decode document: %v", err) - continue - } - if id, ok := result["_id"].(int64); ok { - docIDs = append(docIDs, id) - } - } - - if err := cursor.Err(); err != nil { - return nil, fmt.Errorf("cursor error: %v", err) - } - - return docIDs, nil -} - func main() { var threads int var docCount int var uri string var testType string + var duration int var runAll bool flag.IntVar(&threads, "threads", 10, "Number of threads for inserting, updating, upserting, or deleting documents") @@ -96,8 +21,12 @@ func main() { flag.StringVar(&uri, "uri", "mongodb://localhost:27017", "MongoDB URI") flag.StringVar(&testType, "type", "insert", "Test type: insert, update, upsert, or delete") flag.BoolVar(&runAll, "runAll", false, "Run all tests in order: insert, update, delete, upsert") + flag.IntVar(&duration, "duration", 0, "Duration in seconds to run the test") flag.Parse() + var strategy TestingStrategy + var config TestingConfig + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(uri)) if err != nil { log.Fatalf("Failed to connect to MongoDB: %v", err) @@ -107,175 +36,30 @@ func main() { collection := client.Database("benchmarking").Collection("testdata") mongoCollection := &MongoDBCollection{Collection: collection} - if runAll { - runTestSequence(mongoCollection, threads, docCount) - } else { - runTest(mongoCollection, testType, threads, docCount, fetchDocumentIDs) - } -} - -func runTestSequence(collection CollectionAPI, threads, docCount int) { - tests := []string{"insert", "update", "delete", "upsert"} - for _, test := range tests { - runTest(collection, test, threads, docCount, fetchDocumentIDs) - } -} - -func runTest(collection CollectionAPI, testType string, threads, docCount int, fetchDocIDs func(CollectionAPI) ([]int64, error)) { - if testType == "insert" || testType == "upsert" { - if err := collection.Drop(context.Background()); err != nil { - log.Fatalf("Failed to drop collection: %v", err) + // if duration is given by the user we neeed to initialise a strategy for duration testing + if duration > 0 { + strategy = DurationTestingStrategy{} + config = TestingConfig{ + Threads: threads, + Duration: duration, } - log.Println("Collection dropped. Starting new rate test...") - } else { - log.Printf("Starting %s test...\n", testType) - } - - insertRate := metrics.NewMeter() - var records [][]string - records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"}) - var partitions [][]int64 // To hold the document IDs or dummy IDs, partitioned for each thread - - // Prepare partitions based on test type - switch testType { - case "delete": - // Fetch document IDs and partition them - docIDs, err := fetchDocIDs(collection) - if err != nil { - log.Fatalf("Failed to fetch document IDs: %v", err) - } - partitions = make([][]int64, threads) - for i, id := range docIDs { - partitions[i%threads] = append(partitions[i%threads], id) + if runAll { + strategy.runTestSequence(mongoCollection, config) + } else { + strategy.runTest(mongoCollection, testType, config, fetchDocumentIDs) } - - case "insert", "update", "upsert": - // Generate unique or random IDs for insert/update/upsert - partitions = make([][]int64, threads) - for i := 0; i < docCount; i++ { - id := int64(i) - if testType == "update" || testType == "upsert" { - id = int64(rand.Intn(docCount)) // Random ID for update/upsert - } - partitions[i%threads] = append(partitions[i%threads], id) + } else { + strategy = DocCountTestingStrategy{} + config = TestingConfig{ + Threads: threads, + DocCount: docCount, } - } - - // Start the ticker just before starting the main workload goroutines - secondTicker := time.NewTicker(1 * time.Second) - defer secondTicker.Stop() - go func() { - for range secondTicker.C { - timestamp := time.Now().Unix() - count := insertRate.Count() - mean := insertRate.RateMean() - m1Rate := insertRate.Rate1() - m5Rate := insertRate.Rate5() - m15Rate := insertRate.Rate15() - - log.Printf("Timestamp: %d, Document Count: %d, Mean Rate: %.2f docs/sec, m1_rate: %.2f, m5_rate: %.2f, m15_rate: %.2f", - timestamp, count, mean, m1Rate, m5Rate, m15Rate) - - record := []string{ - fmt.Sprintf("%d", timestamp), - fmt.Sprintf("%d", count), - fmt.Sprintf("%.6f", mean), - fmt.Sprintf("%.6f", m1Rate), - fmt.Sprintf("%.6f", m5Rate), - fmt.Sprintf("%.6f", m15Rate), - fmt.Sprintf("%.6f", mean), - } - records = append(records, record) + if runAll { + strategy.runTestSequence(mongoCollection, config) + } else { + strategy.runTest(mongoCollection, testType, config, fetchDocumentIDs) } - }() - - // Launch threads based on the specific workload type - var wg sync.WaitGroup - wg.Add(threads) - - for i := 0; i < threads; i++ { - go func(partition []int64) { - defer wg.Done() - for _, docID := range partition { - switch testType { - case "insert": - doc := bson.M{"_id": docID, "threadRunCount": 1, "rnd": rand.Int63(), "v": 1} - _, err := collection.InsertOne(context.Background(), doc) - if err == nil { - insertRate.Mark(1) - } else { - log.Printf("Insert failed: %v", err) - } - - case "update": - filter := bson.M{"_id": docID} - update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}} - _, err := collection.UpdateOne(context.Background(), filter, update) - if err == nil { - insertRate.Mark(1) - } else { - log.Printf("Update failed: %v", err) - } - - case "upsert": - filter := bson.M{"_id": docID} - update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}} - opts := options.Update().SetUpsert(true) - _, err := collection.UpdateOne(context.Background(), filter, update, opts) - if err == nil { - insertRate.Mark(1) - } else { - log.Printf("Upsert failed: %v", err) - } - - case "delete": - filter := bson.M{"_id": docID} - result, err := collection.DeleteOne(context.Background(), filter) - if err != nil { - log.Printf("Delete failed for _id %d: %v", docID, err) - continue // Move to next document without retrying - } - if result.DeletedCount > 0 { - insertRate.Mark(1) - } - } - } - }(partitions[i]) - } - - wg.Wait() - - // Final metrics recording - timestamp := time.Now().Unix() - count := insertRate.Count() - mean := insertRate.RateMean() - m1Rate := insertRate.Rate1() - m5Rate := insertRate.Rate5() - m15Rate := insertRate.Rate15() - - finalRecord := []string{ - fmt.Sprintf("%d", timestamp), - fmt.Sprintf("%d", count), - fmt.Sprintf("%.6f", mean), - fmt.Sprintf("%.6f", m1Rate), - fmt.Sprintf("%.6f", m5Rate), - fmt.Sprintf("%.6f", m15Rate), - } - records = append(records, finalRecord) - - filename := fmt.Sprintf("benchmark_results_%s.csv", testType) - file, err := os.Create(filename) - if err != nil { - log.Fatalf("Failed to create CSV file: %v", err) - } - defer file.Close() - - writer := csv.NewWriter(file) - if err := writer.WriteAll(records); err != nil { - log.Fatalf("Failed to write records to CSV: %v", err) } - writer.Flush() - fmt.Printf("Benchmarking completed. Results saved to %s\n", filename) } diff --git a/mongo_bench_test.go b/mongo_bench_test.go index 9e2e385..9936381 100644 --- a/mongo_bench_test.go +++ b/mongo_bench_test.go @@ -3,6 +3,7 @@ package main import ( "context" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "testing" "github.com/stretchr/testify/assert" @@ -11,6 +12,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) +// MockCollection to mock MongoDB collection operations type MockCollection struct { mock.Mock } @@ -45,68 +47,97 @@ func (m *MockCollection) Find(ctx context.Context, filter interface{}, opts ...* return args.Get(0).(*mongo.Cursor), args.Error(1) } -func fetchDocumentIDsMock(_ CollectionAPI) ([]int64, error) { - return []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil +// fetchDocumentIDsMock returns a slice of mock ObjectIDs for testing +func fetchDocumentIDsMock(_ CollectionAPI) ([]primitive.ObjectID, error) { + return []primitive.ObjectID{ + primitive.NewObjectID(), + primitive.NewObjectID(), + primitive.NewObjectID(), + primitive.NewObjectID(), + primitive.NewObjectID(), + primitive.NewObjectID(), + primitive.NewObjectID(), + primitive.NewObjectID(), + primitive.NewObjectID(), + primitive.NewObjectID(), + }, nil } +// TestInsertOperation tests the insert operation using DocCountTestingStrategy func TestInsertOperation(t *testing.T) { mockCollection := new(MockCollection) - docCount := 10 - threads := 2 + config := TestingConfig{ + Threads: 2, + DocCount: 10, + } + strategy := DocCountTestingStrategy{} testType := "insert" mockCollection.On("Drop", mock.Anything).Return(nil) mockCollection.On("InsertOne", mock.Anything, mock.Anything).Return(&mongo.InsertOneResult{}, nil) - runTest(mockCollection, testType, threads, docCount, fetchDocumentIDsMock) + strategy.runTest(mockCollection, testType, config, fetchDocumentIDsMock) mockCollection.AssertNumberOfCalls(t, "Drop", 1) - mockCollection.AssertNumberOfCalls(t, "InsertOne", docCount) + mockCollection.AssertNumberOfCalls(t, "InsertOne", config.DocCount) } +// TestUpdateOperation tests the update operation using DocCountTestingStrategy func TestUpdateOperation(t *testing.T) { mockCollection := new(MockCollection) - docCount := 10 - threads := 2 + config := TestingConfig{ + Threads: 2, + DocCount: 10, + } + strategy := DocCountTestingStrategy{} testType := "update" mockCollection.On("UpdateOne", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&mongo.UpdateResult{}, nil) - runTest(mockCollection, testType, threads, docCount, fetchDocumentIDsMock) + strategy.runTest(mockCollection, testType, config, fetchDocumentIDsMock) - expectedCalls := docCount + expectedCalls := config.DocCount mockCollection.AssertNumberOfCalls(t, "UpdateOne", expectedCalls) } +// TestUpsertOperation tests the upsert operation using DocCountTestingStrategy func TestUpsertOperation(t *testing.T) { mockCollection := new(MockCollection) - docCount := 10 - threads := 2 + config := TestingConfig{ + Threads: 2, + DocCount: 10, + } + strategy := DocCountTestingStrategy{} testType := "upsert" mockCollection.On("Drop", mock.Anything).Return(nil) mockCollection.On("UpdateOne", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&mongo.UpdateResult{UpsertedCount: 1}, nil) - runTest(mockCollection, testType, threads, docCount, fetchDocumentIDsMock) + strategy.runTest(mockCollection, testType, config, fetchDocumentIDsMock) mockCollection.AssertNumberOfCalls(t, "Drop", 1) - mockCollection.AssertNumberOfCalls(t, "UpdateOne", docCount) + mockCollection.AssertNumberOfCalls(t, "UpdateOne", config.DocCount) } +// TestDeleteOperation tests the delete operation using DocCountTestingStrategy func TestDeleteOperation(t *testing.T) { mockCollection := new(MockCollection) - docCount := 10 - threads := 2 + config := TestingConfig{ + Threads: 2, + DocCount: 10, + } + strategy := DocCountTestingStrategy{} testType := "delete" mockCollection.On("DeleteOne", mock.Anything, mock.Anything).Return(&mongo.DeleteResult{DeletedCount: 1}, nil) - runTest(mockCollection, testType, threads, docCount, fetchDocumentIDsMock) + strategy.runTest(mockCollection, testType, config, fetchDocumentIDsMock) - expectedCalls := docCount + expectedCalls := config.DocCount mockCollection.AssertNumberOfCalls(t, "DeleteOne", expectedCalls) } +// TestCountDocuments verifies the CountDocuments method in isolation func TestCountDocuments(t *testing.T) { mockCollection := new(MockCollection) diff --git a/strategy.go b/strategy.go new file mode 100644 index 0000000..072a8ff --- /dev/null +++ b/strategy.go @@ -0,0 +1,14 @@ +package main + +import "go.mongodb.org/mongo-driver/bson/primitive" + +type TestingConfig struct { + Threads int + DocCount int + Duration int +} + +type TestingStrategy interface { + runTestSequence(collection CollectionAPI, config TestingConfig) + runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) +} From 139fcf66fdfdc71c416cfc3868d9751b594b5c27 Mon Sep 17 00:00:00 2001 From: Kasiske Date: Wed, 13 Nov 2024 11:33:40 +0100 Subject: [PATCH 2/5] feat: added a large document option for long running tests --- docs_testing_strategy.go | 2 +- duration_testing_strategy.go | 14 ++++++++++++-- main.go | 7 +++++-- strategy.go | 7 ++++--- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/docs_testing_strategy.go b/docs_testing_strategy.go index ca9062c..d78767a 100644 --- a/docs_testing_strategy.go +++ b/docs_testing_strategy.go @@ -113,7 +113,7 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri switch testType { case "insert": // Let MongoDB generate the _id automatically - doc := bson.M{"threadRunCount": 1, "rnd": rand.Int63(), "v": 1} + doc := bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1} _, err := collection.InsertOne(context.Background(), doc) if err == nil { insertRate.Mark(1) diff --git a/duration_testing_strategy.go b/duration_testing_strategy.go index b2f29f4..7dad833 100644 --- a/duration_testing_strategy.go +++ b/duration_testing_strategy.go @@ -57,6 +57,12 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri } } + var doc interface{} + var data = make([]byte, 1024*1024*10) + for i := 0; i < len(data); i++ { + data[i] = byte(rand.Intn(256)) + } + secondTicker := time.NewTicker(1 * time.Second) defer secondTicker.Stop() go func() { @@ -94,8 +100,12 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri defer wg.Done() for time.Now().Before(endTime) { - // Insert without specifying an ID; MongoDB will auto-generate it - doc := bson.M{"rnd": rand.Int63(), "v": 1} + if config.LargeDocs { + doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1, "data": data} + + } else { + doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1} + } _, err := collection.InsertOne(context.Background(), doc) if err == nil { insertRate.Mark(1) diff --git a/main.go b/main.go index 9d6782c..136688b 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ func main() { var testType string var duration int var runAll bool + var largeDocs bool flag.IntVar(&threads, "threads", 10, "Number of threads for inserting, updating, upserting, or deleting documents") flag.IntVar(&docCount, "docs", 1000, "Total number of documents to insert, update, upsert, or delete") @@ -22,6 +23,7 @@ func main() { flag.StringVar(&testType, "type", "insert", "Test type: insert, update, upsert, or delete") flag.BoolVar(&runAll, "runAll", false, "Run all tests in order: insert, update, delete, upsert") flag.IntVar(&duration, "duration", 0, "Duration in seconds to run the test") + flag.BoolVar(&largeDocs, "largeDocs", false, "Use large documents for testing") flag.Parse() var strategy TestingStrategy @@ -40,8 +42,9 @@ func main() { if duration > 0 { strategy = DurationTestingStrategy{} config = TestingConfig{ - Threads: threads, - Duration: duration, + Threads: threads, + Duration: duration, + LargeDocs: largeDocs, } if runAll { diff --git a/strategy.go b/strategy.go index 072a8ff..45d4737 100644 --- a/strategy.go +++ b/strategy.go @@ -3,9 +3,10 @@ package main import "go.mongodb.org/mongo-driver/bson/primitive" type TestingConfig struct { - Threads int - DocCount int - Duration int + Threads int + DocCount int + Duration int + LargeDocs bool } type TestingStrategy interface { From abdebbd2619dd2e2d9f8ab0930bab5f32c6809be Mon Sep 17 00:00:00 2001 From: Kasiske Date: Wed, 13 Nov 2024 11:41:50 +0100 Subject: [PATCH 3/5] chore: changed the large document size to 2KB --- duration_testing_strategy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/duration_testing_strategy.go b/duration_testing_strategy.go index 7dad833..592fc5e 100644 --- a/duration_testing_strategy.go +++ b/duration_testing_strategy.go @@ -58,7 +58,7 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri } var doc interface{} - var data = make([]byte, 1024*1024*10) + var data = make([]byte, 1024*2) for i := 0; i < len(data); i++ { data[i] = byte(rand.Intn(256)) } From c0bcc3f58cebe3726bf714c9dc5ebcf6b7e5d247 Mon Sep 17 00:00:00 2001 From: Kasiske Date: Wed, 13 Nov 2024 13:17:19 +0100 Subject: [PATCH 4/5] feat: added dropDb flag (default: true) --- README.md | 10 +++++++--- docs_testing_strategy.go | 10 +++++++--- duration_testing_strategy.go | 10 +++++++--- main.go | 3 +++ strategy.go | 1 + 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 83eb899..5b2c8e0 100644 --- a/README.md +++ b/README.md @@ -35,13 +35,17 @@ After building the tool, run it with customizable parameters: - `-threads`: Number of concurrent threads to use for inserting, updating, deleting, or upserting documents. - `-docs`: Total number of documents to process during the benchmark. +- `-duration`: Duration of the test in seconds (default: 0 seconds). +- `-largeDocs`: Use large documents (2K) (default: false). +- `-dropDb`: Drop the database before running the test (default: true). - `-uri`: MongoDB connection URI. - `-type`: Type of test to run. Accepts `insert`, `update`, `delete`, `upsert`, or `runAll`: - `insert`: The tool will insert new documents. - `update`: The tool will update existing documents (requires that documents have been inserted in a prior run). - - `delete`: The tool will delete existing documents. - - `upsert`: The tool will perform upserts, repeatedly updating a specified range. - - `runAll`: Runs the `insert`, `update`, `delete`, and `upsert` tests sequentially. + - `delete`: The tool will delete existing documents. (just if `docs` is given) + - `upsert`: The tool will perform upserts, repeatedly updating a specified range. (just if `docs` is given) + - `runAll`: Runs the `insert`, `update`, `delete`, and `upsert` tests sequentially. (just if `docs` is given) + - `runAll`: Runs the `insert`, `update` tests sequentially. (just if `duration` is given) ### Example Commands diff --git a/docs_testing_strategy.go b/docs_testing_strategy.go index d78767a..84e6e94 100644 --- a/docs_testing_strategy.go +++ b/docs_testing_strategy.go @@ -26,10 +26,14 @@ func (t DocCountTestingStrategy) runTestSequence(collection CollectionAPI, confi func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) { if testType == "insert" || testType == "upsert" { - if err := collection.Drop(context.Background()); err != nil { - log.Fatalf("Failed to drop collection: %v", err) + if config.DropDb { + if err := collection.Drop(context.Background()); err != nil { + log.Fatalf("Failed to drop collection: %v", err) + } + log.Println("Collection dropped. Starting new rate test...") + } else { + log.Println("Collection stays. Dropping disabled.") } - log.Println("Collection dropped. Starting new rate test...") } else { log.Printf("Starting %s test...\n", testType) } diff --git a/duration_testing_strategy.go b/duration_testing_strategy.go index 592fc5e..e41a5f4 100644 --- a/duration_testing_strategy.go +++ b/duration_testing_strategy.go @@ -36,10 +36,14 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri var partitions [][]primitive.ObjectID if testType == "insert" { - if err := collection.Drop(context.Background()); err != nil { - log.Fatalf("Failed to clear collection before test: %v", err) + if config.DropDb { + if err := collection.Drop(context.Background()); err != nil { + log.Fatalf("Failed to clear collection before test: %v", err) + } + log.Println("Collection cleared before insert test.") + } else { + log.Println("Collection stays. Dropping disabled.") } - log.Println("Collection cleared before insert test.") } else if testType == "update" { docIDs, err := fetchDocIDs(collection) if err != nil { diff --git a/main.go b/main.go index 136688b..2a7d9ea 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ func main() { var duration int var runAll bool var largeDocs bool + var dropDb bool flag.IntVar(&threads, "threads", 10, "Number of threads for inserting, updating, upserting, or deleting documents") flag.IntVar(&docCount, "docs", 1000, "Total number of documents to insert, update, upsert, or delete") @@ -24,6 +25,7 @@ func main() { flag.BoolVar(&runAll, "runAll", false, "Run all tests in order: insert, update, delete, upsert") flag.IntVar(&duration, "duration", 0, "Duration in seconds to run the test") flag.BoolVar(&largeDocs, "largeDocs", false, "Use large documents for testing") + flag.BoolVar(&dropDb, "dropDb", true, "Drop the database before running the test") flag.Parse() var strategy TestingStrategy @@ -45,6 +47,7 @@ func main() { Threads: threads, Duration: duration, LargeDocs: largeDocs, + DropDb: dropDb, } if runAll { diff --git a/strategy.go b/strategy.go index 45d4737..bf5bc96 100644 --- a/strategy.go +++ b/strategy.go @@ -7,6 +7,7 @@ type TestingConfig struct { DocCount int Duration int LargeDocs bool + DropDb bool } type TestingStrategy interface { From 0cd837997a4a1a076e7d0ddef876e4ed8f97d619 Mon Sep 17 00:00:00 2001 From: Kasiske Date: Wed, 13 Nov 2024 13:25:21 +0100 Subject: [PATCH 5/5] chore: fixed test --- mongo_bench_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mongo_bench_test.go b/mongo_bench_test.go index 9936381..92550c0 100644 --- a/mongo_bench_test.go +++ b/mongo_bench_test.go @@ -69,6 +69,7 @@ func TestInsertOperation(t *testing.T) { config := TestingConfig{ Threads: 2, DocCount: 10, + DropDb: true, } strategy := DocCountTestingStrategy{} testType := "insert" @@ -106,6 +107,7 @@ func TestUpsertOperation(t *testing.T) { config := TestingConfig{ Threads: 2, DocCount: 10, + DropDb: true, } strategy := DocCountTestingStrategy{} testType := "upsert"