Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry workflows grpc stream #66

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 76 additions & 37 deletions src/worker/task-hub-grpc-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export class TaskHubGrpcWorker {
private _tls?: boolean;
private _grpcChannelOptions?: grpc.ChannelOptions;
private _isRunning: boolean;
private _stopWorker: boolean;
private _stub: stubs.TaskHubSidecarServiceClient | null;

constructor(hostAddress?: string, options?: grpc.ChannelOptions, useTLS?: boolean) {
Expand All @@ -33,6 +34,7 @@ export class TaskHubGrpcWorker {
this._grpcChannelOptions = options;
this._responseStream = null;
this._isRunning = false;
this._stopWorker = false;
this._stub = null;
}

Expand Down Expand Up @@ -99,52 +101,83 @@ export class TaskHubGrpcWorker {
* Therefore, we open the stream and simply listen through the eventemitter behind the scenes
*/
async start(): Promise<void> {
const client = new GrpcClient(this._hostAddress, this._grpcChannelOptions, this._tls);

if (this._isRunning) {
throw new Error("The worker is already running.");
}

// send a "Hello" message to the sidecar to ensure that it's listening
const prom = promisify(client.stub.hello.bind(client.stub));
await prom(new Empty());

// Stream work items from the sidecar
const stubGetWorkItemsReq = new pb.GetWorkItemsRequest();
const client = new GrpcClient(this._hostAddress, this._grpcChannelOptions, this._tls);
this._stub = client.stub;
this._responseStream = client.stub.getWorkItems(stubGetWorkItemsReq);

console.log(`Successfully connected to ${this._hostAddress}. Waiting for work items...`);

// Wait for a work item to be received
this._responseStream.on("data", (workItem: pb.WorkItem) => {
if (workItem.hasOrchestratorrequest()) {
console.log(
`Received "Orchestrator Request" work item with instance id '${workItem
?.getOrchestratorrequest()
?.getInstanceid()}'`,
);
this._executeOrchestrator(workItem.getOrchestratorrequest() as any, client.stub);
} else if (workItem.hasActivityrequest()) {
console.log(`Received "Activity Request" work item`);
this._executeActivity(workItem.getActivityrequest() as any, client.stub);
} else {
console.log(`Received unknown work item`);
}
});

// Wait for the stream to end or error
this._responseStream.on("end", () => {
console.log("Stream ended");
});

this._responseStream.on("error", (err: Error) => {
console.log("Stream error", err);
});
// do not await so it runs in the background
this.internalRunWorker(client);

this._isRunning = true;
}

async internalRunWorker(client: GrpcClient, isRetry: boolean = false): Promise<void> {
try {
// send a "Hello" message to the sidecar to ensure that it's listening
const prom = promisify(client.stub.hello.bind(client.stub));
await prom(new Empty());

// Stream work items from the sidecar
const stream = client.stub.getWorkItems(new pb.GetWorkItemsRequest());
this._responseStream = stream;

console.log(`Successfully connected to ${this._hostAddress}. Waiting for work items...`);

// Wait for a work item to be received
stream.on("data", (workItem: pb.WorkItem) => {
if (workItem.hasOrchestratorrequest()) {
console.log(
`Received "Orchestrator Request" work item with instance id '${workItem
?.getOrchestratorrequest()
?.getInstanceid()}'`,
);
this._executeOrchestrator(workItem.getOrchestratorrequest() as any, client.stub);
} else if (workItem.hasActivityrequest()) {
console.log(`Received "Activity Request" work item`);
this._executeActivity(workItem.getActivityrequest() as any, client.stub);
} else {
console.log(`Received unknown work item`);
}
});

// Wait for the stream to end or error
stream.on("end", async () => {
stream.cancel();
stream.destroy();
if (this._stopWorker) {
console.log("Stream ended");
return;
}
console.log("Stream abruptly closed, will retry the connection...");
// TODO consider exponential backoff
await sleep(5000);
// do not await
this.internalRunWorker(client, true);
});

stream.on("error", (err: Error) => {
console.log("Stream error", err);
});
} catch (err) {
if (this._stopWorker) {
// ignoring the error because the worker has been stopped
return;
}
console.log(`Error on grpc stream: ${err}`);
if (!isRetry) {
throw err;
}
console.log("Connection will be retried...");
// TODO consider exponential backoff
await sleep(5000);
this.internalRunWorker(client, true);
return;
}
}

/**
* Stop the worker and wait for any pending work items to complete
*/
Expand All @@ -153,6 +186,8 @@ export class TaskHubGrpcWorker {
throw new Error("The worker is not running.");
}

this._stopWorker = true;

this._responseStream?.cancel();
this._responseStream?.destroy();

Expand All @@ -162,7 +197,7 @@ export class TaskHubGrpcWorker {

// Wait a bit to let the async operations finish
// https://github.com/grpc/grpc-node/issues/1563#issuecomment-829483711
await new Promise((resolve) => setTimeout(resolve, 1000));
await sleep(1000);
}

/**
Expand Down Expand Up @@ -265,3 +300,7 @@ export class TaskHubGrpcWorker {
}
}
}

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
Loading