Skip to content

Commit

Permalink
Fix wasm workers under node
Browse files Browse the repository at this point in the history
  • Loading branch information
sbc100 committed Oct 15, 2024
1 parent 9c9b764 commit d20e9e0
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 94 deletions.
19 changes: 9 additions & 10 deletions src/library_wasm_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,13 @@ addToLibrary({
},

// Executes a wasm function call received via a postMessage.
$_wasmWorkerRunPostMessage__deps: ['$callUserCallback'],
$_wasmWorkerRunPostMessage: (e) => {
// '_wsc' is short for 'wasm call', trying to use an identifier name that
// will never conflict with user code
#if ENVIRONMENT_MAY_BE_NODE
// In Node.js environment, message event 'e' containing the actual data sent,
// while in the browser environment it's contained by 'e.data'.
let data = ENVIRONMENT_IS_NODE ? e : e.data;
#else
let data = e.data;
#endif
let wasmCall = data['_wsc'];
wasmCall && getWasmTableEntry(wasmCall)(...data['x']);
wasmCall && callUserCallback(() => getWasmTableEntry(wasmCall)(...data['x']));
},

// src/postamble_minimal.js brings this symbol in to the build, and calls this
Expand Down Expand Up @@ -195,6 +190,12 @@ if (ENVIRONMENT_IS_WASM_WORKER
#endif
});
worker.onmessage = _wasmWorkerRunPostMessage;
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) {
/** @suppress {checkTypes} */
worker.on('message', (msg) => worker.onmessage({ data: msg }));
}
#endif
return _wasmWorkersID++;
},

Expand All @@ -212,9 +213,7 @@ if (ENVIRONMENT_IS_WASM_WORKER
#if ASSERTIONS
assert(!ENVIRONMENT_IS_WASM_WORKER, 'emscripten_terminate_all_wasm_workers() cannot be called from a Wasm Worker: only the main browser thread has visibility to terminate all Workers!');
#endif
Object.values(_wasmWorkers).forEach((worker) => {
worker.terminate();
});
Object.values(_wasmWorkers).forEach((worker) => worker.terminate());
_wasmWorkers = {};
},

Expand Down
2 changes: 1 addition & 1 deletion src/runtime_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ if (ENVIRONMENT_IS_PTHREAD) {
// Create as web-worker-like an environment as we can.

var parentPort = worker_threads['parentPort'];
parentPort.on('message', (data) => onmessage({ data: data }));
parentPort.on('message', (msg) => onmessage({ data: msg }));

Object.assign(globalThis, {
self: global,
Expand Down
18 changes: 15 additions & 3 deletions src/wasm_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@ if (ENVIRONMENT_IS_NODE) {

var parentPort = nodeWorkerThreads.parentPort;

parentPort.on('message', (data) => typeof onmessage === "function" && onmessage({ data: data }));
parentPort.on('message', (msg) => global.onmessage?.({ data: msg }));

// Weak map of handle functions to thier wrapper. Used to implement
// addEventListener/removeEventListener.
var wrappedHandlers = new WeakMap();
function wrapMsgHandler(h) {
var f = wrappedHandlers.get(h)
if (!f) {
f = (msg) => h({data: msg});
wrappedHandlers.set(h, f);
}
return f;
}

var fs = require('fs');
var vm = require('vm');
Expand All @@ -28,8 +40,8 @@ if (ENVIRONMENT_IS_NODE) {
importScripts: (f) => vm.runInThisContext(fs.readFileSync(f, 'utf8'), {filename: f}),
postMessage: (msg) => parentPort.postMessage(msg),
performance: global.performance || { now: Date.now },
addEventListener: (name, handler) => parentPort.on(name, handler),
removeEventListener: (name, handler) => parentPort.off(name, handler),
addEventListener: (name, handler) => parentPort.on(name, wrapMsgHandler(handler)),
removeEventListener: (name, handler) => parentPort.off(name, wrapMsgHandler(handler)),
});
}
#endif // ENVIRONMENT_MAY_BE_NODE
Expand Down
15 changes: 6 additions & 9 deletions test/test_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4985,21 +4985,21 @@ def test_system(self):
# Tests the hello_wasm_worker.c documentation example code.
@also_with_minimal_runtime
def test_wasm_worker_hello(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS'])
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS'])

def test_wasm_worker_hello_minimal_runtime_2(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2'])
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2'])

# Tests Wasm Workers build in Wasm2JS mode.
@requires_wasm2js
@also_with_minimal_runtime
def test_wasm_worker_hello_wasm2js(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sWASM=0'])
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sWASM=0'])

# Tests the WASM_WORKERS=2 build mode, which embeds the Wasm Worker bootstrap JS script file to the main JS file.
@also_with_minimal_runtime
def test_wasm_worker_embedded(self):
self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS=2'])
def test_wasm_worker_hello_embedded(self):
self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS=2'])

# Tests that it is possible to call emscripten_futex_wait() in Wasm Workers.
@parameterized({
Expand Down Expand Up @@ -5059,10 +5059,7 @@ def test_wasm_worker_sleep(self):
# Tests emscripten_terminate_wasm_worker()
@also_with_minimal_runtime
def test_wasm_worker_terminate(self):
self.set_setting('WASM_WORKERS')
# Test uses the dynCall library function in its EM_ASM code
self.set_setting('DEFAULT_LIBRARY_FUNCS_TO_INCLUDE', ['$dynCall'])
self.btest('wasm_worker/terminate_wasm_worker.c', expected='0')
self.btest_exit('wasm_worker/terminate_wasm_worker.c', args=['-sWASM_WORKERS'])

# Tests emscripten_terminate_all_wasm_workers()
@also_with_minimal_runtime
Expand Down
6 changes: 6 additions & 0 deletions test/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -13910,6 +13910,12 @@ def test_debug_opt_warning(self, should_fail, args):
else:
self.run_process([EMCC, test_file('hello_world.c'), '-Werror'] + args)

def test_wasm_worker_hello(self):
self.do_runf(test_file('wasm_worker/hello_wasm_worker.c'), emcc_args=['-sWASM_WORKERS'])

def test_wasm_worker_terminate(self):
self.do_runf(test_file('wasm_worker/terminate_wasm_worker.c'), emcc_args=['-sWASM_WORKERS'])

@also_with_minimal_runtime
def test_wasm_worker_closure(self):
self.run_process([EMCC, test_file('wasm_worker/lock_async_acquire.c'), '-O2', '-sWASM_WORKERS', '--closure=1'])
Expand Down
18 changes: 10 additions & 8 deletions test/wasm_worker/hello_wasm_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
#include <assert.h>

// This is the code example in site/source/docs/api_reference/wasm_workers.rst
void do_exit() {
emscripten_out("do_exit");
emscripten_terminate_all_wasm_workers();
emscripten_force_exit(0);
}

void run_in_worker()
{
emscripten_console_log("Hello from wasm worker!\n");
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
void run_in_worker() {
emscripten_out("Hello from wasm worker!");
emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit);
}

int main()
{
int main() {
emscripten_wasm_worker_t worker = emscripten_malloc_wasm_worker(/*stack size: */1024);
assert(worker);
emscripten_wasm_worker_post_function_v(worker, run_in_worker);
emscripten_exit_with_live_runtime();
}
73 changes: 35 additions & 38 deletions test/wasm_worker/lock_async_acquire.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,75 +17,73 @@ bool testFinished = false;
int numTimesMainThreadAcquiredLock = 0;
int numTimesWasmWorkerAcquiredLock = 0;

void work()
{
// emscripten_console_log("work");
void do_exit() {
emscripten_out("do_exit");
emscripten_terminate_all_wasm_workers();
emscripten_force_exit(0);
}

void work() {
// emscripten_console_log("work");
volatile int x = sharedState0;
volatile int y = sharedState1;
assert(x == y+1 || y == x+1);

if (emscripten_current_thread_is_wasm_worker())
if (emscripten_current_thread_is_wasm_worker()) {
++numTimesWasmWorkerAcquiredLock;
else
} else {
++numTimesMainThreadAcquiredLock;
}

if (x < y)
{
if (x < y) {
x = y + 1;
if (emscripten_current_thread_is_wasm_worker())
if (emscripten_current_thread_is_wasm_worker()) {
emscripten_wasm_worker_sleep(/*nsecs=*/(rand()%100000));
}
sharedState0 = x;
}
else
{
} else {
y = x + 1;
if (emscripten_current_thread_is_wasm_worker())
if (emscripten_current_thread_is_wasm_worker()) {
emscripten_wasm_worker_sleep(/*nsecs=*/(rand()%100000));
}
sharedState1 = y;

if (y > 100 && numTimesMainThreadAcquiredLock && numTimesWasmWorkerAcquiredLock)
{
if (!testFinished)
{
emscripten_console_log("test finished");
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
if (y > 100 && numTimesMainThreadAcquiredLock && numTimesWasmWorkerAcquiredLock) {
if (!testFinished) {
testFinished = true;
emscripten_out("test finished");
emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit);
}
testFinished = true;
}
}
}

void schedule_work(void *userData);

void lock_async_acquired(volatile void *addr, uint32_t val, ATOMICS_WAIT_RESULT_T waitResult, void *userData)
{
// emscripten_console_log("async lock acquired");
void lock_async_acquired(volatile void *addr, uint32_t val, ATOMICS_WAIT_RESULT_T waitResult, void *userData) {
// emscripten_out("async lock acquired");
assert(addr == &lock);
assert(val == 0 || val == 1);
assert(waitResult == ATOMICS_WAIT_OK);
assert(userData == (void*)42);
work();
emscripten_lock_release(&lock);

if (!testFinished)
if (!testFinished) {
emscripten_set_timeout(schedule_work, 10, 0);
}
}

void schedule_work(void *userData)
{
if (emscripten_current_thread_is_wasm_worker() && emscripten_random() > 0.5)
{
void schedule_work(void *userData) {
if (emscripten_current_thread_is_wasm_worker() && emscripten_random() > 0.5) {
emscripten_lock_waitinf_acquire(&lock);
// emscripten_console_log("sync lock acquired");
// emscripten_out("sync lock acquired");
work();
emscripten_lock_release(&lock);
if (!testFinished)
if (!testFinished) {
emscripten_set_timeout(schedule_work, 0, 0);
}
else
{
}
} else {
emscripten_lock_async_acquire(&lock, lock_async_acquired, (void*)42, EMSCRIPTEN_WAIT_ASYNC_INFINITY);
}
}
Expand All @@ -94,14 +92,13 @@ void start_worker(int arg) {
schedule_work(0);
}

int main()
{
int main() {
#define NUM_THREADS 10
for(int i = 0; i < NUM_THREADS; ++i)
{
for (int i = 0; i < NUM_THREADS; ++i) {
emscripten_wasm_worker_t worker = emscripten_malloc_wasm_worker(1024);
emscripten_wasm_worker_post_function_vi(worker, start_worker, 0);
}

schedule_work(0);
emscripten_exit_with_live_runtime();
}
40 changes: 15 additions & 25 deletions test/wasm_worker/terminate_wasm_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,32 @@

static volatile int worker_started = 0;

void this_function_should_not_be_called(void *userData)
{
void this_function_should_not_be_called(void *userData) {
worker_started = -1;
emscripten_console_error("this_function_should_not_be_called");
#ifdef REPORT_RESULT
REPORT_RESULT(1/*fail*/);
#endif
emscripten_err("this_function_should_not_be_called");
emscripten_force_exit(1);
}

void test_passed(void *userData)
{
if (worker_started == 1)
{
emscripten_console_error("test_passed");
#ifdef REPORT_RESULT
REPORT_RESULT(0/*ok*/);
#endif
void test_passed(void *userData) {
if (worker_started == 1) {
emscripten_err("test_passed");
emscripten_force_exit(0);
}
}

void worker_main()
{
void worker_main() {
worker_started = 1;
emscripten_console_error("Hello from wasm worker!");
emscripten_err("Hello from wasm worker!");
// Schedule a function to be called, that should never happen, since the Worker
// dies before that.
emscripten_set_timeout(this_function_should_not_be_called, 2000, 0);
}

char stack[1024];

int should_throw(void(*func)())
{
EM_JS_DEPS(deps, "$dynCall");

int should_throw(void(*func)()) {
int threw = EM_ASM_INT({
try {
dynCall('v', Number($0));
Expand All @@ -57,20 +50,17 @@ int should_throw(void(*func)())

emscripten_wasm_worker_t worker = 0;

void post_bad_function()
{
void post_bad_function() {
// Try to post a function to the worker, this should throw
emscripten_wasm_worker_post_function_vi(worker, (void(*)(int))this_function_should_not_be_called, 0);
}

void terminate_worker(void *userData)
{
void terminate_worker(void *userData) {
emscripten_terminate_wasm_worker(worker);
assert(should_throw(post_bad_function));
}

int main()
{
int main() {
worker = emscripten_create_wasm_worker(stack, sizeof(stack));
emscripten_wasm_worker_post_function_v(worker, worker_main);

Expand Down

0 comments on commit d20e9e0

Please sign in to comment.