Skip to content

Commit

Permalink
retry workflows grpc stream
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting committed Oct 17, 2024
1 parent ad93993 commit 2fb07ef
Showing 1 changed file with 76 additions and 37 deletions.
113 changes: 76 additions & 37 deletions src/worker/task-hub-grpc-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export class TaskHubGrpcWorker {
private _tls?: boolean;
private _grpcChannelOptions?: grpc.ChannelOptions;
private _isRunning: boolean;
private _stopWorker: boolean;
private _stub: stubs.TaskHubSidecarServiceClient | null;

constructor(hostAddress?: string, options?: grpc.ChannelOptions, useTLS?: boolean) {
Expand All @@ -33,6 +34,7 @@ export class TaskHubGrpcWorker {
this._grpcChannelOptions = options;
this._responseStream = null;
this._isRunning = false;
this._stopWorker = false;
this._stub = null;
}

Expand Down Expand Up @@ -99,52 +101,83 @@ export class TaskHubGrpcWorker {
* Therefore, we open the stream and simply listen through the eventemitter behind the scenes
*/
async start(): Promise<void> {
const client = new GrpcClient(this._hostAddress, this._grpcChannelOptions, this._tls);

if (this._isRunning) {
throw new Error("The worker is already running.");
}

// send a "Hello" message to the sidecar to ensure that it's listening
const prom = promisify(client.stub.hello.bind(client.stub));
await prom(new Empty());

// Stream work items from the sidecar
const stubGetWorkItemsReq = new pb.GetWorkItemsRequest();
const client = new GrpcClient(this._hostAddress, this._grpcChannelOptions, this._tls);
this._stub = client.stub;
this._responseStream = client.stub.getWorkItems(stubGetWorkItemsReq);

console.log(`Successfully connected to ${this._hostAddress}. Waiting for work items...`);

// Wait for a work item to be received
this._responseStream.on("data", (workItem: pb.WorkItem) => {
if (workItem.hasOrchestratorrequest()) {
console.log(
`Received "Orchestrator Request" work item with instance id '${workItem
?.getOrchestratorrequest()
?.getInstanceid()}'`,
);
this._executeOrchestrator(workItem.getOrchestratorrequest() as any, client.stub);
} else if (workItem.hasActivityrequest()) {
console.log(`Received "Activity Request" work item`);
this._executeActivity(workItem.getActivityrequest() as any, client.stub);
} else {
console.log(`Received unknown work item`);
}
});

// Wait for the stream to end or error
this._responseStream.on("end", () => {
console.log("Stream ended");
});

this._responseStream.on("error", (err: Error) => {
console.log("Stream error", err);
});
// do not await so it runs in the background
this.internalRunWorker(client);

this._isRunning = true;
}

async internalRunWorker(client: GrpcClient, isRetry: boolean = false): Promise<void> {
try {
// send a "Hello" message to the sidecar to ensure that it's listening
const prom = promisify(client.stub.hello.bind(client.stub));
await prom(new Empty());

// Stream work items from the sidecar
const stream = client.stub.getWorkItems(new pb.GetWorkItemsRequest());
this._responseStream = stream;

console.log(`Successfully connected to ${this._hostAddress}. Waiting for work items...`);

// Wait for a work item to be received
stream.on("data", (workItem: pb.WorkItem) => {
if (workItem.hasOrchestratorrequest()) {
console.log(
`Received "Orchestrator Request" work item with instance id '${workItem
?.getOrchestratorrequest()
?.getInstanceid()}'`,
);
this._executeOrchestrator(workItem.getOrchestratorrequest() as any, client.stub);
} else if (workItem.hasActivityrequest()) {
console.log(`Received "Activity Request" work item`);
this._executeActivity(workItem.getActivityrequest() as any, client.stub);
} else {
console.log(`Received unknown work item`);
}
});

// Wait for the stream to end or error
stream.on("end", async () => {
stream.cancel();
stream.destroy();
if (this._stopWorker) {
console.log("Stream ended");
return;
}
console.log("Stream abruptly closed, will retry the connection...");
// TODO consider exponential backoff
await sleep(5000);
// do not await
this.internalRunWorker(client, true);
});

stream.on("error", (err: Error) => {
console.log("Stream error", err);
});
} catch (err) {
if (this._stopWorker) {
// ignoring the error because the worker has been stopped
return;
}
console.log(`Error on grpc stream: ${err}`);
if (!isRetry) {
throw err;
}
console.log("Connection will be retried...");
// TODO consider exponential backoff
await sleep(5000);
this.internalRunWorker(client, true);
return;
}
}

/**
* Stop the worker and wait for any pending work items to complete
*/
Expand All @@ -153,6 +186,8 @@ export class TaskHubGrpcWorker {
throw new Error("The worker is not running.");
}

this._stopWorker = true;

this._responseStream?.cancel();
this._responseStream?.destroy();

Expand All @@ -162,7 +197,7 @@ export class TaskHubGrpcWorker {

// Wait a bit to let the async operations finish
// https://github.com/grpc/grpc-node/issues/1563#issuecomment-829483711
await new Promise((resolve) => setTimeout(resolve, 1000));
await sleep(1000);
}

/**
Expand Down Expand Up @@ -265,3 +300,7 @@ export class TaskHubGrpcWorker {
}
}
}

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

0 comments on commit 2fb07ef

Please sign in to comment.