Skip to content

Commit

Permalink
Error if history contains unexpected events after the StartedEventId (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Oct 8, 2024
1 parent 75bd94b commit c82a8ac
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
5 changes: 3 additions & 2 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2689,7 +2689,7 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) {
})
}

func TestHistoryIterator(t *testing.T) {
func TestHistoryIteratorMaxEventID(t *testing.T) {
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: testWorkflowTaskTaskqueue}}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
Expand Down Expand Up @@ -2725,6 +2725,7 @@ func TestHistoryIterator(t *testing.T) {
WorkflowId: "test-workflow-id",
RunId: "test-run-id",
},
3,
metrics.NopHandler,
"test-task-queue",
),
Expand All @@ -2733,6 +2734,6 @@ func TestHistoryIterator(t *testing.T) {
_, err := historyIterator.GetNextPage()
require.NoError(t, err)
_, err = historyIterator.GetNextPage()
require.NoError(t, err)
require.Error(t, err)

}
29 changes: 24 additions & 5 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ type (
}

historyIteratorImpl struct {
iteratorFunc func(nextPageToken []byte) (*historypb.History, []byte, error)
execution *commonpb.WorkflowExecution
nextPageToken []byte
namespace string
service workflowservice.WorkflowServiceClient
iteratorFunc func(nextPageToken []byte) (*historypb.History, []byte, error)
execution *commonpb.WorkflowExecution
nextPageToken []byte
namespace string
service workflowservice.WorkflowServiceClient
// maxEventID is the maximum eventID that the history iterator is expected to return.
// 0 means that the iterator will return all history events.
maxEventID int64
metricsHandler metrics.Handler
taskQueue string
}
Expand Down Expand Up @@ -869,6 +872,7 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *workflowservice.PollWork
nextPageToken: response.NextPageToken,
namespace: wtp.namespace,
service: wtp.service,
maxEventID: response.GetStartedEventId(),
metricsHandler: wtp.metricsHandler,
taskQueue: wtp.taskQueueName,
}
Expand All @@ -886,6 +890,7 @@ func (h *historyIteratorImpl) GetNextPage() (*historypb.History, error) {
h.service,
h.namespace,
h.execution,
h.maxEventID,
h.metricsHandler,
h.taskQueue,
)
Expand All @@ -912,6 +917,7 @@ func newGetHistoryPageFunc(
service workflowservice.WorkflowServiceClient,
namespace string,
execution *commonpb.WorkflowExecution,
lastEventID int64,
metricsHandler metrics.Handler,
taskQueue string,
) func(nextPageToken []byte) (*historypb.History, []byte, error) {
Expand Down Expand Up @@ -941,6 +947,19 @@ func newGetHistoryPageFunc(
} else {
h = resp.History
}

size := len(h.Events)
// While the SDK is processing a workflow task, the workflow task could timeout and server would start
// a new workflow task or the server looses the workflow task if it is a speculative workflow task. In either
// case, the new workflow task could have events that are beyond the last event ID that the SDK expects to process.
// In such cases, the SDK should return error indicating that the workflow task is stale since the result will not be used.
if size > 0 && lastEventID > 0 &&
h.Events[size-1].GetEventId() > lastEventID {
return nil, nil, fmt.Errorf("history contains events past expected last event ID (%v) "+
"likely this means the current workflow task is no longer valid", lastEventID)

}

return h, resp.NextPageToken, nil
}
}
Expand Down

0 comments on commit c82a8ac

Please sign in to comment.