From e16c5a3ec03ad1f8aead462e8c1bbb150d7dc73e Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 10 Feb 2020 10:47:29 -0800 Subject: [PATCH] Dedup activity heartbeat timeout timer task creation (#3032) (#3037) --- service/history/historyEngine_test.go | 2 +- service/history/timerQueueActiveProcessor.go | 7 +- .../history/timerQueueActiveProcessor_test.go | 93 ++++++++++++++++++- service/history/timerSequence.go | 3 + service/history/timerSequence_test.go | 45 +++++++++ 5 files changed, 143 insertions(+), 7 deletions(-) diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 7e090fc19d8..00afa40a667 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -5327,7 +5327,7 @@ func copyActivityInfo(sourceInfo *persistence.ActivityInfo) *persistence.Activit LastWorkerIdentity: sourceInfo.LastWorkerIdentity, LastFailureDetails: sourceInfo.LastFailureDetails, //// Not written to database - This is used only for deduping heartbeat timer creation - // LastHeartbeatTimeoutVisibility: sourceInfo.LastHeartbeatTimeoutVisibility, + LastHeartbeatTimeoutVisibility: sourceInfo.LastHeartbeatTimeoutVisibility, } } diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index 42a38b24096..e87c71dbb3a 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -378,8 +378,13 @@ func (t *timerQueueActiveProcessorImpl) processActivityTimeout( scheduleDecision := false // need to clear activity heartbeat timer task mask for new activity timer task creation + // NOTE: LastHeartbeatTimeoutVisibility is for deduping heartbeat timer creation as it's possible + // one heartbeat task was persisted multiple times with different taskIDs due to the retry logic + // for updating workflow execution. In that case, only one new heartbeat timeout task should be + // created. isHeartBeatTask := task.TimeoutType == int(workflow.TimeoutTypeHeartbeat) - if activityInfo, ok := mutableState.GetActivityInfo(task.EventID); isHeartBeatTask && ok { + activityInfo, ok := mutableState.GetActivityInfo(task.EventID) + if isHeartBeatTask && ok && activityInfo.LastHeartbeatTimeoutVisibility <= task.VisibilityTimestamp.Unix() { activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ timerTaskStatusCreatedHeartbeat if err := mutableState.UpdateActivity(activityInfo); err != nil { return err diff --git a/service/history/timerQueueActiveProcessor_test.go b/service/history/timerQueueActiveProcessor_test.go index c1f1df6d034..ac1e0888092 100644 --- a/service/history/timerQueueActiveProcessor_test.go +++ b/service/history/timerQueueActiveProcessor_test.go @@ -354,7 +354,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_NoRetryPolic TaskType: persistence.TaskTypeActivityTimeout, TimeoutType: int(workflow.TimeoutTypeScheduleToClose), VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(), - EventID: di.ScheduleID, + EventID: scheduledEvent.GetEventId(), } persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) @@ -432,7 +432,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_NoRetryPolic TaskType: persistence.TaskTypeActivityTimeout, TimeoutType: int(workflow.TimeoutTypeScheduleToClose), VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(), - EventID: di.ScheduleID, + EventID: scheduledEvent.GetEventId(), } completeEvent := addActivityTaskCompletedEvent(mutableState, scheduledEvent.GetEventId(), startedEvent.GetEventId(), []byte(nil), identity) @@ -517,7 +517,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_RetryPolicy_ TaskType: persistence.TaskTypeActivityTimeout, TimeoutType: int(workflow.TimeoutTypeScheduleToClose), VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(), - EventID: di.ScheduleID, + EventID: scheduledEvent.GetEventId(), } persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) @@ -605,7 +605,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_RetryPolicy_ TaskType: persistence.TaskTypeActivityTimeout, TimeoutType: int(workflow.TimeoutTypeScheduleToClose), VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(), - EventID: di.ScheduleID, + EventID: scheduledEvent.GetEventId(), } persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) @@ -692,7 +692,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_RetryPolicy_ TaskType: persistence.TaskTypeActivityTimeout, TimeoutType: int(workflow.TimeoutTypeScheduleToClose), VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(), - EventID: di.ScheduleID, + EventID: scheduledEvent.GetEventId(), } completeEvent := addActivityTaskCompletedEvent(mutableState, scheduledEvent.GetEventId(), common.TransientEventID, []byte(nil), identity) @@ -705,6 +705,89 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_RetryPolicy_ s.NoError(err) } +func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_Heartbeat_Noop() { + execution := workflow.WorkflowExecution{ + WorkflowId: common.StringPtr("some random workflow ID"), + RunId: common.StringPtr(uuid.New()), + } + workflowType := "some random workflow type" + taskListName := "some random task list" + + mutableState := newMutableStateBuilderWithReplicationStateWithEventV2(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId()) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &history.StartWorkflowExecutionRequest{ + DomainUUID: common.StringPtr(s.domainID), + StartRequest: &workflow.StartWorkflowExecutionRequest{ + WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)}, + TaskList: &workflow.TaskList{Name: common.StringPtr(taskListName)}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(2), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + }, + }, + ) + s.Nil(err) + + di := addDecisionTaskScheduledEvent(mutableState) + event := addDecisionTaskStartedEvent(mutableState, di.ScheduleID, taskListName, uuid.New()) + di.StartedID = event.GetEventId() + event = addDecisionTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, nil, "some random identity") + + identity := "identity" + tasklist := "tasklist" + activityID := "activity" + activityType := "activity type" + timerTimeout := 2 * time.Second + heartbeatTimerTimeout := time.Second + scheduledEvent, _ := addActivityTaskScheduledEventWithRetry( + mutableState, + event.GetEventId(), + activityID, + activityType, + tasklist, + []byte(nil), + int32(timerTimeout.Seconds()), + int32(timerTimeout.Seconds()), + int32(timerTimeout.Seconds()), + int32(heartbeatTimerTimeout.Seconds()), + &workflow.RetryPolicy{ + InitialIntervalInSeconds: common.Int32Ptr(1), + BackoffCoefficient: common.Float64Ptr(1.2), + MaximumIntervalInSeconds: common.Int32Ptr(5), + MaximumAttempts: common.Int32Ptr(5), + NonRetriableErrorReasons: []string{"(╯' - ')╯ ┻━┻ "}, + ExpirationIntervalInSeconds: common.Int32Ptr(999), + }, + ) + startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), identity) + s.Nil(startedEvent) + + timerSequence := newTimerSequence(s.timeSource, mutableState) + mutableState.insertTimerTasks = nil + modified, err := timerSequence.createNextActivityTimer() + s.NoError(err) + s.True(modified) + task := mutableState.insertTimerTasks[0] + s.Equal(int(timerTypeHeartbeat), task.(*persistence.ActivityTimeoutTask).TimeoutType) + timerTask := &persistence.TimerTaskInfo{ + Version: s.version, + DomainID: s.domainID, + WorkflowID: execution.GetWorkflowId(), + RunID: execution.GetRunId(), + TaskID: int64(100), + TaskType: persistence.TaskTypeActivityTimeout, + TimeoutType: int(workflow.TimeoutTypeHeartbeat), + VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp().Add(-time.Second), + EventID: scheduledEvent.GetEventId(), + } + + persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + + _, err = s.timerQueueActiveProcessor.process(newTaskInfo(nil, timerTask, s.logger)) + s.NoError(err) +} + func (s *timerQueueActiveProcessorSuite) TestDecisionTimeout_Fire() { execution := workflow.WorkflowExecution{ diff --git a/service/history/timerSequence.go b/service/history/timerSequence.go index 4b457e81d4a..fadcecc1e7b 100644 --- a/service/history/timerSequence.go +++ b/service/history/timerSequence.go @@ -163,6 +163,9 @@ func (t *timerSequenceImpl) createNextActivityTimer() (bool, error) { } // mark timer task mask as indication that timer task is generated activityInfo.TimerTaskStatus |= timerTypeToTimerMask(firstTimerTask.timerType) + if firstTimerTask.timerType == timerTypeHeartbeat { + activityInfo.LastHeartbeatTimeoutVisibility = firstTimerTask.timestamp.Unix() + } if err := t.mutableState.UpdateActivity(activityInfo); err != nil { return false, err } diff --git a/service/history/timerSequence_test.go b/service/history/timerSequence_test.go index 101b60eef13..8fa09224b4f 100644 --- a/service/history/timerSequence_test.go +++ b/service/history/timerSequence_test.go @@ -188,6 +188,51 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated() { s.True(modified) } +func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer() { + now := time.Now() + currentVersion := int64(999) + activityInfo := &persistence.ActivityInfo{ + Version: 123, + ScheduleID: 234, + ScheduledTime: now, + StartedID: 345, + StartedTime: now.Add(200 * time.Millisecond), + ActivityID: "some random activity ID", + ScheduleToStartTimeout: 10, + ScheduleToCloseTimeout: 1000, + StartToCloseTimeout: 100, + HeartbeatTimeout: 1, + LastHeartBeatUpdatedTime: time.Time{}, + TimerTaskStatus: timerTaskStatusNone, + Attempt: 12, + } + activityInfos := map[int64]*persistence.ActivityInfo{activityInfo.ScheduleID: activityInfo} + s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(activityInfos).Times(1) + s.mockMutableState.EXPECT().GetActivityInfo(activityInfo.ScheduleID).Return(activityInfo, true).Times(1) + + taskVisibilityTimestamp := activityInfo.StartedTime.Add( + time.Duration(activityInfo.HeartbeatTimeout) * time.Second, + ) + + var activityInfoUpdated = *activityInfo // make a copy + activityInfoUpdated.TimerTaskStatus = timerTaskStatusCreatedHeartbeat + activityInfoUpdated.LastHeartbeatTimeoutVisibility = taskVisibilityTimestamp.Unix() + s.mockMutableState.EXPECT().UpdateActivity(&activityInfoUpdated).Return(nil).Times(1) + s.mockMutableState.EXPECT().GetCurrentVersion().Return(currentVersion).Times(1) + s.mockMutableState.EXPECT().AddTimerTasks(&persistence.ActivityTimeoutTask{ + // TaskID is set by shard + VisibilityTimestamp: taskVisibilityTimestamp, + TimeoutType: int(shared.TimeoutTypeHeartbeat), + EventID: activityInfo.ScheduleID, + Attempt: int64(activityInfo.Attempt), + Version: currentVersion, + }).Times(1) + + modified, err := s.timerSequence.createNextActivityTimer() + s.NoError(err) + s.True(modified) +} + func (s *timerSequenceSuite) TestLoadAndSortUserTimers_None() { timerInfos := map[string]*persistence.TimerInfo{} s.mockMutableState.EXPECT().GetPendingTimerInfos().Return(timerInfos).Times(1)