Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ClientRequest): support intercepting buffered requests #643

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 58 additions & 15 deletions src/interceptors/ClientRequest/MockHttpSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export class MockHttpSocket extends MockSocket {
private requestParser: HTTPParser<0>
private requestStream?: Readable
private shouldKeepAlive?: boolean
private requestHeaderSent: boolean

private responseType: 'mock' | 'bypassed' = 'bypassed'
private responseParser: HTTPParser<1>
Expand Down Expand Up @@ -89,6 +90,8 @@ export class MockHttpSocket extends MockSocket {
this.onRequest = options.onRequest
this.onResponse = options.onResponse

this.requestHeaderSent = false

this.baseUrl = baseUrlFromConnectionOptions(this.connectionOptions)

// Request parser.
Expand Down Expand Up @@ -124,6 +127,28 @@ export class MockHttpSocket extends MockSocket {
}
}

_read(): void {
/**
* @note If the request header hasn't been sent by the time the socket is read,
* it means that Node.js is buffering the socket writes in memory while
* the client finishes preparing the request. In that case, trigger the
* request start pipeline preemptively so the socket emits the "connect"
* event correctly.
*
* This is triggered if `.end()` is delegated to the "connect" socket event
* listener (i.e. finish the request once the connection is successful).
*/
if (!this.requestHeaderSent) {
this._onRequestStart({
path: this.connectionOptions.pathname,
headers: new Headers(this.connectionOptions.headers || {}),
keepAlive: this.connectionOptions.agent?.keepAlive,
})

this.requestHeaderSent = true
}
}

public emit(event: string | symbol, ...args: any[]): boolean {
const emitEvent = super.emit.bind(this, event as any, ...args)

Expand Down Expand Up @@ -428,22 +453,22 @@ export class MockHttpSocket extends MockSocket {
}
}

private onRequestStart: RequestHeadersCompleteCallback = (
versionMajor,
versionMinor,
rawHeaders,
_,
path,
__,
___,
____,
shouldKeepAlive
) => {
this.shouldKeepAlive = shouldKeepAlive
/**
* Internal method that triggers the start of the request processing
* and lets the parent interceptor know that a request has happened.
*/
private _onRequestStart(args: {
path: string
headers: Headers
keepAlive?: boolean
}) {
const { path, headers, keepAlive } = args

this.requestHeaderSent = true
this.shouldKeepAlive = keepAlive

const url = new URL(path, this.baseUrl)
const method = this.connectionOptions.method?.toUpperCase() || 'GET'
const headers = parseRawHeaders(rawHeaders)
const url = new URL(path, this.baseUrl)
const canHaveBody = method !== 'GET' && method !== 'HEAD'

// Translate the basic authorization in the URL to the request header.
Expand All @@ -467,7 +492,7 @@ export class MockHttpSocket extends MockSocket {
* used as the actual request body (the stream calls "read()").
* We control the queue in the onRequestBody/End functions.
*/
read: () => {
read: (size) => {
// If the user attempts to read the request body,
// flush the write buffer to trigger the callbacks.
// This way, if the request stream ends in the write callback,
Expand Down Expand Up @@ -511,6 +536,24 @@ export class MockHttpSocket extends MockSocket {
})
}

private onRequestStart: RequestHeadersCompleteCallback = (
versionMajor,
versionMinor,
rawHeaders,
_,
path,
__,
___,
____,
keepAlive
) => {
return this._onRequestStart({
path,
headers: parseRawHeaders(rawHeaders),
keepAlive,
})
}

private onRequestBody(chunk: Buffer): void {
invariant(
this.requestStream,
Expand Down
52 changes: 52 additions & 0 deletions test/modules/http/compliance/http-keepalive.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// @vitest-environment node
import { vi, it, expect, afterAll, afterEach, beforeAll } from 'vitest'
import http from 'node:http'
import { HttpServer } from '@open-draft/test-server/http'
import { ClientRequestInterceptor } from '../../../../src/interceptors/ClientRequest'
import { waitForClientRequest } from '../../..//helpers'

const interceptor = new ClientRequestInterceptor()

const httpServer = new HttpServer((app) => {
app.get('/resource', (req, res) => {
res.send('original response')
})
})

beforeAll(async () => {
interceptor.apply()
await httpServer.listen()
})

afterEach(() => {
interceptor.removeAllListeners()
})

afterAll(async () => {
interceptor.dispose()
await httpServer.close()
})

it('dispatches the "connect" socket event when reusing sockets ("keepAlive": true)', async () => {
const connectListener = vi.fn()

const agent = new http.Agent({
keepAlive: true,
})

async function makeRequest() {
const request = http.request(httpServer.http.url('/resource'), {
method: 'GET',
agent,
})
request.on('socket', (socket) => {
socket.on('connect', connectListener)
})
request.end()
await waitForClientRequest(request)
}

await Promise.all([makeRequest(), makeRequest(), makeRequest()])

expect(connectListener).toHaveBeenCalledTimes(3)
})
95 changes: 95 additions & 0 deletions test/modules/http/compliance/http-req-end.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// @vitest-environment node
import { it, expect, afterAll, afterEach, beforeAll } from 'vitest'
import http from 'node:http'
import { HttpServer } from '@open-draft/test-server/http'
import { ClientRequestInterceptor } from '../../../../src/interceptors/ClientRequest'
import { waitForClientRequest } from '../../..//helpers'
import { DeferredPromise } from '@open-draft/deferred-promise'

const interceptor = new ClientRequestInterceptor()

const httpServer = new HttpServer((app) => {
app.post('/resource', (req, res) => {
res.send('original response')
})
})

beforeAll(async () => {
interceptor.apply()
await httpServer.listen()
})

afterEach(() => {
interceptor.removeAllListeners()
})

afterAll(async () => {
interceptor.dispose()
await httpServer.close()
})

it('allows calling "req.end()" in the "connect" socket event (bypass)', async () => {
const request = http.request(httpServer.http.url('/resource'), {
method: 'POST',
headers: { 'X-My-Header': '1' },
})
request.on('socket', (socket) => {
socket.on('connect', () => {
request.end()
})
})

const { text } = await waitForClientRequest(request)
await expect(text()).resolves.toBe('original response')
})

it('allows calling "req.end()" in the "connect" socket event (interceptor + bypass)', async () => {
const requestPromise = new DeferredPromise<Request>()
interceptor.on('request', ({ request }) => {
requestPromise.resolve(request)
})

const request = http.request(httpServer.http.url('/resource'), {
method: 'POST',
headers: { 'X-My-Header': '1' },
})
request.on('socket', (socket) => {
socket.on('connect', () => {
request.end()
})
})

const { text } = await waitForClientRequest(request)
await expect(text()).resolves.toBe('original response')

const interceptedRequest = await requestPromise
expect(interceptedRequest.method).toBe('POST')
expect(interceptedRequest.url).toBe(httpServer.http.url('/resource'))
expect(Array.from(interceptedRequest.headers)).toEqual([['x-my-header', '1']])
})

it('allows calling "req.end()" in the "connect" socket event (mocked)', async () => {
const requestPromise = new DeferredPromise<Request>()
interceptor.on('request', ({ request, controller }) => {
requestPromise.resolve(request)
controller.respondWith(new Response('mocked response'))
})

const request = http.request('http://localhost/irrelevant', {
method: 'POST',
headers: { 'X-My-Header': '1' },
})
request.on('socket', (socket) => {
socket.on('connect', () => {
request.end()
})
})

const { text } = await waitForClientRequest(request)
await expect(text()).resolves.toBe('mocked response')

const interceptedRequest = await requestPromise
expect(interceptedRequest.method).toBe('POST')
expect(interceptedRequest.url).toBe('http://localhost/irrelevant')
expect(Array.from(interceptedRequest.headers)).toEqual([['x-my-header', '1']])
})
Loading