Skip to content

Commit

Permalink
Solve issue with function invocation.
Browse files Browse the repository at this point in the history
  • Loading branch information
viferga committed Jun 4, 2024
1 parent f4faf86 commit 5bd30ed
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 43 deletions.
52 changes: 18 additions & 34 deletions src/controller/call.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { NextFunction, Request, Response } from 'express';
import { Applications } from '../app';

import AppError from '../utils/appError';
import { WorkerMessageType, WorkerMessageUnknown } from '../worker/protocol';
import { invokeQueue } from '../utils/invoke';
import { WorkerMessageType } from '../worker/protocol';

export const callFunction = (
req: Request,
Expand Down Expand Up @@ -36,37 +38,19 @@ export const callFunction = (
);
}

new Promise((resolve, reject) => {
application.proc?.send({
type: WorkerMessageType.Invoke,
data: {
name: func,
args
}
});

application.proc?.on('message', (message: WorkerMessageUnknown) => {
if (message.type === WorkerMessageType.InvokeResult) {
resolve(JSON.stringify(message.data));
}
});

application.proc?.on('exit', code => {
// The application may have been ended unexpectedly,
// probably segmentation fault (exit code 139 in Linux)
reject(
JSON.stringify({
error: `Deployment '${suffix}' process exited with code: ${
code || 'unknown'
}`
})
);
});
})
.then(data => {
res.send(data);
})
.catch(error => {
res.status(500).send(error);
});
application.proc?.send({
type: WorkerMessageType.Invoke,
data: {
id: invokeQueue.push({
resolve: (data: string) => {
res.send(data);
},
reject: (error: string) => {
res.status(500).send(error);
}
}),
name: func,
args
}
});
};
39 changes: 31 additions & 8 deletions src/utils/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { spawn } from 'child_process';
import path from 'path';
import { Applications, Resource } from '../app';
import { WorkerMessageType, WorkerMessageUnknown } from '../worker/protocol';
import { invokeQueue } from './invoke';
import { logProcessOutput } from './logger';

export const deployProcess = async (resource: Resource): Promise<void> => {
Expand Down Expand Up @@ -36,14 +37,31 @@ export const deployProcess = async (resource: Resource): Promise<void> => {
});

proc.on('message', (payload: WorkerMessageUnknown) => {
// Get the deploy data and store the process and app into our tables
if (payload.type === WorkerMessageType.MetaData) {
const application = Applications[resource.id];
const deployment = payload.data as Deployment;

application.proc = proc;
application.deployment = deployment;
deployResolve();
switch (payload.type) {
case WorkerMessageType.MetaData: {
// Get the deploy data and store the process and app into our tables
const application = Applications[resource.id];
const deployment = payload.data as Deployment;

application.proc = proc;
application.deployment = deployment;
deployResolve();
break;
}

case WorkerMessageType.InvokeResult: {
const invokeResult = payload.data as {
id: string;
result: unknown;
};

const invoke = invokeQueue.get(invokeResult.id);
invoke.resolve(JSON.stringify(invokeResult.result));
break;
}

default:
break;
}
});

Expand All @@ -57,6 +75,11 @@ export const deployProcess = async (resource: Resource): Promise<void> => {
}`
)
);

// TODO: How to implement the exit properly? We cannot reject easily
// the promise from the call if the process exits during the call.
// Also if exits during the call it will try to call deployReject
// which is completely out of scope and the promise was fullfilled already
});

return promise;
Expand Down
24 changes: 24 additions & 0 deletions src/utils/invoke.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import crypto from 'crypto';

interface Invocation {
resolve: (value: string) => void;
reject: (reason: string) => void;
}

class InvokeQueue {
private queue: Record<string, Invocation> = {};

public push(invoke: Invocation): string {
const id = crypto.randomBytes(16).toString('hex');
this.queue[id] = invoke;
return id;
}

public get(id: string): Invocation {
const invoke = this.queue[id];
delete this.queue[id];
return invoke;
}
}

export const invokeQueue = new InvokeQueue();
6 changes: 5 additions & 1 deletion src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,18 @@ process.on('message', (payload: WorkerMessageUnknown) => {
case WorkerMessageType.Invoke: {
const fn = (
payload as WorkerMessage<{
id: string;
name: string;
args: unknown[];
}>
).data;
if (process.send) {
process.send({
type: WorkerMessageType.InvokeResult,
data: functions[fn.name](...fn.args)
data: {
id: fn.id,
result: functions[fn.name](...fn.args)
}
});
}
break;
Expand Down

0 comments on commit 5bd30ed

Please sign in to comment.