From cb80dfaa5a9d7fc16dc51c5617178338086f95b5 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Wed, 24 Jan 2024 22:27:20 -0800 Subject: [PATCH] feat: GC finalization for preview2-shims (#357) --- packages/preview2-shim/lib/io/calls.js | 8 +- packages/preview2-shim/lib/io/worker-http.js | 2 +- packages/preview2-shim/lib/io/worker-io.js | 173 ++++++++++++++++-- .../preview2-shim/lib/io/worker-socket-tcp.js | 14 +- .../preview2-shim/lib/io/worker-thread.js | 28 ++- packages/preview2-shim/lib/nodejs/clocks.js | 3 +- .../preview2-shim/lib/nodejs/filesystem.js | 74 ++++---- packages/preview2-shim/lib/nodejs/http.js | 141 ++++++++------ packages/preview2-shim/lib/nodejs/sockets.js | 118 +++++++++--- packages/preview2-shim/package.json | 2 +- packages/preview2-shim/test/test.js | 69 ++++--- 11 files changed, 457 insertions(+), 175 deletions(-) diff --git a/packages/preview2-shim/lib/io/calls.js b/packages/preview2-shim/lib/io/calls.js index fdea3be64..d0e4df534 100644 --- a/packages/preview2-shim/lib/io/calls.js +++ b/packages/preview2-shim/lib/io/calls.js @@ -62,6 +62,7 @@ export const HTTP_SERVER_STOP = ++call_id << CALL_SHIFT; export const HTTP_SERVER_INCOMING_HANDLER = ++call_id << CALL_SHIFT; export const HTTP_SERVER_SET_OUTGOING_RESPONSE = ++call_id << CALL_SHIFT; export const HTTP_SERVER_CLEAR_OUTGOING_RESPONSE = ++call_id << CALL_SHIFT; +export const HTTP_OUTGOING_BODY_DISPOSE = ++call_id << CALL_SHIFT; // Clocks export const CLOCKS_NOW = ++call_id << CALL_SHIFT; @@ -102,7 +103,8 @@ export const SOCKET_UDP_SET_RECEIVE_BUFFER_SIZE = ++call_id << CALL_SHIFT; export const SOCKET_UDP_SET_SEND_BUFFER_SIZE = ++call_id << CALL_SHIFT; export const SOCKET_UDP_SET_UNICAST_HOP_LIMIT = ++call_id << CALL_SHIFT; export const SOCKET_INCOMING_DATAGRAM_STREAM_RECEIVE = ++call_id << CALL_SHIFT; -export const SOCKET_OUTGOING_DATAGRAM_STREAM_CHECK_SEND = ++call_id << CALL_SHIFT; +export const SOCKET_OUTGOING_DATAGRAM_STREAM_CHECK_SEND = + ++call_id << CALL_SHIFT; export const SOCKET_OUTGOING_DATAGRAM_STREAM_SEND = ++call_id << CALL_SHIFT; export const SOCKET_DATAGRAM_STREAM_SUBSCRIBE = ++call_id << CALL_SHIFT; export const SOCKET_DATAGRAM_STREAM_DISPOSE = ++call_id << CALL_SHIFT; @@ -118,9 +120,9 @@ export const SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST = ++call_id << CALL_SHIFT; export const reverseMap = {}; -import * as calls from './calls.js'; +import * as calls from "./calls.js"; for (const name of Object.getOwnPropertyNames(calls)) { - if (name === 'reverseMap') continue; + if (name === "reverseMap") continue; reverseMap[calls[name]] = name; } diff --git a/packages/preview2-shim/lib/io/worker-http.js b/packages/preview2-shim/lib/io/worker-http.js index 15d3be710..6e98edc09 100644 --- a/packages/preview2-shim/lib/io/worker-http.js +++ b/packages/preview2-shim/lib/io/worker-http.js @@ -156,7 +156,7 @@ export async function createHttpRequest( return { status: res.statusCode, headers: Array.from(Object.entries(res.headers)), - bodyStreamId: bodyStreamId, + bodyStreamId }; } catch (e) { if (e?.tag) throw e; diff --git a/packages/preview2-shim/lib/io/worker-io.js b/packages/preview2-shim/lib/io/worker-io.js index aea91d851..21f63265a 100644 --- a/packages/preview2-shim/lib/io/worker-io.js +++ b/packages/preview2-shim/lib/io/worker-io.js @@ -3,7 +3,9 @@ import { createSyncFn } from "../synckit/index.js"; import { CALL_MASK, CALL_TYPE_MASK, + FILE, HTTP_SERVER_INCOMING_HANDLER, + HTTP, INPUT_STREAM_BLOCKING_READ, INPUT_STREAM_BLOCKING_SKIP, INPUT_STREAM_DISPOSE, @@ -25,9 +27,12 @@ import { POLL_POLLABLE_BLOCK, POLL_POLLABLE_DISPOSE, POLL_POLLABLE_READY, + SOCKET_TCP, + STDERR, + STDIN, + STDOUT, reverseMap, } from "./calls.js"; -import { STDERR } from "./calls.js"; import { _rawDebug, exit, stderr, stdout, env } from "node:process"; const workerPath = fileURLToPath( @@ -92,6 +97,43 @@ if (DEBUG) { const symbolDispose = Symbol.dispose || Symbol.for("dispose"); +const finalizationRegistry = new FinalizationRegistry( + (dispose) => void dispose() +); + +const dummySymbol = Symbol(); + +/** + * + * @param {any} resource + * @param {any} parentResource + * @param {number} id + * @param {(number) => void} disposeFn + */ +export function registerDispose(resource, parentResource, id, disposeFn) { + // While strictly speaking all components should handle their disposal, + // this acts as a last-resort to catch all missed drops through the JS GC. + // Mainly for two cases - (1) components which are long lived, that get shut + // down and (2) users that interface with low-level WASI APIs directly in JS + // for various reasons may end up leaning on JS GC inadvertantly. + function finalizer() { + // This has no functional purpose other than to pin a strong reference + // from the child resource's finalizer to the parent resource, to ensure + // that we can never finalize a parent resource before a child resource. + // This makes the generational JS GC become piecewise over child resource + // graphs (generational at each resource hierarchy level at least). + if (parentResource?.[dummySymbol]) return; + disposeFn(id); + } + finalizationRegistry.register(resource, finalizer, finalizer); + return finalizer; +} + +export function earlyDispose(finalizer) { + finalizationRegistry.unregister(finalizer); + finalizer(); +} + const _Error = Error; const IoError = class Error extends _Error { constructor(payload) { @@ -121,6 +163,7 @@ function streamIoErrorCall(call, id, payload) { class InputStream { #id; #streamType; + #finalizer; read(len) { return streamIoErrorCall( INPUT_STREAM_READ | this.#streamType, @@ -151,24 +194,65 @@ class InputStream { } subscribe() { return pollableCreate( - ioCall(INPUT_STREAM_SUBSCRIBE | this.#streamType, this.#id) + ioCall(INPUT_STREAM_SUBSCRIBE | this.#streamType, this.#id), + this ); } - [symbolDispose]() { - ioCall(INPUT_STREAM_DISPOSE | this.#streamType, this.#id); - } static _id(stream) { return stream.#id; } /** - * @param {InputStreamType} streamType + * @param {FILE | SOCKET_TCP | STDIN | HTTP} streamType */ static _create(streamType, id) { const stream = new InputStream(); stream.#id = id; stream.#streamType = streamType; + let disposeFn; + switch (streamType) { + case FILE: + disposeFn = fileInputStreamDispose; + break; + case SOCKET_TCP: + disposeFn = socketTcpInputStreamDispose; + break; + case STDIN: + disposeFn = stdinInputStreamDispose; + break; + case HTTP: + disposeFn = httpInputStreamDispose; + break; + default: + throw new Error( + "wasi-io trap: Dispose function not created for stream type " + + reverseMap[streamType] + ); + } + stream.#finalizer = registerDispose(stream, null, id, disposeFn); return stream; } + [symbolDispose]() { + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } + } +} + +function fileInputStreamDispose(id) { + ioCall(INPUT_STREAM_DISPOSE | FILE, id, null); +} + +function socketTcpInputStreamDispose(id) { + ioCall(INPUT_STREAM_DISPOSE | SOCKET_TCP, id, null); +} + +function stdinInputStreamDispose(id) { + ioCall(INPUT_STREAM_DISPOSE | STDIN, id, null); +} + +function httpInputStreamDispose(id) { + ioCall(INPUT_STREAM_DISPOSE | HTTP, id, null); } export const inputStreamCreate = InputStream._create; @@ -180,6 +264,7 @@ delete InputStream._id; class OutputStream { #id; #streamType; + #finalizer; checkWrite(len) { return streamIoErrorCall( OUTPUT_STREAM_CHECK_WRITE | this.#streamType, @@ -248,9 +333,6 @@ class OutputStream { ioCall(OUTPUT_STREAM_SUBSCRIBE | this.#streamType, this.#id) ); } - [symbolDispose]() { - ioCall(OUTPUT_STREAM_DISPOSE | this.#streamType, this.#id); - } static _id(outputStream) { return outputStream.#id; @@ -263,8 +345,54 @@ class OutputStream { const stream = new OutputStream(); stream.#id = id; stream.#streamType = streamType; + let disposeFn; + switch (streamType) { + case STDOUT: + disposeFn = stdoutOutputStreamDispose; + break; + case STDERR: + disposeFn = stderrOutputStreamDispose; + break; + case SOCKET_TCP: + disposeFn = socketTcpOutputStreamDispose; + break; + case FILE: + disposeFn = fileOutputStreamDispose; + break; + case HTTP: + return stream; + default: + throw new Error( + "wasi-io trap: Dispose function not created for stream type " + + reverseMap[streamType] + ); + } + stream.#finalizer = registerDispose(stream, null, id, disposeFn); return stream; } + + [symbolDispose]() { + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } + } +} + +function stdoutOutputStreamDispose(id) { + ioCall(OUTPUT_STREAM_DISPOSE | STDOUT, id); +} + +function stderrOutputStreamDispose(id) { + ioCall(OUTPUT_STREAM_DISPOSE | STDERR, id); +} + +function socketTcpOutputStreamDispose(id) { + ioCall(OUTPUT_STREAM_DISPOSE | SOCKET_TCP, id); +} + +function fileOutputStreamDispose(id) { + ioCall(OUTPUT_STREAM_DISPOSE | FILE, id); } export const outputStreamCreate = OutputStream._create; @@ -277,8 +405,13 @@ export const error = { Error: IoError }; export const streams = { InputStream, OutputStream }; +function pollableDispose(id) { + ioCall(POLL_POLLABLE_DISPOSE, id); +} + class Pollable { #id; + #finalizer; ready() { if (this.#id === 0) return true; return ioCall(POLL_POLLABLE_READY, this.#id); @@ -288,27 +421,31 @@ class Pollable { ioCall(POLL_POLLABLE_BLOCK, this.#id); } } - [symbolDispose]() { - if (this.#id !== 0) { - ioCall(POLL_POLLABLE_DISPOSE, this.#id); - this.#id = 0; - } - } static _getId(pollable) { return pollable.#id; } - static _create(id) { + static _create(id, parent) { const pollable = new Pollable(); pollable.#id = id; + pollable.#finalizer = registerDispose( + pollable, + parent, + id, + pollableDispose + ); return pollable; } + [symbolDispose]() { + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } + } } export const pollableCreate = Pollable._create; delete Pollable._create; -export const resolvedPoll = pollableCreate(0); - const pollableGetId = Pollable._getId; delete Pollable._getId; diff --git a/packages/preview2-shim/lib/io/worker-socket-tcp.js b/packages/preview2-shim/lib/io/worker-socket-tcp.js index 35e7c2307..77283fdb0 100644 --- a/packages/preview2-shim/lib/io/worker-socket-tcp.js +++ b/packages/preview2-shim/lib/io/worker-socket-tcp.js @@ -33,6 +33,7 @@ import { Socket, Server } from "node:net"; /** * @typedef {import("../../types/interfaces/wasi-sockets-network.js").IpSocketAddress} IpSocketAddress * @typedef {import("../../../types/interfaces/wasi-sockets-tcp.js").IpAddressFamily} IpAddressFamily + * @typedef {import("node:net").Socket} TcpSocket * * @typedef {{ * tcpSocket: number, @@ -43,6 +44,7 @@ import { Socket, Server } from "node:net"; * @typedef {{ * state: number, * future: number | null, + * socket: TcpSocket | null, * listenBacklogSize: number, * handle: TCP, * pendingAccepts: PendingAccept[], @@ -140,11 +142,11 @@ export function socketTcpConnectStart(id, remoteAddress, family) { socket.state = SOCKET_STATE_CONNECT; socket.future = createFuture( new Promise((resolve, reject) => { - const tcpSocket = new Socket({ + const tcpSocket = (socket.tcpSocket = new Socket({ handle: socket.handle, pauseOnCreate: true, allowHalfOpen: true, - }); + })); function handleErr(err) { tcpSocket.off("connect", handleConnect); reject(convertSocketError(err)); @@ -258,12 +260,12 @@ export function socketTcpGetRemoteAddress(id) { return ipSocketAddress(out.family.toLowerCase(), out.address, out.port); } -// Node.js only supports a write shutdown -// so we don't actually check the shutdown type -export function socketTcpShutdown(id, _shutdownType) { +export function socketTcpShutdown(id, shutdownType) { const socket = tcpSockets.get(id); if (socket.state !== SOCKET_STATE_CONNECTION) throw "invalid-state"; - if (socket.socket) socket.socket.end(); + // Node.js only supports a write shutdown, which is triggered on end + if (shutdownType === "send" || shutdownType === "both") + socket.tcpSocket.end(); } export function socketTcpSetKeepAlive(id, { keepAlive, keepAliveIdleTime }) { diff --git a/packages/preview2-shim/lib/io/worker-thread.js b/packages/preview2-shim/lib/io/worker-thread.js index 42b50c213..94003a366 100644 --- a/packages/preview2-shim/lib/io/worker-thread.js +++ b/packages/preview2-shim/lib/io/worker-thread.js @@ -25,6 +25,7 @@ import { FUTURE_TAKE_VALUE, HTTP, HTTP_CREATE_REQUEST, + HTTP_OUTGOING_BODY_DISPOSE, HTTP_OUTPUT_STREAM_FINISH, HTTP_SERVER_CLEAR_OUTGOING_RESPONSE, HTTP_SERVER_SET_OUTGOING_RESPONSE, @@ -347,10 +348,6 @@ function handle(call, id, payload) { // otherwise fall through to generic implementation return handle(call & ~HTTP, id, payload); } - case OUTPUT_STREAM_DISPOSE | HTTP: - throw new Error( - "wasi-io trap: HTTP output stream dispose is bypassed for FINISH" - ); case OUTPUT_STREAM_WRITE | HTTP: { const { stream } = getStreamOrThrow(id); stream.bytesRemaining -= payload.byteLength; @@ -366,8 +363,20 @@ function handle(call, id, payload) { const output = handle(OUTPUT_STREAM_WRITE, id, payload); return output; } + case OUTPUT_STREAM_DISPOSE | HTTP: + throw new Error( + "wasi-io trap: Output stream dispose not implemented as an IO-call for HTTP" + ); case HTTP_OUTPUT_STREAM_FINISH: { - const { stream } = getStreamOrThrow(id); + let stream; + try { + ({ stream } = getStreamOrThrow(id)); + } catch (e) { + if (e.tag === "closed") + throw { tag: "internal-error", val: "stream closed" }; + if (e.tag === "last-operation-failed") + throw { tag: "internal-error", val: e.val.message }; + } if (stream.bytesRemaining > 0) { throw { tag: "HTTP-request-body-size", @@ -383,6 +392,10 @@ function handle(call, id, payload) { stream.end(); return; } + case HTTP_OUTGOING_BODY_DISPOSE: + if (!streams.delete(id)) + throw new Error("wasi-io trap: stream not found to dispose"); + return; case HTTP_SERVER_START: return startHttpServer(id, payload); case HTTP_SERVER_STOP: @@ -600,9 +613,6 @@ function handle(call, id, payload) { return createPoll(streams.get(id).pollState); case INPUT_STREAM_DISPOSE: { const stream = streams.get(id); - // TODO: fix double drop where IncomingBody drops IncomingStream, - // instead implementing proper core WASI GC - if (!stream) return; verifyPollsDroppedForDrop(stream.pollState, "input stream"); streams.delete(id); return; @@ -757,6 +767,8 @@ function handle(call, id, payload) { payload = [id]; // [intentional case fall-through] case POLL_POLL_LIST: { + if (payload.length === 0) + throw new Error("wasi-io trap: attempt to poll on empty list"); const doneList = []; const pollList = payload.map((pollId) => polls.get(pollId)); for (const [idx, pollState] of pollList.entries()) { diff --git a/packages/preview2-shim/lib/nodejs/clocks.js b/packages/preview2-shim/lib/nodejs/clocks.js index f1d861a7f..a676b2546 100644 --- a/packages/preview2-shim/lib/nodejs/clocks.js +++ b/packages/preview2-shim/lib/nodejs/clocks.js @@ -1,4 +1,4 @@ -import { ioCall, createPoll, resolvedPoll } from "../io/worker-io.js"; +import { ioCall, createPoll } from "../io/worker-io.js"; import { CLOCKS_NOW, CLOCKS_INSTANT_SUBSCRIBE, @@ -17,7 +17,6 @@ export const monotonicClock = { }, subscribeDuration(duration) { duration = BigInt(duration); - if (duration === 0n) return resolvedPoll; return createPoll(CLOCKS_DURATION_SUBSCRIBE, null, duration); }, }; diff --git a/packages/preview2-shim/lib/nodejs/filesystem.js b/packages/preview2-shim/lib/nodejs/filesystem.js index 625286125..ff43d1e5d 100644 --- a/packages/preview2-shim/lib/nodejs/filesystem.js +++ b/packages/preview2-shim/lib/nodejs/filesystem.js @@ -1,7 +1,9 @@ import { - ioCall, + earlyDispose, inputStreamCreate, + ioCall, outputStreamCreate, + registerDispose, } from "../io/worker-io.js"; import { INPUT_STREAM_CREATE, OUTPUT_STREAM_CREATE } from "../io/calls.js"; import { FILE } from "../io/calls.js"; @@ -53,15 +55,17 @@ function lookupType(obj) { return "unknown"; } -// Note: This should implement per-segment semantics of openAt, but we cannot currently -// due to the lack of support for openat() in Node.js. +// Note: This should implement per-segment semantics of openAt, but we cannot +// currently due to the lack of support for openat() in Node.js. // Tracking issue: https://github.com/libuv/libuv/issues/4167 + /** * @implements {DescriptorProps} */ class Descriptor { #hostPreopen; #fd; + #finalizer; #mode; #fullPath; @@ -81,11 +85,19 @@ class Descriptor { static _create(fd, mode, fullPath) { const descriptor = new Descriptor(); descriptor.#fd = fd; + descriptor.#finalizer = registerDispose(descriptor, null, fd, closeSync); descriptor.#mode = mode; descriptor.#fullPath = fullPath; return descriptor; } + [symbolDispose]() { + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } + } + readViaStream(offset) { if (this.#hostPreopen) throw "is-directory"; return inputStreamCreate( @@ -118,8 +130,7 @@ class Descriptor { try { fdatasyncSync(this.#fd); } catch (e) { - if (e.code === 'EPERM') - return; + if (e.code === "EPERM") return; throw convertFsError(e); } } @@ -214,8 +225,7 @@ class Descriptor { try { fsyncSync(this.#fd); } catch (e) { - if (e.code === 'EPERM') - return; + if (e.code === "EPERM") return; throw convertFsError(e); } } @@ -333,23 +343,19 @@ class Descriptor { let isSymlink = false; try { isSymlink = lstatSync(fullPath).isSymbolicLink(); - } - catch (e) { + } catch (e) { // } - if (isSymlink) - throw openFlags.directory ? "not-directory" : "loop"; + if (isSymlink) throw openFlags.directory ? "not-directory" : "loop"; } if (pathFlags.symlinkFollow && openFlags.directory) { let isFile = false; try { isFile = !statSync(fullPath).isDirectory(); - } - catch (e) { + } catch (e) { // } - if (isFile) - throw "not-directory"; + if (isFile) throw "not-directory"; } } try { @@ -369,7 +375,8 @@ class Descriptor { } return descriptor; } catch (e) { - if (e.code === "ERR_INVALID_ARG_VALUE") throw isWindows ? "no-entry" : "invalid"; + if (e.code === "ERR_INVALID_ARG_VALUE") + throw isWindows ? "no-entry" : "invalid"; throw convertFsError(e); } } @@ -410,19 +417,17 @@ class Descriptor { try { symlinkSync(target, fullPath); } catch (e) { - if (fullPath.endsWith("/") && e.code === 'EEXIST') { + if (fullPath.endsWith("/") && e.code === "EEXIST") { let isDir = false; try { isDir = statSync(fullPath).isDirectory(); } catch (_) { // } - if (!isDir) - throw isWindows ? "no-entry" : "not-directory"; + if (!isDir) throw isWindows ? "no-entry" : "not-directory"; } if (isWindows) { - if (e.code === "EPERM" || e.code === "EEXIST") - throw "no-entry"; + if (e.code === "EPERM" || e.code === "EEXIST") throw "no-entry"; } throw convertFsError(e); } @@ -435,16 +440,14 @@ class Descriptor { let isDir = false; try { isDir = statSync(fullPath).isDirectory(); - } - catch (e) { + } catch (e) { // } throw isDir ? (isWindows ? "access" : "is-directory") : "not-directory"; } unlinkSync(fullPath); } catch (e) { - if (isWindows && e.code === "EPERM") - throw "access"; + if (isWindows && e.code === "EPERM") throw "access"; throw convertFsError(e); } } @@ -532,10 +535,6 @@ class Descriptor { ); return descriptor.#fullPath + (subpath.length > 0 ? "/" : "") + subpath; } - - [symbolDispose]() { - if (this.#fd) closeSync(this.#fd); - } } const descriptorCreatePreopen = Descriptor._createPreopen; delete Descriptor._createPreopen; @@ -544,6 +543,7 @@ delete Descriptor._create; class DirectoryEntryStream { #dir; + #finalizer; readDirectoryEntry() { let entry; try { @@ -558,15 +558,23 @@ class DirectoryEntryStream { const type = lookupType(entry); return { name, type }; } - [symbolDispose]() { - this.#dir.closeSync(); - } - static _create(dir) { const dirStream = new DirectoryEntryStream(); + dirStream.#finalizer = registerDispose( + dirStream, + null, + null, + dir.closeSync.bind(dir) + ); dirStream.#dir = dir; return dirStream; } + [symbolDispose]() { + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } + } } const directoryEntryStreamCreate = DirectoryEntryStream._create; delete DirectoryEntryStream._create; diff --git a/packages/preview2-shim/lib/nodejs/http.js b/packages/preview2-shim/lib/nodejs/http.js index 9460134ed..d54b09150 100644 --- a/packages/preview2-shim/lib/nodejs/http.js +++ b/packages/preview2-shim/lib/nodejs/http.js @@ -1,23 +1,25 @@ import { - INPUT_STREAM_DISPOSE, + FUTURE_DISPOSE, + FUTURE_SUBSCRIBE, + FUTURE_TAKE_VALUE, HTTP_CREATE_REQUEST, + HTTP_OUTGOING_BODY_DISPOSE, HTTP_OUTPUT_STREAM_FINISH, + HTTP_SERVER_CLEAR_OUTGOING_RESPONSE, + HTTP_SERVER_SET_OUTGOING_RESPONSE, HTTP_SERVER_START, HTTP_SERVER_STOP, OUTPUT_STREAM_CREATE, - FUTURE_DISPOSE, - FUTURE_TAKE_VALUE, - FUTURE_SUBSCRIBE, - HTTP_SERVER_SET_OUTGOING_RESPONSE, - HTTP_SERVER_CLEAR_OUTGOING_RESPONSE, + OUTPUT_STREAM_DISPOSE, } from "../io/calls.js"; import { + earlyDispose, inputStreamCreate, ioCall, outputStreamCreate, pollableCreate, + registerDispose, registerIncomingHttpHandler, - resolvedPoll, } from "../io/worker-io.js"; import { validateHeaderName, validateHeaderValue } from "node:http"; import { HTTP } from "../io/calls.js"; @@ -27,12 +29,12 @@ export const _forbiddenHeaders = new Set(["connection", "keep-alive"]); class IncomingBody { #finished = false; - #calledStream = false; - #streamId = undefined; + #stream = undefined; stream() { - if (this.#calledStream) throw undefined; - this.#calledStream = true; - return inputStreamCreate(HTTP, this.#streamId); + if (!this.#stream) throw undefined; + const stream = this.#stream; + this.#stream = null; + return stream; } static finish(incomingBody) { if (incomingBody.#finished) @@ -40,15 +42,10 @@ class IncomingBody { incomingBody.#finished = true; return futureTrailersCreate(); } - [symbolDispose]() { - if (!this.#finished) { - ioCall(INPUT_STREAM_DISPOSE | HTTP, this.#streamId); - this.#streamId = undefined; - } - } + [symbolDispose]() {} static _create(streamId) { const incomingBody = new IncomingBody(); - incomingBody.#streamId = streamId; + incomingBody.#stream = inputStreamCreate(HTTP, streamId); return incomingBody; } } @@ -80,6 +77,7 @@ class IncomingRequest { consume() { return incomingBodyCreate(this.#streamId); } + [symbolDispose]() {} static _create(method, pathWithQuery, scheme, authority, headers, streamId) { const incomingRequest = new IncomingRequest(); incomingRequest.#method = method; @@ -97,7 +95,7 @@ delete IncomingRequest._create; class FutureTrailers { #requested = false; subscribe() { - return resolvedPoll; + return pollableCreate(0, this); } get() { if (this.#requested) return { tag: "err" }; @@ -149,7 +147,7 @@ class OutgoingResponse { let contentLength; if (contentLengthValues.length > 0) contentLength = Number(new TextDecoder().decode(contentLengthValues[0])); - this.#body = outgoingBodyCreate(contentLength, true); + this.#body = outgoingBodyCreate(contentLength); return this.#body; } @@ -216,7 +214,7 @@ class OutgoingRequest { let contentLength; if (contentLengthValues.length > 0) contentLength = Number(new TextDecoder().decode(contentLengthValues[0])); - this.#body = outgoingBodyCreate(contentLength, false); + this.#body = outgoingBodyCreate(contentLength); } body() { if (this.#bodyRequested) throw new Error("Body already requested"); @@ -303,25 +301,14 @@ class OutgoingBody { #outputStream = null; #outputStreamId = null; #contentLength = undefined; + #finalizer; write() { - if (!this.#outputStreamId) this.#createOutputStream(); // can only call write once const outputStream = this.#outputStream; if (outputStream === null) throw undefined; this.#outputStream = null; return outputStream; } - #createOutputStream() { - this.#outputStream = outputStreamCreate( - HTTP, - (this.#outputStreamId = ioCall( - OUTPUT_STREAM_CREATE | HTTP, - null, - this.#contentLength - )) - ); - this.#outputStream[symbolDispose] = () => {}; - } /** * @param {OutgoingBody} body * @param {Fields | undefined} trailers @@ -330,19 +317,43 @@ class OutgoingBody { if (trailers) throw { tag: "internal-error", val: "trailers unsupported" }; // this will verify content length, and also verify not already finished // throwing errors as appropriate - if (body.#outputStreamId) - ioCall(HTTP_OUTPUT_STREAM_FINISH, body.#outputStreamId, null); + ioCall(HTTP_OUTPUT_STREAM_FINISH, body.#outputStreamId, null); } static _outputStreamId(outgoingBody) { return outgoingBody.#outputStreamId; } - static _create(contentLength, createBodyStream) { + static _create(contentLength) { const outgoingBody = new OutgoingBody(); outgoingBody.#contentLength = contentLength; - if (createBodyStream) outgoingBody.#createOutputStream(); + outgoingBody.#outputStreamId = ioCall( + OUTPUT_STREAM_CREATE | HTTP, + null, + outgoingBody.#contentLength + ); + outgoingBody.#outputStream = outputStreamCreate( + HTTP, + outgoingBody.#outputStreamId + ); + outgoingBody.#finalizer = registerDispose( + outgoingBody, + null, + outgoingBody.#outputStreamId, + outgoingBodyDispose + ); return outgoingBody; } + [symbolDispose]() { + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } + } +} + +function outgoingBodyDispose(id) { + ioCall(HTTP_OUTGOING_BODY_DISPOSE, id, null); } + const outgoingBodyOutputStreamId = OutgoingBody._outputStreamId; delete OutgoingBody._outputStreamId; @@ -352,7 +363,7 @@ delete OutgoingBody._create; class IncomingResponse { /** @type {Fields} */ #headers = undefined; #status = 0; - /** @type {number} */ #bodyStreamId; + /** @type {number} */ #bodyStream; status() { return this.#status; } @@ -360,22 +371,19 @@ class IncomingResponse { return this.#headers; } consume() { - if (this.#bodyStreamId === undefined) throw undefined; - const bodyStreamId = this.#bodyStreamId; - this.#bodyStreamId = undefined; - return incomingBodyCreate(bodyStreamId); + if (this.#bodyStream === undefined) throw undefined; + const bodyStream = this.#bodyStream; + this.#bodyStream = undefined; + return bodyStream; } [symbolDispose]() { - if (this.#bodyStreamId) { - ioCall(INPUT_STREAM_DISPOSE | HTTP, this.#bodyStreamId, null); - this.#bodyStreamId = undefined; - } + if (this.#bodyStream) this.#bodyStream[symbolDispose](); } static _create(status, headers, bodyStreamId) { const res = new IncomingResponse(); res.#status = status; res.#headers = headers; - res.#bodyStreamId = bodyStreamId; + res.#bodyStream = incomingBodyCreate(bodyStreamId); return res; } } @@ -385,8 +393,12 @@ delete IncomingResponse._create; class FutureIncomingResponse { #id; + #finalizer; subscribe() { - return pollableCreate(ioCall(FUTURE_SUBSCRIBE | HTTP, this.#id, null)); + return pollableCreate( + ioCall(FUTURE_SUBSCRIBE | HTTP, this.#id, null), + this + ); } get() { const ret = ioCall(FUTURE_TAKE_VALUE | HTTP, this.#id, null); @@ -404,9 +416,6 @@ class FutureIncomingResponse { } return ret; } - [symbolDispose]() { - ioCall(FUTURE_DISPOSE | HTTP, this.#id, null); - } static _create( method, scheme, @@ -430,8 +439,24 @@ class FutureIncomingResponse { betweenBytesTimeout, firstByteTimeout, }); + res.#finalizer = registerDispose( + res, + null, + res.#id, + futureIncomingResponseDispose + ); return res; } + [symbolDispose]() { + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } + } +} + +function futureIncomingResponseDispose(id) { + ioCall(FUTURE_DISPOSE | HTTP, id, null); } const futureIncomingResponseCreate = FutureIncomingResponse._create; @@ -645,26 +670,29 @@ export class HTTPServer { ), streamId ); + let outgoingBodyStreamId; const responseOutparam = responseOutparamCreate((response) => { if (response.tag === "ok") { const outgoingResponse = response.val; const statusCode = outgoingResponse.statusCode(); const headers = outgoingResponse.headers().entries(); const body = outgoingResponseBody(outgoingResponse); - const streamId = outgoingBodyOutputStreamId(body); + outgoingBodyStreamId = outgoingBodyOutputStreamId(body); ioCall(HTTP_SERVER_SET_OUTGOING_RESPONSE, responseId, { statusCode, headers, - streamId, + streamId: outgoingBodyStreamId, }); } else { ioCall(HTTP_SERVER_CLEAR_OUTGOING_RESPONSE, responseId, null); - console.error("TODO: handle outparam error"); - console.error(response); + console.error(response.val); process.exit(1); } }); incomingHandler.handle(request, responseOutparam); + if (outgoingBodyStreamId) { + ioCall(OUTPUT_STREAM_DISPOSE, outgoingBodyStreamId, null); + } } ); } @@ -676,5 +704,6 @@ export class HTTPServer { stop() { clearInterval(this.#liveEventLoopInterval); ioCall(HTTP_SERVER_STOP, this.#id, null); + httpServers.delete(this.#id); } } diff --git a/packages/preview2-shim/lib/nodejs/sockets.js b/packages/preview2-shim/lib/nodejs/sockets.js index 07e7989cd..bcc69feb8 100644 --- a/packages/preview2-shim/lib/nodejs/sockets.js +++ b/packages/preview2-shim/lib/nodejs/sockets.js @@ -43,20 +43,21 @@ import { SOCKET_UDP_SUBSCRIBE, } from "../io/calls.js"; import { + earlyDispose, inputStreamCreate, ioCall, outputStreamCreate, pollableCreate, - resolvedPoll, + registerDispose, } from "../io/worker-io.js"; +const symbolDispose = Symbol.dispose || Symbol.for("dispose"); + /** * @typedef {import("../../types/interfaces/wasi-sockets-network").IpSocketAddress} IpSocketAddress * @typedef {import("../../types/interfaces/wasi-sockets-network").IpAddressFamily} IpAddressFamily */ -const symbolDispose = Symbol.dispose || Symbol.for("dispose"); - // Network class privately stores capabilities class Network { #allowDnsLookup = true; @@ -116,6 +117,7 @@ class ResolveAddressStream { #data; #curItem = 0; #error = false; + #finalizer; resolveNextAddress() { if (!this.#data) { const res = ioCall(SOCKET_RESOLVE_ADDRESS_TAKE_REQUEST, this.#id, null); @@ -127,22 +129,32 @@ class ResolveAddressStream { return undefined; } subscribe() { - if (this.#id) - return pollableCreate( - ioCall(SOCKET_RESOLVE_ADDRESS_SUBSCRIBE_REQUEST, this.#id, null) - ); - return resolvedPoll; - } - [symbolDispose]() { - if (this.#id) - ioCall(SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST, this.#id, null); + return pollableCreate( + ioCall(SOCKET_RESOLVE_ADDRESS_SUBSCRIBE_REQUEST, this.#id, null), + this + ); } static _resolveAddresses(network, name) { if (!mayDnsLookup(network)) throw "permanent-resolver-failure"; const res = new ResolveAddressStream(); res.#id = ioCall(SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST, null, name); + res.#finalizer = registerDispose( + res, + null, + res.#id, + resolveAddressStreamDispose + ); return res; } + [symbolDispose]() { + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } + } +} +function resolveAddressStreamDispose(id) { + ioCall(SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST, id, null); } const resolveAddresses = ResolveAddressStream._resolveAddresses; @@ -157,6 +169,7 @@ class TcpSocket { #id; #network; #family; + #finalizer; #options = { // defaults per https://nodejs.org/docs/latest/api/net.html#socketsetkeepaliveenable-initialdelay keepAlive: false, @@ -188,6 +201,7 @@ class TcpSocket { const socket = new TcpSocket(); socket.#id = id; socket.#family = addressFamily; + socket.#finalizer = registerDispose(socket, null, id, socketTcpDispose); return socket; } startBind(network, localAddress) { @@ -308,7 +322,11 @@ class TcpSocket { } receiveBufferSize() { if (!this.#options.receiveBufferSize) - this.#options.receiveBufferSize = ioCall(SOCKET_GET_DEFAULT_RECEIVE_BUFFER_SIZE, null, null); + this.#options.receiveBufferSize = ioCall( + SOCKET_GET_DEFAULT_RECEIVE_BUFFER_SIZE, + null, + null + ); return this.#options.receiveBufferSize; } setReceiveBufferSize(value) { @@ -317,7 +335,11 @@ class TcpSocket { } sendBufferSize() { if (!this.#options.sendBufferSize) - this.#options.sendBufferSize = ioCall(SOCKET_GET_DEFAULT_SEND_BUFFER_SIZE, null, null); + this.#options.sendBufferSize = ioCall( + SOCKET_GET_DEFAULT_SEND_BUFFER_SIZE, + null, + null + ); return this.#options.sendBufferSize; } setSendBufferSize(value) { @@ -325,16 +347,23 @@ class TcpSocket { this.#options.sendBufferSize = value; } subscribe() { - return pollableCreate(ioCall(SOCKET_TCP_SUBSCRIBE, this.#id, null)); + return pollableCreate(ioCall(SOCKET_TCP_SUBSCRIBE, this.#id, null), this); } shutdown(shutdownType) { ioCall(SOCKET_TCP_SHUTDOWN, this.#id, shutdownType); } [symbolDispose]() { - ioCall(SOCKET_TCP_DISPOSE, this.#id, null); + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } } } +function socketTcpDispose(id) { + ioCall(SOCKET_TCP_DISPOSE, id, null); +} + const tcpSocketCreate = TcpSocket._create; delete TcpSocket._create; @@ -357,6 +386,7 @@ class UdpSocket { #id; #network; #family; + #finalizer; static _create(addressFamily) { if (addressFamily !== "ipv4" && addressFamily !== "ipv6") throw "not-supported"; @@ -369,6 +399,12 @@ class UdpSocket { unicastHopLimit: 64, }); socket.#family = addressFamily; + socket.#finalizer = registerDispose( + socket, + null, + socket.#id, + socketUdpDispose + ); return socket; } startBind(network, localAddress) { @@ -425,21 +461,35 @@ class UdpSocket { ioCall(SOCKET_UDP_SET_SEND_BUFFER_SIZE, this.#id, value); } subscribe() { - return pollableCreate(ioCall(SOCKET_UDP_SUBSCRIBE, this.#id, null)); + return pollableCreate(ioCall(SOCKET_UDP_SUBSCRIBE, this.#id, null), this); } [symbolDispose]() { - ioCall(SOCKET_UDP_DISPOSE, this.#id, null); + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } } } +function socketUdpDispose(id) { + ioCall(SOCKET_UDP_DISPOSE, id, null); +} + const createUdpSocket = UdpSocket._create; delete UdpSocket._create; class IncomingDatagramStream { #id; + #finalizer; static _create(id) { const stream = new IncomingDatagramStream(); stream.#id = id; + stream.#finalizer = registerDispose( + stream, + null, + id, + incomingDatagramStreamDispose + ); return stream; } receive(maxResults) { @@ -451,21 +501,37 @@ class IncomingDatagramStream { } subscribe() { return pollableCreate( - ioCall(SOCKET_DATAGRAM_STREAM_SUBSCRIBE, this.#id, null) + ioCall(SOCKET_DATAGRAM_STREAM_SUBSCRIBE, this.#id, null), + this ); } [symbolDispose]() { - ioCall(SOCKET_DATAGRAM_STREAM_DISPOSE, this.#id, null); + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } } } + +function incomingDatagramStreamDispose(id) { + ioCall(SOCKET_DATAGRAM_STREAM_DISPOSE, id, null); +} + const incomingDatagramStreamCreate = IncomingDatagramStream._create; delete IncomingDatagramStream._create; class OutgoingDatagramStream { #id = 0; + #finalizer; static _create(id) { const stream = new OutgoingDatagramStream(); stream.#id = id; + stream.#finalizer = registerDispose( + stream, + null, + id, + outgoingDatagramStreamDispose + ); return stream; } checkSend() { @@ -476,13 +542,21 @@ class OutgoingDatagramStream { } subscribe() { return pollableCreate( - ioCall(SOCKET_DATAGRAM_STREAM_SUBSCRIBE, this.#id, null) + ioCall(SOCKET_DATAGRAM_STREAM_SUBSCRIBE, this.#id, null), + this ); } [symbolDispose]() { - ioCall(SOCKET_DATAGRAM_STREAM_DISPOSE, this.#id, null); + if (this.#finalizer) { + earlyDispose(this.#finalizer); + this.#finalizer = null; + } } } +function outgoingDatagramStreamDispose(id) { + ioCall(SOCKET_DATAGRAM_STREAM_DISPOSE, id, null); +} + const outgoingDatagramStreamCreate = OutgoingDatagramStream._create; delete OutgoingDatagramStream._create; diff --git a/packages/preview2-shim/package.json b/packages/preview2-shim/package.json index 97b7211df..bc210a8a6 100644 --- a/packages/preview2-shim/package.json +++ b/packages/preview2-shim/package.json @@ -18,7 +18,7 @@ } }, "scripts": { - "test": "mocha -u tdd test/test.js --timeout 10000" + "test": "node --expose-gc ../../node_modules/mocha/bin/mocha.js -u tdd test/test.js --timeout 30000" }, "files": [ "types", diff --git a/packages/preview2-shim/test/test.js b/packages/preview2-shim/test/test.js index 39fa3e4ad..2f0c3cc9d 100644 --- a/packages/preview2-shim/test/test.js +++ b/packages/preview2-shim/test/test.js @@ -10,10 +10,18 @@ import { fileURLToPath } from "node:url"; const symbolDispose = Symbol.dispose || Symbol.for("dispose"); +function testWithGCWrap (asyncTestFn) { + return async () => { + await asyncTestFn(); + // Force the JS GC to run finalizers + gc(); + await new Promise(resolve => setTimeout(resolve, 200)); + }; +} + suite("Node.js Preview2", () => { test("Stdio", async () => { const { cli } = await import("@bytecodealliance/preview2-shim"); - // todo: wrap in a process call to not spill to test output cli.stdout .getStdout() .blockingWriteAndFlush(new TextEncoder().encode("test stdout")); @@ -101,27 +109,38 @@ suite("Node.js Preview2", () => { }); test("FS read", async () => { - const { filesystem } = await import("@bytecodealliance/preview2-shim"); - const [[rootDescriptor]] = filesystem.preopens.getDirectories(); - const childDescriptor = rootDescriptor.openAt( - {}, - fileURLToPath(import.meta.url).slice(1), - {}, - {} - ); - const stream = childDescriptor.readViaStream(0); - const poll = stream.subscribe(); - poll.block(); - let buf = stream.read(10000n); - while (buf.byteLength === 0) buf = stream.read(10000n); - const source = new TextDecoder().decode(buf); - ok(source.includes("UNIQUE STRING")); - poll[Symbol.dispose](); - stream[Symbol.dispose](); - childDescriptor[Symbol.dispose](); + let toDispose = []; + await (async () => { + const { filesystem } = await import("@bytecodealliance/preview2-shim"); + const [[rootDescriptor]] = filesystem.preopens.getDirectories(); + const childDescriptor = rootDescriptor.openAt( + {}, + fileURLToPath(import.meta.url).slice(1), + {}, + {} + ); + const stream = childDescriptor.readViaStream(0); + const poll = stream.subscribe(); + poll.block(); + let buf = stream.read(10000n); + while (buf.byteLength === 0) buf = stream.read(10000n); + const source = new TextDecoder().decode(buf); + ok(source.includes("UNIQUE STRING")); + toDispose.push(stream); + toDispose.push(childDescriptor); + })(); + + + // Force the Poll to GC so the next dispose doesn't trap + gc(); + await new Promise(resolve => setTimeout(resolve, 200)); + + for (const item of toDispose) { + item[symbolDispose](); + } }); - test("WASI HTTP", async () => { + test("WASI HTTP", testWithGCWrap(async () => { const { http } = await import("@bytecodealliance/preview2-shim"); const { handle } = http.outgoingHandler; const { OutgoingRequest, OutgoingBody, Fields } = http.types; @@ -175,7 +194,7 @@ suite("Node.js Preview2", () => { strictEqual(status, 200); ok(headers["content-type"].startsWith("text/html")); ok(responseBody.includes("WebAssembly")); - }); + })); suite("WASI Sockets (TCP)", async () => { test("sockets.instanceNetwork() should be a singleton", async () => { @@ -316,7 +335,7 @@ suite("Node.js Preview2", () => { // const [socket, input, output] = tcpSocket.accept(); }); - test("tcp.connect(): should connect to a valid ipv4 address and port=0", async () => { + test("tcp.connect(): should connect to a valid ipv4 address and port=0", testWithGCWrap(async () => { const { lookup } = await import("node:dns"); const { sockets } = await import("@bytecodealliance/preview2-shim"); const network = sockets.instanceNetwork.instanceNetwork(); @@ -372,7 +391,7 @@ suite("Node.js Preview2", () => { tcpSocket.remoteAddress().val.address, googleIp.split(".").map((n) => Number(n)) ); - }); + })); }); suite("WASI Sockets (UDP)", async () => { @@ -462,7 +481,7 @@ suite("Node.js Preview2", () => { strictEqual(boundAddress.val.port > 0, true); }); - test("udp.stream(): should connect to a valid ipv6 address", async () => { + test("udp.stream(): should connect to a valid ipv6 address", testWithGCWrap(async () => { const { sockets } = await import("@bytecodealliance/preview2-shim"); const network = sockets.instanceNetwork.instanceNetwork(); const socket = sockets.udpCreateSocket.createUdpSocket('ipv6'); @@ -483,6 +502,6 @@ suite("Node.js Preview2", () => { const boundAddress = socket.localAddress(); deepStrictEqual(boundAddress.val.address, [0, 0, 0, 0, 0, 0, 0, 0]); strictEqual(boundAddress.val.port, 1337); - }); + })); }); });