Skip to content

Commit

Permalink
chore: unify consumers (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
hugoArregui authored Jan 18, 2024
1 parent ece6eac commit f7abd79
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 98 deletions.
28 changes: 24 additions & 4 deletions src/adapters/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@ export async function createConsumerComponent({
godot,
queue,
storage,
retryQueue
}: Pick<AppComponents, 'config' | 'logs' | 'godot' | 'queue' | 'storage' | 'retryQueue'>): Promise<QueueWorker> {
retryQueue,
metrics
}: Pick<
AppComponents,
'config' | 'logs' | 'godot' | 'queue' | 'storage' | 'retryQueue' | 'metrics'
>): Promise<QueueWorker> {
const logger = logs.getLogger('consumer')
const maxJobs = (await config.getNumber('MAX_JOBS')) || 10
let paused = false

const [commitHash, version] = await Promise.all([
config.getString('COMMIT_HASH'),
config.getString('CURRENT_VERSION')
])

let paused = false
function setPaused(p: boolean): void {
paused = p
}
Expand All @@ -26,7 +35,8 @@ export async function createConsumerComponent({
continue
}

const messages = await queue.receive(maxJobs)
const messages = await Promise.race([queue.receive(maxJobs), retryQueue.receive(1)])

if (messages.length === 0) {
continue
}
Expand Down Expand Up @@ -67,6 +77,16 @@ export async function createConsumerComponent({
logger.error(`Error saving generated images to s3 for entity=${result.entity}`)
continue
}
} else if (messages.length === 1) {
metrics.increment('snapshot_generation_failures', {}, 1)
logger.debug(`Giving up on entity=${result.entity} because of godot failure.`)
const failure = {
commitHash,
version,
entity: result.entity,
output: result.output
}
await storage.store(`failures/${result.entity}.txt`, Buffer.from(JSON.stringify(failure)), 'text/plain')
} else {
logger.debug(`Godot failure, enqueue for individual retry, entity=${result.entity}`)
await retryQueue.send({ entity: result.entity, avatar: result.avatar })
Expand Down
9 changes: 8 additions & 1 deletion src/adapters/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ import {
SendMessageCommand,
SQSClient
} from '@aws-sdk/client-sqs'
import { AppComponents, ExtendedAvatar, QueueSendOptions, QueueService } from '../types'
import { AppComponents, ExtendedAvatar, QueueSendOptions } from '../types'

export type QueueService = {
send(message: ExtendedAvatar, options?: QueueSendOptions): Promise<void>
receive(max: number): Promise<Message[]>
deleteMessage(receiptHandle: string): Promise<void>
status(): Promise<Record<string, any>>
}

export async function createQueueComponent(
{ awsConfig }: Pick<AppComponents, 'awsConfig'>,
Expand Down
64 changes: 0 additions & 64 deletions src/adapters/retryConsumer.ts

This file was deleted.

10 changes: 0 additions & 10 deletions src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { createStorageComponent } from './adapters/storage'
import { createProducerComponent } from './adapters/producer'
import { createGodotSnapshotComponent } from './adapters/godot'
import { createQueueComponent } from './adapters/queue'
import { createRetryConsumerComponent } from './adapters/retryConsumer'

// Initialize all the components of the app
export async function initComponents(): Promise<AppComponents> {
Expand Down Expand Up @@ -62,14 +61,6 @@ export async function initComponents(): Promise<AppComponents> {
godot,
queue,
retryQueue,
storage
})

const retryConsumer = await createRetryConsumerComponent({
config,
logs,
godot,
retryQueue,
storage,
metrics
})
Expand All @@ -93,7 +84,6 @@ export async function initComponents(): Promise<AppComponents> {
queue,
retryQueue,
consumer,
retryConsumer,
server,
storage,
statusChecks
Expand Down
15 changes: 5 additions & 10 deletions src/controllers/handlers/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import { HandlerContextWithPath } from '../../types'
import { IHttpServerComponent } from '@well-known-components/interfaces'

export async function adminHandler(
context: HandlerContextWithPath<'jobProducer' | 'logs' | 'consumer' | 'retryConsumer', '/tools'>
context: HandlerContextWithPath<'jobProducer' | 'logs' | 'consumer', '/tools'>
): Promise<IHttpServerComponent.IResponse> {
const {
request,
components: { jobProducer, logs, consumer, retryConsumer }
components: { jobProducer, logs, consumer }
} = context

// TODO: add auth
Expand All @@ -21,14 +21,9 @@ export async function adminHandler(

console.log(body)

if (typeof body.consumer !== 'undefined') {
consumer.setPaused(body.consumer)
logger.debug(`Consumer is now: ${body.consumer ? 'paused' : 'running'}`)
}

if (typeof body.retryConsumer !== 'undefined') {
retryConsumer.setPaused(body.retryConsumer)
logger.debug(`RetryConsumer is now: ${body.retryConsumer ? 'paused' : 'running'}`)
if (typeof body.pauseConsumer !== 'undefined') {
consumer.setPaused(body.pauseConsumer)
logger.debug(`Consumer is now: ${body.pauseConsumer ? 'paused' : 'running'}`)
}

return {
Expand Down
10 changes: 1 addition & 9 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import type {
IMetricsComponent
} from '@well-known-components/interfaces'
import { metricDeclarations } from './metrics'
import { Message } from '@aws-sdk/client-sqs'
import { GodotComponent } from './adapters/godot'
import { AvatarInfo } from '@dcl/schemas'
import { QueueService } from './adapters/queue'

export type GlobalContext = {
components: BaseComponents
Expand All @@ -27,7 +27,6 @@ export type BaseComponents = {
queue: QueueService
retryQueue: QueueService
consumer: QueueWorker
retryConsumer: QueueWorker
server: IHttpServerComponent<GlobalContext>
storage: IStorageComponent
}
Expand Down Expand Up @@ -106,13 +105,6 @@ export type QueueSendOptions = {
delay?: number
}

export type QueueService = {
send(message: ExtendedAvatar, options?: QueueSendOptions): Promise<void>
receive(max: number): Promise<Message[]>
deleteMessage(receiptHandle: string): Promise<void>
status(): Promise<Record<string, any>>
}

export type QueueWorker = IBaseComponent & {
setPaused(paused: boolean): void
}
Expand Down

0 comments on commit f7abd79

Please sign in to comment.