From d8b3ed1408f372a5da89381bfe24de75e3d065c0 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Fri, 27 Sep 2024 12:09:43 +0200 Subject: [PATCH 1/3] test(http): add "keepAlive: true" socket connect event test --- .../http/compliance/http-keepalive.test.ts | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 test/modules/http/compliance/http-keepalive.test.ts diff --git a/test/modules/http/compliance/http-keepalive.test.ts b/test/modules/http/compliance/http-keepalive.test.ts new file mode 100644 index 00000000..3742129d --- /dev/null +++ b/test/modules/http/compliance/http-keepalive.test.ts @@ -0,0 +1,52 @@ +// @vitest-environment node +import { vi, it, expect, afterAll, afterEach, beforeAll } from 'vitest' +import http from 'node:http' +import { HttpServer } from '@open-draft/test-server/http' +import { ClientRequestInterceptor } from '../../../../src/interceptors/ClientRequest' +import { waitForClientRequest } from '../../..//helpers' + +const interceptor = new ClientRequestInterceptor() + +const httpServer = new HttpServer((app) => { + app.get('/resource', (req, res) => { + res.send('original response') + }) +}) + +beforeAll(async () => { + interceptor.apply() + await httpServer.listen() +}) + +afterEach(() => { + interceptor.removeAllListeners() +}) + +afterAll(async () => { + interceptor.dispose() + await httpServer.close() +}) + +it('dispatches the "connect" socket event when reusing sockets ("keepAlive": true)', async () => { + const connectListener = vi.fn() + + const agent = new http.Agent({ + keepAlive: true, + }) + + async function makeRequest() { + const request = http.request(httpServer.http.url('/resource'), { + method: 'GET', + agent, + }) + request.on('socket', (socket) => { + socket.on('connect', connectListener) + }) + request.end() + await waitForClientRequest(request) + } + + await Promise.all([makeRequest(), makeRequest(), makeRequest()]) + + expect(connectListener).toHaveBeenCalledTimes(3) +}) From c3a6b62cd8ef578c8e3b9918136ff5455c66edba Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Fri, 27 Sep 2024 14:35:15 +0200 Subject: [PATCH 2/3] fix(ClientRequest): support intercepting buffered requests --- .../ClientRequest/MockHttpSocket.ts | 78 ++++++++++++--- .../http/compliance/http-req-end.test.ts | 95 +++++++++++++++++++ 2 files changed, 158 insertions(+), 15 deletions(-) create mode 100644 test/modules/http/compliance/http-req-end.test.ts diff --git a/src/interceptors/ClientRequest/MockHttpSocket.ts b/src/interceptors/ClientRequest/MockHttpSocket.ts index 545faee9..04f0b1f6 100644 --- a/src/interceptors/ClientRequest/MockHttpSocket.ts +++ b/src/interceptors/ClientRequest/MockHttpSocket.ts @@ -59,6 +59,7 @@ export class MockHttpSocket extends MockSocket { private requestParser: HTTPParser<0> private requestStream?: Readable private shouldKeepAlive?: boolean + private requestHeaderSent: boolean private responseType: 'mock' | 'bypassed' = 'bypassed' private responseParser: HTTPParser<1> @@ -67,6 +68,12 @@ export class MockHttpSocket extends MockSocket { constructor(options: MockHttpSocketOptions) { super({ write: (chunk, encoding, callback) => { + // Mark the request header as sent once the + // socket receives its first write (which is the request header). + if (!this.requestHeaderSent) { + this.requestHeaderSent = true + } + this.writeBuffer.push([chunk, encoding, callback]) if (chunk) { @@ -89,6 +96,8 @@ export class MockHttpSocket extends MockSocket { this.onRequest = options.onRequest this.onResponse = options.onResponse + this.requestHeaderSent = false + this.baseUrl = baseUrlFromConnectionOptions(this.connectionOptions) // Request parser. @@ -124,6 +133,28 @@ export class MockHttpSocket extends MockSocket { } } + _read(): void { + /** + * @note If the request header hasn't been sent by the time the socket is read, + * it means that Node.js is buffering the socket writes in memory while + * the client finishes preparing the request. In that case, trigger the + * request start pipeline preemptively so the socket emits the "connect" + * event correctly. + * + * This is triggered if `.end()` is delegated to the "connect" socket event + * listener (i.e. finish the request once the connection is successful). + */ + if (!this.requestHeaderSent) { + this._onRequestStart({ + path: this.connectionOptions.pathname, + headers: new Headers(this.connectionOptions.headers || {}), + keepAlive: this.connectionOptions.agent?.keepAlive, + }) + + this.requestHeaderSent = true + } + } + public emit(event: string | symbol, ...args: any[]): boolean { const emitEvent = super.emit.bind(this, event as any, ...args) @@ -428,22 +459,21 @@ export class MockHttpSocket extends MockSocket { } } - private onRequestStart: RequestHeadersCompleteCallback = ( - versionMajor, - versionMinor, - rawHeaders, - _, - path, - __, - ___, - ____, - shouldKeepAlive - ) => { - this.shouldKeepAlive = shouldKeepAlive + /** + * Internal method that triggers the start of the request processing + * and lets the parent interceptor know that a request has happened. + */ + private _onRequestStart(args: { + path: string + headers: Headers + keepAlive?: boolean + }) { + const { path, headers, keepAlive } = args + + this.shouldKeepAlive = keepAlive - const url = new URL(path, this.baseUrl) const method = this.connectionOptions.method?.toUpperCase() || 'GET' - const headers = parseRawHeaders(rawHeaders) + const url = new URL(path, this.baseUrl) const canHaveBody = method !== 'GET' && method !== 'HEAD' // Translate the basic authorization in the URL to the request header. @@ -467,7 +497,7 @@ export class MockHttpSocket extends MockSocket { * used as the actual request body (the stream calls "read()"). * We control the queue in the onRequestBody/End functions. */ - read: () => { + read: (size) => { // If the user attempts to read the request body, // flush the write buffer to trigger the callbacks. // This way, if the request stream ends in the write callback, @@ -511,6 +541,24 @@ export class MockHttpSocket extends MockSocket { }) } + private onRequestStart: RequestHeadersCompleteCallback = ( + versionMajor, + versionMinor, + rawHeaders, + _, + path, + __, + ___, + ____, + keepAlive + ) => { + return this._onRequestStart({ + path, + headers: parseRawHeaders(rawHeaders), + keepAlive, + }) + } + private onRequestBody(chunk: Buffer): void { invariant( this.requestStream, diff --git a/test/modules/http/compliance/http-req-end.test.ts b/test/modules/http/compliance/http-req-end.test.ts new file mode 100644 index 00000000..36bc38e3 --- /dev/null +++ b/test/modules/http/compliance/http-req-end.test.ts @@ -0,0 +1,95 @@ +// @vitest-environment node +import { it, expect, afterAll, afterEach, beforeAll } from 'vitest' +import http from 'node:http' +import { HttpServer } from '@open-draft/test-server/http' +import { ClientRequestInterceptor } from '../../../../src/interceptors/ClientRequest' +import { waitForClientRequest } from '../../..//helpers' +import { DeferredPromise } from '@open-draft/deferred-promise' + +const interceptor = new ClientRequestInterceptor() + +const httpServer = new HttpServer((app) => { + app.post('/resource', (req, res) => { + res.send('original response') + }) +}) + +beforeAll(async () => { + interceptor.apply() + await httpServer.listen() +}) + +afterEach(() => { + interceptor.removeAllListeners() +}) + +afterAll(async () => { + interceptor.dispose() + await httpServer.close() +}) + +it('allows calling "req.end()" in the "connect" socket event (bypass)', async () => { + const request = http.request(httpServer.http.url('/resource'), { + method: 'POST', + headers: { 'X-My-Header': '1' }, + }) + request.on('socket', (socket) => { + socket.on('connect', () => { + request.end() + }) + }) + + const { text } = await waitForClientRequest(request) + await expect(text()).resolves.toBe('original response') +}) + +it('allows calling "req.end()" in the "connect" socket event (interceptor + bypass)', async () => { + const requestPromise = new DeferredPromise() + interceptor.on('request', ({ request }) => { + requestPromise.resolve(request) + }) + + const request = http.request(httpServer.http.url('/resource'), { + method: 'POST', + headers: { 'X-My-Header': '1' }, + }) + request.on('socket', (socket) => { + socket.on('connect', () => { + request.end() + }) + }) + + const { text } = await waitForClientRequest(request) + await expect(text()).resolves.toBe('original response') + + const interceptedRequest = await requestPromise + expect(interceptedRequest.method).toBe('POST') + expect(interceptedRequest.url).toBe(httpServer.http.url('/resource')) + expect(Array.from(interceptedRequest.headers)).toEqual([['x-my-header', '1']]) +}) + +it('allows calling "req.end()" in the "connect" socket event (mocked)', async () => { + const requestPromise = new DeferredPromise() + interceptor.on('request', ({ request, controller }) => { + requestPromise.resolve(request) + controller.respondWith(new Response('mocked response')) + }) + + const request = http.request('http://localhost/irrelevant', { + method: 'POST', + headers: { 'X-My-Header': '1' }, + }) + request.on('socket', (socket) => { + socket.on('connect', () => { + request.end() + }) + }) + + const { text } = await waitForClientRequest(request) + await expect(text()).resolves.toBe('mocked response') + + const interceptedRequest = await requestPromise + expect(interceptedRequest.method).toBe('POST') + expect(interceptedRequest.url).toBe('http://localhost/irrelevant') + expect(Array.from(interceptedRequest.headers)).toEqual([['x-my-header', '1']]) +}) From 7949662e5362975f7d1caf5424f0aad518ee568f Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Fri, 27 Sep 2024 14:49:35 +0200 Subject: [PATCH 3/3] chore: set `requestHeaderSent` in `_onRequestStart` --- src/interceptors/ClientRequest/MockHttpSocket.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/interceptors/ClientRequest/MockHttpSocket.ts b/src/interceptors/ClientRequest/MockHttpSocket.ts index 04f0b1f6..be9d99b1 100644 --- a/src/interceptors/ClientRequest/MockHttpSocket.ts +++ b/src/interceptors/ClientRequest/MockHttpSocket.ts @@ -68,12 +68,6 @@ export class MockHttpSocket extends MockSocket { constructor(options: MockHttpSocketOptions) { super({ write: (chunk, encoding, callback) => { - // Mark the request header as sent once the - // socket receives its first write (which is the request header). - if (!this.requestHeaderSent) { - this.requestHeaderSent = true - } - this.writeBuffer.push([chunk, encoding, callback]) if (chunk) { @@ -470,6 +464,7 @@ export class MockHttpSocket extends MockSocket { }) { const { path, headers, keepAlive } = args + this.requestHeaderSent = true this.shouldKeepAlive = keepAlive const method = this.connectionOptions.method?.toUpperCase() || 'GET'