diff --git a/src/adapters/consumer.ts b/src/adapters/consumer.ts index 0b36d6d..6c65b3e 100644 --- a/src/adapters/consumer.ts +++ b/src/adapters/consumer.ts @@ -2,7 +2,6 @@ import { Message } from '@aws-sdk/client-sqs' import { AppComponents, ExtendedAvatar, QueueWorker } from '../types' import { sleep } from '../logic/sleep' import { sqsDeleteMessage, sqsReceiveMessage, sqsSendMessage } from '../logic/queue' -import { writeFile } from 'fs/promises' export async function createConsumerComponent({ config, @@ -65,7 +64,7 @@ export async function createConsumerComponent({ input.push(body) } - const results = await godot.generateImages(input) + const { avatars: results, output: outputGenerated } = await godot.generateImages(input) for (const result of results) { const message = messageByEntity.get(result.entity)! @@ -80,21 +79,15 @@ export async function createConsumerComponent({ metrics.increment('snapshot_generation_count', { status: 'failure' }, 1) logger.debug(`Giving up on entity=${result.entity} because of godot failure.`) const failure = { + timestamp: new Date().toISOString(), commitHash, version, entity: result.entity, - output: result.output + outputGenerated } await storage.storeFailure(result.entity, JSON.stringify(failure)) } else { logger.debug(`Godot failure, enqueue for individual retry, entity=${result.entity}`) - - if (result.output !== undefined) { - // log the failure into disk (reuslt.output.stderr && result.output.stdout) - const failureFilePath = `failure-${result.entity}.json` - await writeFile(failureFilePath, JSON.stringify(result.output)) - } - await sqsSendMessage(sqsClient, retryQueueUrl, { entity: result.entity, avatar: result.avatar }) } diff --git a/src/adapters/godot.ts b/src/adapters/godot.ts index ab61cc4..584771a 100644 --- a/src/adapters/godot.ts +++ b/src/adapters/godot.ts @@ -14,8 +14,13 @@ type GodotAvatarPayload = ExtendedAvatar & { faceHeight: number | undefined } +type GodotInput = { + baseUrl: string + payload: GodotAvatarPayload[] +} + export type GodotComponent = { - generateImages(profiles: ExtendedAvatar[]): Promise + generateImages(profiles: ExtendedAvatar[]): Promise<{ output?: string; avatars: AvatarGenerationResult[] }> } const outputPath = 'output' @@ -32,41 +37,66 @@ export async function createGodotSnapshotComponent({ const peerUrl = await config.requireString('PEER_URL') const logger = logs.getLogger('godot-snapshot') const explorerPath = process.env.EXPLORER_PATH || '.' + const baseTime = (await config.getNumber('GODOT_BASE_TIMEOUT')) || 15_000 + const timePerAvatar = (await config.getNumber('GODOT_AVATAR_TIMEOUT')) || 10_000 let executionNumber = 0 - function run(input: any): Promise { + function runGodot(input: GodotInput): Promise<{ error: boolean; stderr: string; stdout: string }> { return new Promise(async (resolve) => { executionNumber += 1 + + const timeout = baseTime + input.payload.length * timePerAvatar const avatarDataPath = `temp-avatars-${executionNumber}.json` + await writeFile(avatarDataPath, JSON.stringify(input)) await mkdir(outputPath, { recursive: true }) - - await writeFile(avatarDataPath, JSON.stringify(input)) const command = `${explorerPath}/decentraland.godot.client.x86_64 --rendering-driver opengl3 --avatar-renderer --avatars ${avatarDataPath}` - logger.debug(`about to exec: explorerPath: ${explorerPath}, display: ${process.env.DISPLAY}, command: ${command}`) + logger.debug( + `about to exec: explorerPath: ${explorerPath}, display: ${process.env.DISPLAY}, command: ${command}, timeout: ${timeout}` + ) + + let resolved = false + + const childProcess = exec(command, { timeout }, (error, stdout, stderr) => { + if (resolved) { + return + } - const childProcess = exec(command, { timeout: 45_000 }, (error, stdout, stderr) => { rm(avatarDataPath).catch(logger.error) if (error) { for (const f of globSync('core.*')) { rm(f).catch(logger.error) } - return resolve({ stdout, stderr }) + resolved = true + return resolve({ error: true, stdout, stderr }) } - resolve(undefined) + resolved = true + resolve({ error: false, stdout, stderr }) }) + const childProcessPid = childProcess.pid + childProcess.on('close', (_code, signal) => { // timeout sends SIGTERM, we might want to kill it harder if (signal === 'SIGTERM') { childProcess.kill('SIGKILL') } }) + + setTimeout(() => { + exec(`kill -9 ${childProcessPid}`, () => {}) + if (!resolved) { + resolve({ error: true, stdout: '', stderr: 'timeout' }) + resolved = true + } + }, timeout + 5_000) }) } - async function generateImages(avatars: ExtendedAvatar[]): Promise { + async function generateImages( + avatars: ExtendedAvatar[] + ): Promise<{ output?: string; avatars: AvatarGenerationResult[] }> { const payloads: GodotAvatarPayload[] = [] const results: AvatarGenerationResult[] = [] @@ -93,7 +123,7 @@ export async function createGodotSnapshotComponent({ } if (payloads.length === 0) { - return results + return { avatars: results } } const input = { @@ -101,28 +131,71 @@ export async function createGodotSnapshotComponent({ payload: payloads } + const [previousTopData, previousDiskUsage] = await Promise.all([getTopData(), getDiskUsage()]) + logger.debug(`Running godot to process ${payloads.length} avatars`) const start = Date.now() - const output = await run(input) + const { error, stdout, stderr } = await runGodot(input) const duration = Date.now() - start metrics.observe('snapshot_generation_duration_seconds', {}, duration / payloads.length / 1000) logger.log(`screenshots for ${payloads.length} entities: ${duration} ms`) + let failedGeneration = false for (const result of results) { try { await Promise.all([stat(result.avatarPath), stat(result.facePath)]) result.success = true } catch (err: any) { + failedGeneration = true logger.error(err) - result.output = output } } - return results + let output = undefined + if (failedGeneration) { + const [nextTopData, nextDiskUsage] = await Promise.all([getTopData(), getDiskUsage()]) + output = ` + > error: ${error}\n + > previousTopData: ${previousTopData}\n + > previousDiskUsage: ${previousDiskUsage}\n + > nextTopData: ${nextTopData}\n + > nextDiskUsage: ${nextDiskUsage}\n + > stdout: ${stdout}\n + > stderr: ${stderr}\n + > input: ${JSON.stringify(input)}\n + > duration: ${duration} ms\n + ` + } + + return { avatars: results, output } } return { generateImages } } + +// @returns the top 10 processes sorted by resident memory +function getTopData(): Promise { + return new Promise((resolve, reject) => { + exec('top -b -n 1 -o RES | head -n 17', (error, stdout, _stderr) => { + if (error) { + reject(error) + } + resolve(stdout) + }) + }) +} + +// @returns the disk usage +function getDiskUsage(): Promise { + return new Promise((resolve, reject) => { + exec('df -h', (error, stdout, _stderr) => { + if (error) { + reject(error) + } + resolve(stdout) + }) + }) +} diff --git a/src/controllers/handlers/set-schedule-processing-handler.ts b/src/controllers/handlers/set-schedule-processing-handler.ts index 3faa126..90ba621 100644 --- a/src/controllers/handlers/set-schedule-processing-handler.ts +++ b/src/controllers/handlers/set-schedule-processing-handler.ts @@ -9,7 +9,7 @@ export async function scheduleProcessingHandler( ): Promise { const { request, - components: { logs, sqsClient, storage, fetch, config } + components: { logs, sqsClient, storage: _storage, fetch, config } } = context const [mainQueueUrl, peerUrl] = await Promise.all([ @@ -23,7 +23,8 @@ export async function scheduleProcessingHandler( throw new InvalidRequestError('Invalid request. Request body is not valid') } - await storage.deleteFailures(body) + // TODO: failures will be deleted manually, let keep this comment to revise in the future + // await storage.deleteFailures(body) const response = await fetch.fetch( `${peerUrl}/content/deployments?` + diff --git a/src/types.ts b/src/types.ts index de168de..5fcd9e7 100644 --- a/src/types.ts +++ b/src/types.ts @@ -73,7 +73,6 @@ export type AvatarGenerationResult = ExtendedAvatar & { success: boolean avatarPath: string facePath: string - output?: { stderr: string; stdout: string } } export type QueueWorker = IBaseComponent & { diff --git a/test/unit/consumer.spec.ts b/test/unit/consumer.spec.ts index c7485af..c81e49e 100644 --- a/test/unit/consumer.spec.ts +++ b/test/unit/consumer.spec.ts @@ -123,13 +123,15 @@ describe('Consumer test', function () { ] generateImages.mockImplementation((input: ExtendedAvatar[]) => { - return input.map(({ avatar, entity }: ExtendedAvatar) => ({ - avatar, - entity, - success: false, - avatarPath: 'avatar0.png', - facePath: 'face0.png' - })) + return { + avatars: input.map(({ avatar, entity }: ExtendedAvatar) => ({ + avatar, + entity, + success: false, + avatarPath: 'avatar0.png', + facePath: 'face0.png' + })) + } }) await consumer.process(QUEUE_NAME, messages) @@ -180,13 +182,15 @@ describe('Consumer test', function () { ] generateImages.mockImplementation((input: ExtendedAvatar[]) => { - return input.map(({ avatar, entity }: ExtendedAvatar, i) => ({ - avatar, - entity, - success: false, - avatarPath: `avatar${i}.png`, - facePath: `face${i}.png` - })) + return { + avatars: input.map(({ avatar, entity }: ExtendedAvatar, i) => ({ + avatar, + entity, + success: false, + avatarPath: `avatar${i}.png`, + facePath: `face${i}.png` + })) + } }) await consumer.process(QUEUE_NAME, messages) @@ -250,13 +254,15 @@ describe('Consumer test', function () { ] generateImages.mockImplementation((input: ExtendedAvatar[]) => { - return input.map(({ avatar, entity }: ExtendedAvatar, i) => ({ - avatar, - entity, - success: true, - avatarPath: `avatar${i}.png`, - facePath: `face${i}.png` - })) + return { + avatars: input.map(({ avatar, entity }: ExtendedAvatar, i) => ({ + avatar, + entity, + success: true, + avatarPath: `avatar${i}.png`, + facePath: `face${i}.png` + })) + } }) await consumer.process(QUEUE_NAME, messages) diff --git a/test/unit/godot.spec.ts b/test/unit/godot.spec.ts new file mode 100644 index 0000000..069dc96 --- /dev/null +++ b/test/unit/godot.spec.ts @@ -0,0 +1,155 @@ +import { createTestMetricsComponent } from '@well-known-components/metrics' +import { createConfigComponent } from '@well-known-components/env-config-provider' +import { createLogComponent } from '@well-known-components/logger' +import { ExtendedAvatar } from '../../src/types' +import { createGodotSnapshotComponent } from '../../src/adapters/godot' +import { metricDeclarations } from '../../src/metrics' +import { exec } from 'child_process' +import { stat, writeFile, mkdir, rm } from 'fs/promises' +import { EventEmitter } from 'events' + +jest.mock('child_process') +jest.mock('fs/promises') + +describe('Godot Component', () => { + const config = createConfigComponent( + { PEER_URL: 'http://peer', GODOT_BASE_TIMEOUT: '1000', GODOT_AVATAR_TIMEOUT: '1000' }, + {} + ) + const metrics = createTestMetricsComponent(metricDeclarations) + + beforeEach(() => { + jest.resetAllMocks() + ;(mkdir as jest.Mock).mockResolvedValue(undefined) + ;(writeFile as jest.Mock).mockResolvedValue(undefined) + ;(rm as jest.Mock).mockResolvedValue(undefined) + }) + + it('should generate images successfully', async () => { + const logs = await createLogComponent({ config }) + const mockExec = exec as unknown as jest.Mock + const mockChildProcess: any = new EventEmitter() + mockChildProcess.pid = 123 + + mockExec.mockImplementation((...args) => { + // Get the callback which could be the 2nd or 3rd parameter + const callback = args.find((arg) => typeof arg === 'function') + callback(null, 'success', '') + return mockChildProcess + }) + ;(stat as jest.Mock).mockResolvedValue({}) + + const godot = await createGodotSnapshotComponent({ logs, metrics, config }) + const avatars: ExtendedAvatar[] = [ + { + entity: 'entity1', + avatar: {} as any + } + ] + + const result = await godot.generateImages(avatars) + + expect(result.avatars[0].success).toBe(true) + expect(result.output).toBeUndefined() + expect(mkdir).toHaveBeenCalledWith('output', { recursive: true }) + expect(writeFile).toHaveBeenCalled() + }) + + it('should handle process errors gracefully', async () => { + const logs = await createLogComponent({ config }) + const mockExec = exec as unknown as jest.Mock + const mockChildProcess: any = new EventEmitter() + mockChildProcess.pid = 123 + + mockExec.mockImplementation((...args) => { + const isGodot = args.find((arg) => typeof arg === 'string' && arg.includes('godot')) + const callback = args.find((arg) => typeof arg === 'function') + if (isGodot) { + callback(new Error('Process failed'), '', 'error output') + } else { + callback(null, 'success', '') + } + return mockChildProcess + }) + ;(stat as jest.Mock).mockRejectedValue(new Error('nope')) + + const godot = await createGodotSnapshotComponent({ logs, metrics, config }) + const avatars: ExtendedAvatar[] = [ + { + entity: 'entity1', + avatar: {} as any + } + ] + + const result = await godot.generateImages(avatars) + + expect(result.avatars[0].success).toBe(false) + expect(result.output).toContain('error output') + expect(mkdir).toHaveBeenCalledWith('output', { recursive: true }) + expect(writeFile).toHaveBeenCalled() + }) + + it('should handle multiple avatars', async () => { + const logs = await createLogComponent({ config }) + const mockExec = exec as unknown as jest.Mock + const mockChildProcess: any = new EventEmitter() + mockChildProcess.pid = 123 + + mockExec.mockImplementation((...args) => { + const callback = args.find((arg) => typeof arg === 'function') + callback(null, 'success', '') + return mockChildProcess + }) + ;(stat as jest.Mock).mockResolvedValue({}) + + const godot = await createGodotSnapshotComponent({ logs, metrics, config }) + const avatars: ExtendedAvatar[] = [ + { + entity: 'entity1', + avatar: {} as any + }, + { + entity: 'entity2', + avatar: {} as any + } + ] + + const result = await godot.generateImages(avatars) + + expect(result.avatars).toHaveLength(2) + expect(result.avatars[0].success).toBe(true) + expect(result.avatars[1].success).toBe(true) + }) + + it('should timeout when process takes too long', async () => { + const logs = await createLogComponent({ config }) + const mockExec = exec as unknown as jest.Mock + const mockChildProcess: any = new EventEmitter() + mockChildProcess.pid = 123 + + mockExec.mockImplementation((...args) => { + const isGodot = args.find((arg) => typeof arg === 'string' && arg.includes('godot')) + const callback = args.find((arg) => typeof arg === 'function') + if (isGodot) { + // Don't call the callback to simulate timeout + return mockChildProcess + } + callback(null, 'success', '') + return mockChildProcess + }) + ;(stat as jest.Mock).mockRejectedValue(new Error('nope')) + + const godot = await createGodotSnapshotComponent({ logs, metrics, config }) + const avatars: ExtendedAvatar[] = [ + { + entity: 'entity1', + avatar: {} as any + } + ] + + const result = await godot.generateImages(avatars) + + expect(result.avatars[0].success).toBe(false) + expect(result.output).toContain('timeout') + }, 10_000) +})