Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add fastify cpu profiler server #28

Merged
merged 5 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ Please see each tool's source directory for additional documentation
* Node.js signal handlers that provide a way to shut down long-running application components
gracefully on unhandled exceptions or interrupt signals.

### CPU Profiler

* Fastify server that controls a profiler capable of generating:
* `.cpuprofile` files for CPU usage analysis
* `.heapsnapshot` files for memory usage analysis

### Logger

* Standardized logger configuration using [pino](https://github.com/pinojs/pino)
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
"test": "jest --runInBand",
"lint:eslint": "eslint . --ext .js,.jsx,.ts,.tsx -f unix",
"lint:prettier": "prettier --check src/**/*.ts",
"testenv:run": "docker-compose -f docker/docker-compose.dev.postgres.yml up",
"testenv:stop": "docker-compose -f docker/docker-compose.dev.postgres.yml down -v -t 0",
"testenv:logs": "docker-compose -f docker/docker-compose.dev.postgres.yml logs -t -f"
"testenv:run": "docker compose -f docker/docker-compose.dev.postgres.yml up",
"testenv:stop": "docker compose -f docker/docker-compose.dev.postgres.yml down -v -t 0",
"testenv:logs": "docker compose -f docker/docker-compose.dev.postgres.yml logs -t -f"
},
"bin": {
"api-toolkit-git-info": "./bin/api-toolkit-git-info.js"
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export * from './fastify';
export * from './helpers';
export * from './logger';
export * from './postgres';
export * from './profiler';
export * from './server-version';
export * from './shutdown-handler';
67 changes: 67 additions & 0 deletions src/profiler/__tests__/profiler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { FastifyInstance } from 'fastify';
import { buildProfilerServer } from '../server';
import { timeout } from '../../helpers';

describe('CPU profiler', () => {
let fastify: FastifyInstance;

beforeAll(async () => {
fastify = await buildProfilerServer();
});

test('CPU profiler snapshot bad duration', async () => {
const query1 = await fastify.inject({
method: 'GET',
url: `/profile/cpu?duration=-100`,
});
expect(query1.statusCode).toBe(400);
});

test('generate CPU profiler snapshot', async () => {
const duration = 0.25; // 250 milliseconds
const query1 = await fastify.inject({
method: 'GET',
url: `/profile/cpu?duration=${duration}`,
});
expect(query1.statusCode).toBe(200);
expect(query1.headers['content-type']).toBe('application/json; charset=utf-8');
let cpuProfileBody: any;
// Ensure entire profile result was streamed/returned
expect(() => {
cpuProfileBody = query1.json();
}).not.toThrow();
// Cursory check for the expected JSON format of a `.cpuprofile` file
expect(cpuProfileBody).toEqual(
expect.objectContaining({
nodes: expect.any(Array),
samples: expect.any(Array),
timeDeltas: expect.any(Array),
startTime: expect.any(Number),
endTime: expect.any(Number),
})
);
});

test('cancel CPU profiler snapshot', async () => {
const duration = 150; // 150 seconds
// init a cpu profile request, hold on to the promise for reading the request response
const promise = fastify.inject({
method: 'GET',
url: `/profile/cpu?duration=${duration}`,
});
await timeout(200);
// perform a request to cancel the previous profile session
const endQuery = await fastify.inject({
method: 'GET',
url: `/profile/cancel`,
});
expect(endQuery.statusCode).toBe(200);
// ensure the initial request failed
const result = await promise;
expect(result.statusCode).toBe(500);
});

afterAll(async () => {
await fastify.close();
});
});
2 changes: 2 additions & 0 deletions src/profiler/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './inspector-util';
export * from './server';
275 changes: 275 additions & 0 deletions src/profiler/inspector-util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
import * as inspector from 'inspector';
import * as stream from 'stream';
import { stopwatch, Stopwatch } from '../helpers';
import { logger } from '../logger';

export type CpuProfileResult = inspector.Profiler.Profile;

export interface ProfilerInstance<TStopResult = void> {
start: () => Promise<void>;
stop: () => Promise<TStopResult>;
dispose: () => Promise<void>;
session: inspector.Session;
sessionType: 'cpu' | 'memory';
stopwatch: Stopwatch;
}

function isInspectorNotConnectedError(error: unknown): boolean {
const ERR_INSPECTOR_NOT_CONNECTED = 'ERR_INSPECTOR_NOT_CONNECTED';
const isNodeError = (r: unknown): r is NodeJS.ErrnoException => r instanceof Error && 'code' in r;
return isNodeError(error) && error.code === ERR_INSPECTOR_NOT_CONNECTED;
}

/**
* Connects and enables a new `inspector` session, then starts an internal v8 CPU profiling process.
* @returns A function to stop the profiling, and return the CPU profile result object.
* The result object can be used to create a `.cpuprofile` file using JSON.stringify.
* Use VSCode or Chrome's 'DevTools for Node' (under chrome://inspect) to visualize the `.cpuprofile` file.
* @param samplingInterval - Optionally set sampling interval in microseconds, default is 1000 microseconds.
*/
export function initCpuProfiling(samplingInterval?: number): ProfilerInstance<CpuProfileResult> {
const sessionStopwatch = stopwatch();
const session = new inspector.Session();
session.connect();
logger.info(`[CpuProfiler] Connect session took ${sessionStopwatch.getElapsedAndRestart()}ms`);
const start = async () => {
const sw = stopwatch();
logger.info(`[CpuProfiler] Enabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.enable', error => {
if (error) {
logger.error(error, '[CpuProfiler] Error enabling profiling');
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling enabled`);
resolve();
}
});
} catch (error) {
logger.error(error, '[CpuProfiler] Error enabling profiling');
reject(error);
}
});
logger.info(`[CpuProfiler] Enable session took ${sw.getElapsedAndRestart()}ms`);

if (samplingInterval !== undefined) {
logger.info(`[CpuProfiler] Setting sampling interval to ${samplingInterval} microseconds`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.setSamplingInterval', { interval: samplingInterval }, error => {
if (error) {
logger.error(error, '[CpuProfiler] Error setting sampling interval');
reject(error);
} else {
logger.info(`[CpuProfiler] Set sampling interval`);
resolve();
}
});
} catch (error) {
logger.error(error, '[CpuProfiler] Error setting sampling interval');
reject(error);
}
});
logger.info(`[CpuProfiler] Set sampling interval took ${sw.getElapsedAndRestart()}ms`);
}

logger.info(`[CpuProfiler] Profiling starting...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.start', error => {
if (error) {
logger.error(error, '[CpuProfiler] Error starting profiling');
reject(error);
} else {
sessionStopwatch.restart();
logger.info(`[CpuProfiler] Profiling started`);
resolve();
}
});
} catch (error) {
logger.error(error, '[CpuProfiler] Error starting profiling');
reject(error);
}
});
logger.info(`[CpuProfiler] Start profiler took ${sw.getElapsedAndRestart()}ms`);
};

const stop = async () => {
const sw = stopwatch();
logger.info(`[CpuProfiler] Profiling stopping...`);
try {
return await new Promise<CpuProfileResult>((resolve, reject) => {
try {
session.post('Profiler.stop', (error, profileResult) => {
if (error) {
logger.error(error, '[CpuProfiler] Error stopping profiling');
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling stopped`);
resolve(profileResult.profile);
}
});
} catch (error) {
reject(error);
}
});
} finally {
logger.info(`[CpuProfiler] Stop profiler took ${sw.getElapsedAndRestart()}ms`);
}
};

const dispose = async () => {
const sw = stopwatch();
try {
logger.info(`[CpuProfiler] Disabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.disable', error => {
if (error && isInspectorNotConnectedError(error)) {
logger.info(`[CpuProfiler] Profiler already disconnected`);
resolve();
} else if (error) {
logger.error(error, '[CpuProfiler] Error disabling profiling');
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling disabled`);
resolve();
}
});
} catch (error) {
if (isInspectorNotConnectedError(error)) {
logger.info(`[CpuProfiler] Profiler already disconnected`);
resolve();
} else {
reject();
}
}
});
} finally {
session.disconnect();
logger.info(
`[CpuProfiler] Disable and disconnect profiler took ${sw.getElapsedAndRestart()}ms`
);
}
};

return { start, stop, dispose, session, sessionType: 'cpu', stopwatch: sessionStopwatch };
}

/**
* Connects and enables a new `inspector` session, then creates an internal v8 Heap profiler snapshot.
* @param outputStream - An output stream that heap snapshot chunks are written to.
* The result stream can be used to create a `.heapsnapshot` file.
* Use Chrome's 'DevTools for Node' (under chrome://inspect) to visualize the `.heapsnapshot` file.
*/
export function initHeapSnapshot(
outputStream: stream.Writable
): ProfilerInstance<{ totalSnapshotByteSize: number }> {
const sw = stopwatch();
const session = new inspector.Session();
session.connect();
let totalSnapshotByteSize = 0;
const start = async () => {
logger.info(`[HeapProfiler] Enabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('HeapProfiler.enable', error => {
if (error) {
logger.error(error, '[HeapProfiler] Error enabling profiling');
reject(error);
} else {
sw.restart();
logger.info(`[HeapProfiler] Profiling enabled`);
resolve();
}
});
} catch (error) {
logger.error(error, '[HeapProfiler] Error enabling profiling');
reject(error);
}
});

session.on('HeapProfiler.addHeapSnapshotChunk', message => {
// Note: this doesn't handle stream back-pressure, but we don't have control over the
// `HeapProfiler.addHeapSnapshotChunk` callback in order to use something like piping.
// So in theory on a slow `outputStream` (usually an http connection response) this can cause OOM.
logger.info(
`[HeapProfiler] Writing heap snapshot chunk of size ${message.params.chunk.length}`
);
totalSnapshotByteSize += message.params.chunk.length;
outputStream.write(message.params.chunk, error => {
if (error) {
logger.error(
error,
`[HeapProfiler] Error writing heap profile chunk to output stream: ${error.message}`
);
}
});
});
};

const stop = async () => {
logger.info(`[HeapProfiler] Taking snapshot...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('HeapProfiler.takeHeapSnapshot', undefined, (error: Error | null) => {
if (error) {
logger.error(error, '[HeapProfiler] Error taking snapshot');
reject(error);
} else {
logger.info(
`[HeapProfiler] Taking snapshot completed, ${totalSnapshotByteSize} bytes...`
);
resolve();
}
});
} catch (error) {
logger.error(error, '[HeapProfiler] Error taking snapshot');
reject(error);
}
});
logger.info(`[HeapProfiler] Draining snapshot buffer to stream...`);
const writeFinishedPromise = new Promise<void>((resolve, reject) => {
outputStream.on('finish', () => resolve());
outputStream.on('error', error => reject(error));
});
outputStream.end();
await writeFinishedPromise;
logger.info(`[HeapProfiler] Finished draining snapshot buffer to stream`);
return { totalSnapshotByteSize };
};

const dispose = async () => {
try {
logger.info(`[HeapProfiler] Disabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('HeapProfiler.disable', error => {
if (error && isInspectorNotConnectedError(error)) {
logger.info(`[HeapProfiler] Profiler already disconnected`);
resolve();
} else if (error) {
logger.error(error, '[HeapProfiler] Error disabling profiling');
reject(error);
} else {
logger.info(`[HeapProfiler] Profiling disabled`);
resolve();
}
});
} catch (error) {
if (isInspectorNotConnectedError(error)) {
logger.info(`[HeapProfiler] Profiler already disconnected`);
resolve();
} else {
reject();
}
}
});
} finally {
session.disconnect();
}
};

return { start, stop, dispose, session, sessionType: 'memory', stopwatch: sw };
}
Loading
Loading