Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedup duplicate update IDs for test environment #1695

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions internal/internal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,7 @@ type updateCallback struct {
accept func()
reject func(error)
complete func(interface{}, error)
env *TestWorkflowEnvironment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm so this class is not for users, this whole file is for testing since it is post-fixed with *_test.go. The logic can't be in the updateCallback as we take an impl. from user. The logic needs to be in (env *testWorkflowEnvironmentImpl) updateWorkflow. We could potentially add a wrapper around the user interface. Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is user in this context referring to the user of the test suite?

Copy link
Contributor Author

@yuandrew yuandrew Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole file is for testing since it is post-fixed with *_test.go. The logic can't be in the updateCallback as we take an impl. from user

And would it be accurate to rephrase this as "we need to implement this within the test suite, not the individual test logic. updateCallback is a test specific implementation"?

Copy link
Contributor Author

@yuandrew yuandrew Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially add a wrapper around the user interface.

What interface are you referring to here? updateWorkflow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is user in this context referring to the user of the test suite?

Yeah

And would it be accurate to rephrase this as "we need to implement this within the test suite, not the individual test logic. updateCallback is a test specific implementation"?

yeah, exactly

What interface are you referring to here?

UpdateCallbacks

}

func (uc *updateCallback) Accept() {
Expand All @@ -1417,6 +1418,10 @@ func (uc *updateCallback) Reject(err error) {
}

func (uc *updateCallback) Complete(success interface{}, err error) {
// cache update result so we can dedup duplicate update IDs
if uc.env.impl.updateMap != nil {
uc.env.impl.updateMap[uc.env.impl.currentUpdateId] = success
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
}
uc.complete(success, err)
}

Expand Down
26 changes: 22 additions & 4 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ type (
signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error
queryHandler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error)
updateHandler func(name string, id string, input *commonpb.Payloads, header *commonpb.Header, resp UpdateCallbacks)
updateMap map[string]interface{}
startedHandler func(r WorkflowExecution, e error)

isWorkflowCompleted bool
Expand All @@ -229,6 +230,8 @@ type (

workflowFunctionExecuting bool
bufferedUpdateRequests map[string][]func()

currentUpdateId string
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
}

testSessionEnvironmentImpl struct {
Expand Down Expand Up @@ -2721,10 +2724,25 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u
if err != nil {
panic(err)
}
env.postCallback(func() {
// Do not send any headers on test invocations
env.updateHandler(name, id, data, nil, uc)
}, true)
if env.updateMap == nil {
env.updateMap = make(map[string]interface{})
}

// check for duplicate update ID
if _, ok := env.updateMap[id]; ok {
// return cached result
env.postCallback(func() {
uc.Accept()
uc.Complete(env.updateMap[id], nil)
}, false)
} else {
env.currentUpdateId = id
env.postCallback(func() {
// Do not send any headers on test invocations
env.updateHandler(name, id, data, nil, uc)
}, true)
}

}

func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id string, uc UpdateCallbacks, args ...interface{}) error {
Expand Down
57 changes: 57 additions & 0 deletions internal/workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,63 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) {
require.Equal(t, "unknown update bad update. KnownUpdates=[update]", updateRejectionErr.Error())
}

func TestWorkflowDuplicateIDDedup(t *testing.T) {
var suite WorkflowTestSuite
// Test dev server rejects UpdateWorkflow with same ID
env := suite.NewTestWorkflowEnvironment()
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow("update", "id", &updateCallback{
reject: func(err error) {
require.Fail(t, fmt.Sprintf("update should not be rejected, err: %v", err))
},
accept: func() {
},
complete: func(result interface{}, err error) {
intResult, ok := result.(int)
if !ok {
require.Fail(t, "result should be int")
} else {
require.Equal(t, 0, intResult)
}
},
env: env,
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
}, 0)
}, 0)

env.RegisterDelayedCallback(func() {
env.UpdateWorkflow("update", "id", &updateCallback{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this update is sent before the first update completes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, added a mutex to the map to ensure the 2nd update must wait for the 1st update to complete

reject: func(err error) {
require.Fail(t, fmt.Sprintf("update should not be rejected, err: %v", err))
},
accept: func() {
},
complete: func(result interface{}, err error) {
intResult, ok := result.(int)
if !ok {
require.Fail(t, "result should be int")
} else {
// if dedup, this be okay, even if we pass in 1 as arg, since it's deduping,
// the result should match the first update's result, 0
require.Equal(t, 0, intResult)
}
},
env: env,
}, 1)

}, 1*time.Millisecond)

env.ExecuteWorkflow(func(ctx Context) error {
err := SetUpdateHandler(ctx, "update", func(ctx Context, i int) (int, error) {
return i, nil
}, UpdateHandlerOptions{})
if err != nil {
return err
}
return Sleep(ctx, time.Hour)
})
require.NoError(t, env.GetWorkflowError())
}

func TestAllHandlersFinished(t *testing.T) {
var suite WorkflowTestSuite
env := suite.NewTestWorkflowEnvironment()
Expand Down
Loading