From 7881da0ac3631dadbf68ac1f37b8b876d4e6eca3 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 28 Oct 2024 16:33:49 -0700 Subject: [PATCH 01/12] PR feedback --- internal/internal_update.go | 2 + internal/internal_workflow_test.go | 7 +++ internal/internal_workflow_testsuite.go | 8 ++++ internal/workflow_testsuite_test.go | 64 +++++++++++++++++++++++++ 4 files changed, 81 insertions(+) diff --git a/internal/internal_update.go b/internal/internal_update.go index 19f9d2ff3..b34f80345 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -326,7 +326,9 @@ func defaultUpdateHandler( } } callbacks.Accept() + fmt.Println("[defaultUpdateHandler] calling ExecuteUpdate") success, err := envInterceptor.inboundInterceptor.ExecuteUpdate(ctx, &input) + fmt.Println("[defaultUpdateHandler] ExecuteUpdate completed") callbacks.Complete(success, err) } diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index efa6912d2..8f50c8406 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1406,6 +1406,7 @@ type updateCallback struct { accept func() reject func(error) complete func(interface{}, error) + env *TestWorkflowEnvironment } func (uc *updateCallback) Accept() { @@ -1417,6 +1418,12 @@ func (uc *updateCallback) Reject(err error) { } func (uc *updateCallback) Complete(success interface{}, err error) { + // Should the update be here? How do we know the ID corresponding to the update here? + // Can I somehow create a custom callback built into the framework that this function + // can write to with the result of success, and that can add to the map in env? + fmt.Println("[updateCallback] Complete() called") + // debug.PrintStack() + // uc.env.impl.updateMap["TODO"] = success uc.complete(success, err) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 44c36ea38..755e2808f 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -208,6 +208,7 @@ type ( signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error) updateHandler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks) + updateMap map[string]interface{} startedHandler func(r WorkflowExecution, e error) isWorkflowCompleted bool @@ -2178,7 +2179,10 @@ func (env *testWorkflowEnvironmentImpl) RegisterSignalHandler( func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler( handler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), ) { + // debug.PrintStack() + fmt.Println("RegisterUpdateHandler is not implemented") env.updateHandler = handler + env.updateMap = make(map[string]interface{}) } func (env *testWorkflowEnvironmentImpl) RegisterQueryHandler( @@ -2717,14 +2721,18 @@ func (env *testWorkflowEnvironmentImpl) queryWorkflow(queryType string, args ... } func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, uc UpdateCallbacks, args ...interface{}) { + fmt.Println("updateWorkflow") data, err := encodeArgs(env.GetDataConverter(), args) if err != nil { panic(err) } + // TODO: Somehow map uc to id env.postCallback(func() { // Do not send any headers on test invocations env.updateHandler(name, id, data, nil, uc) + fmt.Println("after updateHandler") }, true) + fmt.Println("after postCallback") } func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id string, uc UpdateCallbacks, args ...interface{}) error { diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index b9f779845..f44a3150e 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -491,6 +491,70 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { require.Equal(t, "unknown update bad update. KnownUpdates=[update]", updateRejectionErr.Error()) } +func TestWorkflowDuplicateIDDedup(t *testing.T) { + var suite WorkflowTestSuite + // Test dev server rejects UpdateWorkflow with same ID + env := suite.NewTestWorkflowEnvironment() + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() { + fmt.Println("[first] accepted") + }, + complete: func(result interface{}, err error) { + fmt.Println("[first] completed") + fmt.Println("[first] result", result) + intResult, ok := result.(int) + fmt.Println("[first] first update result", intResult) + if !ok { + require.Fail(t, "result should be int") + } else { + require.Equal(t, 0, intResult) + } + }, + }, 0) + fmt.Println("This should print first.") + }, 0) + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() {}, + complete: func(result interface{}, err error) { + fmt.Println("[second] workflow completed") + fmt.Println("[second] result", result) + intResult, ok := result.(int) + fmt.Println("[second] update result", intResult) + if !ok { + require.Fail(t, "result should be int") + } else { + require.Equal(t, 0, intResult) + } + }, + env: env, + }, 1) // if dedup, this be okay, even if we pass in 1 as arg, since it's deduping + fmt.Println("This should print second.") + + }, 1*time.Millisecond) + + env.ExecuteWorkflow(func(ctx Context) error { + err := SetUpdateHandler(ctx, "update", func(ctx Context, i int) (int, error) { + fmt.Println("[SetUpdateHandler] i", i) + return i, nil + }, UpdateHandlerOptions{}) + if err != nil { + return err + } + + return Sleep(ctx, time.Hour) + }) + require.NoError(t, env.GetWorkflowError()) +} + func TestAllHandlersFinished(t *testing.T) { var suite WorkflowTestSuite env := suite.NewTestWorkflowEnvironment() From 2526b8bf6541e9aad5b30bc253124321ddb10cd7 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 29 Oct 2024 09:27:50 -0700 Subject: [PATCH 02/12] Push to ask slack question --- internal/workflow_testsuite_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index f44a3150e..7edf24e00 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -523,7 +523,9 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { reject: func(err error) { require.Fail(t, "update should not be rejected") }, - accept: func() {}, + accept: func() { + fmt.Println("[second] accepted") + }, complete: func(result interface{}, err error) { fmt.Println("[second] workflow completed") fmt.Println("[second] result", result) @@ -532,11 +534,12 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { if !ok { require.Fail(t, "result should be int") } else { + // if dedup, this be okay, even if we pass in 1 as arg, since it's deduping, + // the result should match the first update's result, 0 require.Equal(t, 0, intResult) } }, - env: env, - }, 1) // if dedup, this be okay, even if we pass in 1 as arg, since it's deduping + }, 1) fmt.Println("This should print second.") }, 1*time.Millisecond) @@ -552,7 +555,9 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { return Sleep(ctx, time.Hour) }) - require.NoError(t, env.GetWorkflowError()) + var result int + require.NoError(t, env.GetWorkflowResult(&result)) + // require.NoError(t, env.GetWorkflowError()) } func TestAllHandlersFinished(t *testing.T) { From d70caf8fd20770a32aa89ec1c1c8c181c3da76f6 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 29 Oct 2024 12:17:45 -0700 Subject: [PATCH 03/12] test passes --- internal/internal_update.go | 11 +++++-- internal/internal_workflow_test.go | 13 ++++++-- internal/internal_workflow_testsuite.go | 40 ++++++++++++++++++------- internal/workflow_testsuite.go | 1 + internal/workflow_testsuite_test.go | 17 +++++++---- 5 files changed, 62 insertions(+), 20 deletions(-) diff --git a/internal/internal_update.go b/internal/internal_update.go index b34f80345..8442548c4 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -77,7 +77,10 @@ type ( // Complete is called for an update with the result of executing the // update function. If the provided error is non-nil then the overall // outcome is understood to be a failure. - Complete(success interface{}, err error) + Complete(success interface{}, err error) // not sure i should change the API here + // TODO: add an ID identity? + // no, it doesn't make sense to add it here + } // UpdateScheduler allows an update state machine to spawn coroutines and @@ -326,9 +329,11 @@ func defaultUpdateHandler( } } callbacks.Accept() - fmt.Println("[defaultUpdateHandler] calling ExecuteUpdate") + fmt.Println("[defaultUpdateHandler] ExecuteUpdate") success, err := envInterceptor.inboundInterceptor.ExecuteUpdate(ctx, &input) - fmt.Println("[defaultUpdateHandler] ExecuteUpdate completed") + fmt.Println("[defaultUpdateHandler] callbacks.Complete()") + // check stack here + // debug.PrintStack() callbacks.Complete(success, err) } diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index 8f50c8406..33370b0b7 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1406,7 +1406,9 @@ type updateCallback struct { accept func() reject func(error) complete func(interface{}, error) - env *TestWorkflowEnvironment + // TODO: How do I capture env from this struct? + // it won't work to require env to be passed in + env *TestWorkflowEnvironment } func (uc *updateCallback) Accept() { @@ -1423,7 +1425,14 @@ func (uc *updateCallback) Complete(success interface{}, err error) { // can write to with the result of success, and that can add to the map in env? fmt.Println("[updateCallback] Complete() called") // debug.PrintStack() - // uc.env.impl.updateMap["TODO"] = success + fmt.Println("[updateCallback] currentUpdateId:", uc.env.impl.currentUpdateId) + if uc.env.impl.updateMap != nil { + uc.env.impl.updateMap[uc.env.impl.currentUpdateId] = success + } else { + fmt.Println("[updateCallback] UPDATEMAP IS NIL") + } + fmt.Println("[updateCallback] updateMap:", uc.env.impl.updateMap) + uc.complete(success, err) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 755e2808f..6edb6e5ad 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -230,6 +230,8 @@ type ( workflowFunctionExecuting bool bufferedUpdateRequests map[string][]func() + + currentUpdateId string } testSessionEnvironmentImpl struct { @@ -2180,9 +2182,9 @@ func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler( handler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), ) { // debug.PrintStack() - fmt.Println("RegisterUpdateHandler is not implemented") + fmt.Println("[RegisterUpdateHandler] registering handler") env.updateHandler = handler - env.updateMap = make(map[string]interface{}) + } func (env *testWorkflowEnvironmentImpl) RegisterQueryHandler( @@ -2721,18 +2723,36 @@ func (env *testWorkflowEnvironmentImpl) queryWorkflow(queryType string, args ... } func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, uc UpdateCallbacks, args ...interface{}) { - fmt.Println("updateWorkflow") + fmt.Println("[testWorkflowEnvironmentImpl:updateWorkflow]") data, err := encodeArgs(env.GetDataConverter(), args) if err != nil { panic(err) } - // TODO: Somehow map uc to id - env.postCallback(func() { - // Do not send any headers on test invocations - env.updateHandler(name, id, data, nil, uc) - fmt.Println("after updateHandler") - }, true) - fmt.Println("after postCallback") + // Some way to capture state of the update + if env.updateMap == nil { + env.updateMap = make(map[string]interface{}) + } + + // check if id in map + if _, ok := env.updateMap[id]; ok { + // TODO: manually override success to complete? + // How do I do this? + env.postCallback(func() { + uc.Accept() + uc.Complete(env.updateMap[id], nil) + }, false) + } else { + env.currentUpdateId = id + // TODO: how do I return cached state? + env.postCallback(func() { + fmt.Println("[testWorkflowEnvironmentImpl:updateWorkflow] callback starting") + // Do not send any headers on test invocations + env.updateHandler(name, id, data, nil, uc) + fmt.Println("[testWorkflowEnvironmentImpl:updateWorkflow] after updateHandler") + }, true) + fmt.Println("[testWorkflowEnvironmentImpl:updateWorkflow] after postCallback") + } + } func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id string, uc UpdateCallbacks, args ...interface{}) error { diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 1b4c2ae9a..38947abcf 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -909,6 +909,7 @@ func (e *TestWorkflowEnvironment) QueryWorkflow(queryType string, args ...interf // UpdateWorkflow sends an update to the currently running workflow. func (e *TestWorkflowEnvironment) UpdateWorkflow(updateName, updateID string, uc UpdateCallbacks, args ...interface{}) { + // here e.impl.updateWorkflow(updateName, updateID, uc, args...) } diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 7edf24e00..502c40f80 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -31,6 +31,7 @@ import ( "errors" "fmt" "log/slog" + "runtime/debug" "strings" "sync/atomic" "testing" @@ -498,12 +499,14 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { env.RegisterDelayedCallback(func() { env.UpdateWorkflow("update", "id", &updateCallback{ reject: func(err error) { - require.Fail(t, "update should not be rejected") + require.Fail(t, fmt.Sprintf("update should not be rejected, err: %v", err)) }, accept: func() { + debug.PrintStack() fmt.Println("[first] accepted") }, complete: func(result interface{}, err error) { + // debug.PrintStack() fmt.Println("[first] completed") fmt.Println("[first] result", result) intResult, ok := result.(int) @@ -514,6 +517,7 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { require.Equal(t, 0, intResult) } }, + env: env, }, 0) fmt.Println("This should print first.") }, 0) @@ -539,6 +543,7 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { require.Equal(t, 0, intResult) } }, + env: env, }, 1) fmt.Println("This should print second.") @@ -550,14 +555,16 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { return i, nil }, UpdateHandlerOptions{}) if err != nil { + fmt.Println("ERROR") return err } - + fmt.Println("[ExecuteWorkflow] before sleep") return Sleep(ctx, time.Hour) }) - var result int - require.NoError(t, env.GetWorkflowResult(&result)) - // require.NoError(t, env.GetWorkflowError()) + // var result int + // require.NoError(t, env.GetWorkflowResult(&result)) + require.NoError(t, env.GetWorkflowError()) + require.True(t, false) } func TestAllHandlersFinished(t *testing.T) { From 47057fe5a6237cdd02e2c666fa9ebb2139d1b57b Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 29 Oct 2024 12:21:13 -0700 Subject: [PATCH 04/12] cleanup --- internal/internal_update.go | 4 ---- internal/internal_workflow_test.go | 11 +---------- internal/internal_workflow_testsuite.go | 12 ++---------- internal/workflow_testsuite.go | 1 - internal/workflow_testsuite_test.go | 16 ---------------- 5 files changed, 3 insertions(+), 41 deletions(-) diff --git a/internal/internal_update.go b/internal/internal_update.go index 8442548c4..87441a845 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -329,11 +329,7 @@ func defaultUpdateHandler( } } callbacks.Accept() - fmt.Println("[defaultUpdateHandler] ExecuteUpdate") success, err := envInterceptor.inboundInterceptor.ExecuteUpdate(ctx, &input) - fmt.Println("[defaultUpdateHandler] callbacks.Complete()") - // check stack here - // debug.PrintStack() callbacks.Complete(success, err) } diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index 33370b0b7..ecfa9fdfc 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1420,19 +1420,10 @@ func (uc *updateCallback) Reject(err error) { } func (uc *updateCallback) Complete(success interface{}, err error) { - // Should the update be here? How do we know the ID corresponding to the update here? - // Can I somehow create a custom callback built into the framework that this function - // can write to with the result of success, and that can add to the map in env? - fmt.Println("[updateCallback] Complete() called") - // debug.PrintStack() - fmt.Println("[updateCallback] currentUpdateId:", uc.env.impl.currentUpdateId) + // cache update result so we can dedup duplicate update IDs if uc.env.impl.updateMap != nil { uc.env.impl.updateMap[uc.env.impl.currentUpdateId] = success - } else { - fmt.Println("[updateCallback] UPDATEMAP IS NIL") } - fmt.Println("[updateCallback] updateMap:", uc.env.impl.updateMap) - uc.complete(success, err) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 6edb6e5ad..0ba5045b2 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2181,8 +2181,6 @@ func (env *testWorkflowEnvironmentImpl) RegisterSignalHandler( func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler( handler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), ) { - // debug.PrintStack() - fmt.Println("[RegisterUpdateHandler] registering handler") env.updateHandler = handler } @@ -2723,7 +2721,6 @@ func (env *testWorkflowEnvironmentImpl) queryWorkflow(queryType string, args ... } func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, uc UpdateCallbacks, args ...interface{}) { - fmt.Println("[testWorkflowEnvironmentImpl:updateWorkflow]") data, err := encodeArgs(env.GetDataConverter(), args) if err != nil { panic(err) @@ -2733,24 +2730,19 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u env.updateMap = make(map[string]interface{}) } - // check if id in map + // check for duplicate update ID if _, ok := env.updateMap[id]; ok { - // TODO: manually override success to complete? - // How do I do this? env.postCallback(func() { uc.Accept() uc.Complete(env.updateMap[id], nil) }, false) } else { env.currentUpdateId = id - // TODO: how do I return cached state? + // return cached state env.postCallback(func() { - fmt.Println("[testWorkflowEnvironmentImpl:updateWorkflow] callback starting") // Do not send any headers on test invocations env.updateHandler(name, id, data, nil, uc) - fmt.Println("[testWorkflowEnvironmentImpl:updateWorkflow] after updateHandler") }, true) - fmt.Println("[testWorkflowEnvironmentImpl:updateWorkflow] after postCallback") } } diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 38947abcf..1b4c2ae9a 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -909,7 +909,6 @@ func (e *TestWorkflowEnvironment) QueryWorkflow(queryType string, args ...interf // UpdateWorkflow sends an update to the currently running workflow. func (e *TestWorkflowEnvironment) UpdateWorkflow(updateName, updateID string, uc UpdateCallbacks, args ...interface{}) { - // here e.impl.updateWorkflow(updateName, updateID, uc, args...) } diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 502c40f80..0da2d696d 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -503,14 +503,9 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { }, accept: func() { debug.PrintStack() - fmt.Println("[first] accepted") }, complete: func(result interface{}, err error) { - // debug.PrintStack() - fmt.Println("[first] completed") - fmt.Println("[first] result", result) intResult, ok := result.(int) - fmt.Println("[first] first update result", intResult) if !ok { require.Fail(t, "result should be int") } else { @@ -519,7 +514,6 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { }, env: env, }, 0) - fmt.Println("This should print first.") }, 0) env.RegisterDelayedCallback(func() { @@ -528,13 +522,9 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { require.Fail(t, "update should not be rejected") }, accept: func() { - fmt.Println("[second] accepted") }, complete: func(result interface{}, err error) { - fmt.Println("[second] workflow completed") - fmt.Println("[second] result", result) intResult, ok := result.(int) - fmt.Println("[second] update result", intResult) if !ok { require.Fail(t, "result should be int") } else { @@ -545,24 +535,18 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { }, env: env, }, 1) - fmt.Println("This should print second.") }, 1*time.Millisecond) env.ExecuteWorkflow(func(ctx Context) error { err := SetUpdateHandler(ctx, "update", func(ctx Context, i int) (int, error) { - fmt.Println("[SetUpdateHandler] i", i) return i, nil }, UpdateHandlerOptions{}) if err != nil { - fmt.Println("ERROR") return err } - fmt.Println("[ExecuteWorkflow] before sleep") return Sleep(ctx, time.Hour) }) - // var result int - // require.NoError(t, env.GetWorkflowResult(&result)) require.NoError(t, env.GetWorkflowError()) require.True(t, false) } From 56398598bcadf07de52648c4339dfc119bb189d6 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 29 Oct 2024 12:24:04 -0700 Subject: [PATCH 05/12] more cleanup --- internal/internal_update.go | 5 +---- internal/internal_workflow_test.go | 4 +--- internal/internal_workflow_testsuite.go | 4 +--- internal/workflow_testsuite_test.go | 5 +---- 4 files changed, 4 insertions(+), 14 deletions(-) diff --git a/internal/internal_update.go b/internal/internal_update.go index 87441a845..19f9d2ff3 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -77,10 +77,7 @@ type ( // Complete is called for an update with the result of executing the // update function. If the provided error is non-nil then the overall // outcome is understood to be a failure. - Complete(success interface{}, err error) // not sure i should change the API here - // TODO: add an ID identity? - // no, it doesn't make sense to add it here - + Complete(success interface{}, err error) } // UpdateScheduler allows an update state machine to spawn coroutines and diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index ecfa9fdfc..0602415f0 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1406,9 +1406,7 @@ type updateCallback struct { accept func() reject func(error) complete func(interface{}, error) - // TODO: How do I capture env from this struct? - // it won't work to require env to be passed in - env *TestWorkflowEnvironment + env *TestWorkflowEnvironment } func (uc *updateCallback) Accept() { diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 0ba5045b2..f0b7a14cb 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2182,7 +2182,6 @@ func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler( handler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), ) { env.updateHandler = handler - } func (env *testWorkflowEnvironmentImpl) RegisterQueryHandler( @@ -2725,20 +2724,19 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u if err != nil { panic(err) } - // Some way to capture state of the update if env.updateMap == nil { env.updateMap = make(map[string]interface{}) } // check for duplicate update ID if _, ok := env.updateMap[id]; ok { + // return cached result env.postCallback(func() { uc.Accept() uc.Complete(env.updateMap[id], nil) }, false) } else { env.currentUpdateId = id - // return cached state env.postCallback(func() { // Do not send any headers on test invocations env.updateHandler(name, id, data, nil, uc) diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 0da2d696d..153f0acab 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -31,7 +31,6 @@ import ( "errors" "fmt" "log/slog" - "runtime/debug" "strings" "sync/atomic" "testing" @@ -502,7 +501,6 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { require.Fail(t, fmt.Sprintf("update should not be rejected, err: %v", err)) }, accept: func() { - debug.PrintStack() }, complete: func(result interface{}, err error) { intResult, ok := result.(int) @@ -519,7 +517,7 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { env.RegisterDelayedCallback(func() { env.UpdateWorkflow("update", "id", &updateCallback{ reject: func(err error) { - require.Fail(t, "update should not be rejected") + require.Fail(t, fmt.Sprintf("update should not be rejected, err: %v", err)) }, accept: func() { }, @@ -548,7 +546,6 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { return Sleep(ctx, time.Hour) }) require.NoError(t, env.GetWorkflowError()) - require.True(t, false) } func TestAllHandlersFinished(t *testing.T) { From d03b73ad8974e9e03c6a055cbef48f28db4b616a Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 4 Nov 2024 10:39:03 -0800 Subject: [PATCH 06/12] handle error properly, better guard rails for passing env --- internal/internal_workflow_test.go | 5 +++-- internal/internal_workflow_testsuite.go | 13 ++++++++----- internal/workflow_testsuite_test.go | 12 ++++++++++++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index 0602415f0..2b4fc96eb 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1419,9 +1419,10 @@ func (uc *updateCallback) Reject(err error) { func (uc *updateCallback) Complete(success interface{}, err error) { // cache update result so we can dedup duplicate update IDs - if uc.env.impl.updateMap != nil { - uc.env.impl.updateMap[uc.env.impl.currentUpdateId] = success + if uc.env == nil { + panic("env is needed in updateCallback to cache update results for deduping purposes") } + uc.env.impl.updateMap[uc.env.impl.currentUpdateId] = updateResult{success, err} uc.complete(success, err) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index f0b7a14cb..72f50dc76 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -143,6 +143,11 @@ type ( taskQueues map[string]struct{} } + updateResult struct { + success interface{} + err error + } + // testWorkflowEnvironmentShared is the shared data between parent workflow and child workflow test environments testWorkflowEnvironmentShared struct { locker sync.Mutex @@ -208,7 +213,7 @@ type ( signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error) updateHandler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks) - updateMap map[string]interface{} + updateMap map[string]updateResult startedHandler func(r WorkflowExecution, e error) isWorkflowCompleted bool @@ -2182,6 +2187,7 @@ func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler( handler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), ) { env.updateHandler = handler + env.updateMap = make(map[string]updateResult) } func (env *testWorkflowEnvironmentImpl) RegisterQueryHandler( @@ -2724,16 +2730,13 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u if err != nil { panic(err) } - if env.updateMap == nil { - env.updateMap = make(map[string]interface{}) - } // check for duplicate update ID if _, ok := env.updateMap[id]; ok { // return cached result env.postCallback(func() { uc.Accept() - uc.Complete(env.updateMap[id], nil) + uc.Complete(env.updateMap[id].success, env.updateMap[id].err) }, false) } else { env.currentUpdateId = id diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 153f0acab..6de3ee689 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -280,6 +280,7 @@ func TestWorkflowIDUpdateWorkflowByID(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }, "input") require.NoError(t, err) }, time.Second) @@ -322,6 +323,7 @@ func TestChildWorkflowUpdate(t *testing.T) { require.Fail(t, "update failed", err) } }, + env: env, }, nil) assert.NoError(t, err) }, time.Second*5) @@ -375,6 +377,7 @@ func TestWorkflowUpdateOrder(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }) }, 0) @@ -415,6 +418,7 @@ func TestWorkflowNotRegisteredRejected(t *testing.T) { require.Fail(t, "update should not be accepted") }, complete: func(interface{}, error) {}, + env: env, }) }, 0) @@ -439,6 +443,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }) }, 0) @@ -452,6 +457,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { require.Fail(t, "update should not be rejected") }, complete: func(interface{}, error) {}, + env: env, }) }, 0) @@ -462,6 +468,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }) }, 0) @@ -559,6 +566,7 @@ func TestAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }) }, 0) @@ -569,6 +577,7 @@ func TestAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }) }, time.Minute) @@ -617,6 +626,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }) }, 0) @@ -627,6 +637,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }) }, time.Minute) @@ -637,6 +648,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }) }, 2*time.Minute) From 16b9ce3c3ac83e29e5efa655da07860d7d2d77a3 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 4 Nov 2024 12:03:50 -0800 Subject: [PATCH 07/12] potential solution to multiple async updates --- internal/internal_workflow_test.go | 12 ++++++++++++ internal/internal_workflow_testsuite.go | 3 +++ internal/workflow_testsuite_test.go | 21 +++++++++++++-------- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index 2b4fc96eb..94d40bd46 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1407,6 +1407,18 @@ type updateCallback struct { reject func(error) complete func(interface{}, error) env *TestWorkflowEnvironment + updateID string +} + +// env and updateID are needed to cache update results for deduping purposes +func newUpdateCallback(env *TestWorkflowEnvironment, updateID string, accept func(), reject func(error), complete func(interface{}, error)) *updateCallback { + return &updateCallback{ + accept: accept, + reject: reject, + complete: complete, + env: env, + updateID: updateID, + } } func (uc *updateCallback) Accept() { diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 9044f9967..d1bb780ad 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2750,6 +2750,8 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u uc.Complete(env.updateMap[id].success, env.updateMap[id].err) }, false) } else { + // TODO: This doesn't account for multiple async updates + // would a UC -> ID map work? Would I have to use pointers? env.currentUpdateId = id env.postCallback(func() { // Do not send any headers on test invocations @@ -2768,6 +2770,7 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id if err != nil { panic(err) } + // TODO: handle dedup workflowHandle.env.postCallback(func() { workflowHandle.env.updateHandler(name, id, data, nil, uc) }, true) diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 4a5724db3..482f506f9 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -273,15 +273,17 @@ func TestWorkflowIDUpdateWorkflowByID(t *testing.T) { var suite WorkflowTestSuite // Test UpdateWorkflowByID works with custom ID env := suite.NewTestWorkflowEnvironment() + updateID := "id" env.RegisterDelayedCallback(func() { - err := env.UpdateWorkflowByID("my-workflow-id", "update", "id", &updateCallback{ - reject: func(err error) { + err := env.UpdateWorkflowByID("my-workflow-id", "update", updateID, newUpdateCallback( + env, + updateID, + func() {}, + func(err error) { require.Fail(t, "update should not be rejected") }, - accept: func() {}, - complete: func(interface{}, error) {}, - env: env, - }, "input") + func(interface{}, error) {}, + ), "input") require.NoError(t, err) }, time.Second) @@ -500,7 +502,7 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { func TestWorkflowDuplicateIDDedup(t *testing.T) { var suite WorkflowTestSuite - // Test dev server rejects UpdateWorkflow with same ID + // Test dev server dedups UpdateWorkflow with same ID env := suite.NewTestWorkflowEnvironment() env.RegisterDelayedCallback(func() { env.UpdateWorkflow("update", "id", &updateCallback{ @@ -517,7 +519,8 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { require.Equal(t, 0, intResult) } }, - env: env, + env: env, + updateID: "id", }, 0) }, 0) @@ -553,6 +556,7 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { return Sleep(ctx, time.Hour) }) require.NoError(t, env.GetWorkflowError()) + require.True(t, false) } func TestAllHandlersFinished(t *testing.T) { @@ -802,6 +806,7 @@ func TestWorkflowUpdateLogger(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, + env: env, }) }, 0) From 223677f45a4fc162e4fd3f39b4d55317d6c20743 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 12 Nov 2024 09:55:41 -0800 Subject: [PATCH 08/12] add wrapper the user can't see --- internal/internal_workflow_test.go | 18 -------- internal/internal_workflow_testsuite.go | 58 +++++++++++++++++++------ internal/workflow_testsuite_test.go | 28 +++--------- 3 files changed, 50 insertions(+), 54 deletions(-) diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index 94d40bd46..efa6912d2 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1406,19 +1406,6 @@ type updateCallback struct { accept func() reject func(error) complete func(interface{}, error) - env *TestWorkflowEnvironment - updateID string -} - -// env and updateID are needed to cache update results for deduping purposes -func newUpdateCallback(env *TestWorkflowEnvironment, updateID string, accept func(), reject func(error), complete func(interface{}, error)) *updateCallback { - return &updateCallback{ - accept: accept, - reject: reject, - complete: complete, - env: env, - updateID: updateID, - } } func (uc *updateCallback) Accept() { @@ -1430,11 +1417,6 @@ func (uc *updateCallback) Reject(err error) { } func (uc *updateCallback) Complete(success interface{}, err error) { - // cache update result so we can dedup duplicate update IDs - if uc.env == nil { - panic("env is needed in updateCallback to cache update results for deduping purposes") - } - uc.env.impl.updateMap[uc.env.impl.currentUpdateId] = updateResult{success, err} uc.complete(success, err) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index d1bb780ad..07b008ebd 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -235,14 +235,19 @@ type ( workflowFunctionExecuting bool bufferedUpdateRequests map[string][]func() - - currentUpdateId string } testSessionEnvironmentImpl struct { *sessionEnvironmentImpl testWorkflowEnvironment *testWorkflowEnvironmentImpl } + + // UpdateCallbacksWrapper is a wrapper to UpdateCallbacks. It allows us to dedup duplicate update IDs in the test environment. + updateCallbacksWrapper struct { + uc UpdateCallbacks + env *testWorkflowEnvironmentImpl + updateID string + } ) func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *registry) *testWorkflowEnvironmentImpl { @@ -2742,20 +2747,19 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u panic(err) } + var ucWrapper = updateCallbacksWrapper{uc: uc, env: env, updateID: id} + // check for duplicate update ID - if _, ok := env.updateMap[id]; ok { + if result, ok := env.updateMap[id]; ok { // return cached result env.postCallback(func() { - uc.Accept() - uc.Complete(env.updateMap[id].success, env.updateMap[id].err) + ucWrapper.uc.Accept() + ucWrapper.uc.Complete(result.success, result.err) }, false) } else { - // TODO: This doesn't account for multiple async updates - // would a UC -> ID map work? Would I have to use pointers? - env.currentUpdateId = id env.postCallback(func() { // Do not send any headers on test invocations - env.updateHandler(name, id, data, nil, uc) + env.updateHandler(name, id, data, nil, ucWrapper) }, true) } @@ -2770,10 +2774,21 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id if err != nil { panic(err) } - // TODO: handle dedup - workflowHandle.env.postCallback(func() { - workflowHandle.env.updateHandler(name, id, data, nil, uc) - }, true) + + var ucWrapper = updateCallbacksWrapper{uc: uc, env: env, updateID: id} + + // Check for duplicate update ID + if result, ok := env.updateMap[id]; ok { + workflowHandle.env.postCallback(func() { + ucWrapper.uc.Accept() + ucWrapper.uc.Complete(result.success, result.err) + }, false) + } else { + workflowHandle.env.postCallback(func() { + workflowHandle.env.updateHandler(name, id, data, nil, ucWrapper) + }, true) + } + return nil } @@ -2994,3 +3009,20 @@ func (h *testNexusOperationHandle) cancel() { }, false) }() } + +func (uc updateCallbacksWrapper) Accept() { + uc.uc.Accept() +} + +func (uc updateCallbacksWrapper) Reject(err error) { + uc.uc.Reject(err) +} + +func (uc updateCallbacksWrapper) Complete(success interface{}, err error) { + // cache update result so we can dedup duplicate update IDs + if uc.env == nil { + panic("env is needed in updateCallback to cache update results for deduping purposes") + } + uc.env.updateMap[uc.updateID] = updateResult{success, err} + uc.uc.Complete(success, err) +} diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 482f506f9..b9344e8b5 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -273,17 +273,14 @@ func TestWorkflowIDUpdateWorkflowByID(t *testing.T) { var suite WorkflowTestSuite // Test UpdateWorkflowByID works with custom ID env := suite.NewTestWorkflowEnvironment() - updateID := "id" env.RegisterDelayedCallback(func() { - err := env.UpdateWorkflowByID("my-workflow-id", "update", updateID, newUpdateCallback( - env, - updateID, - func() {}, - func(err error) { + err := env.UpdateWorkflowByID("my-workflow-id", "update", "id", &updateCallback{ + reject: func(err error) { require.Fail(t, "update should not be rejected") }, - func(interface{}, error) {}, - ), "input") + accept: func() {}, + complete: func(interface{}, error) {}, + }, "input") require.NoError(t, err) }, time.Second) @@ -325,7 +322,6 @@ func TestChildWorkflowUpdate(t *testing.T) { require.Fail(t, "update failed", err) } }, - env: env, }, nil) assert.NoError(t, err) }, time.Second*5) @@ -379,7 +375,6 @@ func TestWorkflowUpdateOrder(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, - env: env, }) }, 0) @@ -420,7 +415,6 @@ func TestWorkflowNotRegisteredRejected(t *testing.T) { require.Fail(t, "update should not be accepted") }, complete: func(interface{}, error) {}, - env: env, }) }, 0) @@ -445,7 +439,6 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, - env: env, }) }, 0) @@ -459,7 +452,6 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { require.Fail(t, "update should not be rejected") }, complete: func(interface{}, error) {}, - env: env, }) }, 0) @@ -470,7 +462,6 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, - env: env, }) }, 0) @@ -519,8 +510,6 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { require.Equal(t, 0, intResult) } }, - env: env, - updateID: "id", }, 0) }, 0) @@ -541,7 +530,6 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { require.Equal(t, 0, intResult) } }, - env: env, }, 1) }, 1*time.Millisecond) @@ -570,7 +558,6 @@ func TestAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, - env: env, }) }, 0) @@ -581,7 +568,6 @@ func TestAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, - env: env, }) }, time.Minute) @@ -646,7 +632,6 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, - env: env, }) }, 0) @@ -657,7 +642,6 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, - env: env, }) }, time.Minute) @@ -668,7 +652,6 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, - env: env, }) }, 2*time.Minute) @@ -806,7 +789,6 @@ func TestWorkflowUpdateLogger(t *testing.T) { }, accept: func() {}, complete: func(interface{}, error) {}, - env: env, }) }, 0) From 809c82e71d174b8f7ff5ff92e9381f86c97b3bfd Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 12 Nov 2024 10:11:58 -0800 Subject: [PATCH 09/12] remove fail line used for debugging --- internal/workflow_testsuite_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index b9344e8b5..1cbb08cc3 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -544,7 +544,6 @@ func TestWorkflowDuplicateIDDedup(t *testing.T) { return Sleep(ctx, time.Hour) }) require.NoError(t, env.GetWorkflowError()) - require.True(t, false) } func TestAllHandlersFinished(t *testing.T) { From 41c27d9cd99722aea064239014a19046044ec514 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 14 Nov 2024 16:15:37 -0800 Subject: [PATCH 10/12] add mutex to ensure multiple updates are properly processed --- internal/internal_workflow_testsuite.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 87bf91379..bc00ae314 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -160,6 +160,7 @@ type ( updateResult struct { success interface{} err error + mu *sync.Mutex } // testWorkflowEnvironmentShared is the shared data between parent workflow and child workflow test environments @@ -235,7 +236,8 @@ type ( queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error) updateHandler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks) updateMap map[string]updateResult - startedHandler func(r WorkflowExecution, e error) + // updateMapLock map[string]sync.Mutex + startedHandler func(r WorkflowExecution, e error) isWorkflowCompleted bool testResult converter.EncodedValue @@ -2236,7 +2238,6 @@ func (env *testWorkflowEnvironmentImpl) RegisterUpdateHandler( handler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks), ) { env.updateHandler = handler - env.updateMap = make(map[string]updateResult) } func (env *testWorkflowEnvironmentImpl) RegisterQueryHandler( @@ -2925,19 +2926,28 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u panic(err) } + if env.updateMap == nil { + env.updateMap = make(map[string]updateResult) + } + var ucWrapper = updateCallbacksWrapper{uc: uc, env: env, updateID: id} // check for duplicate update ID if result, ok := env.updateMap[id]; ok { + result.mu.Lock() // return cached result env.postCallback(func() { ucWrapper.uc.Accept() ucWrapper.uc.Complete(result.success, result.err) + defer result.mu.Unlock() }, false) } else { + env.updateMap[id] = updateResult{nil, nil, &sync.Mutex{}} + env.updateMap[id].mu.Lock() env.postCallback(func() { // Do not send any headers on test invocations env.updateHandler(name, id, data, nil, ucWrapper) + defer env.updateMap[id].mu.Unlock() }, true) } @@ -3120,7 +3130,13 @@ func (uc updateCallbacksWrapper) Complete(success interface{}, err error) { if uc.env == nil { panic("env is needed in updateCallback to cache update results for deduping purposes") } - uc.env.updateMap[uc.updateID] = updateResult{success, err} + if result, ok := uc.env.updateMap[uc.updateID]; ok { + result.success = success + result.err = err + uc.env.updateMap[uc.updateID] = result + } else { + panic("updateMap[updateID] should already be created from updateWorkflow()") + } uc.uc.Complete(success, err) } From d49647ca57d3a22ff52e0dbbc865d8d1f0b985e9 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 14 Nov 2024 16:17:55 -0800 Subject: [PATCH 11/12] forgot to remove stale code --- internal/internal_workflow_testsuite.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index bc00ae314..c70bb44f8 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -236,8 +236,7 @@ type ( queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error) updateHandler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks) updateMap map[string]updateResult - // updateMapLock map[string]sync.Mutex - startedHandler func(r WorkflowExecution, e error) + startedHandler func(r WorkflowExecution, e error) isWorkflowCompleted bool testResult converter.EncodedValue From 330189d2638ac996939562da8cb109ee532e881a Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 14 Nov 2024 16:25:58 -0800 Subject: [PATCH 12/12] forgot updateworkflowbyid --- internal/internal_workflow_testsuite.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index c70bb44f8..b0a5b5b35 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2962,17 +2962,26 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id panic(err) } + if env.updateMap == nil { + env.updateMap = make(map[string]updateResult) + } + var ucWrapper = updateCallbacksWrapper{uc: uc, env: env, updateID: id} // Check for duplicate update ID if result, ok := env.updateMap[id]; ok { + result.mu.Lock() workflowHandle.env.postCallback(func() { ucWrapper.uc.Accept() ucWrapper.uc.Complete(result.success, result.err) + defer result.mu.Unlock() }, false) } else { + env.updateMap[id] = updateResult{nil, nil, &sync.Mutex{}} + env.updateMap[id].mu.Lock() workflowHandle.env.postCallback(func() { workflowHandle.env.updateHandler(name, id, data, nil, ucWrapper) + defer env.updateMap[id].mu.Unlock() }, true) }