Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

startUpdate require wait stage #1448

Merged
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ jobs:
typescript-repo-path: ${{github.event.pull_request.head.repo.full_name}}
version: ${{github.event.pull_request.head.ref}}
version-is-repo-ref: true
features-repo-ref: sdk-1403-ts-startUpdate-require-wait-stage

stress-tests-no-reuse-context:
name: Stress Tests (No Reuse V8 Context)
Expand Down
1 change: 1 addition & 0 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export * from './workflow-options';
export * from './schedule-types';
export * from './schedule-client';
export * from './task-queue-client';
export { WorkflowUpdateStage } from './workflow-update-stage';
export {
WorkerBuildIdVersionSets,
BuildIdVersionSet,
Expand Down
77 changes: 41 additions & 36 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ import {
WithDefaults,
} from './base-client';
import { mapAsyncIterable } from './iterators-utils';
import { WorkflowUpdateStage } from './workflow-update-stage';
import * as workflowUpdateStage from './workflow-update-stage';

/**
* A client side handle to a single Workflow instance.
Expand Down Expand Up @@ -140,8 +142,8 @@ export interface WorkflowHandle<T extends Workflow = Workflow> extends BaseWorkf
): Promise<Ret>;

/**
* Start an Update and receive a handle to the Update.
* The Update validator (if present) is run before the handle is returned.
* Start an Update and receive a handle to the Update. The Update validator (if present) is run
* before the handle is returned.
*
* @experimental Update is an experimental feature.
*
Expand All @@ -150,22 +152,41 @@ export interface WorkflowHandle<T extends Workflow = Workflow> extends BaseWorkf
* mean the update itself was timed out or cancelled.
*
* @param def an Update definition as returned from {@link defineUpdate}
* @param options Update arguments
* @param options update arguments, and update lifecycle stage to wait for
*
* Currently, startUpdate always waits until a worker is accepting tasks for the workflow and the
* update is accepted or rejected, and the options object must be at least
* ```ts
* {
* waitForStage: WorkflowUpdateStage.ACCEPTED
* }
* ```
* If the update takes arguments, then the options object must additionally contain an `args`
* property with an array of argument values.
*
* @example
* ```ts
* const updateHandle = await handle.startUpdate(incrementAndGetValueUpdate, { args: [2] });
* const updateHandle = await handle.startUpdate(incrementAndGetValueUpdate, {
* args: [2],
* waitForStage: WorkflowUpdateStage.ACCEPTED,
* });
* const updateResult = await updateHandle.result();
* ```
*/
startUpdate<Ret, Args extends [any, ...any[]], Name extends string = string>(
def: UpdateDefinition<Ret, Args, Name> | string,
options: WorkflowUpdateOptions & { args: Args }
options: WorkflowUpdateOptions & {
args: Args;
waitForStage: WorkflowUpdateStage.ACCEPTED;
}
): Promise<WorkflowUpdateHandle<Ret>>;

startUpdate<Ret, Args extends [], Name extends string = string>(
def: UpdateDefinition<Ret, Args, Name> | string,
options?: WorkflowUpdateOptions & { args?: Args }
options: WorkflowUpdateOptions & {
args?: Args;
waitForStage: WorkflowUpdateStage.ACCEPTED;
}
): Promise<WorkflowUpdateHandle<Ret>>;

/**
Expand Down Expand Up @@ -782,15 +803,17 @@ export class WorkflowClient extends BaseClient {
* Used as the final function of the interceptor chain during startUpdate and executeUpdate.
*/
protected async _startUpdateHandler(
waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage,
waitForStage: WorkflowUpdateStage,
input: WorkflowStartUpdateInput
): Promise<WorkflowStartUpdateOutput> {
const updateId = input.options?.updateId ?? uuid4();
const req: temporal.api.workflowservice.v1.IUpdateWorkflowExecutionRequest = {
namespace: this.options.namespace,
workflowExecution: input.workflowExecution,
firstExecutionRunId: input.firstExecutionRunId,
waitPolicy: { lifecycleStage: waitForStage },
waitPolicy: {
lifecycleStage: workflowUpdateStage.toProtoEnum(waitForStage),
},
request: {
meta: {
updateId,
Expand All @@ -811,11 +834,7 @@ export class WorkflowClient extends BaseClient {
try {
do {
response = await this.workflowService.updateWorkflowExecution(req);
} while (
response.stage < waitForStage &&
response.stage <
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
);
} while (response.stage < waitForStage && response.stage < WorkflowUpdateStage.ACCEPTED);
} catch (err) {
this.rethrowUpdateGrpcError(err, 'Workflow Update failed', input.workflowExecution);
}
Expand Down Expand Up @@ -865,9 +884,7 @@ export class WorkflowClient extends BaseClient {
updateRef: { workflowExecution, updateId },
identity: this.options.identity,
waitPolicy: {
lifecycleStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
lifecycleStage: workflowUpdateStage.toProtoEnum(WorkflowUpdateStage.COMPLETED),
},
};
for (;;) {
Expand Down Expand Up @@ -1076,7 +1093,7 @@ export class WorkflowClient extends BaseClient {
}: WorkflowHandleOptions): WorkflowHandle<T> {
const _startUpdate = async <Ret, Args extends unknown[]>(
def: UpdateDefinition<Ret, Args> | string,
waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage,
waitForStage: WorkflowUpdateStage,
options?: WorkflowUpdateOptions & { args?: Args }
): Promise<WorkflowUpdateHandle<Ret>> => {
const next = this._startUpdateHandler.bind(this, waitForStage);
Expand All @@ -1098,12 +1115,7 @@ export class WorkflowClient extends BaseClient {
output.workflowRunId,
output.outcome
);
if (
!output.outcome &&
waitForStage ===
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
) {
if (!output.outcome && waitForStage === WorkflowUpdateStage.COMPLETED) {
await this._pollForUpdateOutcome(handle.updateId, input.workflowExecution);
}
return handle;
Expand Down Expand Up @@ -1162,25 +1174,18 @@ export class WorkflowClient extends BaseClient {
},
async startUpdate<Ret, Args extends any[]>(
def: UpdateDefinition<Ret, Args> | string,
options?: WorkflowUpdateOptions & { args?: Args }
options: WorkflowUpdateOptions & {
args?: Args;
waitForStage: WorkflowUpdateStage.ACCEPTED;
}
): Promise<WorkflowUpdateHandle<Ret>> {
return await _startUpdate(
def,
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
options
);
return await _startUpdate(def, options.waitForStage, options);
},
async executeUpdate<Ret, Args extends any[]>(
def: UpdateDefinition<Ret, Args> | string,
options?: WorkflowUpdateOptions & { args?: Args }
): Promise<Ret> {
const handle = await _startUpdate(
def,
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
options
);
const handle = await _startUpdate(def, WorkflowUpdateStage.COMPLETED, options);
return await handle.result();
},
getUpdateHandle<Ret>(updateId: string): WorkflowUpdateHandle<Ret> {
Expand Down
33 changes: 33 additions & 0 deletions packages/client/src/workflow-update-stage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { temporal } from '@temporalio/proto';
import { checkExtends } from '@temporalio/common/lib/type-helpers';

export enum WorkflowUpdateStage {
/** This is not an allowed value. */
UNSPECIFIED = 0,
mjameswh marked this conversation as resolved.
Show resolved Hide resolved
/** Admitted stage. This stage is reached when the server accepts the update request. It is not
* allowed to wait for this stage when using startUpdate, since the update request has not yet
* been durably persisted at this stage. */
ADMITTED = 1,
/** Accepted stage. This stage is reached when a workflow has received the update and either
* accepted it (i.e. it has passed validation, or there was no validator configured on the update
* handler) or rejected it. This is currently the only allowed value when using startUpdate. */
ACCEPTED = 2,
/** Completed stage. This stage is reached when a workflow has completed processing the
* update with either a success or failure. */
COMPLETED = 3,
}

checkExtends<
`UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_${keyof typeof WorkflowUpdateStage}`,
keyof typeof temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
>();
checkExtends<
keyof typeof temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage,
`UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_${keyof typeof WorkflowUpdateStage}`
>();

export function toProtoEnum(stage: WorkflowUpdateStage): temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage {
return temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage[
`UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_${WorkflowUpdateStage[stage] as keyof typeof WorkflowUpdateStage}`
];
}
7 changes: 5 additions & 2 deletions packages/test/src/test-integration-update-interceptors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkflowStartUpdateInput, WorkflowStartUpdateOutput } from '@temporalio/client';
import { WorkflowStartUpdateInput, WorkflowStartUpdateOutput, WorkflowUpdateStage } from '@temporalio/client';
import * as wf from '@temporalio/workflow';
import { Next, UpdateInput, WorkflowInboundCallsInterceptor, WorkflowInterceptors } from '@temporalio/workflow';
import { helpers, makeTestFunction } from './helpers-integration';
Expand Down Expand Up @@ -66,7 +66,10 @@ test('Update client and inbound interceptors work for startUpdate', async (t) =>
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithUpdate);

const updateHandle = await wfHandle.startUpdate(update, { args: ['1'] });
const updateHandle = await wfHandle.startUpdate(update, {
args: ['1'],
waitForStage: WorkflowUpdateStage.ACCEPTED,
});
const updateResult = await updateHandle.result();
t.deepEqual(updateResult, '1-clientIntercepted-inboundIntercepted');
});
Expand Down
52 changes: 41 additions & 11 deletions packages/test/src/test-integration-update.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkflowUpdateRPCTimeoutOrCancelledError } from '@temporalio/client';
import { WorkflowUpdateStage, WorkflowUpdateRPCTimeoutOrCancelledError } from '@temporalio/client';
import * as wf from '@temporalio/workflow';
import { helpers, makeTestFunction } from './helpers-integration';

Expand Down Expand Up @@ -64,11 +64,16 @@ test('Update can be executed via startUpdate() and handle.result()', async (t) =
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithUpdates);

const updateHandle = await wfHandle.startUpdate(update, { args: ['1'] });
const updateHandle = await wfHandle.startUpdate(update, {
args: ['1'],
waitForStage: WorkflowUpdateStage.ACCEPTED,
});
const updateResult = await updateHandle.result();
t.deepEqual(updateResult, ['1']);

const doneUpdateHandle = await wfHandle.startUpdate(doneUpdate);
const doneUpdateHandle = await wfHandle.startUpdate(doneUpdate, {
waitForStage: WorkflowUpdateStage.ACCEPTED,
});
const doneUpdateResult = await doneUpdateHandle.result();
t.is(doneUpdateResult, undefined);

Expand All @@ -83,7 +88,11 @@ test('Update handle can be created from identifiers and used to obtain result',
await worker.runUntil(async () => {
const updateId = 'my-update-id';
const wfHandle = await startWorkflow(workflowWithUpdates);
const updateHandleFromStartUpdate = await wfHandle.startUpdate(update, { args: ['1'], updateId });
const updateHandleFromStartUpdate = await wfHandle.startUpdate(update, {
args: ['1'],
updateId,
waitForStage: WorkflowUpdateStage.ACCEPTED,
});

// Obtain update handle on workflow handle from start update.
const updateHandle = wfHandle.getUpdateHandle(updateId);
Expand Down Expand Up @@ -175,9 +184,15 @@ test('Update validator can reject when using handle.result() but handle can be o
const worker = await createWorker();
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithUpdateValidator);
let updateHandle = await wfHandle.startUpdate(stringToStringUpdate, { args: ['arg'] });
let updateHandle = await wfHandle.startUpdate(stringToStringUpdate, {
args: ['arg'],
waitForStage: WorkflowUpdateStage.ACCEPTED,
});
t.is(await updateHandle.result(), 'update-result');
updateHandle = await wfHandle.startUpdate(stringToStringUpdate, { args: ['bad-arg'] });
updateHandle = await wfHandle.startUpdate(stringToStringUpdate, {
args: ['bad-arg'],
waitForStage: WorkflowUpdateStage.ACCEPTED,
});
await assertWorkflowUpdateFailed(updateHandle.result(), wf.ApplicationFailure, 'Validation failed');
});
});
Expand Down Expand Up @@ -246,7 +261,10 @@ test('Update id can be assigned and is present on returned handle', async (t) =>
const worker = await createWorker();
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithUpdates);
const updateHandle = await wfHandle.startUpdate(doneUpdate, { updateId: 'my-update-id' });
const updateHandle = await wfHandle.startUpdate(doneUpdate, {
updateId: 'my-update-id',
waitForStage: WorkflowUpdateStage.ACCEPTED,
});
t.is(updateHandle.updateId, 'my-update-id');
});
});
Expand Down Expand Up @@ -425,7 +443,10 @@ test('Update/Signal/Query example in WorkflowHandle docstrings works', async (t)
t.is(queryResult, 4);
const updateResult = await wfHandle.executeUpdate(incrementAndGetValueUpdate, { args: [2] });
t.is(updateResult, 6);
const secondUpdateHandle = await wfHandle.startUpdate(incrementAndGetValueUpdate, { args: [2] });
const secondUpdateHandle = await wfHandle.startUpdate(incrementAndGetValueUpdate, {
args: [2],
waitForStage: WorkflowUpdateStage.ACCEPTED,
});
const secondUpdateResult = await secondUpdateHandle.result();
t.is(secondUpdateResult, 8);
await wfHandle.cancel();
Expand All @@ -436,7 +457,12 @@ test('Update/Signal/Query example in WorkflowHandle docstrings works', async (t)
test('startUpdate does not return handle before update has reached requested stage', async (t) => {
const { startWorkflow } = helpers(t);
const wfHandle = await startWorkflow(workflowWithUpdates);
const updatePromise = wfHandle.startUpdate(update, { args: ['1'] }).then(() => 'update');
const updatePromise = wfHandle
.startUpdate(update, {
args: ['1'],
waitForStage: WorkflowUpdateStage.ACCEPTED,
})
.then(() => 'update');
const timeoutPromise = new Promise<string>((f) =>
setTimeout(() => f('timeout'), 500 + LONG_POLL_EXPIRATION_INTERVAL_SECONDS * 1000)
);
Expand Down Expand Up @@ -536,14 +562,18 @@ test('startUpdate throws WorkflowUpdateRPCTimeoutOrCancelledError with no worker
const { startWorkflow } = helpers(t);
const wfHandle = await startWorkflow(workflowWithUpdates);
await t.context.env.client.withDeadline(Date.now() + 100, async () => {
const err = await t.throwsAsync(wfHandle.startUpdate(update, { args: ['1'] }));
const err = await t.throwsAsync(
wfHandle.startUpdate(update, { args: ['1'], waitForStage: WorkflowUpdateStage.ACCEPTED })
);
t.true(err instanceof WorkflowUpdateRPCTimeoutOrCancelledError);
});

const ctrl = new AbortController();
setTimeout(() => ctrl.abort(), 10);
await t.context.env.client.withAbortSignal(ctrl.signal, async () => {
const err = await t.throwsAsync(wfHandle.startUpdate(update, { args: ['1'] }));
const err = await t.throwsAsync(
wfHandle.startUpdate(update, { args: ['1'], waitForStage: WorkflowUpdateStage.ACCEPTED })
);
t.true(err instanceof WorkflowUpdateRPCTimeoutOrCancelledError);
});
});
Expand Down
Loading
Loading