Skip to content

Commit

Permalink
Skip replaying incomplete workflow tasks (temporalio#1670)
Browse files Browse the repository at this point in the history
  • Loading branch information
RamyElkest committed Nov 6, 2024
1 parent 9d74a90 commit 6720811
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
16 changes: 8 additions & 8 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ type (
}

finishedTask struct {
isFailed bool
isCompleted bool
binaryChecksum string
flags []sdkFlag
sdkVersion string
Expand Down Expand Up @@ -267,9 +267,9 @@ func (eh *history) IsReplayEvent(event *historypb.HistoryEvent) bool {
return event.GetEventId() <= eh.workflowTask.task.GetPreviousStartedEventId() || isCommandEvent(event.GetEventType())
}

// isNextWorkflowTaskFailed checks if the workflow task failed or completed. If it did complete returns some information
// isNextWorkflowTaskCompleted checks if the workflow task completed or failed. If it did complete returns some information
// on the completed workflow task.
func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
func (eh *history) isNextWorkflowTaskCompleted() (task finishedTask, err error) {
nextIndex := eh.currentIndex + 1
// Server can return an empty page so if we need the next event we must keep checking until we either get it
// or know we have no more pages to check
Expand All @@ -283,10 +283,10 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
if nextIndex < len(eh.loadedEvents) {
nextEvent := eh.loadedEvents[nextIndex]
nextEventType := nextEvent.GetEventType()
isFailed := nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT || nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED
isCompleted := nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED
var binaryChecksum string
var flags []sdkFlag
if nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
if isCompleted {
completedAttrs := nextEvent.GetWorkflowTaskCompletedEventAttributes()
binaryChecksum = completedAttrs.BinaryChecksum
for _, flag := range completedAttrs.GetSdkMetadata().GetLangUsedFlags() {
Expand All @@ -301,7 +301,7 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
}
}
return finishedTask{
isFailed: isFailed,
isCompleted: isCompleted,
binaryChecksum: binaryChecksum,
flags: flags,
sdkName: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkName(),
Expand Down Expand Up @@ -469,12 +469,12 @@ OrderEvents:

switch event.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
finishedTask, err1 := eh.isNextWorkflowTaskFailed()
finishedTask, err1 := eh.isNextWorkflowTaskCompleted()
if err1 != nil {
err := err1
return nil, err
}
if !finishedTask.isFailed {
if finishedTask.isCompleted {
eh.binaryChecksum = finishedTask.binaryChecksum
eh.currentIndex++
taskEvents.events = append(taskEvents.events, event)
Expand Down
21 changes: 21 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,27 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory() {
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_IncompleteWorkflowTask() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflow"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()),
}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
}

history := &historypb.History{Events: testEvents}
logger := getLogger()
replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{})
require.NoError(s.T(), err)
replayer.RegisterWorkflow(testReplayWorkflow)
err = replayer.ReplayWorkflowHistory(logger, history)
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
Expand Down

0 comments on commit 6720811

Please sign in to comment.