diff --git a/src/library_wasm_worker.js b/src/library_wasm_worker.js index ec50bdb1f8df..1eaa0a1d7381 100644 --- a/src/library_wasm_worker.js +++ b/src/library_wasm_worker.js @@ -55,18 +55,13 @@ addToLibrary({ }, // Executes a wasm function call received via a postMessage. + $_wasmWorkerRunPostMessage__deps: ['$callUserCallback'], $_wasmWorkerRunPostMessage: (e) => { // '_wsc' is short for 'wasm call', trying to use an identifier name that // will never conflict with user code -#if ENVIRONMENT_MAY_BE_NODE - // In Node.js environment, message event 'e' containing the actual data sent, - // while in the browser environment it's contained by 'e.data'. - let data = ENVIRONMENT_IS_NODE ? e : e.data; -#else let data = e.data; -#endif let wasmCall = data['_wsc']; - wasmCall && getWasmTableEntry(wasmCall)(...data['x']); + wasmCall && callUserCallback(() => getWasmTableEntry(wasmCall)(...data['x'])); }, // src/postamble_minimal.js brings this symbol in to the build, and calls this @@ -195,6 +190,12 @@ if (ENVIRONMENT_IS_WASM_WORKER #endif }); worker.onmessage = _wasmWorkerRunPostMessage; +#if ENVIRONMENT_MAY_BE_NODE + if (ENVIRONMENT_IS_NODE) { + /** @suppress {checkTypes} */ + worker.on('message', (msg) => worker.onmessage({ data: msg })); + } +#endif return _wasmWorkersID++; }, @@ -212,9 +213,7 @@ if (ENVIRONMENT_IS_WASM_WORKER #if ASSERTIONS assert(!ENVIRONMENT_IS_WASM_WORKER, 'emscripten_terminate_all_wasm_workers() cannot be called from a Wasm Worker: only the main browser thread has visibility to terminate all Workers!'); #endif - Object.values(_wasmWorkers).forEach((worker) => { - worker.terminate(); - }); + Object.values(_wasmWorkers).forEach((worker) => worker.terminate()); _wasmWorkers = {}; }, diff --git a/src/runtime_pthread.js b/src/runtime_pthread.js index 6444102a16ab..8c194c12156d 100644 --- a/src/runtime_pthread.js +++ b/src/runtime_pthread.js @@ -27,7 +27,7 @@ if (ENVIRONMENT_IS_PTHREAD) { // Create as web-worker-like an environment as we can. var parentPort = worker_threads['parentPort']; - parentPort.on('message', (data) => onmessage({ data: data })); + parentPort.on('message', (msg) => onmessage({ data: msg })); Object.assign(globalThis, { self: global, diff --git a/src/wasm_worker.js b/src/wasm_worker.js index 914d8151ebbf..fd3b7bc98281 100644 --- a/src/wasm_worker.js +++ b/src/wasm_worker.js @@ -14,7 +14,19 @@ if (ENVIRONMENT_IS_NODE) { var parentPort = nodeWorkerThreads.parentPort; - parentPort.on('message', (data) => typeof onmessage === "function" && onmessage({ data: data })); + parentPort.on('message', (msg) => global.onmessage?.({ data: msg })); + + // Weak map of handle functions to thier wrapper. Used to implement + // addEventListener/removeEventListener. + var wrappedHandlers = new WeakMap(); + function wrapMsgHandler(h) { + var f = wrappedHandlers.get(h) + if (!f) { + f = (msg) => h({data: msg}); + wrappedHandlers.set(h, f); + } + return f; + } var fs = require('fs'); var vm = require('vm'); @@ -28,8 +40,8 @@ if (ENVIRONMENT_IS_NODE) { importScripts: (f) => vm.runInThisContext(fs.readFileSync(f, 'utf8'), {filename: f}), postMessage: (msg) => parentPort.postMessage(msg), performance: global.performance || { now: Date.now }, - addEventListener: (name, handler) => parentPort.on(name, handler), - removeEventListener: (name, handler) => parentPort.off(name, handler), + addEventListener: (name, handler) => parentPort.on(name, wrapMsgHandler(handler)), + removeEventListener: (name, handler) => parentPort.off(name, wrapMsgHandler(handler)), }); } #endif // ENVIRONMENT_MAY_BE_NODE diff --git a/test/test_browser.py b/test/test_browser.py index b89def707d33..149fd905c8b6 100644 --- a/test/test_browser.py +++ b/test/test_browser.py @@ -4985,21 +4985,21 @@ def test_system(self): # Tests the hello_wasm_worker.c documentation example code. @also_with_minimal_runtime def test_wasm_worker_hello(self): - self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS']) + self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS']) def test_wasm_worker_hello_minimal_runtime_2(self): - self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2']) + self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sMINIMAL_RUNTIME=2']) # Tests Wasm Workers build in Wasm2JS mode. @requires_wasm2js @also_with_minimal_runtime def test_wasm_worker_hello_wasm2js(self): - self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS', '-sWASM=0']) + self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS', '-sWASM=0']) # Tests the WASM_WORKERS=2 build mode, which embeds the Wasm Worker bootstrap JS script file to the main JS file. @also_with_minimal_runtime - def test_wasm_worker_embedded(self): - self.btest('wasm_worker/hello_wasm_worker.c', expected='0', args=['-sWASM_WORKERS=2']) + def test_wasm_worker_hello_embedded(self): + self.btest_exit('wasm_worker/hello_wasm_worker.c', args=['-sWASM_WORKERS=2']) # Tests that it is possible to call emscripten_futex_wait() in Wasm Workers. @parameterized({ @@ -5059,10 +5059,7 @@ def test_wasm_worker_sleep(self): # Tests emscripten_terminate_wasm_worker() @also_with_minimal_runtime def test_wasm_worker_terminate(self): - self.set_setting('WASM_WORKERS') - # Test uses the dynCall library function in its EM_ASM code - self.set_setting('DEFAULT_LIBRARY_FUNCS_TO_INCLUDE', ['$dynCall']) - self.btest('wasm_worker/terminate_wasm_worker.c', expected='0') + self.btest_exit('wasm_worker/terminate_wasm_worker.c', args=['-sWASM_WORKERS']) # Tests emscripten_terminate_all_wasm_workers() @also_with_minimal_runtime diff --git a/test/test_other.py b/test/test_other.py index f8ee4f1281f2..6655c53d48af 100644 --- a/test/test_other.py +++ b/test/test_other.py @@ -13910,6 +13910,12 @@ def test_debug_opt_warning(self, should_fail, args): else: self.run_process([EMCC, test_file('hello_world.c'), '-Werror'] + args) + def test_wasm_worker_hello(self): + self.do_runf(test_file('wasm_worker/hello_wasm_worker.c'), emcc_args=['-sWASM_WORKERS']) + + def test_wasm_worker_terminate(self): + self.do_runf(test_file('wasm_worker/terminate_wasm_worker.c'), emcc_args=['-sWASM_WORKERS']) + @also_with_minimal_runtime def test_wasm_worker_closure(self): self.run_process([EMCC, test_file('wasm_worker/lock_async_acquire.c'), '-O2', '-sWASM_WORKERS', '--closure=1']) diff --git a/test/wasm_worker/hello_wasm_worker.c b/test/wasm_worker/hello_wasm_worker.c index cae9ac83cabe..50cd0693ad9d 100644 --- a/test/wasm_worker/hello_wasm_worker.c +++ b/test/wasm_worker/hello_wasm_worker.c @@ -4,18 +4,20 @@ #include // This is the code example in site/source/docs/api_reference/wasm_workers.rst +void do_exit() { + emscripten_out("do_exit"); + emscripten_terminate_all_wasm_workers(); + emscripten_force_exit(0); +} -void run_in_worker() -{ - emscripten_console_log("Hello from wasm worker!\n"); -#ifdef REPORT_RESULT - REPORT_RESULT(0); -#endif +void run_in_worker() { + emscripten_out("Hello from wasm worker!"); + emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit); } -int main() -{ +int main() { emscripten_wasm_worker_t worker = emscripten_malloc_wasm_worker(/*stack size: */1024); assert(worker); emscripten_wasm_worker_post_function_v(worker, run_in_worker); + emscripten_exit_with_live_runtime(); } diff --git a/test/wasm_worker/lock_async_acquire.c b/test/wasm_worker/lock_async_acquire.c index f27f52b26cff..0fa7fb10a7fd 100644 --- a/test/wasm_worker/lock_async_acquire.c +++ b/test/wasm_worker/lock_async_acquire.c @@ -17,51 +17,51 @@ bool testFinished = false; int numTimesMainThreadAcquiredLock = 0; int numTimesWasmWorkerAcquiredLock = 0; -void work() -{ -// emscripten_console_log("work"); +void do_exit() { + emscripten_out("do_exit"); + emscripten_terminate_all_wasm_workers(); + emscripten_force_exit(0); +} + +void work() { + // emscripten_console_log("work"); volatile int x = sharedState0; volatile int y = sharedState1; assert(x == y+1 || y == x+1); - if (emscripten_current_thread_is_wasm_worker()) + if (emscripten_current_thread_is_wasm_worker()) { ++numTimesWasmWorkerAcquiredLock; - else + } else { ++numTimesMainThreadAcquiredLock; + } - if (x < y) - { + if (x < y) { x = y + 1; - if (emscripten_current_thread_is_wasm_worker()) + if (emscripten_current_thread_is_wasm_worker()) { emscripten_wasm_worker_sleep(/*nsecs=*/(rand()%100000)); + } sharedState0 = x; - } - else - { + } else { y = x + 1; - if (emscripten_current_thread_is_wasm_worker()) + if (emscripten_current_thread_is_wasm_worker()) { emscripten_wasm_worker_sleep(/*nsecs=*/(rand()%100000)); + } sharedState1 = y; - if (y > 100 && numTimesMainThreadAcquiredLock && numTimesWasmWorkerAcquiredLock) - { - if (!testFinished) - { - emscripten_console_log("test finished"); -#ifdef REPORT_RESULT - REPORT_RESULT(0); -#endif + if (y > 100 && numTimesMainThreadAcquiredLock && numTimesWasmWorkerAcquiredLock) { + if (!testFinished) { + testFinished = true; + emscripten_out("test finished"); + emscripten_wasm_worker_post_function_v(EMSCRIPTEN_WASM_WORKER_ID_PARENT, do_exit); } - testFinished = true; } } } void schedule_work(void *userData); -void lock_async_acquired(volatile void *addr, uint32_t val, ATOMICS_WAIT_RESULT_T waitResult, void *userData) -{ -// emscripten_console_log("async lock acquired"); +void lock_async_acquired(volatile void *addr, uint32_t val, ATOMICS_WAIT_RESULT_T waitResult, void *userData) { + // emscripten_out("async lock acquired"); assert(addr == &lock); assert(val == 0 || val == 1); assert(waitResult == ATOMICS_WAIT_OK); @@ -69,23 +69,21 @@ void lock_async_acquired(volatile void *addr, uint32_t val, ATOMICS_WAIT_RESULT_ work(); emscripten_lock_release(&lock); - if (!testFinished) + if (!testFinished) { emscripten_set_timeout(schedule_work, 10, 0); + } } -void schedule_work(void *userData) -{ - if (emscripten_current_thread_is_wasm_worker() && emscripten_random() > 0.5) - { +void schedule_work(void *userData) { + if (emscripten_current_thread_is_wasm_worker() && emscripten_random() > 0.5) { emscripten_lock_waitinf_acquire(&lock); -// emscripten_console_log("sync lock acquired"); + // emscripten_out("sync lock acquired"); work(); emscripten_lock_release(&lock); - if (!testFinished) + if (!testFinished) { emscripten_set_timeout(schedule_work, 0, 0); - } - else - { + } + } else { emscripten_lock_async_acquire(&lock, lock_async_acquired, (void*)42, EMSCRIPTEN_WAIT_ASYNC_INFINITY); } } @@ -94,14 +92,13 @@ void start_worker(int arg) { schedule_work(0); } -int main() -{ +int main() { #define NUM_THREADS 10 - for(int i = 0; i < NUM_THREADS; ++i) - { + for (int i = 0; i < NUM_THREADS; ++i) { emscripten_wasm_worker_t worker = emscripten_malloc_wasm_worker(1024); emscripten_wasm_worker_post_function_vi(worker, start_worker, 0); } schedule_work(0); + emscripten_exit_with_live_runtime(); } diff --git a/test/wasm_worker/terminate_wasm_worker.c b/test/wasm_worker/terminate_wasm_worker.c index d3dd554817f0..90b303eb21e3 100644 --- a/test/wasm_worker/terminate_wasm_worker.c +++ b/test/wasm_worker/terminate_wasm_worker.c @@ -9,30 +9,22 @@ static volatile int worker_started = 0; -void this_function_should_not_be_called(void *userData) -{ +void this_function_should_not_be_called(void *userData) { worker_started = -1; - emscripten_console_error("this_function_should_not_be_called"); -#ifdef REPORT_RESULT - REPORT_RESULT(1/*fail*/); -#endif + emscripten_err("this_function_should_not_be_called"); + emscripten_force_exit(1); } -void test_passed(void *userData) -{ - if (worker_started == 1) - { - emscripten_console_error("test_passed"); -#ifdef REPORT_RESULT - REPORT_RESULT(0/*ok*/); -#endif +void test_passed(void *userData) { + if (worker_started == 1) { + emscripten_err("test_passed"); + emscripten_force_exit(0); } } -void worker_main() -{ +void worker_main() { worker_started = 1; - emscripten_console_error("Hello from wasm worker!"); + emscripten_err("Hello from wasm worker!"); // Schedule a function to be called, that should never happen, since the Worker // dies before that. emscripten_set_timeout(this_function_should_not_be_called, 2000, 0); @@ -40,8 +32,9 @@ void worker_main() char stack[1024]; -int should_throw(void(*func)()) -{ +EM_JS_DEPS(deps, "$dynCall"); + +int should_throw(void(*func)()) { int threw = EM_ASM_INT({ try { dynCall('v', Number($0)); @@ -57,20 +50,17 @@ int should_throw(void(*func)()) emscripten_wasm_worker_t worker = 0; -void post_bad_function() -{ +void post_bad_function() { // Try to post a function to the worker, this should throw emscripten_wasm_worker_post_function_vi(worker, (void(*)(int))this_function_should_not_be_called, 0); } -void terminate_worker(void *userData) -{ +void terminate_worker(void *userData) { emscripten_terminate_wasm_worker(worker); assert(should_throw(post_bad_function)); } -int main() -{ +int main() { worker = emscripten_create_wasm_worker(stack, sizeof(stack)); emscripten_wasm_worker_post_function_v(worker, worker_main);