Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard notifying outdated scheduler #1559

Merged
merged 25 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This release upgrades the Sync metadata in a way that is not compatible with old
### Fixed
* `Realm.close()` is now idempotent.
* Fix error in `RealmAny.equals` that would sometimes return `true` when comparing RealmAnys wrapping same type but different values. (Issue [#1523](https://github.com/realm/realm-kotlin/pull/1523))
* Fix craches caused by posting to a released scheduler. (Issue [#1543](https://github.com/realm/realm-kotlin/issues/1543))
* [Sync] If calling a function on App Services that resulted in a redirect, it would only redirect for GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517))
* [Sync] Manual client reset on Windows would not trigger correctly when run inside `onManualResetFallback`. (Issue [#1515](https://github.com/realm/realm-kotlin/pull/1515))
* [Sync] `ClientResetRequiredException.executeClientReset()` now returns a boolean indicating if the manual reset fully succeded or not. (Issue [#1515](https://github.com/realm/realm-kotlin/pull/1515))
Expand All @@ -34,7 +35,7 @@ GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517))
* Minimum Android SDK: 16.

### Internal
* Updated to Realm Core 13.23.2, commit e6271d72308b40399890060f58a88cf568c2ee22.
* Updated to Realm Core 13.23.2, commit ba96288b5c88096998591f94a64b0e3a00d4c92f.


## 1.11.1 (2023-09-07)
Expand Down
2 changes: 1 addition & 1 deletion dependencies.list
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Version of MongoDB Realm used by integration tests
# See https://github.com/realm/ci/packages/147854 for available versions
MONGODB_REALM_SERVER=2023-10-10
MONGODB_REALM_SERVER=2023-11-02

# `BAAS` and `BAAS-UI` projects commit hashes matching MONGODB_REALM_SERVER image version
# note that the MONGODB_REALM_SERVER image is a nightly build, find the matching commits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.mongodb.kbson.ObjectId
import java.util.concurrent.atomic.AtomicBoolean

// FIXME API-CLEANUP Rename io.realm.interop. to something with platform?
// https://github.com/realm/realm-kotlin/issues/56
Expand Down Expand Up @@ -2095,10 +2096,17 @@ fun ObjectId.asRealmObjectIdT(): realm_object_id_t {

private class JVMScheduler(dispatcher: CoroutineDispatcher) {
val scope: CoroutineScope = CoroutineScope(dispatcher)
val cancelled = AtomicBoolean(false)

fun notifyCore(schedulerPointer: Long) {
scope.launch {
realmc.invoke_core_notify_callback(schedulerPointer)
if (!cancelled.get()) {
realmc.invoke_core_notify_callback(schedulerPointer)
}
}
}

fun cancel() {
cancelled.set(true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ import realm_wrapper.realm_user_t
import realm_wrapper.realm_value_t
import realm_wrapper.realm_value_type
import realm_wrapper.realm_version_id_t
import realm_wrapper.realm_work_queue_t
import kotlin.collections.set
import kotlin.native.internal.createCleaner

Expand Down Expand Up @@ -545,17 +546,19 @@ actual object RealmInterop {
// free: realm_wrapper.realm_free_userdata_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
staticCFunction<COpaquePointer?, Unit> { userdata ->
printlntid("free")
userdata?.asStableRef<SingleThreadDispatcherScheduler>()?.dispose()
val stableSchedulerRef: StableRef<SingleThreadDispatcherScheduler>? = userdata?.asStableRef<SingleThreadDispatcherScheduler>()
stableSchedulerRef?.get()?.cancel()
stableSchedulerRef?.dispose()
},

// notify: realm_wrapper.realm_scheduler_notify_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
staticCFunction<COpaquePointer?, Unit> { userdata ->
staticCFunction<COpaquePointer?, CPointer<realm_work_queue_t>?, Unit> { userdata, work_queue ->
// Must be thread safe
val scheduler =
userdata!!.asStableRef<SingleThreadDispatcherScheduler>().get()
printlntid("$scheduler notify")
try {
scheduler.notify()
scheduler.notify(work_queue)
} catch (e: Exception) {
// Should never happen, but is included for development to get some indicators
// on errors instead of silent crashes.
Expand Down Expand Up @@ -3382,7 +3385,7 @@ actual object RealmInterop {
)

interface Scheduler {
fun notify()
fun notify(work_queue: CPointer<realm_work_queue_t>?)
}

class SingleThreadDispatcherScheduler(
Expand All @@ -3392,6 +3395,7 @@ actual object RealmInterop {
val scope: CoroutineScope = CoroutineScope(dispatcher)
val ref: CPointer<out CPointed>
lateinit var scheduler: CPointer<realm_scheduler_t>
private val cancelled: AtomicBoolean = atomic(false)

init {
ref = StableRef.create(this).asCPointer()
Expand All @@ -3401,18 +3405,24 @@ actual object RealmInterop {
this.scheduler = scheduler
}

override fun notify() {
override fun notify(work_queue: CPointer<realm_work_queue_t>?) {
scope.launch {
try {
printlntid("on dispatcher")
realm_wrapper.realm_scheduler_perform_work(scheduler)
if (!cancelled.value) {
Copy link
Contributor

@clementetb clementetb Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't core wait until the jobs have been executed to release the scheduler?

Also, this guard does not cover 100% of the cases, the pointer might have been released just after accessing cancelled.

Because closing and job execution happens on the same thread there is no way that the Realm gets closed while executing the Job.

realm_wrapper.realm_scheduler_perform_work(work_queue)
}
} catch (e: Exception) {
// Should never happen, but is included for development to get some indicators
// on errors instead of silent crashes.
e.printStackTrace()
}
}
}

fun cancel() {
cancelled.value = true
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/external/core
Submodule core updated 49 files
+9 −2 CHANGELOG.md
+34 −1 CMakeLists.txt
+54 −0 Jenkinsfile
+2 −0 alpine.Dockerfile
+1 −0 evergreen/config.yml
+3 −2 src/realm.h
+4 −0 src/realm/CMakeLists.txt
+33 −1 src/realm/array.cpp
+2 −0 src/realm/array.hpp
+1 −1 src/realm/array_integer.cpp
+7 −9 src/realm/array_integer.hpp
+18 −23 src/realm/array_integer_tpl.hpp
+7 −7 src/realm/array_with_find.cpp
+121 −211 src/realm/array_with_find.hpp
+1 −0 src/realm/cluster.cpp
+21 −1 src/realm/collection.hpp
+1 −3 src/realm/list.hpp
+6 −6 src/realm/object-store/c_api/scheduler.cpp
+10 −30 src/realm/object-store/set.cpp
+16 −10 src/realm/object-store/sync/sync_session.cpp
+3 −4 src/realm/object-store/sync/sync_session.hpp
+35 −1 src/realm/query_conditions_tpl.hpp
+3 −11 src/realm/query_engine.cpp
+1 −22 src/realm/query_engine.hpp
+17 −1 src/realm/query_state.hpp
+290 −93 src/realm/set.cpp
+100 −384 src/realm/set.hpp
+19 −5 src/realm/sync/client.cpp
+4 −6 src/realm/sync/noinst/client_impl_base.cpp
+2 −0 src/realm/sync/noinst/client_impl_base.hpp
+3 −2 src/realm/sync/noinst/client_reset.cpp
+2 −9 src/realm/sync/subscriptions.cpp
+1 −2 src/realm/sync/subscriptions.hpp
+2 −3 src/realm/table.cpp
+12 −2 src/realm/util/backtrace.cpp
+7 −2 src/realm/util/config.h.in
+4 −4 src/realm/util/file.cpp
+16 −6 src/realm/util/thread.cpp
+106 −0 test/benchmark-common-tasks/main.cpp
+5 −5 test/large_tests/test_column_large.cpp
+1 −1 test/object-store/audit.cpp
+76 −0 test/object-store/c_api/c_api.cpp
+4 −0 test/object-store/set.cpp
+75 −1 test/object-store/sync/flx_sync.cpp
+1 −1 test/object-store/sync/session/wait_for_completion.cpp
+4 −1 test/test_set.cpp
+3 −3 test/test_sync_subscriptions.cpp
+5 −0 test/test_util_error.cpp
+3 −0 test/test_util_file.cpp
30 changes: 18 additions & 12 deletions packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,25 +264,22 @@ class CustomJVMScheduler {
JNIEnv *jenv = get_env();
jclass jvm_scheduler_class = jenv->FindClass("io/realm/kotlin/internal/interop/JVMScheduler");
m_notify_method = jenv->GetMethodID(jvm_scheduler_class, "notifyCore", "(J)V");
m_cancel_method = jenv->GetMethodID(jvm_scheduler_class, "cancel", "()V");
m_jvm_dispatch_scheduler = jenv->NewGlobalRef(dispatchScheduler);
}

~CustomJVMScheduler() {
get_env(true)->DeleteGlobalRef(m_jvm_dispatch_scheduler);
}

void set_scheduler(realm_scheduler_t* scheduler) {
m_scheduler = scheduler;
}

void notify() {
void notify(realm_work_queue_t* work_queue) {
// There is currently no signaling of creation/tear down of the core notifier thread, so we
// just attach it as a daemon thread here on first notification to allow the JVM to
// shutdown propertly. See https://github.com/realm/realm-core/issues/6429
auto jenv = get_env(true, true, "core-notifier");
jni_check_exception(jenv);
jenv->CallVoidMethod(m_jvm_dispatch_scheduler, m_notify_method,
reinterpret_cast<jlong>(m_scheduler));
reinterpret_cast<jlong>(work_queue));
}

bool is_on_thread() const noexcept {
Expand All @@ -293,12 +290,18 @@ class CustomJVMScheduler {
return true;
}

void cancel() {
auto jenv = get_env(true, true, "core-notifier");
jenv->CallVoidMethod(m_jvm_dispatch_scheduler, m_cancel_method);
jni_check_exception(jenv);
}


private:
std::thread::id m_id;
jmethodID m_notify_method;
jmethodID m_cancel_method;
jobject m_jvm_dispatch_scheduler;
realm_scheduler_t *m_scheduler;
};

// Note: using jlong here will create a linker issue
Expand All @@ -309,8 +312,8 @@ class CustomJVMScheduler {
//
// I suspect this could be related to the fact that jni.h defines jlong differently between Android (typedef int64_t)
// and JVM which is a (typedef long long) resulting in a different signature of the method that could be found by the linker.
void invoke_core_notify_callback(int64_t scheduler) {
realm_scheduler_perform_work(reinterpret_cast<realm_scheduler_t *>(scheduler));
void invoke_core_notify_callback(int64_t work_queue) {
realm_scheduler_perform_work(reinterpret_cast<realm_work_queue_t *>(work_queue));
}

realm_scheduler_t*
Expand All @@ -319,13 +322,16 @@ realm_create_scheduler(jobject dispatchScheduler) {
auto jvmScheduler = new CustomJVMScheduler(dispatchScheduler);
auto scheduler = realm_scheduler_new(
jvmScheduler,
[](void *userdata) { delete(static_cast<CustomJVMScheduler *>(userdata)); },
[](void *userdata) { static_cast<CustomJVMScheduler *>(userdata)->notify(); },
[](void *userdata) {
auto jvmScheduler = static_cast<CustomJVMScheduler *>(userdata);
jvmScheduler->cancel();
delete(jvmScheduler);
},
[](void *userdata, realm_work_queue_t* work_queue) { static_cast<CustomJVMScheduler *>(userdata)->notify(work_queue); },
[](void *userdata) { return static_cast<CustomJVMScheduler *>(userdata)->is_on_thread(); },
[](const void *userdata, const void *userdata_other) { return userdata == userdata_other; },
[](void *userdata) { return static_cast<CustomJVMScheduler *>(userdata)->can_invoke(); }
);
jvmScheduler->set_scheduler(scheduler);
return scheduler;
}
throw std::runtime_error("Null dispatchScheduler");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ class SystemNotificationTests {
writer1.write { copyToRealm(Sample()) }
writer2.write { copyToRealm(Sample()) }
}
liveRealmContext.close()
}
}