diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4617714eb..df631e415 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -102,7 +102,7 @@ jobs: e2e: runs-on: ubuntu-latest - timeout-minutes: 5 + timeout-minutes: 30 env: DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet @@ -164,7 +164,7 @@ jobs: run: | export HATCHET_CLIENT_TOKEN="$(go run ./cmd/hatchet-admin token create --config ./generated/ --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52)" - go test -tags e2e ./... -p 1 -v -failfast + go test -tags e2e ./... -race -p 1 -v -failfast - name: Teardown run: docker compose down diff --git a/examples/loadtest/cli/cli_e2e_test.go b/examples/loadtest/cli/cli_e2e_test.go new file mode 100644 index 000000000..9dc88d09d --- /dev/null +++ b/examples/loadtest/cli/cli_e2e_test.go @@ -0,0 +1,66 @@ +//go:build e2e + +package main + +import ( + "testing" + "time" + + "go.uber.org/goleak" + + "github.com/hatchet-dev/hatchet/internal/testutils" +) + +func TestLoadCLI(t *testing.T) { + testutils.Prepare(t) + + type args struct { + duration time.Duration + eventsPerSecond int + delay time.Duration + wait time.Duration + concurrency int + } + tests := []struct { + name string + args args + wantErr bool + }{{ + name: "test simple with unlimited concurrency", + args: args{ + duration: 10 * time.Second, + eventsPerSecond: 10, + delay: 0 * time.Second, + wait: 20 * time.Second, + concurrency: 0, + }, + }, { + name: "test with high step delay", + args: args{ + duration: 10 * time.Second, + eventsPerSecond: 10, + delay: 10 * time.Second, + wait: 30 * time.Second, + concurrency: 0, + }, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { + time.Sleep(1 * time.Second) + + goleak.VerifyNone( + t, + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"), + ) + }() + + if err := do(tt.args.duration, tt.args.eventsPerSecond, tt.args.delay, tt.args.wait, tt.args.concurrency); (err != nil) != tt.wantErr { + t.Errorf("do() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/examples/loadtest/cli/do.go b/examples/loadtest/cli/do.go new file mode 100644 index 000000000..fdcb2edd4 --- /dev/null +++ b/examples/loadtest/cli/do.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" +) + +func do(duration time.Duration, eventsPerSecond int, delay time.Duration, wait time.Duration, concurrency int) error { + log.Printf("testing with duration=%s, eventsPerSecond=%d, delay=%s, wait=%s, concurrency=%d", duration, eventsPerSecond, delay, wait, concurrency) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + after := 10 * time.Second + + go func() { + time.Sleep(duration + after + wait + 5*time.Second) + cancel() + }() + + ch := make(chan int64, 2) + durations := make(chan time.Duration, eventsPerSecond*int(duration.Seconds())*3) + go func() { + count, uniques := run(ctx, delay, durations, concurrency) + ch <- count + ch <- uniques + }() + + time.Sleep(after) + + scheduled := make(chan time.Duration, eventsPerSecond*int(duration.Seconds())*2) + emitted := emit(ctx, eventsPerSecond, duration, scheduled) + executed := <-ch + uniques := <-ch + + log.Printf("ℹ️ emitted %d, executed %d, uniques %d, using %d events/s", emitted, executed, uniques, eventsPerSecond) + + if executed == 0 { + return fmt.Errorf("❌ no events executed") + } + + var totalDurationExecuted time.Duration + for i := 0; i < int(executed); i++ { + totalDurationExecuted += <-durations + } + durationPerEventExecuted := totalDurationExecuted / time.Duration(executed) + log.Printf("ℹ️ average duration per executed event: %s", durationPerEventExecuted) + + var totalDurationScheduled time.Duration + for i := 0; i < int(emitted); i++ { + totalDurationScheduled += <-scheduled + } + scheduleTimePerEvent := totalDurationScheduled / time.Duration(emitted) + log.Printf("ℹ️ average scheduling time per event: %s", scheduleTimePerEvent) + + if emitted != executed { + log.Printf("⚠️ warning: emitted and executed counts do not match: %d != %d", emitted, executed) + } + + if emitted != uniques { + return fmt.Errorf("❌ emitted and unique executed counts do not match: %d != %d", emitted, uniques) + } + + log.Printf("✅ success") + + return nil +} diff --git a/examples/loadtest/cli/emit.go b/examples/loadtest/cli/emit.go new file mode 100644 index 000000000..07da45836 --- /dev/null +++ b/examples/loadtest/cli/emit.go @@ -0,0 +1,71 @@ +package main + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/hatchet-dev/hatchet/pkg/client" +) + +type Event struct { + ID int64 `json:"id"` + CreatedAt time.Time `json:"created_at"` +} + +func emit(ctx context.Context, amountPerSecond int, duration time.Duration, scheduled chan<- time.Duration) int64 { + c, err := client.New() + + if err != nil { + panic(err) + } + + var id int64 + mx := sync.Mutex{} + go func() { + ticker := time.NewTicker(time.Second / time.Duration(amountPerSecond)) + defer ticker.Stop() + + timer := time.After(duration) + + for { + select { + case <-ticker.C: + mx.Lock() + id += 1 + mx.Unlock() + + go func(id int64) { + ev := Event{CreatedAt: time.Now(), ID: id} + fmt.Println("pushed event", ev.ID) + err = c.Event().Push(context.Background(), "load-test:event", ev) + if err != nil { + panic(fmt.Errorf("error pushing event: %w", err)) + } + took := time.Since(ev.CreatedAt) + fmt.Println("pushed event", ev.ID, "took", took) + scheduled <- took + }(id) + case <-timer: + log.Println("done emitting events due to timer at", id) + return + case <-ctx.Done(): + log.Println("done emitting events due to interruption at", id) + return + } + } + }() + + for { + select { + case <-ctx.Done(): + mx.Lock() + defer mx.Unlock() + return id + default: + time.Sleep(time.Second) + } + } +} diff --git a/examples/loadtest/cli/main.go b/examples/loadtest/cli/main.go new file mode 100644 index 000000000..af805d2aa --- /dev/null +++ b/examples/loadtest/cli/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "log" + "time" + + "github.com/joho/godotenv" + "github.com/spf13/cobra" +) + +func main() { + var events int + var concurrency int + var duration time.Duration + var wait time.Duration + var delay time.Duration + + var loadtest = &cobra.Command{ + Use: "loadtest", + Run: func(cmd *cobra.Command, args []string) { + err := godotenv.Load() + if err != nil { + panic(err) + } + + if err := do(duration, events, delay, wait, concurrency); err != nil { + log.Println(err) + panic("load test failed") + } + }, + } + + loadtest.Flags().IntVarP(&events, "events", "e", 10, "events per second") + loadtest.Flags().IntVarP(&concurrency, "concurrency", "c", 0, "concurrency specifies the maximum events to run at the same time") + loadtest.Flags().DurationVarP(&duration, "duration", "d", 10*time.Second, "duration specifies the total time to run the load test") + loadtest.Flags().DurationVarP(&delay, "delay", "D", 0, "delay specifies the time to wait in each event to simulate slow tasks") + loadtest.Flags().DurationVarP(&wait, "wait", "w", 10*time.Second, "wait specifies the total time to wait until events complete") + + cmd := &cobra.Command{Use: "app"} + cmd.AddCommand(loadtest) + if err := cmd.Execute(); err != nil { + panic(err) + } +} diff --git a/examples/loadtest/cli/run.go b/examples/loadtest/cli/run.go new file mode 100644 index 000000000..d148d3118 --- /dev/null +++ b/examples/loadtest/cli/run.go @@ -0,0 +1,114 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/hatchet-dev/hatchet/pkg/client" + "github.com/hatchet-dev/hatchet/pkg/worker" +) + +type stepOneOutput struct { + Message string `json:"message"` +} + +func getConcurrencyKey(ctx worker.HatchetContext) (string, error) { + return "my-key", nil +} + +func run(ctx context.Context, delay time.Duration, executions chan<- time.Duration, concurrency int) (int64, int64) { + c, err := client.New() + + if err != nil { + panic(err) + } + + w, err := worker.NewWorker( + worker.WithClient( + c, + ), + ) + + if err != nil { + panic(err) + } + + mx := sync.Mutex{} + var count int64 + var uniques int64 + var executed []int64 + + var concurrencyOpts *worker.WorkflowConcurrency + if concurrency > 0 { + concurrencyOpts = worker.Concurrency(getConcurrencyKey).MaxRuns(int32(concurrency)) + } + + err = w.On( + worker.Event("load-test:event"), + &worker.WorkflowJob{ + Name: "load-test", + Description: "Load testing", + Concurrency: concurrencyOpts, + Steps: []*worker.WorkflowStep{ + worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) { + var input Event + err = ctx.WorkflowInput(&input) + if err != nil { + return nil, err + } + + took := time.Since(input.CreatedAt) + fmt.Println("executing", input.ID, "took", took) + + mx.Lock() + executions <- took + // detect duplicate in executed slice + var duplicate bool + for i := 0; i < len(executed)-1; i++ { + if executed[i] == input.ID { + duplicate = true + fmt.Println("DUPLICATE:", input.ID) + } + } + if !duplicate { + uniques += 1 + } + count += 1 + executed = append(executed, input.ID) + mx.Unlock() + + time.Sleep(delay) + + return &stepOneOutput{ + Message: "This ran at: " + time.Now().Format(time.RFC3339Nano), + }, nil + }).SetName("step-one"), + }, + }, + ) + + if err != nil { + panic(err) + } + + go func() { + err = w.Start(ctx) + + if err != nil { + panic(err) + } + }() + + for { + select { + case <-ctx.Done(): + mx.Lock() + defer mx.Unlock() + return count, uniques + default: + time.Sleep(time.Second) + } + } +} diff --git a/examples/middleware/main.go b/examples/middleware/main.go index c33a45189..7cee75ebd 100644 --- a/examples/middleware/main.go +++ b/examples/middleware/main.go @@ -6,10 +6,11 @@ import ( "log" "time" + "github.com/joho/godotenv" + "github.com/hatchet-dev/hatchet/pkg/client" "github.com/hatchet-dev/hatchet/pkg/cmdutils" "github.com/hatchet-dev/hatchet/pkg/worker" - "github.com/joho/godotenv" ) type userCreateEvent struct { @@ -29,12 +30,13 @@ func main() { } events := make(chan string, 50) - if err := run(cmdutils.InterruptChan(), events); err != nil { + ctx, _ := cmdutils.NewInterruptContext() + if err := run(ctx, events); err != nil { panic(err) } } -func run(ch <-chan interface{}, events chan<- string) error { +func run(interruptCtx context.Context, events chan<- string) error { c, err := client.New() if err != nil { return fmt.Errorf("error creating client: %w", err) @@ -128,37 +130,32 @@ func run(ch <-chan interface{}, events chan<- string) error { return fmt.Errorf("error registering workflow: %w", err) } - interruptCtx, cancel := cmdutils.InterruptContextFromChan(ch) - defer cancel() - go func() { - err = w.Start(interruptCtx) + log.Printf("pushing event user:create:middleware") - if err != nil { - panic(err) + testEvent := userCreateEvent{ + Username: "echo-test", + UserID: "1234", + Data: map[string]string{ + "test": "test", + }, } - cancel() + // push an event + err = c.Event().Push( + context.Background(), + "user:create:middleware", + testEvent, + ) + if err != nil { + panic(fmt.Errorf("error pushing event: %w", err)) + } }() - testEvent := userCreateEvent{ - Username: "echo-test", - UserID: "1234", - Data: map[string]string{ - "test": "test", - }, - } - - log.Printf("pushing event user:create:middleware") + err = w.Start(interruptCtx) - // push an event - err = c.Event().Push( - context.Background(), - "user:create:middleware", - testEvent, - ) if err != nil { - return fmt.Errorf("error pushing event: %w", err) + panic(err) } for { diff --git a/examples/middleware/main_e2e_test.go b/examples/middleware/main_e2e_test.go index 900f20f7d..7fdbc27d2 100644 --- a/examples/middleware/main_e2e_test.go +++ b/examples/middleware/main_e2e_test.go @@ -3,35 +3,41 @@ package main import ( - "log" + "context" + "fmt" "testing" "time" - "github.com/hatchet-dev/hatchet/internal/testutils" "github.com/stretchr/testify/assert" + + "github.com/hatchet-dev/hatchet/internal/testutils" ) func TestMiddleware(t *testing.T) { + t.Skip() testutils.Prepare(t) - ch := make(chan interface{}, 1) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() events := make(chan string, 50) go func() { - time.Sleep(20 * time.Second) - ch <- struct{}{} - close(events) - log.Printf("sent interrupt") + if err := run(ctx, events); err != nil { + panic(fmt.Errorf("run() error = %v", err)) + } }() - if err := run(ch, events); err != nil { - t.Fatalf("run() error = %v", err) - } - var items []string - for item := range events { - items = append(items, item) + +outer: + for { + select { + case item := <-events: + items = append(items, item) + case <-ctx.Done(): + break outer + } } assert.Equal(t, []string{ diff --git a/examples/simple/main.go b/examples/simple/main.go index 6d8f7cd7c..5e9b6992e 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -30,7 +30,8 @@ func main() { } events := make(chan string, 50) - if err := run(cmdutils.InterruptChan(), events); err != nil { + ctx, _ := cmdutils.NewInterruptContext() + if err := run(ctx, events); err != nil { panic(err) } } @@ -39,7 +40,7 @@ func getConcurrencyKey(ctx worker.HatchetContext) (string, error) { return "user-create", nil } -func run(ch <-chan interface{}, events chan<- string) error { +func run(interruptCtx context.Context, events chan<- string) error { c, err := client.New() if err != nil { @@ -105,38 +106,31 @@ func run(ch <-chan interface{}, events chan<- string) error { return fmt.Errorf("error registering workflow: %w", err) } - interruptCtx, cancel := cmdutils.InterruptContextFromChan(ch) - defer cancel() - go func() { - err = w.Start(interruptCtx) + testEvent := userCreateEvent{ + Username: "echo-test", + UserID: "1234", + Data: map[string]string{ + "test": "test", + }, + } + log.Printf("pushing event user:create:simple") + // push an event + err := c.Event().Push( + context.Background(), + "user:create:simple", + testEvent, + ) if err != nil { - panic(err) + panic(fmt.Errorf("error pushing event: %w", err)) } - - cancel() }() - testEvent := userCreateEvent{ - Username: "echo-test", - UserID: "1234", - Data: map[string]string{ - "test": "test", - }, - } - - log.Printf("pushing event user:create:simple") - - // push an event - err = c.Event().Push( - context.Background(), - "user:create:simple", - testEvent, - ) + err = w.Start(interruptCtx) if err != nil { - return fmt.Errorf("error pushing event: %w", err) + panic(err) } for { diff --git a/examples/simple/main_e2e_test.go b/examples/simple/main_e2e_test.go index fd51c6f25..d3da07c33 100644 --- a/examples/simple/main_e2e_test.go +++ b/examples/simple/main_e2e_test.go @@ -3,35 +3,40 @@ package main import ( - "log" + "context" + "fmt" "testing" "time" - "github.com/hatchet-dev/hatchet/internal/testutils" "github.com/stretchr/testify/assert" + + "github.com/hatchet-dev/hatchet/internal/testutils" ) func TestSimple(t *testing.T) { testutils.Prepare(t) - ch := make(chan interface{}, 1) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() events := make(chan string, 50) go func() { - time.Sleep(20 * time.Second) - ch <- struct{}{} - close(events) - log.Printf("sent interrupt") + if err := run(ctx, events); err != nil { + panic(fmt.Errorf("run() error = %v", err)) + } }() - if err := run(ch, events); err != nil { - t.Fatalf("run() error = %v", err) - } - var items []string - for item := range events { - items = append(items, item) + +outer: + for { + select { + case item := <-events: + items = append(items, item) + case <-ctx.Done(): + break outer + } } assert.Equal(t, []string{ diff --git a/go.mod b/go.mod index 3944e8ed2..06f98fda0 100644 --- a/go.mod +++ b/go.mod @@ -72,6 +72,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect + go.uber.org/goleak v1.3.0 // indirect golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/internal/testutils/env.go b/internal/testutils/env.go index 998537e7c..35b1c582b 100644 --- a/internal/testutils/env.go +++ b/internal/testutils/env.go @@ -2,14 +2,20 @@ package testutils import ( "os" + "path" + "path/filepath" + "runtime" "testing" ) func Prepare(t *testing.T) { t.Helper() + _, b, _, _ := runtime.Caller(0) + testPath := filepath.Dir(b) + _ = os.Setenv("HATCHET_CLIENT_TENANT_ID", "707d0855-80ab-4e1f-a156-f1c4546cbf52") _ = os.Setenv("DATABASE_URL", "postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet") - _ = os.Setenv("HATCHET_CLIENT_TLS_ROOT_CA_FILE", "../../hack/dev/certs/ca.cert") + _ = os.Setenv("HATCHET_CLIENT_TLS_ROOT_CA_FILE", path.Join(testPath, "../..", "hack/dev/certs/ca.cert")) _ = os.Setenv("HATCHET_CLIENT_TLS_SERVER_NAME", "cluster") } diff --git a/pkg/worker/middleware.go b/pkg/worker/middleware.go index 9bafcdfd5..7a7c19aeb 100644 --- a/pkg/worker/middleware.go +++ b/pkg/worker/middleware.go @@ -26,9 +26,6 @@ func (m *middlewares) add(mws ...MiddlewareFunc) { } func (m *middlewares) runAll(ctx HatchetContext, next func(HatchetContext) error) error { - m.mu.Lock() - defer m.mu.Unlock() - return run(ctx, m.middlewares, next) }