Skip to content

Commit

Permalink
add gracefull shutdown to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Joao-vi committed Oct 22, 2024
1 parent 67eddd9 commit ffa8140
Showing 1 changed file with 55 additions and 42 deletions.
97 changes: 55 additions & 42 deletions app/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,57 +39,70 @@ DB.connect(config.DBHOST, dbAuth)
setupWorkerSockets();

await tenants.run(async () => {
permissionsContext.setCommandContext();

console.info('==> 📡 starting external services...');
ocrManager.start();
new ATServiceListener().start();
new InformationExtraction().start();

new ConvertToPdfWorker().start();
permissionsContext.setCommandContext();

const stopList = [
ocrManager.start(),
new ATServiceListener().start(),
new InformationExtraction().start(),
new ConvertToPdfWorker().start(),
];

const segmentationConnector = new PDFSegmentation();
segmentationConnector.start();
const segmentationRepeater = new DistributedLoop(
'segmentation_repeat',
segmentationConnector.segmentPdfs,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 5000 }
);
stopList.push(segmentationConnector.start());

// eslint-disable-next-line no-void
void segmentationRepeater.start();
stopList.push(
new DistributedLoop('segmentation_repeat', segmentationConnector.segmentPdfs, {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 5000,
}).start()
);

const twitterIntegration = new TwitterIntegration();
twitterIntegration.start();
const twitterRepeater = new DistributedLoop(
'twitter_repeat',
twitterIntegration.addTweetsRequestsToQueue,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 120000 }
stopList.push(twitterIntegration.start());

stopList.push(
new DistributedLoop('twitter_repeat', twitterIntegration.addTweetsRequestsToQueue, {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 120000,
}).start()
);

// eslint-disable-next-line no-void
void twitterRepeater.start();

// eslint-disable-next-line no-void
void new DistributedLoop('preserve_integration', async () => preserveSync.syncAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}).start();

// eslint-disable-next-line no-void
void new DistributedLoop('toc_service', async () => tocService.processAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}).start();

// eslint-disable-next-line no-void
void new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 1000,
}).start();
stopList.push(
new DistributedLoop('preserve_integration', async () => preserveSync.syncAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}).start()
);

stopList.push(
new DistributedLoop('toc_service', async () => tocService.processAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}).start()
);

stopList.push(
new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 1000,
}).start()
);

process.on('SIGINT', async () => {
console.log('Received SIGINT, waiting for graceful stop...');
await Promise.all(stopList.map(async stop => stop()));
console.log('Graceful stop process has finished, now exiting...');

process.exit(0);
});
});
})
.catch(error => {
Expand Down

0 comments on commit ffa8140

Please sign in to comment.