Skip to content

Commit

Permalink
Guard notifying outdated scheduler (#1559)
Browse files Browse the repository at this point in the history
  • Loading branch information
rorbech authored Nov 9, 2023
1 parent 5f7b8fa commit 0dcdbfb
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 36 deletions.
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
## 1.12.1-SNAPSHOT (YYYY-MM-DD)

### Breaking Changes
* None.

### Enhancements
* None.

### Fixed
* Fix craches caused by posting to a released scheduler. (Issue [#1543](https://github.com/realm/realm-kotlin/issues/1543))

### Compatibility
* File format: Generates Realms with file format v23.
* Realm Studio 13.0.0 or above is required to open Realms created by this version.
* This release is compatible with the following Kotlin releases:
* Kotlin 1.8.0 and above. The K2 compiler is not supported yet.
* Ktor 2.1.2 and above.
* Coroutines 1.7.0 and above.
* AtomicFu 0.18.3 and above.
* The new memory model only. See https://github.com/realm/realm-kotlin#kotlin-memory-model-and-coroutine-compatibility
* Minimum Kbson 0.3.0.
* Minimum Gradle version: 6.8.3.
* Minimum Android Gradle Plugin version: 4.1.3.
* Minimum Android SDK: 16.

### Internal
* Updated to Realm Core 13.23.3, commit 7556b535aa7b27d49c13444894f7e9db778b3203.


## 1.12.0 (2023-11-02)

This release upgrades the Sync metadata in a way that is not compatible with older versions. To downgrade a Sync app from this version, you'll need to manually delete the metadata folder located at `$[SYNC-ROOT-DIRECTORY]/mongodb-realm/[APP-ID]/server-utility/metadata/`. This will log out all users.
Expand Down
6 changes: 3 additions & 3 deletions dependencies.list
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Version of MongoDB Realm used by integration tests
# See https://github.com/realm/ci/packages/147854 for available versions
MONGODB_REALM_SERVER=2023-10-10
MONGODB_REALM_SERVER=2023-11-07

# `BAAS` and `BAAS-UI` projects commit hashes matching MONGODB_REALM_SERVER image version
# note that the MONGODB_REALM_SERVER image is a nightly build, find the matching commits
# for that date within the following repositories:
# https://github.com/10gen/baas/
# https://github.com/10gen/baas-ui/
REALM_BAAS_GIT_HASH=8246fc548763eb908b8090df864e9924e3330a0d
REALM_BAAS_UI_GIT_HASH=8a1843be2bf24f2faa705c5470a5bdd8d954f7ea
REALM_BAAS_GIT_HASH=41fa6cdbca47826c20a64f756e21b2c184393e90
REALM_BAAS_UI_GIT_HASH=b97a27ac858e0e8126aeb63f6ff9734d11029a91
Original file line number Diff line number Diff line change
Expand Up @@ -2138,10 +2138,22 @@ fun ObjectId.asRealmObjectIdT(): realm_object_id_t {

private class JVMScheduler(dispatcher: CoroutineDispatcher) {
val scope: CoroutineScope = CoroutineScope(dispatcher)
val lock = SynchronizableObject()
var cancelled = false

fun notifyCore(schedulerPointer: Long) {
scope.launch {
realmc.invoke_core_notify_callback(schedulerPointer)
lock.withLock {
if (!cancelled) {
realmc.invoke_core_notify_callback(schedulerPointer)
}
}
}
}

fun cancel() {
lock.withLock {
cancelled = true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ import realm_wrapper.realm_user_t
import realm_wrapper.realm_value_t
import realm_wrapper.realm_value_type
import realm_wrapper.realm_version_id_t
import realm_wrapper.realm_work_queue_t
import kotlin.collections.set
import kotlin.native.internal.createCleaner

Expand Down Expand Up @@ -560,17 +561,19 @@ actual object RealmInterop {
// free: realm_wrapper.realm_free_userdata_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
staticCFunction<COpaquePointer?, Unit> { userdata ->
printlntid("free")
userdata?.asStableRef<SingleThreadDispatcherScheduler>()?.dispose()
val stableSchedulerRef: StableRef<SingleThreadDispatcherScheduler>? = userdata?.asStableRef<SingleThreadDispatcherScheduler>()
stableSchedulerRef?.get()?.cancel()
stableSchedulerRef?.dispose()
},

// notify: realm_wrapper.realm_scheduler_notify_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
staticCFunction<COpaquePointer?, Unit> { userdata ->
staticCFunction<COpaquePointer?, CPointer<realm_work_queue_t>?, Unit> { userdata, work_queue ->
// Must be thread safe
val scheduler =
userdata!!.asStableRef<SingleThreadDispatcherScheduler>().get()
printlntid("$scheduler notify")
try {
scheduler.notify()
scheduler.notify(work_queue)
} catch (e: Exception) {
// Should never happen, but is included for development to get some indicators
// on errors instead of silent crashes.
Expand Down Expand Up @@ -3392,7 +3395,7 @@ actual object RealmInterop {
}

interface Scheduler {
fun notify()
fun notify(work_queue: CPointer<realm_work_queue_t>?)
}

class SingleThreadDispatcherScheduler(
Expand All @@ -3402,23 +3405,35 @@ actual object RealmInterop {
private val scope: CoroutineScope = CoroutineScope(dispatcher)
val ref: CPointer<out CPointed> = StableRef.create(this).asCPointer()
private lateinit var scheduler: CPointer<realm_scheduler_t>
private val lock = SynchronizableObject()
private var cancelled = false

fun setScheduler(scheduler: CPointer<realm_scheduler_t>) {
this.scheduler = scheduler
}

override fun notify() {
override fun notify(work_queue: CPointer<realm_work_queue_t>?) {
scope.launch {
try {
printlntid("on dispatcher")
realm_wrapper.realm_scheduler_perform_work(scheduler)
lock.withLock {
if (!cancelled) {
realm_wrapper.realm_scheduler_perform_work(work_queue)
}
}
} catch (e: Exception) {
// Should never happen, but is included for development to get some indicators
// on errors instead of silent crashes.
e.printStackTrace()
}
}
}

fun cancel() {
lock.withLock {
cancelled = true
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/external/core
Submodule core updated 74 files
+31 −1 CHANGELOG.md
+34 −1 CMakeLists.txt
+54 −0 Jenkinsfile
+1 −1 Package.swift
+2 −0 alpine.Dockerfile
+2 −2 dependencies.list
+2 −1 evergreen/config.yml
+0 −16 evergreen/config_overrides.json
+29 −21 evergreen/install_baas.sh
+3 −2 src/realm.h
+4 −0 src/realm/CMakeLists.txt
+33 −1 src/realm/array.cpp
+2 −0 src/realm/array.hpp
+1 −1 src/realm/array_integer.cpp
+7 −9 src/realm/array_integer.hpp
+18 −23 src/realm/array_integer_tpl.hpp
+7 −7 src/realm/array_with_find.cpp
+121 −211 src/realm/array_with_find.hpp
+1 −0 src/realm/cluster.cpp
+21 −1 src/realm/collection.hpp
+1 −3 src/realm/list.hpp
+6 −6 src/realm/object-store/c_api/scheduler.cpp
+10 −30 src/realm/object-store/set.cpp
+16 −10 src/realm/object-store/sync/sync_session.cpp
+3 −4 src/realm/object-store/sync/sync_session.hpp
+35 −1 src/realm/query_conditions_tpl.hpp
+3 −11 src/realm/query_engine.cpp
+1 −22 src/realm/query_engine.hpp
+17 −1 src/realm/query_state.hpp
+290 −93 src/realm/set.cpp
+100 −384 src/realm/set.hpp
+25 −11 src/realm/sync/client.cpp
+19 −7 src/realm/sync/noinst/client_history_impl.cpp
+3 −13 src/realm/sync/noinst/client_history_impl.hpp
+5 −7 src/realm/sync/noinst/client_impl_base.cpp
+2 −0 src/realm/sync/noinst/client_impl_base.hpp
+95 −87 src/realm/sync/noinst/client_reset.cpp
+4 −4 src/realm/sync/noinst/client_reset.hpp
+1 −1 src/realm/sync/noinst/client_reset_operation.cpp
+1 −1 src/realm/sync/noinst/pending_bootstrap_store.cpp
+2 −2 src/realm/sync/noinst/pending_bootstrap_store.hpp
+2 −2 src/realm/sync/noinst/protocol_codec.hpp
+0 −34 src/realm/sync/noinst/server/server.cpp
+2 −15 src/realm/sync/noinst/server/server_history.cpp
+1 −11 src/realm/sync/noinst/server/server_history.hpp
+52 −39 src/realm/sync/subscriptions.cpp
+12 −6 src/realm/sync/subscriptions.hpp
+2 −2 src/realm/sync/tools/apply_to_state_command.cpp
+118 −237 src/realm/sync/transform.cpp
+20 −65 src/realm/sync/transform.hpp
+2 −3 src/realm/table.cpp
+12 −2 src/realm/util/backtrace.cpp
+7 −2 src/realm/util/config.h.in
+4 −4 src/realm/util/file.cpp
+16 −6 src/realm/util/thread.cpp
+106 −0 test/benchmark-common-tasks/main.cpp
+5 −5 test/large_tests/test_column_large.cpp
+1 −1 test/object-store/audit.cpp
+76 −0 test/object-store/c_api/c_api.cpp
+4 −0 test/object-store/set.cpp
+2 −2 test/object-store/sync/client_reset.cpp
+6 −2 test/object-store/sync/flx_migration.cpp
+186 −51 test/object-store/sync/flx_sync.cpp
+1 −1 test/object-store/sync/session/wait_for_completion.cpp
+11 −5 test/object-store/util/sync/baas_admin_api.cpp
+2 −2 test/object-store/util/sync/sync_test_utils.cpp
+45 −54 test/peer.hpp
+7 −2 test/test_file.cpp
+4 −1 test/test_set.cpp
+5 −20 test/test_sync.cpp
+2 −2 test/test_sync_pending_bootstraps.cpp
+3 −3 test/test_sync_subscriptions.cpp
+5 −0 test/test_util_error.cpp
+3 −0 test/test_util_file.cpp
30 changes: 18 additions & 12 deletions packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,25 +264,22 @@ class CustomJVMScheduler {
JNIEnv *jenv = get_env();
jclass jvm_scheduler_class = jenv->FindClass("io/realm/kotlin/internal/interop/JVMScheduler");
m_notify_method = jenv->GetMethodID(jvm_scheduler_class, "notifyCore", "(J)V");
m_cancel_method = jenv->GetMethodID(jvm_scheduler_class, "cancel", "()V");
m_jvm_dispatch_scheduler = jenv->NewGlobalRef(dispatchScheduler);
}

~CustomJVMScheduler() {
get_env(true)->DeleteGlobalRef(m_jvm_dispatch_scheduler);
}

void set_scheduler(realm_scheduler_t* scheduler) {
m_scheduler = scheduler;
}

void notify() {
void notify(realm_work_queue_t* work_queue) {
// There is currently no signaling of creation/tear down of the core notifier thread, so we
// just attach it as a daemon thread here on first notification to allow the JVM to
// shutdown propertly. See https://github.com/realm/realm-core/issues/6429
auto jenv = get_env(true, true, "core-notifier");
jni_check_exception(jenv);
jenv->CallVoidMethod(m_jvm_dispatch_scheduler, m_notify_method,
reinterpret_cast<jlong>(m_scheduler));
reinterpret_cast<jlong>(work_queue));
}

bool is_on_thread() const noexcept {
Expand All @@ -293,12 +290,18 @@ class CustomJVMScheduler {
return true;
}

void cancel() {
auto jenv = get_env(true, true, "core-notifier");
jenv->CallVoidMethod(m_jvm_dispatch_scheduler, m_cancel_method);
jni_check_exception(jenv);
}


private:
std::thread::id m_id;
jmethodID m_notify_method;
jmethodID m_cancel_method;
jobject m_jvm_dispatch_scheduler;
realm_scheduler_t *m_scheduler;
};

// Note: using jlong here will create a linker issue
Expand All @@ -309,8 +312,8 @@ class CustomJVMScheduler {
//
// I suspect this could be related to the fact that jni.h defines jlong differently between Android (typedef int64_t)
// and JVM which is a (typedef long long) resulting in a different signature of the method that could be found by the linker.
void invoke_core_notify_callback(int64_t scheduler) {
realm_scheduler_perform_work(reinterpret_cast<realm_scheduler_t *>(scheduler));
void invoke_core_notify_callback(int64_t work_queue) {
realm_scheduler_perform_work(reinterpret_cast<realm_work_queue_t *>(work_queue));
}

realm_scheduler_t*
Expand All @@ -319,13 +322,16 @@ realm_create_scheduler(jobject dispatchScheduler) {
auto jvmScheduler = new CustomJVMScheduler(dispatchScheduler);
auto scheduler = realm_scheduler_new(
jvmScheduler,
[](void *userdata) { delete(static_cast<CustomJVMScheduler *>(userdata)); },
[](void *userdata) { static_cast<CustomJVMScheduler *>(userdata)->notify(); },
[](void *userdata) {
auto jvmScheduler = static_cast<CustomJVMScheduler *>(userdata);
jvmScheduler->cancel();
delete(jvmScheduler);
},
[](void *userdata, realm_work_queue_t* work_queue) { static_cast<CustomJVMScheduler *>(userdata)->notify(work_queue); },
[](void *userdata) { return static_cast<CustomJVMScheduler *>(userdata)->is_on_thread(); },
[](const void *userdata, const void *userdata_other) { return userdata == userdata_other; },
[](void *userdata) { return static_cast<CustomJVMScheduler *>(userdata)->can_invoke(); }
);
jvmScheduler->set_scheduler(scheduler);
return scheduler;
}
throw std::runtime_error("Null dispatchScheduler");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import io.realm.kotlin.Realm
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.types.RealmInstant
import io.realm.kotlin.types.RealmObject
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
import kotlinx.datetime.Instant
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
Expand Down Expand Up @@ -92,8 +94,12 @@ fun Instant.toRealmInstant(): RealmInstant {
}

// Variant of `Channel.receiveOrFail()` that will will throw if a timeout is hit.
suspend fun <T : Any?> Channel<T>.receiveOrFail(timeout: Duration = 1.minutes): T {
return withTimeout(timeout) {
receive()
suspend fun <T : Any?> Channel<T>.receiveOrFail(timeout: Duration = 1.minutes, message: String? = null): T {
return select {
this@receiveOrFail.onReceive { it }
onTimeout(timeout) {
@Suppress("invisible_member")
throw TimeoutCancellationException("Timeout: $message")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue

class VersionTrackingTests {
private lateinit var initialLogLevel: LogLevel
Expand Down Expand Up @@ -150,7 +151,8 @@ class VersionTrackingTests {
realm.write<Unit> { copyToRealm(Sample()) }
realm.write<Unit> { copyToRealm(Sample()) }
realm.activeVersions().run {
assertEquals(1, allTracked.size, toString())
// Initially tracked version from user facing realm might have been released by now
assertTrue(allTracked.size <= 1, toString())
assertNotNull(notifier, toString())
assertEquals(0, notifier?.active?.size, toString())
assertNotNull(writer, toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ class MemoryTests {
@OptIn(ExperimentalStdlibApi::class)
println("NEW_MEMORY_MODEL: " + isExperimentalMM())

// Referencing things like
// NSProcessInfo.Companion.processInfo().operatingSystemVersionString
// platform.Foundation.NSFileManager.defaultManager
// as done in Darwin SystemUtils.kt and initialized lazily, so do a full realm-lifecycle
// to only measure increases over the actual test
// - Ensure that we clean up any released memory to get a nice baseline
platform.posix.sleep(1 * 5) // give chance to the Collector Thread to process out of scope references
triggerGC()
// - Record the baseline
val initialAllocation = parseSizeString(runSystemCommand(amountOfMemoryMappedInProcessCMD))

val referenceHolder = mutableListOf<Sample>();
Expand Down Expand Up @@ -91,11 +100,6 @@ class MemoryTests {
triggerGC()
platform.posix.sleep(1 * 5) // give chance to the Collector Thread to process out of scope references

// Referencing things like
// NSProcessInfo.Companion.processInfo().operatingSystemVersionString
// platform.Foundation.NSFileManager.defaultManager
// as done in Darwin SystemUtils.kt cause allocations so we just assert the increase over
// the test
val allocation = parseSizeString(runSystemCommand(amountOfMemoryMappedInProcessCMD))
assertEquals(initialAllocation, allocation, "mmap allocation exceeds expectations: initial=$initialAllocation current=$allocation")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ class SyncedRealmTests {
realm.writeBlocking { copyToRealm(masterObject) }
realm.syncSession.uploadAllLocalChanges()
}
assertEquals(42, counterValue.receiveOrFail(), "Failed to receive 42")
assertEquals(42, counterValue.receiveOrFail(message = "Failed to receive 42"))

// Increment counter asynchronously after download initial data (1)
val increment1 = async {
Expand All @@ -753,9 +753,10 @@ class SyncedRealmTests {
.mutableRealmIntField
.increment(1)
}
realm.syncSession.uploadAllLocalChanges(10.seconds)
}
}
assertEquals(43, counterValue.receiveOrFail(), "Failed to receive 43")
assertEquals(43, counterValue.receiveOrFail(message = "Failed to receive 43"))

// Increment counter asynchronously after download initial data (2)
val increment2 = async {
Expand All @@ -769,9 +770,10 @@ class SyncedRealmTests {
.mutableRealmIntField
.increment(1)
}
realm.syncSession.uploadAllLocalChanges(10.seconds)
}
}
assertEquals(44, counterValue.receiveOrFail(), "Failed to receive 44")
assertEquals(44, counterValue.receiveOrFail(message = "Failed to receive 44"))

increment1.cancel()
increment2.cancel()
Expand Down

0 comments on commit 0dcdbfb

Please sign in to comment.