From aa1f2dc14f6dd711ce05362b358b0b0e8154c8d6 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sat, 15 Jul 2023 13:52:08 -0400 Subject: [PATCH 1/2] Refactor: add shared utility functions --- features/child_workflow/result/feature.py | 7 +------ features/child_workflow/signal/feature.java | 13 +------------ features/child_workflow/signal/feature.py | 7 +------ features/child_workflow/signal/feature.ts | 8 +------- harness/python/feature.py | 5 ++++- harness/ts/harness.ts | 5 ++++- 6 files changed, 12 insertions(+), 33 deletions(-) diff --git a/features/child_workflow/result/feature.py b/features/child_workflow/result/feature.py index 35b99656..cdc8063c 100644 --- a/features/child_workflow/result/feature.py +++ b/features/child_workflow/result/feature.py @@ -28,12 +28,7 @@ async def run(self, input: str) -> str: async def start(runner: Runner) -> WorkflowHandle: - return await runner.client.start_workflow( - Workflow, - id=f"{runner.feature.rel_dir}-{uuid4()}", - task_queue=runner.task_queue, - execution_timeout=timedelta(minutes=1), - ) + return await runner.start_parameterless_workflow(Workflow) register_feature( diff --git a/features/child_workflow/signal/feature.java b/features/child_workflow/signal/feature.java index 83c1e049..3896bed7 100644 --- a/features/child_workflow/signal/feature.java +++ b/features/child_workflow/signal/feature.java @@ -1,7 +1,5 @@ package child_workflow.signal; -import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowOptions; import io.temporal.sdkfeatures.Assertions; import io.temporal.sdkfeatures.Feature; import io.temporal.sdkfeatures.Run; @@ -13,7 +11,6 @@ import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; -import java.time.Duration; @WorkflowInterface public interface feature extends Feature { @@ -78,15 +75,7 @@ public String workflow() { @Override public Run execute(Runner runner) throws Exception { - var options = - WorkflowOptions.newBuilder() - .setTaskQueue(runner.config.taskQueue) - .setWorkflowExecutionTimeout(Duration.ofMinutes(1)) - .build(); - var stub = runner.client.newWorkflowStub(feature.class, options); - var execution = WorkflowClient.start(stub::workflow); - var method = runner.featureInfo.metadata.getWorkflowMethods().get(0); - return new Run(method, execution); + return runner.executeSingleParameterlessWorkflow(); } @Override diff --git a/features/child_workflow/signal/feature.py b/features/child_workflow/signal/feature.py index ed679c4d..9ecbe223 100644 --- a/features/child_workflow/signal/feature.py +++ b/features/child_workflow/signal/feature.py @@ -44,12 +44,7 @@ def unblock(self, message: Optional[str]) -> None: async def start(runner: Runner) -> WorkflowHandle: - return await runner.client.start_workflow( - Workflow, - id=f"{runner.feature.rel_dir}-{uuid4()}", - task_queue=runner.task_queue, - execution_timeout=timedelta(minutes=1), - ) + return await runner.start_parameterless_workflow(Workflow) register_feature( diff --git a/features/child_workflow/signal/feature.ts b/features/child_workflow/signal/feature.ts index ad8c5936..5b9a9653 100644 --- a/features/child_workflow/signal/feature.ts +++ b/features/child_workflow/signal/feature.ts @@ -1,4 +1,3 @@ -import { randomUUID } from 'crypto'; import * as assert from 'assert'; import { Feature } from '@temporalio/harness'; import * as wf from '@temporalio/workflow'; @@ -28,12 +27,7 @@ export async function childWorkflow(): Promise { export const feature = new Feature({ workflow, async execute(runner) { - return await runner.client.start(workflow, { - taskQueue: runner.options.taskQueue, - workflowId: `${runner.source.relDir}-${randomUUID()}`, - workflowExecutionTimeout: 60000, - ...(runner.feature.options.workflowStartOptions ?? {}), - }); + return await runner.executeParameterlessWorkflow(workflow); }, async checkResult(runner, handle) { const result = await handle.result(); diff --git a/harness/python/feature.py b/harness/python/feature.py index 33f913cc..49a8bfeb 100644 --- a/harness/python/feature.py +++ b/harness/python/feature.py @@ -150,7 +150,10 @@ async def run(self) -> None: async def start_single_parameterless_workflow(self) -> WorkflowHandle: if len(self.feature.workflows) != 1: raise ValueError("Must have a single workflow") - defn = workflow._Definition.must_from_class(self.feature.workflows[0]) + return await self.start_parameterless_workflow(self.feature.workflows[0]) + + async def start_parameterless_workflow(self, workflow_cls: Type) -> WorkflowHandle: + defn = workflow._Definition.must_from_class(workflow_cls) start_options = { "id": f"{self.feature.rel_dir}-{uuid.uuid4()}", "task_queue": self.task_queue, diff --git a/harness/ts/harness.ts b/harness/ts/harness.ts index f0527c40..902229d8 100644 --- a/harness/ts/harness.ts +++ b/harness/ts/harness.ts @@ -243,7 +243,10 @@ export class Runner { } async executeSingleParameterlessWorkflow(): Promise { - const workflow = this.feature.options.workflow ?? 'workflow'; + return this.executeParameterlessWorkflow(this.feature.options.workflow ?? 'workflow'); + } + + async executeParameterlessWorkflow(workflow: W | 'workflow'): Promise { const startOptions: WorkflowStartOptions = { taskQueue: this.options.taskQueue, workflowId: `${this.source.relDir}-${randomUUID()}`, From cf9c8410dd33dd504fcf2d71c61c23bc0aeb05db Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sat, 15 Jul 2023 14:45:59 -0400 Subject: [PATCH 2/2] Refactor: single-file java features --- .../child_workflow/result/ChildWorkflow.java | 18 ---- features/child_workflow/result/feature.java | 92 +++++++++++-------- features/child_workflow/signal/feature.java | 32 +++---- 3 files changed, 69 insertions(+), 73 deletions(-) delete mode 100644 features/child_workflow/result/ChildWorkflow.java diff --git a/features/child_workflow/result/ChildWorkflow.java b/features/child_workflow/result/ChildWorkflow.java deleted file mode 100644 index 9d7193ef..00000000 --- a/features/child_workflow/result/ChildWorkflow.java +++ /dev/null @@ -1,18 +0,0 @@ -package child_workflow.result; - -import io.temporal.workflow.WorkflowInterface; -import io.temporal.workflow.WorkflowMethod; - - -@WorkflowInterface -public interface ChildWorkflow { - @WorkflowMethod - public String executeChild(String input ); -} - -class ChildWorkflowImpl implements ChildWorkflow { - @Override - public String executeChild(String input) { - return input; - } -} diff --git a/features/child_workflow/result/feature.java b/features/child_workflow/result/feature.java index 161b17b7..662619b6 100644 --- a/features/child_workflow/result/feature.java +++ b/features/child_workflow/result/feature.java @@ -5,52 +5,66 @@ import io.temporal.sdkfeatures.Feature; import io.temporal.sdkfeatures.Run; import io.temporal.sdkfeatures.Runner; +import io.temporal.worker.Worker; +import io.temporal.workflow.Async; +import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import org.junit.jupiter.api.Assertions; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; -import io.temporal.workflow.Async; -import io.temporal.workflow.Promise; - import java.time.Duration; +import org.junit.jupiter.api.Assertions; @WorkflowInterface public interface feature extends Feature { + + @WorkflowInterface + interface ChildWorkflow { @WorkflowMethod - public String workflow(); - - class Impl implements feature, ChildWorkflow { - private static final String CHILDWORKFLOW_INPUT = "test"; - - @Override - public String executeChild(String input) { - return input; - } - - @Override - public String workflow() { - ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class); - Promise result = Async.function(child::executeChild, CHILDWORKFLOW_INPUT); - return result.get(); - } - - @Override - public Run execute(Runner runner) throws Exception { - var options = WorkflowOptions.newBuilder() - .setTaskQueue(runner.config.taskQueue) - .setWorkflowExecutionTimeout(Duration.ofMinutes(1)) - .build(); - - var methods = runner.featureInfo.metadata.getWorkflowMethods(); - - var stub = runner.client.newWorkflowStub(feature.class, options); - return new Run(methods.get(0), WorkflowClient.start(stub::workflow)); - } - - @Override - public void checkResult(Runner runner, Run run) { - var resultStr = runner.waitForRunResult(run, String.class); - Assertions.assertEquals(CHILDWORKFLOW_INPUT, resultStr); - } + String execute(String input); + + class Impl implements ChildWorkflow { + public String execute(String input) { + return input; + } + } + } + + @WorkflowMethod + public String workflow(); + + class Impl implements feature { + private static final String CHILDWORKFLOW_INPUT = "test"; + + @Override + public void prepareWorker(Worker worker) { + worker.registerWorkflowImplementationTypes(ChildWorkflow.Impl.class); + } + + @Override + public String workflow() { + ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class); + Promise result = Async.function(child::execute, CHILDWORKFLOW_INPUT); + return result.get(); + } + + @Override + public Run execute(Runner runner) throws Exception { + var options = + WorkflowOptions.newBuilder() + .setTaskQueue(runner.config.taskQueue) + .setWorkflowExecutionTimeout(Duration.ofMinutes(1)) + .build(); + + var methods = runner.featureInfo.metadata.getWorkflowMethods(); + + var stub = runner.client.newWorkflowStub(feature.class, options); + return new Run(methods.get(0), WorkflowClient.start(stub::workflow)); + } + + @Override + public void checkResult(Runner runner, Run run) { + var resultStr = runner.waitForRunResult(run, String.class); + Assertions.assertEquals(CHILDWORKFLOW_INPUT, resultStr); } + } } diff --git a/features/child_workflow/signal/feature.java b/features/child_workflow/signal/feature.java index 3896bed7..1904272a 100644 --- a/features/child_workflow/signal/feature.java +++ b/features/child_workflow/signal/feature.java @@ -16,31 +16,31 @@ public interface feature extends Feature { @WorkflowInterface - public interface ChildWorkflow { + interface ChildWorkflow { @WorkflowMethod String workflow(); @SignalMethod void unblock(String message); - } - class ChildWorkflowImpl implements ChildWorkflow { - /* - * A workflow that waits for a signal and returns the data received. - */ + class Impl implements ChildWorkflow { + /* + * A workflow that waits for a signal and returns the data received. + */ - private String childWorkflowUnblockMessage; + private String childWorkflowUnblockMessage; - @Override - public String workflow() { - Workflow.await(() -> childWorkflowUnblockMessage != null); - return childWorkflowUnblockMessage; - } + @Override + public String workflow() { + Workflow.await(() -> childWorkflowUnblockMessage != null); + return childWorkflowUnblockMessage; + } - @Override - public void unblock(String message) { - childWorkflowUnblockMessage = message; + @Override + public void unblock(String message) { + childWorkflowUnblockMessage = message; + } } } @@ -51,7 +51,7 @@ class Impl implements feature { @Override public void prepareWorker(Worker worker) { - worker.registerWorkflowImplementationTypes(ChildWorkflowImpl.class); + worker.registerWorkflowImplementationTypes(ChildWorkflow.Impl.class); } private static final String UNBLOCK_MESSAGE = "unblock";