Skip to content

Commit

Permalink
Fix UpdateWithStart untyped operation (#2288)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos authored Oct 22, 2024
1 parent 7bcade2 commit 3410677
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

/**
* UpdateWithStartWorkflowOperation is an update workflow request that can be executed together with
Expand Down Expand Up @@ -284,26 +285,27 @@ public static <R> Builder<R> newBuilder(String updateName, Class<R> resultClass,

private final CompletableFuture<WorkflowUpdateHandle<R>> handle;

private final Functions.Proc request;
@Nullable private final Functions.Proc updateRequest;

private UpdateWithStartWorkflowOperation(
UpdateOptions<R> options, Functions.Proc request, Object[] updateArgs) {
UpdateOptions<R> options, Functions.Proc updateRequest, Object[] updateArgs) {
this.options = options;
this.updateArgs = updateArgs;
this.handle = new CompletableFuture<>();
this.request = request;
this.updateRequest = updateRequest;
}

WorkflowUpdateHandle<R> invoke(Functions.Proc workflow) {
WorkflowUpdateHandle<R> invoke(Functions.Proc workflowRequest) {
WorkflowInvocationHandler.initAsyncInvocation(
WorkflowInvocationHandler.InvocationType.UPDATE_WITH_START, this);
try {
// invokes `prepareUpdate` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler
request.apply();

// invokes `prepareStart` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler
workflow.apply();
workflowRequest.apply();

if (updateRequest != null) { // only present when using typed API
// invokes `prepareUpdate` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler
updateRequest.apply();
}
stub.updateWithStart(this, this.workflowArgs);
return this.handle.get();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -365,8 +367,8 @@ public Object[] getUpdateArgs() {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("UpdateWithStartWorkflowOperation{options=").append(options);
if (request != null) {
sb.append(", request=").append(request);
if (updateRequest != null) {
sb.append(", updateRequest=").append(updateRequest);
}
if (updateArgs != null) {
sb.append(", updateArgs=").append(Arrays.toString(updateArgs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,14 @@ public <R> R getResult(Class<R> resultClass) {
private static class UpdateWithStartInvocationHandler implements SpecificInvocationHandler {

enum State {
NOT_STARTED,
INIT,
START_RECEIVED,
UPDATE_RECEIVED,
}

private final UpdateWithStartWorkflowOperation operation;

private State state = State.NOT_STARTED;
private State state = State.INIT;

public UpdateWithStartInvocationHandler(UpdateWithStartWorkflowOperation operation) {
this.operation = operation;
Expand All @@ -471,7 +471,15 @@ public void invoke(

POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method);

if (state == State.NOT_STARTED) {
if (state == State.INIT) {
WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
if (workflowMethod == null) {
throw new IllegalArgumentException(
"Method '" + method.getName() + "' is not a WorkflowMethod");
}
this.operation.prepareStart(untyped, args);
state = State.START_RECEIVED;
} else if (state == State.START_RECEIVED) {
UpdateMethod updateMethod = method.getAnnotation(UpdateMethod.class);
if (updateMethod == null) {
throw new IllegalArgumentException(
Expand All @@ -483,14 +491,6 @@ public void invoke(
method.getReturnType(),
method.getGenericReturnType(),
args);
state = State.START_RECEIVED;
} else if (state == State.START_RECEIVED) {
WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
if (workflowMethod == null) {
throw new IllegalArgumentException(
"Method '" + method.getName() + "' is not a WorkflowMethod");
}
this.operation.prepareStart(untyped, args);
state = State.UPDATE_RECEIVED;
} else {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,35 @@ public void startAndSendUpdateTogether() throws ExecutionException, InterruptedE
assertEquals(options.getWorkflowId(), handle1.getExecution().getWorkflowId());
assertEquals("Hello Update", handle1.getResultAsync().get());

WorkflowUpdateHandle<String> updHandle = updateOp.getUpdateHandle().get();
assertEquals(updateOp.getResult(), updHandle.getResultAsync().get());
WorkflowUpdateHandle<String> handle2 = updateOp.getUpdateHandle().get();
assertEquals(updateOp.getResult(), handle2.getResultAsync().get());

workflow.complete();

assertEquals("Hello Update complete", WorkflowStub.fromTyped(workflow).getResult(String.class));
}

@Test
public void startAndSendUpdateTogetherUsingUntypedWorkflowOperation()
throws ExecutionException, InterruptedException {
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();

WorkflowOptions options = createOptions();
TestWorkflows.WorkflowWithUpdate workflow =
workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options);

UpdateWithStartWorkflowOperation<String> updateOp =
UpdateWithStartWorkflowOperation.newBuilder(
"update", String.class, new Object[] {1, "Hello Update"}) // untyped!
.setWaitForStage(WorkflowUpdateStage.COMPLETED)
.build();

WorkflowUpdateHandle<String> handle1 =
WorkflowClient.updateWithStart(workflow::execute, updateOp);
assertEquals("Hello Update", handle1.getResultAsync().get());

WorkflowUpdateHandle<String> handle2 = updateOp.getUpdateHandle().get();
assertEquals(updateOp.getResult(), handle2.getResultAsync().get());

workflow.complete();

Expand All @@ -110,8 +137,8 @@ public void startAndSendUpdateTogetherWithNullUpdateResult()
WorkflowClient.updateWithStart(workflow::execute, updateOp);
assertNull(handle1.getResultAsync().get());

WorkflowUpdateHandle<Void> updHandle = updateOp.getUpdateHandle().get();
assertEquals(updateOp.getResult(), updHandle.getResultAsync().get());
WorkflowUpdateHandle<Void> handle2 = updateOp.getUpdateHandle().get();
assertEquals(updateOp.getResult(), handle2.getResultAsync().get());

assertEquals("Hello Update", WorkflowStub.fromTyped(workflow).getResult(String.class));
}
Expand Down

0 comments on commit 3410677

Please sign in to comment.