Skip to content

Commit

Permalink
fix: support passthrough via server
Browse files Browse the repository at this point in the history
  • Loading branch information
kettanaito committed Sep 30, 2024
1 parent daec32b commit 8c37fe0
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 15 deletions.
123 changes: 117 additions & 6 deletions src/browser/sse.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import type { Constructor } from 'type-fest'
import type { ResponseResolver } from '~/core/handlers/RequestHandler'
import {
HttpHandler,
type HttpRequestResolverExtras,
type HttpRequestParsedResult,
} from '~/core/handlers/HttpHandler'
import { HttpResponse } from '~/core/HttpResponse'
import type { Path, PathParams } from '~/core/utils/matching/matchRequestUrl'

export type ServerSentEventResolverExtras<Params extends PathParams> =
HttpRequestResolverExtras<Params> & {
source: ServerSentEventSource
client: ServerSentEventClient
server: ServerSentEventServer
}

export type ServerSentEventResolver<Params extends PathParams> =
Expand Down Expand Up @@ -42,14 +43,23 @@ class ServerSentEventHandler extends HttpHandler {
super('GET', path, (info) => {
const stream = new ReadableStream({
start(controller) {
const client = new ServerSentEventClient({
controller,
})
const server = new ServerSentEventServer({
request: info.request,
client,
})

resolver({
...info,
source: new ServerSentEventSource(controller),
client,
server,
})
},
})

return new HttpResponse(stream, {
return new Response(stream, {
headers: {
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
Expand All @@ -75,11 +85,13 @@ class ServerSentEventHandler extends HttpHandler {
}
}

class ServerSentEventSource {
class ServerSentEventClient {
private encoder: TextEncoder
protected controller: ReadableStreamDefaultController

constructor(protected controller: ReadableStreamDefaultController) {
constructor(args: { controller: ReadableStreamDefaultController }) {
this.encoder = new TextEncoder()
this.controller = args.controller
}

/**
Expand Down Expand Up @@ -109,3 +121,102 @@ class ServerSentEventSource {
this.controller.error()
}
}

const kListener = Symbol('kListener')

class ServerSentEventServer {
protected request: Request
protected client: ServerSentEventClient

constructor(args: { request: Request; client: ServerSentEventClient }) {
this.request = args.request
this.client = args.client
}

/**
* Establishes an actual connection for this SSE request
* and returns the `EventSource` instance.
*/
public connect(): EventSource {
const requestUrl = new URL(this.request.url)
requestUrl.searchParams.set('x-msw-intention', 'bypass')

let source = new EventSource(requestUrl, {
withCredentials: this.request.credentials === 'include',
})

const addEventListener = source.addEventListener.bind(source)
source.addEventListener = (event: any, listener: any, options: any) => {
const wrappedListener = this.wrapEventListener(listener).bind(source)
Object.defineProperty(listener, kListener, {
value: wrappedListener,
})
addEventListener(event, wrappedListener, options)
}

const removeEventListener = source.removeEventListener.bind(source)
source.removeEventListener = (event: any, listener: any, options: any) => {
const wrappedListener = listener[kListener] || listener
removeEventListener(event, wrappedListener, options)
}

source = new Proxy(source, {
set: (target, property, value) => {
switch (property) {
case 'onopen':
case 'onmessage':
case 'onerror': {
return Reflect.set(target, property, this.wrapEventListener(value))
}
}

return Reflect.set(target, property, value)
},
})

return source
}

private wrapEventListener(listener: (event: Event) => void) {
const { client } = this

return function (this: EventSource, event: Event) {
const EventConstructor = event.constructor as Constructor<Event>
const cancelableEvent = new EventConstructor(event.type, {
...event,
cancelable: true,
})

listener.call(this, cancelableEvent)

if (event.type === 'open') {
return
}

if (!cancelableEvent.defaultPrevented) {
switch (event.type) {
case 'error': {
client.error()
break
}

default: {
if (event instanceof MessageEvent) {
client.send({
id: event.lastEventId,
event: event.type === 'message' ? undefined : event.type,
/**
* @fixme Data will already be stringified.
* `client.send()` will stringify it again.
*/
data: event.data,
})
}

break
}
}
}
}
}
}
5 changes: 4 additions & 1 deletion src/core/utils/handleRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ export async function handleRequest(
emitter.emit('request:start', { request, requestId })

// Perform bypassed requests (i.e. wrapped in "bypass()") as-is.
if (request.headers.get('x-msw-intention') === 'bypass') {
if (
request.headers.get('x-msw-intention') === 'bypass' ||
new URL(request.url).searchParams.get('x-msw-intention') === 'bypass'
) {
emitter.emit('request:end', { request, requestId })
handleRequestOptions?.onPassthroughResponse?.(request)
return
Expand Down
71 changes: 63 additions & 8 deletions test/browser/sse-api/sse.node.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { setupWorker, sse } from 'msw/browser'
import { HttpServer } from '@open-draft/test-server/http'
import { test, expect } from '../playwright.extend'

declare namespace window {
Expand All @@ -8,6 +9,26 @@ declare namespace window {
}
}

const httpServer = new HttpServer((app) => {
app.get('/stream', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
})

res.write('data: {"message": "hello"}\n\n')
})
})

test.beforeAll(async () => {
await httpServer.listen()
})

test.afterAll(async () => {
await httpServer.close()
})

test('sends a mock message event', async ({ loadExample, page }) => {
await loadExample(require.resolve('./sse.mocks.ts'), {
skipActivation: true,
Expand All @@ -17,8 +38,8 @@ test('sends a mock message event', async ({ loadExample, page }) => {
const { setupWorker, sse } = window.msw

const worker = setupWorker(
sse('http://localhost/stream', ({ source }) => {
source.send({
sse('http://localhost/stream', ({ client }) => {
client.send({
data: { username: 'john' },
})
}),
Expand Down Expand Up @@ -48,8 +69,8 @@ test('sends a mock custom event', async ({ loadExample, page }) => {
const { setupWorker, sse } = window.msw

const worker = setupWorker(
sse('http://localhost/stream', ({ source }) => {
source.send({
sse('http://localhost/stream', ({ client }) => {
client.send({
event: 'userconnect',
data: { username: 'john' },
})
Expand Down Expand Up @@ -83,8 +104,8 @@ test('sends a mock message event with custom id', async ({
const { setupWorker, sse } = window.msw

const worker = setupWorker(
sse('http://localhost/stream', ({ source }) => {
source.send({
sse('http://localhost/stream', ({ client }) => {
client.send({
id: 'abc-123',
event: 'userconnect',
data: { username: 'john' },
Expand Down Expand Up @@ -122,8 +143,8 @@ test('errors the connected source', async ({ loadExample, page, waitFor }) => {
const { setupWorker, sse } = window.msw

const worker = setupWorker(
sse('http://localhost/stream', ({ source }) => {
queueMicrotask(() => source.error())
sse('http://localhost/stream', ({ client }) => {
queueMicrotask(() => client.error())
}),
)
await worker.start()
Expand All @@ -147,3 +168,37 @@ test('errors the connected source', async ({ loadExample, page, waitFor }) => {
expect(pageErrors).toContain('Failed to fetch')
})
})

test.only('forwards original server message events to the client', async ({
loadExample,
page,
}) => {
await loadExample(require.resolve('./sse.mocks.ts'), {
skipActivation: true,
})
const url = httpServer.http.url('/stream')

await page.evaluate(async (url) => {
const { setupWorker, sse } = window.msw

const worker = setupWorker(
sse(url, async ({ server }) => {
const source = server.connect()

source.addEventListener('message', (event) => {
event.preventDefault()
})
}),
)
await worker.start()
}, url)

await page.evaluate((url) => {
const source = new EventSource(url)
source.addEventListener('message', (event) => {
console.warn('client received:', event)
})
}, url)

await page.pause()
})

0 comments on commit 8c37fe0

Please sign in to comment.