From f7abd79a690330c8ba178017a915425f6368ae1b Mon Sep 17 00:00:00 2001 From: Hugo Arregui <969314+hugoArregui@users.noreply.github.com> Date: Thu, 18 Jan 2024 12:39:42 -0300 Subject: [PATCH] chore: unify consumers (#23) --- src/adapters/consumer.ts | 28 ++++++++++++-- src/adapters/queue.ts | 9 ++++- src/adapters/retryConsumer.ts | 64 ------------------------------- src/components.ts | 10 ----- src/controllers/handlers/admin.ts | 15 +++----- src/types.ts | 10 +---- 6 files changed, 38 insertions(+), 98 deletions(-) delete mode 100644 src/adapters/retryConsumer.ts diff --git a/src/adapters/consumer.ts b/src/adapters/consumer.ts index e2361ff..bd0de7d 100644 --- a/src/adapters/consumer.ts +++ b/src/adapters/consumer.ts @@ -8,12 +8,21 @@ export async function createConsumerComponent({ godot, queue, storage, - retryQueue -}: Pick): Promise { + retryQueue, + metrics +}: Pick< + AppComponents, + 'config' | 'logs' | 'godot' | 'queue' | 'storage' | 'retryQueue' | 'metrics' +>): Promise { const logger = logs.getLogger('consumer') const maxJobs = (await config.getNumber('MAX_JOBS')) || 10 - let paused = false + const [commitHash, version] = await Promise.all([ + config.getString('COMMIT_HASH'), + config.getString('CURRENT_VERSION') + ]) + + let paused = false function setPaused(p: boolean): void { paused = p } @@ -26,7 +35,8 @@ export async function createConsumerComponent({ continue } - const messages = await queue.receive(maxJobs) + const messages = await Promise.race([queue.receive(maxJobs), retryQueue.receive(1)]) + if (messages.length === 0) { continue } @@ -67,6 +77,16 @@ export async function createConsumerComponent({ logger.error(`Error saving generated images to s3 for entity=${result.entity}`) continue } + } else if (messages.length === 1) { + metrics.increment('snapshot_generation_failures', {}, 1) + logger.debug(`Giving up on entity=${result.entity} because of godot failure.`) + const failure = { + commitHash, + version, + entity: result.entity, + output: result.output + } + await storage.store(`failures/${result.entity}.txt`, Buffer.from(JSON.stringify(failure)), 'text/plain') } else { logger.debug(`Godot failure, enqueue for individual retry, entity=${result.entity}`) await retryQueue.send({ entity: result.entity, avatar: result.avatar }) diff --git a/src/adapters/queue.ts b/src/adapters/queue.ts index beca262..3ad4825 100644 --- a/src/adapters/queue.ts +++ b/src/adapters/queue.ts @@ -6,7 +6,14 @@ import { SendMessageCommand, SQSClient } from '@aws-sdk/client-sqs' -import { AppComponents, ExtendedAvatar, QueueSendOptions, QueueService } from '../types' +import { AppComponents, ExtendedAvatar, QueueSendOptions } from '../types' + +export type QueueService = { + send(message: ExtendedAvatar, options?: QueueSendOptions): Promise + receive(max: number): Promise + deleteMessage(receiptHandle: string): Promise + status(): Promise> +} export async function createQueueComponent( { awsConfig }: Pick, diff --git a/src/adapters/retryConsumer.ts b/src/adapters/retryConsumer.ts deleted file mode 100644 index a6e36c7..0000000 --- a/src/adapters/retryConsumer.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { AppComponents, ExtendedAvatar, QueueWorker } from '../types' -import { sleep } from '../logic/sleep' - -export async function createRetryConsumerComponent({ - logs, - godot, - storage, - retryQueue, - config, - metrics -}: Pick): Promise { - const logger = logs.getLogger('retry-consumer') - let paused = false - - const [commitHash, version] = await Promise.all([ - config.getString('COMMIT_HASH'), - config.getString('CURRENT_VERSION') - ]) - - function setPaused(p: boolean): void { - paused = p - } - - async function start() { - logger.debug('Starting retry consumer') - while (true) { - if (paused) { - await sleep(60 * 1000) - continue - } - const messages = await retryQueue.receive(1) - if (messages.length === 0) { - continue - } - const message = messages[0] - const avatar: ExtendedAvatar = JSON.parse(message.Body!) - - const results = await godot.generateImages([avatar]) - const result = results[0] - - if (result.success) { - const success = await storage.storeImages(result.entity, result.avatarPath, result.facePath) - if (!success) { - logger.error(`Error saving generated images to s3 for entity=${result.entity}`) - return - } - } else { - metrics.increment('snapshot_generation_failures', {}, 1) - logger.debug(`Giving up on entity=${result.entity} because of godot failure.`) - const failure = { - commitHash, - version, - entity: result.entity, - output: result.output - } - await storage.store(`failures/${result.entity}.txt`, Buffer.from(JSON.stringify(failure)), 'text/plain') - } - - await retryQueue.deleteMessage(message.ReceiptHandle!) - } - } - - return { setPaused, start } -} diff --git a/src/components.ts b/src/components.ts index 77a7dee..4196e95 100644 --- a/src/components.ts +++ b/src/components.ts @@ -10,7 +10,6 @@ import { createStorageComponent } from './adapters/storage' import { createProducerComponent } from './adapters/producer' import { createGodotSnapshotComponent } from './adapters/godot' import { createQueueComponent } from './adapters/queue' -import { createRetryConsumerComponent } from './adapters/retryConsumer' // Initialize all the components of the app export async function initComponents(): Promise { @@ -62,14 +61,6 @@ export async function initComponents(): Promise { godot, queue, retryQueue, - storage - }) - - const retryConsumer = await createRetryConsumerComponent({ - config, - logs, - godot, - retryQueue, storage, metrics }) @@ -93,7 +84,6 @@ export async function initComponents(): Promise { queue, retryQueue, consumer, - retryConsumer, server, storage, statusChecks diff --git a/src/controllers/handlers/admin.ts b/src/controllers/handlers/admin.ts index 268353b..3d6061f 100644 --- a/src/controllers/handlers/admin.ts +++ b/src/controllers/handlers/admin.ts @@ -2,11 +2,11 @@ import { HandlerContextWithPath } from '../../types' import { IHttpServerComponent } from '@well-known-components/interfaces' export async function adminHandler( - context: HandlerContextWithPath<'jobProducer' | 'logs' | 'consumer' | 'retryConsumer', '/tools'> + context: HandlerContextWithPath<'jobProducer' | 'logs' | 'consumer', '/tools'> ): Promise { const { request, - components: { jobProducer, logs, consumer, retryConsumer } + components: { jobProducer, logs, consumer } } = context // TODO: add auth @@ -21,14 +21,9 @@ export async function adminHandler( console.log(body) - if (typeof body.consumer !== 'undefined') { - consumer.setPaused(body.consumer) - logger.debug(`Consumer is now: ${body.consumer ? 'paused' : 'running'}`) - } - - if (typeof body.retryConsumer !== 'undefined') { - retryConsumer.setPaused(body.retryConsumer) - logger.debug(`RetryConsumer is now: ${body.retryConsumer ? 'paused' : 'running'}`) + if (typeof body.pauseConsumer !== 'undefined') { + consumer.setPaused(body.pauseConsumer) + logger.debug(`Consumer is now: ${body.pauseConsumer ? 'paused' : 'running'}`) } return { diff --git a/src/types.ts b/src/types.ts index ad5fd63..1f86e7e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -7,9 +7,9 @@ import type { IMetricsComponent } from '@well-known-components/interfaces' import { metricDeclarations } from './metrics' -import { Message } from '@aws-sdk/client-sqs' import { GodotComponent } from './adapters/godot' import { AvatarInfo } from '@dcl/schemas' +import { QueueService } from './adapters/queue' export type GlobalContext = { components: BaseComponents @@ -27,7 +27,6 @@ export type BaseComponents = { queue: QueueService retryQueue: QueueService consumer: QueueWorker - retryConsumer: QueueWorker server: IHttpServerComponent storage: IStorageComponent } @@ -106,13 +105,6 @@ export type QueueSendOptions = { delay?: number } -export type QueueService = { - send(message: ExtendedAvatar, options?: QueueSendOptions): Promise - receive(max: number): Promise - deleteMessage(receiptHandle: string): Promise - status(): Promise> -} - export type QueueWorker = IBaseComponent & { setPaused(paused: boolean): void }