Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wasm workers under node #22721

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions src/library_wasm_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,13 @@ addToLibrary({
},

// Executes a wasm function call received via a postMessage.
$_wasmWorkerRunPostMessage__deps: ['$callUserCallback'],
$_wasmWorkerRunPostMessage: (e) => {
// '_wsc' is short for 'wasm call', trying to use an identifier name that
// will never conflict with user code
#if ENVIRONMENT_MAY_BE_NODE
// In Node.js environment, message event 'e' containing the actual data sent,
// while in the browser environment it's contained by 'e.data'.
let data = ENVIRONMENT_IS_NODE ? e : e.data;
#else
let data = e.data;
#endif
let wasmCall = data['_wsc'];
wasmCall && getWasmTableEntry(wasmCall)(...data['x']);
wasmCall && callUserCallback(() => getWasmTableEntry(wasmCall)(...data['x']));
},

// src/postamble_minimal.js brings this symbol in to the build, and calls this
Expand All @@ -87,6 +82,11 @@ addToLibrary({
assert(m['sz'] % 16 == 0);
#endif

#if !MINIMAL_RUNTIME && isSymbolNeeded('$noExitRuntime')
// Wasm workers basically never exit their runtime
noExitRuntime = 1;
#endif

#if STACK_OVERFLOW_CHECK >= 2
// _emscripten_wasm_worker_initialize() initializes the stack for this
// Worker, but it cannot call to extern __set_stack_limits() function, or
Expand Down Expand Up @@ -209,6 +209,12 @@ if (ENVIRONMENT_IS_WASM_WORKER
#endif
});
worker.onmessage = _wasmWorkerRunPostMessage;
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) {
/** @suppress {checkTypes} */
worker.on('message', (msg) => worker.onmessage({ data: msg }));
}
#endif
return _wasmWorkersID++;
},

Expand All @@ -226,9 +232,7 @@ if (ENVIRONMENT_IS_WASM_WORKER
#if ASSERTIONS
assert(!ENVIRONMENT_IS_WASM_WORKER, 'emscripten_terminate_all_wasm_workers() cannot be called from a Wasm Worker: only the main browser thread has visibility to terminate all Workers!');
#endif
Object.values(_wasmWorkers).forEach((worker) => {
worker.terminate();
});
Object.values(_wasmWorkers).forEach((worker) => worker.terminate());
_wasmWorkers = {};
},

Expand Down
2 changes: 1 addition & 1 deletion src/runtime_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ if (ENVIRONMENT_IS_PTHREAD) {
// Create as web-worker-like an environment as we can.

var parentPort = worker_threads['parentPort'];
parentPort.on('message', (data) => onmessage({ data: data }));
parentPort.on('message', (msg) => onmessage({ data: msg }));

Object.assign(globalThis, {
self: global,
Expand Down
18 changes: 15 additions & 3 deletions src/wasm_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@ if (ENVIRONMENT_IS_NODE) {

var parentPort = nodeWorkerThreads.parentPort;

parentPort.on('message', (data) => typeof onmessage === "function" && onmessage({ data: data }));
parentPort.on('message', (msg) => global.onmessage?.({ data: msg }));

// Weak map of handle functions to their wrapper. Used to implement
// addEventListener/removeEventListener.
var wrappedHandlers = new WeakMap();
function wrapMsgHandler(h) {
var f = wrappedHandlers.get(h)
if (!f) {
f = (msg) => h({data: msg});
wrappedHandlers.set(h, f);
}
return f;
}
sbc100 marked this conversation as resolved.
Show resolved Hide resolved

var fs = require('fs');
var vm = require('vm');
Expand All @@ -28,8 +40,8 @@ if (ENVIRONMENT_IS_NODE) {
importScripts: (f) => vm.runInThisContext(fs.readFileSync(f, 'utf8'), {filename: f}),
postMessage: (msg) => parentPort.postMessage(msg),
performance: global.performance || { now: Date.now },
addEventListener: (name, handler) => parentPort.on(name, handler),
removeEventListener: (name, handler) => parentPort.off(name, handler),
addEventListener: (name, handler) => parentPort.on(name, wrapMsgHandler(handler)),
removeEventListener: (name, handler) => parentPort.off(name, wrapMsgHandler(handler)),
});
}
#endif // ENVIRONMENT_MAY_BE_NODE
Expand Down
14 changes: 7 additions & 7 deletions test/test_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4981,21 +4981,21 @@ def test_system(self):
# Tests the hello_wasm_worker.c documentation example code.
@also_with_minimal_runtime
def test_wasm_worker_hello(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS'])
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS'])

def test_wasm_worker_hello_minimal_runtime_2(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2'])
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2'])

# Tests Wasm Workers build in Wasm2JS mode.
@requires_wasm2js
@also_with_minimal_runtime
def test_wasm_worker_hello_wasm2js(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sWASM=0'])
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sWASM=0'])

# Tests the WASM_WORKERS=2 build mode, which embeds the Wasm Worker bootstrap JS script file to the main JS file.
@also_with_minimal_runtime
def test_wasm_worker_embedded(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS=2'])
def test_wasm_worker_hello_embedded(self):
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS=2'])

# Tests that it is possible to call emscripten_futex_wait() in Wasm Workers.
@parameterized({
Expand Down Expand Up @@ -5059,7 +5059,7 @@ def test_wasm_worker_sleep(self):
# Tests emscripten_terminate_wasm_worker()
@also_with_minimal_runtime
def test_wasm_worker_terminate(self):
self.btest('wasm_worker/terminate_wasm_worker.c', expected='0', args=['-sWASM_WORKERS'])
self.btest_exit('wasm_worker/terminate_wasm_worker.c', args=['-sWASM_WORKERS'])

# Tests emscripten_terminate_all_wasm_workers()
@also_with_minimal_runtime
Expand Down Expand Up @@ -5133,7 +5133,7 @@ def test_wasm_worker_lock_wait2(self):
# Tests emscripten_lock_async_acquire() function.
@also_with_minimal_runtime
def test_wasm_worker_lock_async_acquire(self):
self.btest('wasm_worker/lock_async_acquire.c', expected='0', args=['--closure=1', '-sWASM_WORKERS'])
self.btest_exit('wasm_worker/lock_async_acquire.c', args=['--closure=1', '-sWASM_WORKERS'])

# Tests emscripten_lock_busyspin_wait_acquire() in Worker and main thread.
@also_with_minimal_runtime
Expand Down
6 changes: 6 additions & 0 deletions test/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -13904,6 +13904,12 @@ def test_debug_opt_warning(self, should_fail, args):
else:
self.run_process([EMCC, test_file('hello_world.c'), '-Werror'] + args)

def test_wasm_worker_hello(self):
self.do_runf(test_file('wasm_worker/hello_wasm_worker.c'), emcc_args=['-sWASM_WORKERS'])

def test_wasm_worker_terminate(self):
self.do_runf(test_file('wasm_worker/terminate_wasm_worker.c'), emcc_args=['-sWASM_WORKERS'])

@also_with_minimal_runtime
def test_wasm_worker_closure(self):
self.run_process([EMCC, test_file('wasm_worker/lock_async_acquire.c'), '-O2', '-sWASM_WORKERS', '--closure=1'])
Expand Down
11 changes: 8 additions & 3 deletions test/wasm_worker/hello_wasm_worker.c
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
#include <emscripten/emscripten.h>
#include <emscripten/console.h>
#include <emscripten/em_asm.h>
#include <emscripten/wasm_worker.h>
#include <assert.h>

// This is the code example in site/source/docs/api_reference/wasm_workers.rst
void do_exit() {
emscripten_out("do_exit");
emscripten_terminate_all_wasm_workers();
emscripten_force_exit(0);
}

void run_in_worker() {
emscripten_out("Hello from wasm worker!\n");
EM_ASM(typeof checkStackCookie == 'function' && checkStackCookie());
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit);
}

int main() {
emscripten_wasm_worker_t worker = emscripten_malloc_wasm_worker(/*stack size: */1024);
assert(worker);
emscripten_wasm_worker_post_function_v(worker, run_in_worker);
emscripten_exit_with_live_runtime();
}
19 changes: 13 additions & 6 deletions test/wasm_worker/lock_async_acquire.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ bool testFinished = false;
int numTimesMainThreadAcquiredLock = 0;
int numTimesWasmWorkerAcquiredLock = 0;

void do_exit() {
emscripten_out("do_exit");
emscripten_terminate_all_wasm_workers();
emscripten_force_exit(0);
}

void work() {
// emscripten_out("work");
volatile int x = sharedState0;
Expand All @@ -37,18 +43,17 @@ void work() {
sharedState0 = x;
} else {
y = x + 1;
if (emscripten_current_thread_is_wasm_worker())
if (emscripten_current_thread_is_wasm_worker()) {
emscripten_wasm_worker_sleep(/*nsecs=*/(rand()%100000));
}
sharedState1 = y;

if (y > 100 && numTimesMainThreadAcquiredLock && numTimesWasmWorkerAcquiredLock) {
if (!testFinished) {
testFinished = true;
emscripten_out("test finished");
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit);
}
testFinished = true;
}
}
}
Expand All @@ -75,8 +80,9 @@ void schedule_work(void *userData) {
// emscripten_out("sync lock acquired");
work();
emscripten_lock_release(&lock);
if (!testFinished)
if (!testFinished) {
emscripten_set_timeout(schedule_work, 0, 0);
}
} else {
emscripten_lock_async_acquire(&lock, lock_async_acquired, (void*)42, EMSCRIPTEN_WAIT_ASYNC_INFINITY);
}
Expand All @@ -94,4 +100,5 @@ int main() {
}

schedule_work(0);
emscripten_exit_with_live_runtime();
}
9 changes: 3 additions & 6 deletions test/wasm_worker/terminate_wasm_worker.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <emscripten/emscripten.h>
#include <emscripten/console.h>
#include <emscripten/em_asm.h>
#include <emscripten/em_js.h>
Expand All @@ -13,17 +14,13 @@ static volatile int worker_started = 0;
void this_function_should_not_be_called(void *userData) {
worker_started = -1;
emscripten_err("this_function_should_not_be_called");
#ifdef REPORT_RESULT
REPORT_RESULT(1/*fail*/);
#endif
emscripten_force_exit(1);
}

void test_passed(void *userData) {
if (worker_started == 1) {
emscripten_err("test_passed");
#ifdef REPORT_RESULT
REPORT_RESULT(0/*ok*/);
#endif
emscripten_force_exit(0);
}
}

Expand Down
Loading