Skip to content

Commit

Permalink
test: add debug info when failing generation (#81)
Browse files Browse the repository at this point in the history
* add debug info when failing generation

* fix tests

* disable clean failures temporary

* add tests

* add tests
  • Loading branch information
leanmendoza authored Dec 3, 2024
1 parent d05cda3 commit 456d14a
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 47 deletions.
13 changes: 3 additions & 10 deletions src/adapters/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Message } from '@aws-sdk/client-sqs'
import { AppComponents, ExtendedAvatar, QueueWorker } from '../types'
import { sleep } from '../logic/sleep'
import { sqsDeleteMessage, sqsReceiveMessage, sqsSendMessage } from '../logic/queue'
import { writeFile } from 'fs/promises'

export async function createConsumerComponent({
config,
Expand Down Expand Up @@ -65,7 +64,7 @@ export async function createConsumerComponent({
input.push(body)
}

const results = await godot.generateImages(input)
const { avatars: results, output: outputGenerated } = await godot.generateImages(input)

for (const result of results) {
const message = messageByEntity.get(result.entity)!
Expand All @@ -80,21 +79,15 @@ export async function createConsumerComponent({
metrics.increment('snapshot_generation_count', { status: 'failure' }, 1)
logger.debug(`Giving up on entity=${result.entity} because of godot failure.`)
const failure = {
timestamp: new Date().toISOString(),
commitHash,
version,
entity: result.entity,
output: result.output
outputGenerated
}
await storage.storeFailure(result.entity, JSON.stringify(failure))
} else {
logger.debug(`Godot failure, enqueue for individual retry, entity=${result.entity}`)

if (result.output !== undefined) {
// log the failure into disk (reuslt.output.stderr && result.output.stdout)
const failureFilePath = `failure-${result.entity}.json`
await writeFile(failureFilePath, JSON.stringify(result.output))
}

await sqsSendMessage(sqsClient, retryQueueUrl, { entity: result.entity, avatar: result.avatar })
}

Expand Down
99 changes: 86 additions & 13 deletions src/adapters/godot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ type GodotAvatarPayload = ExtendedAvatar & {
faceHeight: number | undefined
}

type GodotInput = {
baseUrl: string
payload: GodotAvatarPayload[]
}

export type GodotComponent = {
generateImages(profiles: ExtendedAvatar[]): Promise<AvatarGenerationResult[]>
generateImages(profiles: ExtendedAvatar[]): Promise<{ output?: string; avatars: AvatarGenerationResult[] }>
}

const outputPath = 'output'
Expand All @@ -32,41 +37,66 @@ export async function createGodotSnapshotComponent({
const peerUrl = await config.requireString('PEER_URL')
const logger = logs.getLogger('godot-snapshot')
const explorerPath = process.env.EXPLORER_PATH || '.'
const baseTime = (await config.getNumber('GODOT_BASE_TIMEOUT')) || 15_000
const timePerAvatar = (await config.getNumber('GODOT_AVATAR_TIMEOUT')) || 10_000

let executionNumber = 0

function run(input: any): Promise<undefined | { stderr: string; stdout: string }> {
function runGodot(input: GodotInput): Promise<{ error: boolean; stderr: string; stdout: string }> {
return new Promise(async (resolve) => {
executionNumber += 1

const timeout = baseTime + input.payload.length * timePerAvatar
const avatarDataPath = `temp-avatars-${executionNumber}.json`
await writeFile(avatarDataPath, JSON.stringify(input))

await mkdir(outputPath, { recursive: true })

await writeFile(avatarDataPath, JSON.stringify(input))
const command = `${explorerPath}/decentraland.godot.client.x86_64 --rendering-driver opengl3 --avatar-renderer --avatars ${avatarDataPath}`
logger.debug(`about to exec: explorerPath: ${explorerPath}, display: ${process.env.DISPLAY}, command: ${command}`)
logger.debug(
`about to exec: explorerPath: ${explorerPath}, display: ${process.env.DISPLAY}, command: ${command}, timeout: ${timeout}`
)

let resolved = false

const childProcess = exec(command, { timeout }, (error, stdout, stderr) => {
if (resolved) {
return
}

const childProcess = exec(command, { timeout: 45_000 }, (error, stdout, stderr) => {
rm(avatarDataPath).catch(logger.error)
if (error) {
for (const f of globSync('core.*')) {
rm(f).catch(logger.error)
}
return resolve({ stdout, stderr })
resolved = true
return resolve({ error: true, stdout, stderr })
}
resolve(undefined)
resolved = true
resolve({ error: false, stdout, stderr })
})

const childProcessPid = childProcess.pid

childProcess.on('close', (_code, signal) => {
// timeout sends SIGTERM, we might want to kill it harder
if (signal === 'SIGTERM') {
childProcess.kill('SIGKILL')
}
})

setTimeout(() => {
exec(`kill -9 ${childProcessPid}`, () => {})
if (!resolved) {
resolve({ error: true, stdout: '', stderr: 'timeout' })
resolved = true
}
}, timeout + 5_000)
})
}

async function generateImages(avatars: ExtendedAvatar[]): Promise<AvatarGenerationResult[]> {
async function generateImages(
avatars: ExtendedAvatar[]
): Promise<{ output?: string; avatars: AvatarGenerationResult[] }> {
const payloads: GodotAvatarPayload[] = []
const results: AvatarGenerationResult[] = []

Expand All @@ -93,36 +123,79 @@ export async function createGodotSnapshotComponent({
}

if (payloads.length === 0) {
return results
return { avatars: results }
}

const input = {
baseUrl: `${peerUrl}/content`,
payload: payloads
}

const [previousTopData, previousDiskUsage] = await Promise.all([getTopData(), getDiskUsage()])

logger.debug(`Running godot to process ${payloads.length} avatars`)
const start = Date.now()
const output = await run(input)
const { error, stdout, stderr } = await runGodot(input)
const duration = Date.now() - start

metrics.observe('snapshot_generation_duration_seconds', {}, duration / payloads.length / 1000)
logger.log(`screenshots for ${payloads.length} entities: ${duration} ms`)

let failedGeneration = false
for (const result of results) {
try {
await Promise.all([stat(result.avatarPath), stat(result.facePath)])
result.success = true
} catch (err: any) {
failedGeneration = true
logger.error(err)
result.output = output
}
}

return results
let output = undefined
if (failedGeneration) {
const [nextTopData, nextDiskUsage] = await Promise.all([getTopData(), getDiskUsage()])
output = `
> error: ${error}\n
> previousTopData: ${previousTopData}\n
> previousDiskUsage: ${previousDiskUsage}\n
> nextTopData: ${nextTopData}\n
> nextDiskUsage: ${nextDiskUsage}\n
> stdout: ${stdout}\n
> stderr: ${stderr}\n
> input: ${JSON.stringify(input)}\n
> duration: ${duration} ms\n
`
}

return { avatars: results, output }
}

return {
generateImages
}
}

// @returns the top 10 processes sorted by resident memory
function getTopData(): Promise<string> {
return new Promise<string>((resolve, reject) => {
exec('top -b -n 1 -o RES | head -n 17', (error, stdout, _stderr) => {
if (error) {
reject(error)
}
resolve(stdout)
})
})
}

// @returns the disk usage
function getDiskUsage(): Promise<string> {
return new Promise<string>((resolve, reject) => {
exec('df -h', (error, stdout, _stderr) => {
if (error) {
reject(error)
}
resolve(stdout)
})
})
}
5 changes: 3 additions & 2 deletions src/controllers/handlers/set-schedule-processing-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export async function scheduleProcessingHandler(
): Promise<IHttpServerComponent.IResponse> {
const {
request,
components: { logs, sqsClient, storage, fetch, config }
components: { logs, sqsClient, storage: _storage, fetch, config }
} = context

const [mainQueueUrl, peerUrl] = await Promise.all([
Expand All @@ -23,7 +23,8 @@ export async function scheduleProcessingHandler(
throw new InvalidRequestError('Invalid request. Request body is not valid')
}

await storage.deleteFailures(body)
// TODO: failures will be deleted manually, let keep this comment to revise in the future
// await storage.deleteFailures(body)

const response = await fetch.fetch(
`${peerUrl}/content/deployments?` +
Expand Down
1 change: 0 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ export type AvatarGenerationResult = ExtendedAvatar & {
success: boolean
avatarPath: string
facePath: string
output?: { stderr: string; stdout: string }
}

export type QueueWorker = IBaseComponent & {
Expand Down
48 changes: 27 additions & 21 deletions test/unit/consumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,15 @@ describe('Consumer test', function () {
]

generateImages.mockImplementation((input: ExtendedAvatar[]) => {
return input.map(({ avatar, entity }: ExtendedAvatar) => ({
avatar,
entity,
success: false,
avatarPath: 'avatar0.png',
facePath: 'face0.png'
}))
return {
avatars: input.map(({ avatar, entity }: ExtendedAvatar) => ({
avatar,
entity,
success: false,
avatarPath: 'avatar0.png',
facePath: 'face0.png'
}))
}
})

await consumer.process(QUEUE_NAME, messages)
Expand Down Expand Up @@ -180,13 +182,15 @@ describe('Consumer test', function () {
]

generateImages.mockImplementation((input: ExtendedAvatar[]) => {
return input.map(({ avatar, entity }: ExtendedAvatar, i) => ({
avatar,
entity,
success: false,
avatarPath: `avatar${i}.png`,
facePath: `face${i}.png`
}))
return {
avatars: input.map(({ avatar, entity }: ExtendedAvatar, i) => ({
avatar,
entity,
success: false,
avatarPath: `avatar${i}.png`,
facePath: `face${i}.png`
}))
}
})

await consumer.process(QUEUE_NAME, messages)
Expand Down Expand Up @@ -250,13 +254,15 @@ describe('Consumer test', function () {
]

generateImages.mockImplementation((input: ExtendedAvatar[]) => {
return input.map(({ avatar, entity }: ExtendedAvatar, i) => ({
avatar,
entity,
success: true,
avatarPath: `avatar${i}.png`,
facePath: `face${i}.png`
}))
return {
avatars: input.map(({ avatar, entity }: ExtendedAvatar, i) => ({
avatar,
entity,
success: true,
avatarPath: `avatar${i}.png`,
facePath: `face${i}.png`
}))
}
})

await consumer.process(QUEUE_NAME, messages)
Expand Down
Loading

0 comments on commit 456d14a

Please sign in to comment.