From 763527f175364d6afc3afecee5abb957acf90d92 Mon Sep 17 00:00:00 2001 From: Bowen Xiao Date: Fri, 8 Nov 2024 15:05:53 -0800 Subject: [PATCH 1/2] Add test --- .../internal/worker/ActivityPollTaskTest.java | 194 ++++++++++++++++++ ...LocallyDispatchedActivityPollTaskTest.java | 69 +++++++ .../internal/worker/WorkflowPollTaskTest.java | 17 ++ 3 files changed, 280 insertions(+) create mode 100644 src/test/java/com/uber/cadence/internal/worker/ActivityPollTaskTest.java create mode 100644 src/test/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityPollTaskTest.java create mode 100644 src/test/java/com/uber/cadence/internal/worker/WorkflowPollTaskTest.java diff --git a/src/test/java/com/uber/cadence/internal/worker/ActivityPollTaskTest.java b/src/test/java/com/uber/cadence/internal/worker/ActivityPollTaskTest.java new file mode 100644 index 00000000..0fbe15fe --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/worker/ActivityPollTaskTest.java @@ -0,0 +1,194 @@ +/** + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + *

Modifications copyright (C) 2017 Uber Technologies, Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file + * except in compliance with the License. A copy of the License is located at + * + *

http://aws.amazon.com/apache2.0 + * + *

or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.uber.cadence.internal.worker; + +import static com.uber.cadence.internal.metrics.MetricsTagValue.INTERNAL_SERVICE_ERROR; +import static com.uber.cadence.internal.metrics.MetricsTagValue.SERVICE_BUSY; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.google.common.collect.ImmutableMap; +import com.uber.cadence.InternalServiceError; +import com.uber.cadence.PollForActivityTaskRequest; +import com.uber.cadence.PollForActivityTaskResponse; +import com.uber.cadence.ServiceBusyError; +import com.uber.cadence.internal.metrics.MetricsTag; +import com.uber.cadence.internal.metrics.MetricsType; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.m3.tally.Counter; +import com.uber.m3.tally.Scope; +import com.uber.m3.tally.Stopwatch; +import com.uber.m3.tally.Timer; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; + +public class ActivityPollTaskTest { + + private IWorkflowService mockService; + private SingleWorkerOptions options; + private ActivityPollTask pollTask; + + @Before + public void setup() { + mockService = mock(IWorkflowService.class); + Scope metricsScope = mock(Scope.class); // Mock Scope to avoid NoopScope + + // Mock the Timer and Stopwatch + Timer timer = mock(Timer.class); + Stopwatch stopwatch = mock(Stopwatch.class); + when(metricsScope.timer(MetricsType.ACTIVITY_POLL_LATENCY)).thenReturn(timer); + when(timer.start()).thenReturn(stopwatch); + + // Mock the Counter and its inc() method + Counter counter = mock(Counter.class); + when(metricsScope.counter(MetricsType.ACTIVITY_POLL_COUNTER)).thenReturn(counter); + when(metricsScope.counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER)).thenReturn(counter); + when(metricsScope.counter(MetricsType.ACTIVITY_POLL_FAILED_COUNTER)).thenReturn(counter); + when(metricsScope.counter(MetricsType.ACTIVITY_POLL_TRANSIENT_FAILED_COUNTER)) + .thenReturn(counter); + + // Use doNothing() to stub the inc() method of Counter, as it returns void + doNothing().when(counter).inc(anyInt()); + + // Set up SingleWorkerOptions to return the mocked metricsScope + options = mock(SingleWorkerOptions.class); + when(options.getMetricsScope()) + .thenReturn(metricsScope); // Set options to return the mock Scope + when(options.getIdentity()).thenReturn("test-identity"); + when(options.getTaskListActivitiesPerSecond()).thenReturn(1.0); + + // Initialize pollTask with mocked options + pollTask = new ActivityPollTask(mockService, "test-domain", "test-taskList", options); + } + + @Test + public void testPollTaskSuccess() throws TException { + PollForActivityTaskResponse response = + new PollForActivityTaskResponse().setTaskToken("testToken".getBytes()); + when(mockService.PollForActivityTask(any(PollForActivityTaskRequest.class))) + .thenReturn(response); + + // Mock the timer and stopwatch behavior + Scope metricsScope = options.getMetricsScope(); + Timer timer = mock(Timer.class); + when(metricsScope.timer(MetricsType.ACTIVITY_POLL_LATENCY)).thenReturn(timer); + Stopwatch sw = mock(Stopwatch.class); + when(timer.start()).thenReturn(sw); + + PollForActivityTaskResponse result = pollTask.pollTask(); + + assertNotNull(result); + assertArrayEquals("testToken".getBytes(), result.getTaskToken()); + + // Verify the counters and the timer behavior + verify(metricsScope.counter(MetricsType.ACTIVITY_POLL_COUNTER), times(1)).inc(1); + verify(timer, times(1)).start(); + verify(sw, times(1)).stop(); + } + + @Test(expected = InternalServiceError.class) + public void testPollTaskInternalServiceError() throws TException { + // Set up mockService to throw an InternalServiceError exception + when(mockService.PollForActivityTask(any(PollForActivityTaskRequest.class))) + .thenThrow(new InternalServiceError()); + + // Mock taggedScope and taggedCounter to ensure the behavior + Scope metricsScope = options.getMetricsScope(); + Scope taggedScope = mock(Scope.class); + Counter taggedCounter = mock(Counter.class); + + // Set up taggedScope to return taggedCounter for specific counter calls + when(metricsScope.tagged(ImmutableMap.of(MetricsTag.CAUSE, INTERNAL_SERVICE_ERROR))) + .thenReturn(taggedScope); + when(taggedScope.counter(MetricsType.ACTIVITY_POLL_TRANSIENT_FAILED_COUNTER)) + .thenReturn(taggedCounter); + + try { + // Call pollTask.pollTask(), expecting an InternalServiceError to be thrown + pollTask.pollTask(); + } finally { + // Verify that taggedCounter.inc(1) is called once + verify(taggedCounter, times(1)).inc(1); + } + } + + @Test(expected = ServiceBusyError.class) + public void testPollTaskServiceBusyError() throws TException { + // Set up mockService to throw a ServiceBusyError exception + when(mockService.PollForActivityTask(any(PollForActivityTaskRequest.class))) + .thenThrow(new ServiceBusyError()); + + // Mock taggedScope and taggedCounter to ensure the behavior + Scope metricsScope = options.getMetricsScope(); + Scope taggedScope = mock(Scope.class); + Counter taggedCounter = mock(Counter.class); + + // Set up taggedScope to return taggedCounter for specific counter calls + when(metricsScope.tagged(ImmutableMap.of(MetricsTag.CAUSE, SERVICE_BUSY))) + .thenReturn(taggedScope); + when(taggedScope.counter(MetricsType.ACTIVITY_POLL_TRANSIENT_FAILED_COUNTER)) + .thenReturn(taggedCounter); + + try { + // Call pollTask.pollTask(), expecting a ServiceBusyError to be thrown + pollTask.pollTask(); + } finally { + // Verify that taggedCounter.inc(1) is called once + verify(taggedCounter, times(1)).inc(1); + } + } + + @Test(expected = TException.class) + public void testPollTaskGeneralTException() throws TException { + // Set up mockService to throw a TException + when(mockService.PollForActivityTask(any(PollForActivityTaskRequest.class))) + .thenThrow(new TException()); + + // Mock the metricsScope and counter to ensure proper behavior + Scope metricsScope = options.getMetricsScope(); + Counter failedCounter = mock(Counter.class); + when(metricsScope.counter(MetricsType.ACTIVITY_POLL_FAILED_COUNTER)).thenReturn(failedCounter); + + try { + // Call pollTask.pollTask(), expecting a TException to be thrown + pollTask.pollTask(); + } finally { + // Verify that failedCounter.inc(1) is called once + verify(failedCounter, times(1)).inc(1); + } + } + + @Test + public void testPollTaskNoTask() throws TException { + // Set up mockService to return an empty PollForActivityTaskResponse + when(mockService.PollForActivityTask(any(PollForActivityTaskRequest.class))) + .thenReturn(new PollForActivityTaskResponse()); + + // Mock the metricsScope and noTaskCounter to ensure proper behavior + Scope metricsScope = options.getMetricsScope(); + Counter noTaskCounter = mock(Counter.class); + when(metricsScope.counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER)).thenReturn(noTaskCounter); + + // Call pollTask.pollTask() and check the response + PollForActivityTaskResponse result = pollTask.pollTask(); + + // Verify that the result is null when there is no task + assertNull(result); + + // Verify that noTaskCounter.inc(1) is called once + verify(noTaskCounter, times(1)).inc(1); + } +} diff --git a/src/test/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityPollTaskTest.java b/src/test/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityPollTaskTest.java new file mode 100644 index 00000000..2a9148e2 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityPollTaskTest.java @@ -0,0 +1,69 @@ +/** + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + *

Modifications copyright (C) 2017 Uber Technologies, Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file + * except in compliance with the License. A copy of the License is located at + * + *

http://aws.amazon.com/apache2.0 + * + *

or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.uber.cadence.internal.worker; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.serviceclient.ClientOptions; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.m3.tally.NoopScope; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; + +public class LocallyDispatchedActivityPollTaskTest { + + private WorkflowClient mockClient; + private IWorkflowService mockService; + private LocallyDispatchedActivityPollTask pollTask; + private LocallyDispatchedActivityWorker.Task mockTask; + private SingleWorkerOptions options; + + @Before + public void setup() throws Exception { + mockClient = mock(WorkflowClient.class); + mockService = mock(IWorkflowService.class); + + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setMetricsScope(new NoopScope()).build(); + when(mockClient.getOptions()).thenReturn(clientOptions); + when(mockClient.getService()).thenReturn(mockService); + when(mockService.getOptions()).thenReturn(ClientOptions.defaultInstance()); + + options = + SingleWorkerOptions.newBuilder().setMetricsScope(clientOptions.getMetricsScope()).build(); + pollTask = new LocallyDispatchedActivityPollTask(options); + mockTask = mock(LocallyDispatchedActivityWorker.Task.class); + } + + @Test + public void testPollTaskInterruptedException() throws Exception { + Thread.currentThread().interrupt(); + pollTask.apply(mockTask); + try { + pollTask.pollTask(); + fail("Expected RuntimeException due to interruption"); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains("locally dispatch activity poll task interrupted")); + } catch (TException e) { + fail("Unexpected TException"); + } finally { + Thread.interrupted(); + } + } +} diff --git a/src/test/java/com/uber/cadence/internal/worker/WorkflowPollTaskTest.java b/src/test/java/com/uber/cadence/internal/worker/WorkflowPollTaskTest.java new file mode 100644 index 00000000..779110ea --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/worker/WorkflowPollTaskTest.java @@ -0,0 +1,17 @@ +/** + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + *

Modifications copyright (C) 2017 Uber Technologies, Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file + * except in compliance with the License. A copy of the License is located at + * + *

http://aws.amazon.com/apache2.0 + * + *

or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.uber.cadence.internal.worker; + +public class WorkflowPollTaskTest {} From 39f8ec240540a9f2689c6d46e42d453d15c6364d Mon Sep 17 00:00:00 2001 From: Bowen Xiao Date: Fri, 8 Nov 2024 15:48:19 -0800 Subject: [PATCH 2/2] add test for workflowPollTask --- .../internal/worker/WorkflowPollTaskTest.java | 197 +++++++++++++++++- 1 file changed, 196 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/uber/cadence/internal/worker/WorkflowPollTaskTest.java b/src/test/java/com/uber/cadence/internal/worker/WorkflowPollTaskTest.java index 779110ea..9b36f551 100644 --- a/src/test/java/com/uber/cadence/internal/worker/WorkflowPollTaskTest.java +++ b/src/test/java/com/uber/cadence/internal/worker/WorkflowPollTaskTest.java @@ -14,4 +14,199 @@ */ package com.uber.cadence.internal.worker; -public class WorkflowPollTaskTest {} +import static com.uber.cadence.internal.metrics.MetricsTagValue.INTERNAL_SERVICE_ERROR; +import static com.uber.cadence.internal.metrics.MetricsTagValue.SERVICE_BUSY; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.google.common.collect.ImmutableMap; +import com.uber.cadence.*; +import com.uber.cadence.internal.metrics.MetricsTag; +import com.uber.cadence.internal.metrics.MetricsType; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.m3.tally.Counter; +import com.uber.m3.tally.Scope; +import com.uber.m3.tally.Stopwatch; +import com.uber.m3.tally.Timer; +import com.uber.m3.util.Duration; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; + +public class WorkflowPollTaskTest { + + private IWorkflowService mockService; + private Scope mockMetricScope; + private WorkflowPollTask pollTask; + + @Before + public void setup() { + mockService = mock(IWorkflowService.class); + mockMetricScope = mock(Scope.class); + + // Mock the Timer and Stopwatch + Timer pollLatencyTimer = mock(Timer.class); + Timer scheduledToStartLatencyTimer = mock(Timer.class); + Stopwatch sw = mock(Stopwatch.class); + + // Ensure timers and stopwatch are not null and return expected values + when(mockMetricScope.timer(MetricsType.DECISION_POLL_LATENCY)).thenReturn(pollLatencyTimer); + when(pollLatencyTimer.start()).thenReturn(sw); + when(mockMetricScope.timer(MetricsType.DECISION_SCHEDULED_TO_START_LATENCY)) + .thenReturn(scheduledToStartLatencyTimer); + doNothing().when(scheduledToStartLatencyTimer).record(any(Duration.class)); + + // Mock counters for different metrics + Counter pollCounter = mock(Counter.class); + Counter succeedCounter = mock(Counter.class); + Counter noTaskCounter = mock(Counter.class); + Counter failedCounter = mock(Counter.class); + Counter transientFailedCounter = mock(Counter.class); + + // Set up mockMetricScope to return these counters for specific metrics + when(mockMetricScope.counter(MetricsType.DECISION_POLL_COUNTER)).thenReturn(pollCounter); + when(mockMetricScope.counter(MetricsType.DECISION_POLL_NO_TASK_COUNTER)) + .thenReturn(noTaskCounter); + when(mockMetricScope.counter(MetricsType.DECISION_POLL_FAILED_COUNTER)) + .thenReturn(failedCounter); + when(mockMetricScope.counter(MetricsType.DECISION_POLL_TRANSIENT_FAILED_COUNTER)) + .thenReturn(transientFailedCounter); + when(mockMetricScope.counter(MetricsType.DECISION_POLL_SUCCEED_COUNTER)) + .thenReturn(succeedCounter); + + // Initialize pollTask with the mocked dependencies + pollTask = + new WorkflowPollTask( + mockService, + "test-domain", + "test-taskList", + TaskListKind.TASK_LIST_KIND_NORMAL, + mockMetricScope, + "test-identity"); + } + + @Test + public void testPollSuccess() throws TException { + // Mock a successful response with all necessary fields + WorkflowType workflowType = new WorkflowType().setName("testWorkflowType"); + + PollForDecisionTaskResponse response = + new PollForDecisionTaskResponse() + .setTaskToken("testToken".getBytes()) + .setWorkflowType(workflowType) + .setScheduledTimestamp(1000L) // Ensure ScheduledTimestamp is non-null + .setStartedTimestamp(2000L); // Ensure StartedTimestamp is non-null + + when(mockService.PollForDecisionTask(any(PollForDecisionTaskRequest.class))) + .thenReturn(response); + + // Mock the timer and stopwatch behavior + Timer pollLatencyTimer = mock(Timer.class); + Timer scheduledToStartLatencyTimer = mock(Timer.class); + Stopwatch sw = mock(Stopwatch.class); + when(mockMetricScope.timer(MetricsType.DECISION_POLL_LATENCY)).thenReturn(pollLatencyTimer); + when(pollLatencyTimer.start()).thenReturn(sw); + + // Mock the tagged scope for workflow type + Scope taggedScope = mock(Scope.class); + when(mockMetricScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, "testWorkflowType"))) + .thenReturn(taggedScope); + + // Ensure DECISION_SCHEDULED_TO_START_LATENCY timer in taggedScope is not null + when(taggedScope.timer(MetricsType.DECISION_SCHEDULED_TO_START_LATENCY)) + .thenReturn(scheduledToStartLatencyTimer); + doNothing().when(scheduledToStartLatencyTimer).record(any(Duration.class)); + + // Mock counters for DECISION_POLL_COUNTER and DECISION_POLL_SUCCEED_COUNTER + Counter pollCounter = mock(Counter.class); + Counter succeedCounter = mock(Counter.class); + when(mockMetricScope.counter(MetricsType.DECISION_POLL_COUNTER)).thenReturn(pollCounter); + when(taggedScope.counter(MetricsType.DECISION_POLL_SUCCEED_COUNTER)).thenReturn(succeedCounter); + + PollForDecisionTaskResponse result = pollTask.poll(); + + // Verify that the result is not null and task token is as expected + assertNotNull(result); + assertArrayEquals("testToken".getBytes(), result.getTaskToken()); + + // Verify counter and timer behavior + verify(pollCounter, times(1)).inc(1); + verify(succeedCounter, times(1)).inc(1); + verify(pollLatencyTimer, times(1)).start(); + verify(sw, times(1)).stop(); + + // Verify that record() on scheduledToStartLatencyTimer was called with correct duration + Duration expectedDuration = + Duration.ofNanos(result.getStartedTimestamp() - result.getScheduledTimestamp()); + verify(scheduledToStartLatencyTimer, times(1)).record(eq(expectedDuration)); + } + + @Test(expected = InternalServiceError.class) + public void testPollInternalServiceError() throws TException { + when(mockService.PollForDecisionTask(any(PollForDecisionTaskRequest.class))) + .thenThrow(new InternalServiceError()); + + Scope taggedScope = mock(Scope.class); + Counter taggedCounter = mock(Counter.class); + when(mockMetricScope.tagged(ImmutableMap.of(MetricsTag.CAUSE, INTERNAL_SERVICE_ERROR))) + .thenReturn(taggedScope); + when(taggedScope.counter(MetricsType.DECISION_POLL_TRANSIENT_FAILED_COUNTER)) + .thenReturn(taggedCounter); + + try { + pollTask.poll(); + } finally { + verify(taggedCounter, times(1)).inc(1); + } + } + + @Test(expected = ServiceBusyError.class) + public void testPollServiceBusyError() throws TException { + when(mockService.PollForDecisionTask(any(PollForDecisionTaskRequest.class))) + .thenThrow(new ServiceBusyError()); + + Scope taggedScope = mock(Scope.class); + Counter taggedCounter = mock(Counter.class); + when(mockMetricScope.tagged(ImmutableMap.of(MetricsTag.CAUSE, SERVICE_BUSY))) + .thenReturn(taggedScope); + when(taggedScope.counter(MetricsType.DECISION_POLL_TRANSIENT_FAILED_COUNTER)) + .thenReturn(taggedCounter); + + try { + pollTask.poll(); + } finally { + verify(taggedCounter, times(1)).inc(1); + } + } + + @Test(expected = TException.class) + public void testPollGeneralTException() throws TException { + when(mockService.PollForDecisionTask(any(PollForDecisionTaskRequest.class))) + .thenThrow(new TException()); + + Counter failedCounter = mock(Counter.class); + when(mockMetricScope.counter(MetricsType.DECISION_POLL_FAILED_COUNTER)) + .thenReturn(failedCounter); + + try { + pollTask.poll(); + } finally { + verify(failedCounter, times(1)).inc(1); + } + } + + @Test + public void testPollNoTask() throws TException { + when(mockService.PollForDecisionTask(any(PollForDecisionTaskRequest.class))) + .thenReturn(new PollForDecisionTaskResponse()); + + Counter noTaskCounter = mock(Counter.class); + when(mockMetricScope.counter(MetricsType.DECISION_POLL_NO_TASK_COUNTER)) + .thenReturn(noTaskCounter); + + PollForDecisionTaskResponse result = pollTask.poll(); + + assertNull(result); + verify(noTaskCounter, times(1)).inc(1); + } +}