diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 831258776..a93bd30bd 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -215,7 +215,7 @@ type ( } finishedTask struct { - isFailed bool + isCompleted bool binaryChecksum string flags []sdkFlag sdkVersion string @@ -267,9 +267,9 @@ func (eh *history) IsReplayEvent(event *historypb.HistoryEvent) bool { return event.GetEventId() <= eh.workflowTask.task.GetPreviousStartedEventId() || isCommandEvent(event.GetEventType()) } -// isNextWorkflowTaskFailed checks if the workflow task failed or completed. If it did complete returns some information +// isNextWorkflowTaskCompleted checks if the workflow task completed or failed. If it did complete returns some information // on the completed workflow task. -func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) { +func (eh *history) isNextWorkflowTaskCompleted() (task finishedTask, err error) { nextIndex := eh.currentIndex + 1 // Server can return an empty page so if we need the next event we must keep checking until we either get it // or know we have no more pages to check @@ -283,10 +283,10 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) { if nextIndex < len(eh.loadedEvents) { nextEvent := eh.loadedEvents[nextIndex] nextEventType := nextEvent.GetEventType() - isFailed := nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT || nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED + isCompleted := nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED var binaryChecksum string var flags []sdkFlag - if nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + if isCompleted { completedAttrs := nextEvent.GetWorkflowTaskCompletedEventAttributes() binaryChecksum = completedAttrs.BinaryChecksum for _, flag := range completedAttrs.GetSdkMetadata().GetLangUsedFlags() { @@ -301,7 +301,7 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) { } } return finishedTask{ - isFailed: isFailed, + isCompleted: isCompleted, binaryChecksum: binaryChecksum, flags: flags, sdkName: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkName(), @@ -469,12 +469,12 @@ OrderEvents: switch event.GetEventType() { case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED: - finishedTask, err1 := eh.isNextWorkflowTaskFailed() + finishedTask, err1 := eh.isNextWorkflowTaskCompleted() if err1 != nil { err := err1 return nil, err } - if !finishedTask.isFailed { + if finishedTask.isCompleted { eh.binaryChecksum = finishedTask.binaryChecksum eh.currentIndex++ taskEvents.events = append(taskEvents.events, event) diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 6a5a34862..2859d4614 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -314,6 +314,27 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory() { require.NoError(s.T(), err) } +func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_IncompleteWorkflowTask() { + taskQueue := "taskQueue1" + testEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflow"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()), + }), + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventWorkflowTaskStarted(3), + } + + history := &historypb.History{Events: testEvents} + logger := getLogger() + replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{}) + require.NoError(s.T(), err) + replayer.RegisterWorkflow(testReplayWorkflow) + err = replayer.ReplayWorkflowHistory(logger, history) + require.NoError(s.T(), err) +} + func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() { taskQueue := "taskQueue1" testEvents := []*historypb.HistoryEvent{