diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index f68509342..2cc34f4ac 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -2689,7 +2689,7 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) { }) } -func TestHistoryIterator(t *testing.T) { +func TestHistoryIteratorMaxEventID(t *testing.T) { testEvents := []*historypb.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: testWorkflowTaskTaskqueue}}), createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}), @@ -2725,6 +2725,7 @@ func TestHistoryIterator(t *testing.T) { WorkflowId: "test-workflow-id", RunId: "test-run-id", }, + 3, metrics.NopHandler, "test-task-queue", ), @@ -2733,6 +2734,6 @@ func TestHistoryIterator(t *testing.T) { _, err := historyIterator.GetNextPage() require.NoError(t, err) _, err = historyIterator.GetNextPage() - require.NoError(t, err) + require.Error(t, err) } diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 71c447c77..a59600638 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -138,11 +138,14 @@ type ( } historyIteratorImpl struct { - iteratorFunc func(nextPageToken []byte) (*historypb.History, []byte, error) - execution *commonpb.WorkflowExecution - nextPageToken []byte - namespace string - service workflowservice.WorkflowServiceClient + iteratorFunc func(nextPageToken []byte) (*historypb.History, []byte, error) + execution *commonpb.WorkflowExecution + nextPageToken []byte + namespace string + service workflowservice.WorkflowServiceClient + // maxEventID is the maximum eventID that the history iterator is expected to return. + // 0 means that the iterator will return all history events. + maxEventID int64 metricsHandler metrics.Handler taskQueue string } @@ -869,6 +872,7 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *workflowservice.PollWork nextPageToken: response.NextPageToken, namespace: wtp.namespace, service: wtp.service, + maxEventID: response.GetStartedEventId(), metricsHandler: wtp.metricsHandler, taskQueue: wtp.taskQueueName, } @@ -886,6 +890,7 @@ func (h *historyIteratorImpl) GetNextPage() (*historypb.History, error) { h.service, h.namespace, h.execution, + h.maxEventID, h.metricsHandler, h.taskQueue, ) @@ -912,6 +917,7 @@ func newGetHistoryPageFunc( service workflowservice.WorkflowServiceClient, namespace string, execution *commonpb.WorkflowExecution, + lastEventID int64, metricsHandler metrics.Handler, taskQueue string, ) func(nextPageToken []byte) (*historypb.History, []byte, error) { @@ -941,6 +947,19 @@ func newGetHistoryPageFunc( } else { h = resp.History } + + size := len(h.Events) + // While the SDK is processing a workflow task, the workflow task could timeout and server would start + // a new workflow task or the server looses the workflow task if it is a speculative workflow task. In either + // case, the new workflow task could have events that are beyond the last event ID that the SDK expects to process. + // In such cases, the SDK should return error indicating that the workflow task is stale since the result will not be used. + if size > 0 && lastEventID > 0 && + h.Events[size-1].GetEventId() > lastEventID { + return nil, nil, fmt.Errorf("history contains events past expected last event ID (%v) "+ + "likely this means the current workflow task is no longer valid", lastEventID) + + } + return h, resp.NextPageToken, nil } }