Skip to content

Commit

Permalink
feat: GC finalization for preview2-shims (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
guybedford authored Jan 25, 2024
1 parent ff236ae commit cb80dfa
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 175 deletions.
8 changes: 5 additions & 3 deletions packages/preview2-shim/lib/io/calls.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export const HTTP_SERVER_STOP = ++call_id << CALL_SHIFT;
export const HTTP_SERVER_INCOMING_HANDLER = ++call_id << CALL_SHIFT;
export const HTTP_SERVER_SET_OUTGOING_RESPONSE = ++call_id << CALL_SHIFT;
export const HTTP_SERVER_CLEAR_OUTGOING_RESPONSE = ++call_id << CALL_SHIFT;
export const HTTP_OUTGOING_BODY_DISPOSE = ++call_id << CALL_SHIFT;

// Clocks
export const CLOCKS_NOW = ++call_id << CALL_SHIFT;
Expand Down Expand Up @@ -102,7 +103,8 @@ export const SOCKET_UDP_SET_RECEIVE_BUFFER_SIZE = ++call_id << CALL_SHIFT;
export const SOCKET_UDP_SET_SEND_BUFFER_SIZE = ++call_id << CALL_SHIFT;
export const SOCKET_UDP_SET_UNICAST_HOP_LIMIT = ++call_id << CALL_SHIFT;
export const SOCKET_INCOMING_DATAGRAM_STREAM_RECEIVE = ++call_id << CALL_SHIFT;
export const SOCKET_OUTGOING_DATAGRAM_STREAM_CHECK_SEND = ++call_id << CALL_SHIFT;
export const SOCKET_OUTGOING_DATAGRAM_STREAM_CHECK_SEND =
++call_id << CALL_SHIFT;
export const SOCKET_OUTGOING_DATAGRAM_STREAM_SEND = ++call_id << CALL_SHIFT;
export const SOCKET_DATAGRAM_STREAM_SUBSCRIBE = ++call_id << CALL_SHIFT;
export const SOCKET_DATAGRAM_STREAM_DISPOSE = ++call_id << CALL_SHIFT;
Expand All @@ -118,9 +120,9 @@ export const SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST = ++call_id << CALL_SHIFT;

export const reverseMap = {};

import * as calls from './calls.js';
import * as calls from "./calls.js";

for (const name of Object.getOwnPropertyNames(calls)) {
if (name === 'reverseMap') continue;
if (name === "reverseMap") continue;
reverseMap[calls[name]] = name;
}
2 changes: 1 addition & 1 deletion packages/preview2-shim/lib/io/worker-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export async function createHttpRequest(
return {
status: res.statusCode,
headers: Array.from(Object.entries(res.headers)),
bodyStreamId: bodyStreamId,
bodyStreamId
};
} catch (e) {
if (e?.tag) throw e;
Expand Down
173 changes: 155 additions & 18 deletions packages/preview2-shim/lib/io/worker-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import { createSyncFn } from "../synckit/index.js";
import {
CALL_MASK,
CALL_TYPE_MASK,
FILE,
HTTP_SERVER_INCOMING_HANDLER,
HTTP,
INPUT_STREAM_BLOCKING_READ,
INPUT_STREAM_BLOCKING_SKIP,
INPUT_STREAM_DISPOSE,
Expand All @@ -25,9 +27,12 @@ import {
POLL_POLLABLE_BLOCK,
POLL_POLLABLE_DISPOSE,
POLL_POLLABLE_READY,
SOCKET_TCP,
STDERR,
STDIN,
STDOUT,
reverseMap,
} from "./calls.js";
import { STDERR } from "./calls.js";
import { _rawDebug, exit, stderr, stdout, env } from "node:process";

const workerPath = fileURLToPath(
Expand Down Expand Up @@ -92,6 +97,43 @@ if (DEBUG) {

const symbolDispose = Symbol.dispose || Symbol.for("dispose");

const finalizationRegistry = new FinalizationRegistry(
(dispose) => void dispose()
);

const dummySymbol = Symbol();

/**
*
* @param {any} resource
* @param {any} parentResource
* @param {number} id
* @param {(number) => void} disposeFn
*/
export function registerDispose(resource, parentResource, id, disposeFn) {
// While strictly speaking all components should handle their disposal,
// this acts as a last-resort to catch all missed drops through the JS GC.
// Mainly for two cases - (1) components which are long lived, that get shut
// down and (2) users that interface with low-level WASI APIs directly in JS
// for various reasons may end up leaning on JS GC inadvertantly.
function finalizer() {
// This has no functional purpose other than to pin a strong reference
// from the child resource's finalizer to the parent resource, to ensure
// that we can never finalize a parent resource before a child resource.
// This makes the generational JS GC become piecewise over child resource
// graphs (generational at each resource hierarchy level at least).
if (parentResource?.[dummySymbol]) return;
disposeFn(id);
}
finalizationRegistry.register(resource, finalizer, finalizer);
return finalizer;
}

export function earlyDispose(finalizer) {
finalizationRegistry.unregister(finalizer);
finalizer();
}

const _Error = Error;
const IoError = class Error extends _Error {
constructor(payload) {
Expand Down Expand Up @@ -121,6 +163,7 @@ function streamIoErrorCall(call, id, payload) {
class InputStream {
#id;
#streamType;
#finalizer;
read(len) {
return streamIoErrorCall(
INPUT_STREAM_READ | this.#streamType,
Expand Down Expand Up @@ -151,24 +194,65 @@ class InputStream {
}
subscribe() {
return pollableCreate(
ioCall(INPUT_STREAM_SUBSCRIBE | this.#streamType, this.#id)
ioCall(INPUT_STREAM_SUBSCRIBE | this.#streamType, this.#id),
this
);
}
[symbolDispose]() {
ioCall(INPUT_STREAM_DISPOSE | this.#streamType, this.#id);
}
static _id(stream) {
return stream.#id;
}
/**
* @param {InputStreamType} streamType
* @param {FILE | SOCKET_TCP | STDIN | HTTP} streamType
*/
static _create(streamType, id) {
const stream = new InputStream();
stream.#id = id;
stream.#streamType = streamType;
let disposeFn;
switch (streamType) {
case FILE:
disposeFn = fileInputStreamDispose;
break;
case SOCKET_TCP:
disposeFn = socketTcpInputStreamDispose;
break;
case STDIN:
disposeFn = stdinInputStreamDispose;
break;
case HTTP:
disposeFn = httpInputStreamDispose;
break;
default:
throw new Error(
"wasi-io trap: Dispose function not created for stream type " +
reverseMap[streamType]
);
}
stream.#finalizer = registerDispose(stream, null, id, disposeFn);
return stream;
}
[symbolDispose]() {
if (this.#finalizer) {
earlyDispose(this.#finalizer);
this.#finalizer = null;
}
}
}

function fileInputStreamDispose(id) {
ioCall(INPUT_STREAM_DISPOSE | FILE, id, null);
}

function socketTcpInputStreamDispose(id) {
ioCall(INPUT_STREAM_DISPOSE | SOCKET_TCP, id, null);
}

function stdinInputStreamDispose(id) {
ioCall(INPUT_STREAM_DISPOSE | STDIN, id, null);
}

function httpInputStreamDispose(id) {
ioCall(INPUT_STREAM_DISPOSE | HTTP, id, null);
}

export const inputStreamCreate = InputStream._create;
Expand All @@ -180,6 +264,7 @@ delete InputStream._id;
class OutputStream {
#id;
#streamType;
#finalizer;
checkWrite(len) {
return streamIoErrorCall(
OUTPUT_STREAM_CHECK_WRITE | this.#streamType,
Expand Down Expand Up @@ -248,9 +333,6 @@ class OutputStream {
ioCall(OUTPUT_STREAM_SUBSCRIBE | this.#streamType, this.#id)
);
}
[symbolDispose]() {
ioCall(OUTPUT_STREAM_DISPOSE | this.#streamType, this.#id);
}

static _id(outputStream) {
return outputStream.#id;
Expand All @@ -263,8 +345,54 @@ class OutputStream {
const stream = new OutputStream();
stream.#id = id;
stream.#streamType = streamType;
let disposeFn;
switch (streamType) {
case STDOUT:
disposeFn = stdoutOutputStreamDispose;
break;
case STDERR:
disposeFn = stderrOutputStreamDispose;
break;
case SOCKET_TCP:
disposeFn = socketTcpOutputStreamDispose;
break;
case FILE:
disposeFn = fileOutputStreamDispose;
break;
case HTTP:
return stream;
default:
throw new Error(
"wasi-io trap: Dispose function not created for stream type " +
reverseMap[streamType]
);
}
stream.#finalizer = registerDispose(stream, null, id, disposeFn);
return stream;
}

[symbolDispose]() {
if (this.#finalizer) {
earlyDispose(this.#finalizer);
this.#finalizer = null;
}
}
}

function stdoutOutputStreamDispose(id) {
ioCall(OUTPUT_STREAM_DISPOSE | STDOUT, id);
}

function stderrOutputStreamDispose(id) {
ioCall(OUTPUT_STREAM_DISPOSE | STDERR, id);
}

function socketTcpOutputStreamDispose(id) {
ioCall(OUTPUT_STREAM_DISPOSE | SOCKET_TCP, id);
}

function fileOutputStreamDispose(id) {
ioCall(OUTPUT_STREAM_DISPOSE | FILE, id);
}

export const outputStreamCreate = OutputStream._create;
Expand All @@ -277,8 +405,13 @@ export const error = { Error: IoError };

export const streams = { InputStream, OutputStream };

function pollableDispose(id) {
ioCall(POLL_POLLABLE_DISPOSE, id);
}

class Pollable {
#id;
#finalizer;
ready() {
if (this.#id === 0) return true;
return ioCall(POLL_POLLABLE_READY, this.#id);
Expand All @@ -288,27 +421,31 @@ class Pollable {
ioCall(POLL_POLLABLE_BLOCK, this.#id);
}
}
[symbolDispose]() {
if (this.#id !== 0) {
ioCall(POLL_POLLABLE_DISPOSE, this.#id);
this.#id = 0;
}
}
static _getId(pollable) {
return pollable.#id;
}
static _create(id) {
static _create(id, parent) {
const pollable = new Pollable();
pollable.#id = id;
pollable.#finalizer = registerDispose(
pollable,
parent,
id,
pollableDispose
);
return pollable;
}
[symbolDispose]() {
if (this.#finalizer) {
earlyDispose(this.#finalizer);
this.#finalizer = null;
}
}
}

export const pollableCreate = Pollable._create;
delete Pollable._create;

export const resolvedPoll = pollableCreate(0);

const pollableGetId = Pollable._getId;
delete Pollable._getId;

Expand Down
14 changes: 8 additions & 6 deletions packages/preview2-shim/lib/io/worker-socket-tcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { Socket, Server } from "node:net";
/**
* @typedef {import("../../types/interfaces/wasi-sockets-network.js").IpSocketAddress} IpSocketAddress
* @typedef {import("../../../types/interfaces/wasi-sockets-tcp.js").IpAddressFamily} IpAddressFamily
* @typedef {import("node:net").Socket} TcpSocket
*
* @typedef {{
* tcpSocket: number,
Expand All @@ -43,6 +44,7 @@ import { Socket, Server } from "node:net";
* @typedef {{
* state: number,
* future: number | null,
* socket: TcpSocket | null,
* listenBacklogSize: number,
* handle: TCP,
* pendingAccepts: PendingAccept[],
Expand Down Expand Up @@ -140,11 +142,11 @@ export function socketTcpConnectStart(id, remoteAddress, family) {
socket.state = SOCKET_STATE_CONNECT;
socket.future = createFuture(
new Promise((resolve, reject) => {
const tcpSocket = new Socket({
const tcpSocket = (socket.tcpSocket = new Socket({
handle: socket.handle,
pauseOnCreate: true,
allowHalfOpen: true,
});
}));
function handleErr(err) {
tcpSocket.off("connect", handleConnect);
reject(convertSocketError(err));
Expand Down Expand Up @@ -258,12 +260,12 @@ export function socketTcpGetRemoteAddress(id) {
return ipSocketAddress(out.family.toLowerCase(), out.address, out.port);
}

// Node.js only supports a write shutdown
// so we don't actually check the shutdown type
export function socketTcpShutdown(id, _shutdownType) {
export function socketTcpShutdown(id, shutdownType) {
const socket = tcpSockets.get(id);
if (socket.state !== SOCKET_STATE_CONNECTION) throw "invalid-state";
if (socket.socket) socket.socket.end();
// Node.js only supports a write shutdown, which is triggered on end
if (shutdownType === "send" || shutdownType === "both")
socket.tcpSocket.end();
}

export function socketTcpSetKeepAlive(id, { keepAlive, keepAliveIdleTime }) {
Expand Down
Loading

0 comments on commit cb80dfa

Please sign in to comment.