Skip to content

Commit

Permalink
add integration tests, add debug API to enable SDK flag for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandrew committed Nov 13, 2024
1 parent ceadefd commit 7ad8169
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 1 deletion.
8 changes: 8 additions & 0 deletions internal/internal_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
SDKFlagUnknown = math.MaxUint32
)

var unblockSelectorSignal bool

func sdkFlagFromUint(value uint32) sdkFlag {
switch value {
case uint32(SDKFlagUnset):
Expand Down Expand Up @@ -141,3 +143,9 @@ func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag {
}
return flags
}

// SetUnblockSelectorSignal sets the flag to unblock the selector signal.
// For test use only,
func SetUnblockSelectorSignal() {
unblockSelectorSignal = true
}
9 changes: 8 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,14 @@ func (s *selectorImpl) Select(ctx Context) {
}
// readyBranch is not executed when AddDefault is specified,
// setting the value here prevents the signal from being dropped
dropSignalFlag := getWorkflowEnvironment(ctx).GetFlag(SDKFlagBlockedSelectorSignalReceive)
env := getWorkflowEnvironment(ctx)
var dropSignalFlag bool
if unblockSelectorSignal {
dropSignalFlag = env.TryUse(SDKFlagBlockedSelectorSignalReceive)
} else {
dropSignalFlag = env.GetFlag(SDKFlagBlockedSelectorSignalReceive)
}

if dropSignalFlag {
c.recValue = &v
}
Expand Down
22 changes: 22 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6602,6 +6602,28 @@ func (ts *IntegrationTestSuite) getReportedOperationCount(metricName string, ope
return count
}

func (ts *IntegrationTestSuite) TestSelectorBlock() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-selector-block")
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal, false)
ts.NoError(err)
var result string
ts.NoError(run.Get(ctx, &result))
ts.Equal("hello", result)
}

func (ts *IntegrationTestSuite) TestSelectorNoBlock() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-selector-block")
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal, true)
ts.NoError(err)
var result string
ts.NoError(run.Get(ctx, &result))
ts.Equal("HELLO", result)
}

type coroutineCountingInterceptor struct {
interceptor.WorkerInterceptorBase
// Access via count()
Expand Down
10 changes: 10 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,16 @@ func (s *replayTestSuite) TestSelectorBlockingDefault() {
require.NoError(s.T(), err)
}

func (s *replayTestSuite) TestSelectorNonBlocking() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(SelectorBlockingDefaultWorkflow)
// Verify we can replay the new workflow that has the
// SDKFlagBlockedSelectorSignalReceive flag
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "selector-non-blocking.json")
s.NoError(err)
require.NoError(s.T(), err)
}

type captureConverter struct {
converter.DataConverter
toPayloads []interface{}
Expand Down
211 changes: 211 additions & 0 deletions test/replaytests/selector-non-blocking.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2024-11-13T17:54:47.478632Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1048626",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "SelectorBlockingDefaultWorkflow"
},
"taskQueue": {
"name": "hello-world",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"workflowExecutionTimeout": "2s",
"workflowRunTimeout": "2s",
"workflowTaskTimeout": "2s",
"originalExecutionRunId": "a73567ce-1a8e-4c86-9286-65e9039663a3",
"identity": "[email protected]@",
"firstExecutionRunId": "a73567ce-1a8e-4c86-9286-65e9039663a3",
"attempt": 1,
"workflowExecutionExpirationTime": "2024-11-13T17:54:49.478Z",
"firstWorkflowTaskBackoff": "0s",
"header": {},
"workflowId": "hello_world_workflowID"
}
},
{
"eventId": "2",
"eventTime": "2024-11-13T17:54:47.478680Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048627",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "hello-world",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "2s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2024-11-13T17:54:47.480740Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048636",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "[email protected]@",
"requestId": "9fae9b1e-4182-4f47-a675-ba4facd08273",
"historySizeBytes": "602",
"workerVersion": {
"buildId": "7e5be6238aa91ebec5dcc5b6859e87c6"
}
}
},
{
"eventId": "4",
"eventTime": "2024-11-13T17:54:47.485146Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048640",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "[email protected]@",
"workerVersion": {
"buildId": "7e5be6238aa91ebec5dcc5b6859e87c6"
},
"sdkMetadata": {
"langUsedFlags": [
3,
5
],
"sdkName": "temporal-go",
"sdkVersion": "1.29.1"
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2024-11-13T17:54:47.485222Z",
"eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED",
"taskId": "1048641",
"activityTaskScheduledEventAttributes": {
"activityId": "5",
"activityType": {
"name": "SelectorBlockingDefaultActivity"
},
"taskQueue": {
"name": "hello-world",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"header": {},
"input": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg=="
},
"data": "IlNpZ25hbCBub3QgbG9zdCI="
}
]
},
"scheduleToCloseTimeout": "2s",
"scheduleToStartTimeout": "2s",
"startToCloseTimeout": "2s",
"heartbeatTimeout": "0s",
"workflowTaskCompletedEventId": "4",
"retryPolicy": {
"initialInterval": "1s",
"backoffCoefficient": 2,
"maximumInterval": "100s"
},
"useWorkflowBuildId": true
}
},
{
"eventId": "6",
"eventTime": "2024-11-13T17:54:47.486704Z",
"eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED",
"taskId": "1048646",
"activityTaskStartedEventAttributes": {
"scheduledEventId": "5",
"identity": "[email protected]@",
"requestId": "31f676df-39a4-4ef7-ad2e-fd2166139abd",
"attempt": 1,
"workerVersion": {
"buildId": "7e5be6238aa91ebec5dcc5b6859e87c6"
}
}
},
{
"eventId": "7",
"eventTime": "2024-11-13T17:54:47.488853Z",
"eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED",
"taskId": "1048647",
"activityTaskCompletedEventAttributes": {
"result": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg=="
},
"data": "IlNpZ25hbCBub3QgbG9zdCB3YXMgbG9nZ2VkISI="
}
]
},
"scheduledEventId": "5",
"startedEventId": "6",
"identity": "[email protected]@"
}
},
{
"eventId": "8",
"eventTime": "2024-11-13T17:54:47.488857Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048648",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "Andrews-MacBook-Pro.local:ffbf63d9-bf89-41ab-8431-2f3d60c085c7",
"kind": "TASK_QUEUE_KIND_STICKY",
"normalName": "hello-world"
},
"startToCloseTimeout": "2s",
"attempt": 1
}
},
{
"eventId": "9",
"eventTime": "2024-11-13T17:54:47.489773Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048652",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "8",
"identity": "[email protected]@",
"requestId": "fc1ab01a-627d-49db-a0c0-0829e9938212",
"historySizeBytes": "1417",
"workerVersion": {
"buildId": "7e5be6238aa91ebec5dcc5b6859e87c6"
}
}
},
{
"eventId": "10",
"eventTime": "2024-11-13T17:54:47.491177Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048656",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "8",
"startedEventId": "9",
"identity": "[email protected]@",
"workerVersion": {
"buildId": "7e5be6238aa91ebec5dcc5b6859e87c6"
},
"sdkMetadata": {},
"meteringMetadata": {}
}
},
{
"eventId": "11",
"eventTime": "2024-11-13T17:54:47.491192Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
"taskId": "1048657",
"workflowExecutionCompletedEventAttributes": {
"workflowTaskCompletedEventId": "10"
}
}
]
}
44 changes: 44 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3174,6 +3174,49 @@ func (w *Workflows) RunsLocalAndNonlocalActsWithRetries(ctx workflow.Context, nu
return nil
}

func (w *Workflows) SelectorBlockSignal(ctx workflow.Context, enableFlag bool) (string, error) {
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
var logger = workflow.GetLogger(ctx)
logger.Info("calling ExecuteActivity")
ch1 := workflow.NewChannel(ctx)
ch2 := workflow.NewChannel(ctx)

if enableFlag {
internal.SetUnblockSelectorSignal()
}

workflow.Go(ctx, func(ctx workflow.Context) {
ch1.Send(ctx, "one")

})

workflow.Go(ctx, func(ctx workflow.Context) {
ch2.Send(ctx, "two")
})

selector := workflow.NewSelector(ctx)
var s string
selector.AddReceive(ch1, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &s)
})
selector.AddDefault(func() {
ch2.Receive(ctx, &s)
})
selector.Select(ctx)

var hello = "hello"
if selector.HasPending() {
var result string
activity := workflow.ExecuteActivity(ctx, "Prefix_ToUpper", hello)
activity.Get(ctx, &result)
logger.Info("Result", result)
return result, nil
} else {
logger.Info("Signal in ch1 lost")
}
return hello, nil
}

func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ActivityCancelRepro)
worker.RegisterWorkflow(w.ActivityCompletionUsingID)
Expand Down Expand Up @@ -3310,6 +3353,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.UpdateSetHandlerOnly)
worker.RegisterWorkflow(w.Echo)
worker.RegisterWorkflow(w.RunsLocalAndNonlocalActsWithRetries)
worker.RegisterWorkflow(w.SelectorBlockSignal)
}

func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {
Expand Down

0 comments on commit 7ad8169

Please sign in to comment.