Skip to content

Commit

Permalink
Merge pull request #16 from idealo/long_running_tests
Browse files Browse the repository at this point in the history
Long running tests
  • Loading branch information
manuelkasiske4idealo authored Nov 13, 2024
2 parents 8823d2d + 0cd8379 commit 646b78f
Show file tree
Hide file tree
Showing 8 changed files with 576 additions and 264 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ GOBUILD=$(GOCMD) build
GOCLEAN=$(GOCMD) clean
GOTEST=$(GOCMD) test
BINARY_NAME=mongo-bench
MAIN_FILE=main.go

.PHONY: all
all: build

build: test
$(GOBUILD) -o $(BINARY_NAME) $(MAIN_FILE)
$(GOBUILD) -o $(BINARY_NAME) *.go
@echo "Build complete: $(BINARY_NAME)"

run:
Expand Down
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ After building the tool, run it with customizable parameters:

- `-threads`: Number of concurrent threads to use for inserting, updating, deleting, or upserting documents.
- `-docs`: Total number of documents to process during the benchmark.
- `-duration`: Duration of the test in seconds (default: 0 seconds).
- `-largeDocs`: Use large documents (2K) (default: false).
- `-dropDb`: Drop the database before running the test (default: true).
- `-uri`: MongoDB connection URI.
- `-type`: Type of test to run. Accepts `insert`, `update`, `delete`, `upsert`, or `runAll`:
- `insert`: The tool will insert new documents.
- `update`: The tool will update existing documents (requires that documents have been inserted in a prior run).
- `delete`: The tool will delete existing documents.
- `upsert`: The tool will perform upserts, repeatedly updating a specified range.
- `runAll`: Runs the `insert`, `update`, `delete`, and `upsert` tests sequentially.
- `delete`: The tool will delete existing documents. (just if `docs` is given)
- `upsert`: The tool will perform upserts, repeatedly updating a specified range. (just if `docs` is given)
- `runAll`: Runs the `insert`, `update`, `delete`, and `upsert` tests sequentially. (just if `docs` is given)
- `runAll`: Runs the `insert`, `update` tests sequentially. (just if `duration` is given)

### Example Commands

Expand Down
80 changes: 80 additions & 0 deletions collection_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
)

// CollectionAPI defines an interface for MongoDB operations, allowing for testing
type CollectionAPI interface {
InsertOne(ctx context.Context, document interface{}) (*mongo.InsertOneResult, error)
UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error)
CountDocuments(ctx context.Context, filter interface{}) (int64, error)
Drop(ctx context.Context) error
Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error)
}

// MongoDBCollection is a wrapper around mongo.Collection to implement CollectionAPI
type MongoDBCollection struct {
*mongo.Collection
}

func (c *MongoDBCollection) InsertOne(ctx context.Context, document interface{}) (*mongo.InsertOneResult, error) {
return c.Collection.InsertOne(ctx, document)
}

func (c *MongoDBCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
return c.Collection.UpdateOne(ctx, filter, update, opts...)
}

func (c *MongoDBCollection) DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error) {
return c.Collection.DeleteOne(ctx, filter)
}

func (c *MongoDBCollection) CountDocuments(ctx context.Context, filter interface{}) (int64, error) {
return c.Collection.CountDocuments(ctx, filter)
}

func (c *MongoDBCollection) Drop(ctx context.Context) error {
return c.Collection.Drop(ctx)
}

func (c *MongoDBCollection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) {
return c.Collection.Find(ctx, filter, opts...)
}

func fetchDocumentIDs(collection CollectionAPI) ([]primitive.ObjectID, error) {
var docIDs []primitive.ObjectID

cursor, err := collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1}))
if err != nil {
return nil, fmt.Errorf("failed to fetch document IDs: %v", err)
}
defer cursor.Close(context.Background())

for cursor.Next(context.Background()) {
var result bson.M
if err := cursor.Decode(&result); err != nil {
log.Printf("Failed to decode document: %v", err)
continue
}
// Check if `_id` is of type `ObjectId` and add to `docIDs`
if id, ok := result["_id"].(primitive.ObjectID); ok {
docIDs = append(docIDs, id)
} else {
log.Printf("Skipping document with unsupported _id type: %T", result["_id"])
}
}

if err := cursor.Err(); err != nil {
return nil, fmt.Errorf("cursor error: %v", err)
}

return docIDs, nil
}
200 changes: 200 additions & 0 deletions docs_testing_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package main

import (
"context"
"encoding/csv"
"fmt"
"github.com/rcrowley/go-metrics"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
"math/rand"
"os"
"sync"
"time"
)

type DocCountTestingStrategy struct{}

func (t DocCountTestingStrategy) runTestSequence(collection CollectionAPI, config TestingConfig) {
tests := []string{"insert", "update", "delete", "upsert"}
for _, test := range tests {
t.runTest(collection, test, config, fetchDocumentIDs)
}
}

func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) {
if testType == "insert" || testType == "upsert" {
if config.DropDb {
if err := collection.Drop(context.Background()); err != nil {
log.Fatalf("Failed to drop collection: %v", err)
}
log.Println("Collection dropped. Starting new rate test...")
} else {
log.Println("Collection stays. Dropping disabled.")
}
} else {
log.Printf("Starting %s test...\n", testType)
}

insertRate := metrics.NewMeter()
var records [][]string
records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"})

var partitions [][]primitive.ObjectID

var threads = config.Threads
var docCount = config.DocCount

// Prepare partitions based on test type
switch testType {
case "delete":
// Fetch document IDs as ObjectId and partition them
docIDs, err := fetchDocIDs(collection)
if err != nil {
log.Fatalf("Failed to fetch document IDs: %v", err)
}
partitions = make([][]primitive.ObjectID, threads)
for i, id := range docIDs {
partitions[i%threads] = append(partitions[i%threads], id)
}

case "insert", "upsert":
partitions = make([][]primitive.ObjectID, threads)
for i := 0; i < docCount; i++ {
partitions[i%threads] = append(partitions[i%threads], primitive.NewObjectID())
}

case "update":
docIDs, err := fetchDocIDs(collection)
if err != nil {
log.Fatalf("Failed to fetch document IDs: %v", err)
}

partitions = make([][]primitive.ObjectID, threads)
for i := 0; i < len(docIDs); i++ {
docID := docIDs[rand.Intn(len(docIDs))]
partitions[i%threads] = append(partitions[i%threads], docID)
}
}

// Start the ticker just before starting the main workload goroutines
secondTicker := time.NewTicker(1 * time.Second)
defer secondTicker.Stop()
go func() {
for range secondTicker.C {
timestamp := time.Now().Unix()
count := insertRate.Count()
mean := insertRate.RateMean()
m1Rate := insertRate.Rate1()
m5Rate := insertRate.Rate5()
m15Rate := insertRate.Rate15()

log.Printf("Timestamp: %d, Document Count: %d, Mean Rate: %.2f docs/sec, m1_rate: %.2f, m5_rate: %.2f, m15_rate: %.2f",
timestamp, count, mean, m1Rate, m5Rate, m15Rate)

record := []string{
fmt.Sprintf("%d", timestamp),
fmt.Sprintf("%d", count),
fmt.Sprintf("%.6f", mean),
fmt.Sprintf("%.6f", m1Rate),
fmt.Sprintf("%.6f", m5Rate),
fmt.Sprintf("%.6f", m15Rate),
}
records = append(records, record)
}
}()

// Launch threads based on the specific workload type
var wg sync.WaitGroup
wg.Add(threads)

for i := 0; i < threads; i++ {
go func(partition []primitive.ObjectID) {
defer wg.Done()
for _, docID := range partition {
switch testType {
case "insert":
// Let MongoDB generate the _id automatically
doc := bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1}
_, err := collection.InsertOne(context.Background(), doc)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Insert failed: %v", err)
}

case "update":
filter := bson.M{"_id": docID}
update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}}
_, err := collection.UpdateOne(context.Background(), filter, update)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Update failed for _id %v: %v", docID, err)
}

case "upsert":
randomDocID := partition[rand.Intn(len(partition)/2)]
filter := bson.M{"_id": randomDocID}
update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}}
opts := options.Update().SetUpsert(true)
_, err := collection.UpdateOne(context.Background(), filter, update, opts)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Upsert failed for _id %v: %v", docID, err)
}

case "delete":
// Use ObjectId in the filter for delete
filter := bson.M{"_id": docID}
result, err := collection.DeleteOne(context.Background(), filter)
if err != nil {
log.Printf("Delete failed for _id %v: %v", docID, err)
continue // Move to next document without retrying
}
if result.DeletedCount > 0 {
insertRate.Mark(1)
}
}
}
}(partitions[i])
}

wg.Wait()

// Final metrics recording
timestamp := time.Now().Unix()
count := insertRate.Count()
mean := insertRate.RateMean()
m1Rate := insertRate.Rate1()
m5Rate := insertRate.Rate5()
m15Rate := insertRate.Rate15()

finalRecord := []string{
fmt.Sprintf("%d", timestamp),
fmt.Sprintf("%d", count),
fmt.Sprintf("%.6f", mean),
fmt.Sprintf("%.6f", m1Rate),
fmt.Sprintf("%.6f", m5Rate),
fmt.Sprintf("%.6f", m15Rate),
}
records = append(records, finalRecord)

filename := fmt.Sprintf("benchmark_results_%s.csv", testType)
file, err := os.Create(filename)
if err != nil {
log.Fatalf("Failed to create CSV file: %v", err)
}
defer file.Close()

writer := csv.NewWriter(file)
if err := writer.WriteAll(records); err != nil {
log.Fatalf("Failed to write records to CSV: %v", err)
}
writer.Flush()

fmt.Printf("Benchmarking completed. Results saved to %s\n", filename)
}
Loading

0 comments on commit 646b78f

Please sign in to comment.