Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard notifying outdated scheduler #1559

Merged
merged 25 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
## 1.12.1-SNAPSHOT (YYYY-MM-DD)

### Breaking Changes
* None.

### Enhancements
* None.

### Fixed
* Fix craches caused by posting to a released scheduler. (Issue [#1543](https://github.com/realm/realm-kotlin/issues/1543))

### Compatibility
* File format: Generates Realms with file format v23.
* Realm Studio 13.0.0 or above is required to open Realms created by this version.
* This release is compatible with the following Kotlin releases:
* Kotlin 1.8.0 and above. The K2 compiler is not supported yet.
* Ktor 2.1.2 and above.
* Coroutines 1.7.0 and above.
* AtomicFu 0.18.3 and above.
* The new memory model only. See https://github.com/realm/realm-kotlin#kotlin-memory-model-and-coroutine-compatibility
* Minimum Kbson 0.3.0.
* Minimum Gradle version: 6.8.3.
* Minimum Android Gradle Plugin version: 4.1.3.
* Minimum Android SDK: 16.

### Internal
* Updated to Realm Core 13.23.3, commit 7556b535aa7b27d49c13444894f7e9db778b3203.


## 1.12.0 (2023-11-02)

This release upgrades the Sync metadata in a way that is not compatible with older versions. To downgrade a Sync app from this version, you'll need to manually delete the metadata folder located at `$[SYNC-ROOT-DIRECTORY]/mongodb-realm/[APP-ID]/server-utility/metadata/`. This will log out all users.
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ val HOST_OS: OperatingSystem = findHostOs()

object Realm {
val ciBuild = (System.getenv("JENKINS_HOME") != null)
const val version = "1.12.0"
const val version = "1.12.1-SNAPSHOT"
const val group = "io.realm.kotlin"
const val projectUrl = "https://realm.io"
const val pluginPortalId = "io.realm.kotlin"
Expand Down
6 changes: 3 additions & 3 deletions dependencies.list
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Version of MongoDB Realm used by integration tests
# See https://github.com/realm/ci/packages/147854 for available versions
MONGODB_REALM_SERVER=2023-10-10
MONGODB_REALM_SERVER=2023-11-07

# `BAAS` and `BAAS-UI` projects commit hashes matching MONGODB_REALM_SERVER image version
# note that the MONGODB_REALM_SERVER image is a nightly build, find the matching commits
# for that date within the following repositories:
# https://github.com/10gen/baas/
# https://github.com/10gen/baas-ui/
REALM_BAAS_GIT_HASH=8246fc548763eb908b8090df864e9924e3330a0d
REALM_BAAS_UI_GIT_HASH=8a1843be2bf24f2faa705c5470a5bdd8d954f7ea
REALM_BAAS_GIT_HASH=41fa6cdbca47826c20a64f756e21b2c184393e90
REALM_BAAS_UI_GIT_HASH=b97a27ac858e0e8126aeb63f6ff9734d11029a91
Original file line number Diff line number Diff line change
Expand Up @@ -2138,10 +2138,22 @@ fun ObjectId.asRealmObjectIdT(): realm_object_id_t {

private class JVMScheduler(dispatcher: CoroutineDispatcher) {
val scope: CoroutineScope = CoroutineScope(dispatcher)
val lock = SynchronizableObject()
var cancelled = false

fun notifyCore(schedulerPointer: Long) {
scope.launch {
realmc.invoke_core_notify_callback(schedulerPointer)
lock.withLock {
if (!cancelled) {
realmc.invoke_core_notify_callback(schedulerPointer)
}
}
}
}

fun cancel() {
lock.withLock {
cancelled = true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ import realm_wrapper.realm_user_t
import realm_wrapper.realm_value_t
import realm_wrapper.realm_value_type
import realm_wrapper.realm_version_id_t
import realm_wrapper.realm_work_queue_t
import kotlin.collections.set
import kotlin.native.internal.createCleaner

Expand Down Expand Up @@ -560,17 +561,19 @@ actual object RealmInterop {
// free: realm_wrapper.realm_free_userdata_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
staticCFunction<COpaquePointer?, Unit> { userdata ->
printlntid("free")
userdata?.asStableRef<SingleThreadDispatcherScheduler>()?.dispose()
val stableSchedulerRef: StableRef<SingleThreadDispatcherScheduler>? = userdata?.asStableRef<SingleThreadDispatcherScheduler>()
stableSchedulerRef?.get()?.cancel()
stableSchedulerRef?.dispose()
},

// notify: realm_wrapper.realm_scheduler_notify_func_t? /* = kotlinx.cinterop.CPointer<kotlinx.cinterop.CFunction<(kotlinx.cinterop.COpaquePointer? /* = kotlinx.cinterop.CPointer<out kotlinx.cinterop.CPointed>? */) -> kotlin.Unit>>? */,
staticCFunction<COpaquePointer?, Unit> { userdata ->
staticCFunction<COpaquePointer?, CPointer<realm_work_queue_t>?, Unit> { userdata, work_queue ->
// Must be thread safe
val scheduler =
userdata!!.asStableRef<SingleThreadDispatcherScheduler>().get()
printlntid("$scheduler notify")
try {
scheduler.notify()
scheduler.notify(work_queue)
} catch (e: Exception) {
// Should never happen, but is included for development to get some indicators
// on errors instead of silent crashes.
Expand Down Expand Up @@ -3392,7 +3395,7 @@ actual object RealmInterop {
}

interface Scheduler {
fun notify()
fun notify(work_queue: CPointer<realm_work_queue_t>?)
}

class SingleThreadDispatcherScheduler(
Expand All @@ -3402,23 +3405,35 @@ actual object RealmInterop {
private val scope: CoroutineScope = CoroutineScope(dispatcher)
val ref: CPointer<out CPointed> = StableRef.create(this).asCPointer()
private lateinit var scheduler: CPointer<realm_scheduler_t>
private val lock = SynchronizableObject()
private var cancelled = false

fun setScheduler(scheduler: CPointer<realm_scheduler_t>) {
this.scheduler = scheduler
}

override fun notify() {
override fun notify(work_queue: CPointer<realm_work_queue_t>?) {
scope.launch {
try {
printlntid("on dispatcher")
realm_wrapper.realm_scheduler_perform_work(scheduler)
lock.withLock {
if (!cancelled) {
realm_wrapper.realm_scheduler_perform_work(work_queue)
}
}
} catch (e: Exception) {
// Should never happen, but is included for development to get some indicators
// on errors instead of silent crashes.
e.printStackTrace()
}
}
}

fun cancel() {
lock.withLock {
cancelled = true
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/external/core
Submodule core updated 74 files
+31 −1 CHANGELOG.md
+34 −1 CMakeLists.txt
+54 −0 Jenkinsfile
+1 −1 Package.swift
+2 −0 alpine.Dockerfile
+2 −2 dependencies.list
+2 −1 evergreen/config.yml
+0 −16 evergreen/config_overrides.json
+29 −21 evergreen/install_baas.sh
+3 −2 src/realm.h
+4 −0 src/realm/CMakeLists.txt
+33 −1 src/realm/array.cpp
+2 −0 src/realm/array.hpp
+1 −1 src/realm/array_integer.cpp
+7 −9 src/realm/array_integer.hpp
+18 −23 src/realm/array_integer_tpl.hpp
+7 −7 src/realm/array_with_find.cpp
+121 −211 src/realm/array_with_find.hpp
+1 −0 src/realm/cluster.cpp
+21 −1 src/realm/collection.hpp
+1 −3 src/realm/list.hpp
+6 −6 src/realm/object-store/c_api/scheduler.cpp
+10 −30 src/realm/object-store/set.cpp
+16 −10 src/realm/object-store/sync/sync_session.cpp
+3 −4 src/realm/object-store/sync/sync_session.hpp
+35 −1 src/realm/query_conditions_tpl.hpp
+3 −11 src/realm/query_engine.cpp
+1 −22 src/realm/query_engine.hpp
+17 −1 src/realm/query_state.hpp
+290 −93 src/realm/set.cpp
+100 −384 src/realm/set.hpp
+25 −11 src/realm/sync/client.cpp
+19 −7 src/realm/sync/noinst/client_history_impl.cpp
+3 −13 src/realm/sync/noinst/client_history_impl.hpp
+5 −7 src/realm/sync/noinst/client_impl_base.cpp
+2 −0 src/realm/sync/noinst/client_impl_base.hpp
+95 −87 src/realm/sync/noinst/client_reset.cpp
+4 −4 src/realm/sync/noinst/client_reset.hpp
+1 −1 src/realm/sync/noinst/client_reset_operation.cpp
+1 −1 src/realm/sync/noinst/pending_bootstrap_store.cpp
+2 −2 src/realm/sync/noinst/pending_bootstrap_store.hpp
+2 −2 src/realm/sync/noinst/protocol_codec.hpp
+0 −34 src/realm/sync/noinst/server/server.cpp
+2 −15 src/realm/sync/noinst/server/server_history.cpp
+1 −11 src/realm/sync/noinst/server/server_history.hpp
+52 −39 src/realm/sync/subscriptions.cpp
+12 −6 src/realm/sync/subscriptions.hpp
+2 −2 src/realm/sync/tools/apply_to_state_command.cpp
+118 −237 src/realm/sync/transform.cpp
+20 −65 src/realm/sync/transform.hpp
+2 −3 src/realm/table.cpp
+12 −2 src/realm/util/backtrace.cpp
+7 −2 src/realm/util/config.h.in
+4 −4 src/realm/util/file.cpp
+16 −6 src/realm/util/thread.cpp
+106 −0 test/benchmark-common-tasks/main.cpp
+5 −5 test/large_tests/test_column_large.cpp
+1 −1 test/object-store/audit.cpp
+76 −0 test/object-store/c_api/c_api.cpp
+4 −0 test/object-store/set.cpp
+2 −2 test/object-store/sync/client_reset.cpp
+6 −2 test/object-store/sync/flx_migration.cpp
+186 −51 test/object-store/sync/flx_sync.cpp
+1 −1 test/object-store/sync/session/wait_for_completion.cpp
+11 −5 test/object-store/util/sync/baas_admin_api.cpp
+2 −2 test/object-store/util/sync/sync_test_utils.cpp
+45 −54 test/peer.hpp
+7 −2 test/test_file.cpp
+4 −1 test/test_set.cpp
+5 −20 test/test_sync.cpp
+2 −2 test/test_sync_pending_bootstraps.cpp
+3 −3 test/test_sync_subscriptions.cpp
+5 −0 test/test_util_error.cpp
+3 −0 test/test_util_file.cpp
30 changes: 18 additions & 12 deletions packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,25 +264,22 @@ class CustomJVMScheduler {
JNIEnv *jenv = get_env();
jclass jvm_scheduler_class = jenv->FindClass("io/realm/kotlin/internal/interop/JVMScheduler");
m_notify_method = jenv->GetMethodID(jvm_scheduler_class, "notifyCore", "(J)V");
m_cancel_method = jenv->GetMethodID(jvm_scheduler_class, "cancel", "()V");
m_jvm_dispatch_scheduler = jenv->NewGlobalRef(dispatchScheduler);
}

~CustomJVMScheduler() {
get_env(true)->DeleteGlobalRef(m_jvm_dispatch_scheduler);
}

void set_scheduler(realm_scheduler_t* scheduler) {
m_scheduler = scheduler;
}

void notify() {
void notify(realm_work_queue_t* work_queue) {
// There is currently no signaling of creation/tear down of the core notifier thread, so we
// just attach it as a daemon thread here on first notification to allow the JVM to
// shutdown propertly. See https://github.com/realm/realm-core/issues/6429
auto jenv = get_env(true, true, "core-notifier");
jni_check_exception(jenv);
jenv->CallVoidMethod(m_jvm_dispatch_scheduler, m_notify_method,
reinterpret_cast<jlong>(m_scheduler));
reinterpret_cast<jlong>(work_queue));
}

bool is_on_thread() const noexcept {
Expand All @@ -293,12 +290,18 @@ class CustomJVMScheduler {
return true;
}

void cancel() {
auto jenv = get_env(true, true, "core-notifier");
jenv->CallVoidMethod(m_jvm_dispatch_scheduler, m_cancel_method);
jni_check_exception(jenv);
}


private:
std::thread::id m_id;
jmethodID m_notify_method;
jmethodID m_cancel_method;
jobject m_jvm_dispatch_scheduler;
realm_scheduler_t *m_scheduler;
};

// Note: using jlong here will create a linker issue
Expand All @@ -309,8 +312,8 @@ class CustomJVMScheduler {
//
// I suspect this could be related to the fact that jni.h defines jlong differently between Android (typedef int64_t)
// and JVM which is a (typedef long long) resulting in a different signature of the method that could be found by the linker.
void invoke_core_notify_callback(int64_t scheduler) {
realm_scheduler_perform_work(reinterpret_cast<realm_scheduler_t *>(scheduler));
void invoke_core_notify_callback(int64_t work_queue) {
realm_scheduler_perform_work(reinterpret_cast<realm_work_queue_t *>(work_queue));
}

realm_scheduler_t*
Expand All @@ -319,13 +322,16 @@ realm_create_scheduler(jobject dispatchScheduler) {
auto jvmScheduler = new CustomJVMScheduler(dispatchScheduler);
auto scheduler = realm_scheduler_new(
jvmScheduler,
[](void *userdata) { delete(static_cast<CustomJVMScheduler *>(userdata)); },
[](void *userdata) { static_cast<CustomJVMScheduler *>(userdata)->notify(); },
[](void *userdata) {
auto jvmScheduler = static_cast<CustomJVMScheduler *>(userdata);
jvmScheduler->cancel();
delete(jvmScheduler);
},
[](void *userdata, realm_work_queue_t* work_queue) { static_cast<CustomJVMScheduler *>(userdata)->notify(work_queue); },
[](void *userdata) { return static_cast<CustomJVMScheduler *>(userdata)->is_on_thread(); },
[](const void *userdata, const void *userdata_other) { return userdata == userdata_other; },
[](void *userdata) { return static_cast<CustomJVMScheduler *>(userdata)->can_invoke(); }
);
jvmScheduler->set_scheduler(scheduler);
return scheduler;
}
throw std::runtime_error("Null dispatchScheduler");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import io.realm.kotlin.Realm
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.types.RealmInstant
import io.realm.kotlin.types.RealmObject
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
import kotlinx.datetime.Instant
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
Expand Down Expand Up @@ -92,8 +94,12 @@ fun Instant.toRealmInstant(): RealmInstant {
}

// Variant of `Channel.receiveOrFail()` that will will throw if a timeout is hit.
suspend fun <T : Any?> Channel<T>.receiveOrFail(timeout: Duration = 1.minutes): T {
return withTimeout(timeout) {
receive()
suspend fun <T : Any?> Channel<T>.receiveOrFail(timeout: Duration = 1.minutes, message: String? = null): T {
return select {
[email protected] { it }
onTimeout(timeout) {
@Suppress("invisible_member")
throw TimeoutCancellationException("Timeout: $message")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue

class VersionTrackingTests {
private lateinit var initialLogLevel: LogLevel
Expand Down Expand Up @@ -150,7 +151,8 @@ class VersionTrackingTests {
realm.write<Unit> { copyToRealm(Sample()) }
realm.write<Unit> { copyToRealm(Sample()) }
realm.activeVersions().run {
assertEquals(1, allTracked.size, toString())
// Initially tracked version from user facing realm might have been released by now
assertTrue(allTracked.size <= 1, toString())
assertNotNull(notifier, toString())
assertEquals(0, notifier?.active?.size, toString())
assertNotNull(writer, toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ class MemoryTests {
@OptIn(ExperimentalStdlibApi::class)
println("NEW_MEMORY_MODEL: " + isExperimentalMM())

// Referencing things like
// NSProcessInfo.Companion.processInfo().operatingSystemVersionString
// platform.Foundation.NSFileManager.defaultManager
// as done in Darwin SystemUtils.kt and initialized lazily, so do a full realm-lifecycle
// to only measure increases over the actual test
// - Ensure that we clean up any released memory to get a nice baseline
platform.posix.sleep(1 * 5) // give chance to the Collector Thread to process out of scope references
triggerGC()
// - Record the baseline
val initialAllocation = parseSizeString(runSystemCommand(amountOfMemoryMappedInProcessCMD))

val referenceHolder = mutableListOf<Sample>();
Expand Down Expand Up @@ -91,11 +100,6 @@ class MemoryTests {
triggerGC()
platform.posix.sleep(1 * 5) // give chance to the Collector Thread to process out of scope references

// Referencing things like
// NSProcessInfo.Companion.processInfo().operatingSystemVersionString
// platform.Foundation.NSFileManager.defaultManager
// as done in Darwin SystemUtils.kt cause allocations so we just assert the increase over
// the test
val allocation = parseSizeString(runSystemCommand(amountOfMemoryMappedInProcessCMD))
assertEquals(initialAllocation, allocation, "mmap allocation exceeds expectations: initial=$initialAllocation current=$allocation")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ class SyncedRealmTests {
realm.writeBlocking { copyToRealm(masterObject) }
realm.syncSession.uploadAllLocalChanges()
}
assertEquals(42, counterValue.receiveOrFail(), "Failed to receive 42")
assertEquals(42, counterValue.receiveOrFail(message = "Failed to receive 42"))

// Increment counter asynchronously after download initial data (1)
val increment1 = async {
Expand All @@ -753,9 +753,10 @@ class SyncedRealmTests {
.mutableRealmIntField
.increment(1)
}
realm.syncSession.uploadAllLocalChanges(10.seconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch 🙈, but any reason you are adding a timeout? The first upload doesn't have one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it because it would highlight what action is actually not executing as expected instead of just timeout out on the recipient side. I just added if for the uploads that I inserted, but just didn't walk over the rest of the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok 👍

}
}
assertEquals(43, counterValue.receiveOrFail(), "Failed to receive 43")
assertEquals(43, counterValue.receiveOrFail(message = "Failed to receive 43"))

// Increment counter asynchronously after download initial data (2)
val increment2 = async {
Expand All @@ -769,9 +770,10 @@ class SyncedRealmTests {
.mutableRealmIntField
.increment(1)
}
realm.syncSession.uploadAllLocalChanges(10.seconds)
}
}
assertEquals(44, counterValue.receiveOrFail(), "Failed to receive 44")
assertEquals(44, counterValue.receiveOrFail(message = "Failed to receive 44"))

increment1.cancel()
increment2.cancel()
Expand Down