Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RJS-2854: Adding a ThreadSafeFunction-based scheduler for Core on Node.js #6800

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
"--import=tsx",
"--expose_gc",
"--enable-source-maps",
"--no-warnings",
"--force-node-api-uncaught-exceptions-policy",
"${workspaceRoot}/node_modules/mocha/lib/cli/cli.js",
"--require",
"src/node/inject-dev-environment.ts",
Expand All @@ -85,6 +85,28 @@
"${input:integrationTestFilter}"
]
},
{
"type": "lldb",
"request": "launch",
"name": "LLDB: Integration tests (throwing from listeners)",
"program": "node",
"cwd": "${workspaceRoot}/integration-tests/tests",
"args": [
"--inspect",
"--import=tsx",
"--expose_gc",
"--enable-source-maps",
"--force-node-api-uncaught-exceptions-policy",
"${workspaceRoot}/node_modules/mocha/lib/cli/cli.js",
"--require",
"src/node/inject-dev-environment.ts",
"src/node/index.ts",
"--timeout",
"10000",
"--grep",
"can throw from a listener"
]
},
{
"type": "node",
"presentation": {
Expand All @@ -99,7 +121,7 @@
"--import=tsx",
"--expose_gc",
"--enable-source-maps",
"--no-warnings"
"--force-node-api-uncaught-exceptions-policy"
],
"args": [
"--require",
Expand Down
6 changes: 3 additions & 3 deletions integration-tests/environments/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
"scripts": {
"test": "wireit",
"test:commonjs": "wireit",
"test:ci": "mocha-remote --reporter @realm/mocha-reporter -- tsx index.mjs",
"test:ci": "mocha-remote --reporter @realm/mocha-reporter -- tsx --force-node-api-uncaught-exceptions-policy index.mjs",
"lint": "eslint --ext js,mjs ."
},
"wireit": {
"test": {
"command": "mocha-remote --reporter @realm/mocha-reporter -- tsx index.mjs",
"command": "mocha-remote --reporter @realm/mocha-reporter -- tsx --force-node-api-uncaught-exceptions-policy index.mjs",
"dependencies": [
"../../../packages/realm:build:ts",
"../../../packages/realm:build:node",
"../../../packages/mocha-reporter:bundle"
]
},
"test:commonjs": {
"command": "mocha-remote --reporter @realm/mocha-reporter -- tsx index.cjs",
"command": "mocha-remote --reporter @realm/mocha-reporter -- tsx --force-node-api-uncaught-exceptions-policy index.cjs",
"dependencies": [
"../../../packages/realm:build:ts",
"../../../packages/realm:build:node",
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/.mocharc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"import=tsx",
"expose_gc",
"enable-source-maps",
"no-warnings"
"force-node-api-uncaught-exceptions-policy"
],
"reporter": "@realm/mocha-reporter",
"require": [
Expand Down
23 changes: 23 additions & 0 deletions integration-tests/tests/src/node/setup-globals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,29 @@ Object.assign(globalThis, {
},
},
gc: vm.runInNewContext("gc"),
async nextUncaughtException(timeoutMs = 5000) {
// Remove any other listeners, storing them later so they can be restored
const listenersBefore = process.listeners("uncaughtException");
process.removeAllListeners("uncaughtException");
try {
return await new Promise<Error>((resolve, reject) => {
const timeoutTimer = setTimeout(() => {
process.off("uncaughtException", handleException);
const error = new Error(`Timed out waiting for uncaught exception (waited ${timeoutMs} ms)`);
reject(error);
}, timeoutMs);
function handleException(error: Error) {
clearTimeout(timeoutTimer);
resolve(error);
}
process.once("uncaughtException", handleException);
});
} finally {
for (const listener of listenersBefore) {
process.addListener("uncaughtException", listener);
}
}
},
});

// Indicate that the tests are running in Node
Expand Down
28 changes: 28 additions & 0 deletions integration-tests/tests/src/tests/observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,20 @@ describe("Observable", () => {
expectObservableMethods(this.object);
});

it.skipIf(
typeof nextUncaughtException !== "function",
"can throw from a listener",
async function (this: RealmObjectContext<Person>) {
assert(typeof nextUncaughtException === "function", "Expected ability to await an uncaught exception");
const uncaughtException = nextUncaughtException();
this.object.addListener(() => {
throw new Error("boom!");
});
const error = await uncaughtException;
expect(error.message).equals("boom!");
},
);

it("throws on listener reuse", function (this: RealmObjectContext<Person>) {
this.object.addListener(noop);
expect(() => {
Expand Down Expand Up @@ -869,6 +883,20 @@ describe("Observable", () => {
expectObservableMethods(this.realm.objects("Person"));
});

it.skipIf(
typeof nextUncaughtException !== "function",
"can throw from a listener",
async function (this: RealmObjectContext<Person>) {
assert(typeof nextUncaughtException === "function", "Expected ability to await an uncaught exception");
const uncaughtException = nextUncaughtException();
this.realm.objects("Person").addListener(() => {
throw new Error("boom!");
});
const error = await uncaughtException;
expect(error.message).equals("boom!");
},
);

it("throws on listener reuse", function (this: RealmObjectContext<Person>) {
const collection = this.realm.objects("Person");
collection.addListener(noop);
Expand Down
1 change: 1 addition & 0 deletions integration-tests/tests/src/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ declare const fs: fs;
declare const path: path;
declare const environment: Environment;
declare const gc: undefined | (() => void);
declare const nextUncaughtException: undefined | (() => Promise<Error>);

// Extend the mocha test function with the skipIf that we patch in from index.ts
declare namespace Mocha {
Expand Down
1 change: 1 addition & 0 deletions packages/realm/bindgen/js_opt_in_spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ records:
- schema_version
- schema_mode
- disable_format_upgrade
- scheduler
- sync_config
- force_sync_history
- migration_function
Expand Down
2 changes: 2 additions & 0 deletions packages/realm/bindgen/src/templates/node-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export function generate(context: TemplateContext): void {

// We know that node always has real WeakRefs so just use them.
export const WeakRef = global.WeakRef;
// Export a special function to get the env specific scheduler
export const getPlatformScheduler = nativeModule.getPlatformScheduler;
`);

generateNativeBigIntSupport(out);
Expand Down
22 changes: 20 additions & 2 deletions packages/realm/bindgen/src/templates/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import { strict as assert } from "assert";

import { TemplateContext } from "@realm/bindgen/context";
import { CppVar, CppFunc, CppFuncProps, CppCtor, CppMethod, CppClass, CppDecls } from "@realm/bindgen/cpp";
import { CppVar, CppFunc, CppFuncProps, CppCtor, CppMethod, CppClass, CppDecls, CppMemInit } from "@realm/bindgen/cpp";
import {
BoundSpec,
Class,
Expand Down Expand Up @@ -83,6 +83,7 @@ class NodeAddon extends CppClass {
this.withCrtpBase("Napi::Addon");

this.members.push(new CppVar("std::deque<std::string>", "m_string_bufs"));
this.members.push(new CppVar("std::shared_ptr<util::Scheduler>", "m_scheduler"));
this.addMethod(
new CppMethod("wrapString", "const std::string&", [new CppVar("std::string", "str")], {
attributes: "inline",
Expand All @@ -104,6 +105,8 @@ class NodeAddon extends CppClass {
this.classes.forEach((t) =>
this.members.push(new CppVar("Napi::FunctionReference", NodeAddon.memberNameForExtractor(t))),
);

// Injectables
this.addMethod(
new CppMethod("injectInjectables", "void", [node_callback_info], {
body: `
Expand All @@ -122,14 +125,26 @@ class NodeAddon extends CppClass {
}),
);

// Env specific scheduler
this.addMethod(
new CppMethod("getPlatformScheduler", "Napi::Value", [node_callback_info], {
body: `
const auto env = info.Env();
return NODE_FROM_SHARED_Scheduler(env, env.GetInstanceData<RealmAddon>()->m_scheduler);
`,
}),
);

this.addMethod(
new CppCtor(this.name, [new CppVar("Napi::Env", env), new CppVar("Napi::Object", "exports")], {
mem_inits: [new CppMemInit("m_scheduler", `std::make_shared<NapiScheduler>(${env})`)],
body: `
DefineAddon(exports, {
${Object.entries(this.exports)
.map(([name, val]) => `InstanceValue("${name}", ${val}, napi_enumerable),`)
.join("\n")}
InstanceMethod<&${this.name}::injectInjectables>("injectInjectables"),
InstanceMethod<&${this.name}::getPlatformScheduler>("getPlatformScheduler"),
});
`,
}),
Expand Down Expand Up @@ -585,7 +600,9 @@ function convertFromNode(addon: NodeAddon, type: Type, expr: string): string {
// For now assuming that all void-returning functions are "notifications" and don't need to block until done.
// Non-void returning functions *must* block so they have something to return.
const shouldBlock = !type.ret.isVoid();
return shouldBlock ? `schedulerWrapBlockingFunction(${lambda})` : `util::EventLoopDispatcher(${lambda})`;
return shouldBlock
? `schedulerWrapBlockingFunction(${lambda}, ${env}.GetInstanceData<RealmAddon>()->m_scheduler)`
: `util::EventLoopDispatcher(${lambda}, ${env}.GetInstanceData<RealmAddon>()->m_scheduler)`;

case "Enum":
return `${type.cppName}((${expr}).As<Napi::Number>().DoubleValue())`;
Expand Down Expand Up @@ -936,6 +953,7 @@ export function generate({ rawSpec, spec, file: makeFile }: TemplateContext): vo
#include <napi.h>
#include <realm_helpers.h>
#include <realm_js_node_helpers.h>
#include <napi_scheduler.h>

namespace realm::js::node {
namespace {
Expand Down
3 changes: 3 additions & 0 deletions packages/realm/bindgen/src/templates/typescript.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ export function generate({ rawSpec, spec: boundSpec, file }: TemplateContext): v
out("export type AppError = Error & {code: number};");
out("export type CppErrorCode = Error & {code: number, category: string};");

out("// Special functions");
out("export const getPlatformScheduler: undefined | (() => binding.SharedScheduler);");

out(`
// WeakRef polyfill for Hermes.
export class WeakRef<T extends object> {
Expand Down
2 changes: 1 addition & 1 deletion packages/realm/bindgen/vendor/realm-core
Submodule realm-core updated 127 files
5 changes: 2 additions & 3 deletions packages/realm/binding/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ else()
target_compile_options(realm-js PRIVATE -Wall -Wextra)
endif()

target_include_directories(realm-js PRIVATE "${BINDGEN_DIR}/src")
target_include_directories(realm-js PRIVATE "${BINDING_DIR}")
target_include_directories(realm-js PRIVATE "${BINDGEN_DIR}/src" "${BINDING_DIR}" "${BINDING_DIR}/node")


file(GLOB_RECURSE SDK_TS_FILES
Expand Down Expand Up @@ -137,4 +136,4 @@ bindgen(
SOURCES ${SDK_TS_FILES}
)

target_sources(realm-js PRIVATE node_init.cpp ${CMAKE_JS_SRC} ${BINDING_DIR}/node/platform.cpp)
target_sources(realm-js PRIVATE node_init.cpp ${CMAKE_JS_SRC} ${BINDING_DIR}/node/platform.cpp ${BINDING_DIR}/node/napi_scheduler.cpp)
79 changes: 79 additions & 0 deletions packages/realm/binding/node/napi_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2024 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////

#include "napi_scheduler.h"

#include <realm/util/functional.hpp>
#include <realm/object-store/util/scheduler.hpp>

#include <napi.h>

#include <functional>
#include <memory>
#include <type_traits>

namespace realm::js::node {

namespace {
/**
* Assumes called exactly once per data value:
* An absent call results in a leak and multiple calls result in use-after-free.
*/
void call_func_from_data(Napi::Env, Napi::Function, std::nullptr_t*, VoidUniqueFunctionImpl* data)
{
(realm::util::UniqueFunction<void()>(data))();
}

/**
* A NAPI thread-safe function which use the data to construct and call a `UniqueFunction`:
* Simpler and faster than passing and calling a `Napi::Function` to `NonBlockingCall`.
*/
using SchedulerThreadSafeFunction =
Napi::TypedThreadSafeFunction<std::nullptr_t, VoidUniqueFunctionImpl, &call_func_from_data>;

} // namespace

NapiScheduler::NapiScheduler(Napi::Env& env)
: m_env(env)
// TODO: Consider including an id from the env in the resource name
, m_tsf(SchedulerThreadSafeFunction::New(env, "realm::NapiScheduler", 0, 1))
{
}

bool NapiScheduler::is_on_thread() const noexcept
{
return m_id == std::this_thread::get_id();
}

bool NapiScheduler::is_same_as(const Scheduler* other) const noexcept
{
auto o = dynamic_cast<const NapiScheduler*>(other);
return (o && (o->m_env == m_env));
}

bool NapiScheduler::can_invoke() const noexcept
{
return true;
}

void NapiScheduler::invoke(realm::util::UniqueFunction<void()>&& func)
{
m_tsf.NonBlockingCall(func.release());
}

} // namespace realm::js::node
Loading
Loading