From ce02feabbab77c65a80979620b0cf3f133f40b8b Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Thu, 30 May 2019 14:42:34 -0700 Subject: [PATCH] Fix getVersion without decision event (#309) * Fix getVersion without decision event * Add sleep before signal * Use completable future instead of sleep --- .../internal/replay/DecisionsHelper.java | 4 -- .../internal/replay/HistoryHelper.java | 10 +--- .../internal/replay/MarkerHandler.java | 7 +-- .../uber/cadence/workflow/WorkflowTest.java | 54 ++++++++++++++++++- 4 files changed, 58 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java index 21390152f..07cdd7e70 100644 --- a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java +++ b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java @@ -701,10 +701,6 @@ String getAndIncrementNextId() { return String.valueOf(idCounter++); } - HistoryEvent getDecisionEvent(long eventId) { - return decisionEvents.getDecisionEvent(eventId); - } - Optional getOptionalDecisionEvent(long eventId) { return decisionEvents.getOptionalDecisionEvent(eventId); } diff --git a/src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java b/src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java index 68b879ceb..bef3cba5c 100644 --- a/src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java +++ b/src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java @@ -78,18 +78,10 @@ public List getEvents() { return events; } - public List getDecisionEvents() { + List getDecisionEvents() { return decisionEvents; } - HistoryEvent getDecisionEvent(long eventId) { - int index = (int) (eventId - nextDecisionEventId); - if (index < 0 || index >= decisionEvents.size()) { - throw new IllegalArgumentException("No decision event found at eventId=" + eventId); - } - return decisionEvents.get(index); - } - Optional getOptionalDecisionEvent(long eventId) { int index = (int) (eventId - nextDecisionEventId); if (index < 0 || index >= decisionEvents.size()) { diff --git a/src/main/java/com/uber/cadence/internal/replay/MarkerHandler.java b/src/main/java/com/uber/cadence/internal/replay/MarkerHandler.java index 992bfaac3..ac9fd007e 100644 --- a/src/main/java/com/uber/cadence/internal/replay/MarkerHandler.java +++ b/src/main/java/com/uber/cadence/internal/replay/MarkerHandler.java @@ -134,11 +134,12 @@ Optional handle( private Optional getMarkerDataFromHistory( long eventId, String markerId, int expectedAcccessCount, DataConverter converter) { - HistoryEvent event = decisions.getDecisionEvent(eventId); - if (event.getEventType() != EventType.MarkerRecorded) { + Optional event = decisions.getOptionalDecisionEvent(eventId); + if (!event.isPresent() || event.get().getEventType() != EventType.MarkerRecorded) { return Optional.empty(); } - MarkerRecordedEventAttributes attributes = event.getMarkerRecordedEventAttributes(); + + MarkerRecordedEventAttributes attributes = event.get().getMarkerRecordedEventAttributes(); String name = attributes.getMarkerName(); if (!markerName.equals(name)) { return Optional.empty(); diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 9d73fdee8..6abf8d9ab 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -114,7 +114,7 @@ public class WorkflowTest { */ private static final boolean DEBUGGER_TIMEOUTS = false; - public static final String ANNOTATION_TASK_LIST = "WorkflowTest-testExecute[Docker]"; + private static final String ANNOTATION_TASK_LIST = "WorkflowTest-testExecute[Docker]"; private TracingWorkflowInterceptorFactory tracer; private static final boolean useDockerService = @@ -3733,6 +3733,58 @@ public void testGetVersion() { "executeActivity customActivity1"); } + static CompletableFuture executionStarted = new CompletableFuture<>(); + public static class TestGetVersionWithoutDecisionEventWorkflowImpl implements TestWorkflowSignaled { + + CompletablePromise signalReceived = Workflow.newPromise(); + + @Override + public String execute() { + try { + if (!getVersionExecuted.contains("getVersionWithoutDecisionEvent")) { + // Execute getVersion in non-replay mode. + getVersionExecuted.add("getVersionWithoutDecisionEvent"); + executionStarted.complete(true); + signalReceived.get(); + } else { + // Execute getVersion in replay mode. In this case we have no decision event, only a signal. + int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1); + if (version == Workflow.DEFAULT_VERSION) { + signalReceived.get(); + return "result 1"; + } else { + return "result 2"; + } + } + } catch (Exception e) { + throw new RuntimeException("failed to get from signal"); + } + + throw new RuntimeException("unreachable"); + } + + @Override + public void signal1(String arg) { + signalReceived.complete(true); + } + } + + @Test + public void testGetVersionWithoutDecisionEvent() throws Exception { + Assume.assumeTrue("skipping as there will be no replay", disableStickyExecution); + executionStarted = new CompletableFuture<>(); + getVersionExecuted.remove("getVersionWithoutDecisionEvent"); + startWorkerFor(TestGetVersionWithoutDecisionEventWorkflowImpl.class); + TestWorkflowSignaled workflowStub = + workflowClient.newWorkflowStub( + TestWorkflowSignaled.class, newWorkflowOptionsBuilder(taskList).build()); + WorkflowClient.start(workflowStub::execute); + executionStarted.get(); + workflowStub.signal1("test signal"); + String result = workflowStub.execute(); + assertEquals("result 1", result); + } + // The following test covers the scenario where getVersion call is removed before a // non-version-marker decision. public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflow1 {