From 9ce59a5622505425c24ace6c1dfe0d0ad961a38f Mon Sep 17 00:00:00 2001 From: Christian Melchior Date: Mon, 11 Dec 2023 21:01:58 +0100 Subject: [PATCH] Fix Realm.asFlow() potentially missing an update when doing a write immediately after opening the Realm --- CHANGELOG.md | 1 + .../io/realm/kotlin/internal/RealmImpl.kt | 12 +++++- .../kotlin/internal/SuspendableNotifier.kt | 7 +++- .../test/common/RealmDictionaryTests.kt | 2 +- .../kotlin/test/common/RealmListTests.kt | 2 +- .../realm/kotlin/test/common/RealmSetTests.kt | 2 +- .../test/common/VersionTrackingTests.kt | 16 ++++---- .../notifications/RealmNotificationsTests.kt | 40 +++++++++++++++++++ 8 files changed, 68 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 917ec1c80f..474a60acad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ ### Fixed * Cache notification callback JNI references at startup to ensure that symbols can be resolved in core callbacks. (Issue [#1577](https://github.com/realm/realm-kotlin/issues/1577)) +* Using `Realm.asFlow()` could miss an update if a write was started right after opening the Realm. (Issue [#1582](https://github.com/realm/realm-kotlin/issues/1582)) ### Compatibility * File format: Generates Realms with file format v23. diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt index e99972f1ce..2cf547befe 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt @@ -46,6 +46,7 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch import kotlin.reflect.KClass @@ -65,7 +66,7 @@ public class RealmImpl private constructor( internal val realmScope = CoroutineScope(SupervisorJob() + notificationScheduler.dispatcher) private val notifierFlow: MutableSharedFlow> = - MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + MutableSharedFlow(onBufferOverflow = BufferOverflow.DROP_OLDEST, replay = 1) private val notifier = SuspendableNotifier( owner = this, @@ -208,7 +209,14 @@ public class RealmImpl private constructor( } override fun asFlow(): Flow> = scopedFlow { - notifierFlow.onStart { emit(InitialRealmImpl(this@RealmImpl)) } + val firstItem = atomic(true) + return@scopedFlow notifierFlow.map { + if (firstItem.compareAndSet(expect = true, update = false)) { + InitialRealmImpl(this) + } else { + it + } + } } override fun writeCopyTo(configuration: Configuration) { diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt index e075e86dae..ef64cc8da2 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt @@ -44,7 +44,7 @@ internal class SuspendableNotifier( // see https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt#L78 private val _realmChanged = MutableSharedFlow( onBufferOverflow = BufferOverflow.DROP_OLDEST, - extraBufferCapacity = 1 + replay = 1 ) val dispatcher: CoroutineDispatcher = scheduler.dispatcher @@ -89,7 +89,10 @@ internal class SuspendableNotifier( // Touching realm will open the underlying realm and register change listeners, but must // happen on the dispatcher as the realm can only be touched on the dispatcher's thread. if (!realmInitializer.isInitialized()) { - withContext(dispatcher) { realm } + withContext(dispatcher) { + realm + _realmChanged.emit(realm.version()) + } } return _realmChanged.asSharedFlow() } diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmDictionaryTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmDictionaryTests.kt index 4c903effe0..3ccc1f648e 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmDictionaryTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmDictionaryTests.kt @@ -1261,7 +1261,7 @@ class RealmDictionaryTests : EmbeddedObjectCollectionQueryTests { val listener = async { withTimeout(10.seconds) { flow.collect { current -> - delay(30.milliseconds) + delay(100.milliseconds) } } } diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmListTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmListTests.kt index 4e1bcf0b8f..a25a0fd109 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmListTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmListTests.kt @@ -570,7 +570,7 @@ class RealmListTests : EmbeddedObjectCollectionQueryTests { val listener = async { withTimeout(10.seconds) { flow.collect { current -> - delay(30.milliseconds) + delay(100.milliseconds) } } } diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmSetTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmSetTests.kt index 437ff45ebc..f03eede904 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmSetTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmSetTests.kt @@ -575,7 +575,7 @@ class RealmSetTests : CollectionQueryTests { val listener = async { withTimeout(10.seconds) { flow.collect { current -> - delay(30.milliseconds) + delay(100.milliseconds) } } } diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt index d79035d188..1dc47b670d 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt @@ -103,7 +103,7 @@ class VersionTrackingTests { // Write that doesn't return objects does not trigger tracking additional versions realm.write { copyToRealm(Sample()) } realm.activeVersions().run { - assertEquals(1, allTracked.size, toString()) + assertTrue(1 >= allTracked.size, toString()) assertNotNull(writer, toString()) assertEquals(0, writer?.active?.size, toString()) } @@ -111,7 +111,7 @@ class VersionTrackingTests { // Until we actually query the object realm.query().find() realm.activeVersions().run { - assertEquals(2, allTracked.size, toString()) + assertTrue(2 >= allTracked.size, toString()) assertNotNull(writer, toString()) assertEquals(1, writer?.active?.size, toString()) } @@ -129,7 +129,7 @@ class VersionTrackingTests { // not assigned to a variable unless the generic return type is ) realm.write { copyToRealm(Sample()) } realm.activeVersions().run { - assertEquals(2, allTracked.size, toString()) + assertTrue(2 >= allTracked.size, toString()) assertNotNull(writer, toString()) assertEquals(1, writer?.active?.size, toString()) } @@ -219,8 +219,8 @@ class VersionTrackingTests { @Suppress("invisible_member", "invisible_reference") fun initialVersionDereferencedAfterFirstWrite() { (realm as RealmImpl).let { realm -> - assertNotNull(realm.initialRealmReference.value, toString()) - assertEquals(1, realm.versionTracker.versions().size, toString()) + val intermediateVersions = realm.versionTracker.versions() + assertEquals(1, intermediateVersions.size, intermediateVersions.toString()) val realmUpdates = TestChannel() @@ -238,9 +238,11 @@ class VersionTrackingTests { // Wait for the notifier to start realmUpdates.receiveOrFail() - assertNull(realm.initialRealmReference.value, toString()) - assertEquals(1, realm.versionTracker.versions().size, toString()) + // Depending on the exact timing, the first version might or might not have been + // GC'ed. If GC'ed, there are no intermediate versions. + val trackedVersions = realm.versionTracker.versions() + assertTrue(1 >= trackedVersions.size, trackedVersions.toString()) deferred.cancel() realmUpdates.close() diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/notifications/RealmNotificationsTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/notifications/RealmNotificationsTests.kt index 2fec01968d..ae4220029b 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/notifications/RealmNotificationsTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/notifications/RealmNotificationsTests.kt @@ -90,6 +90,46 @@ class RealmNotificationsTests : FlowableTests { } } + @Test + fun registerTwoFlows() = runBlocking { + val c1 = TestChannel>() + val c2 = TestChannel>() + val startingVersion = realm.version() + val observer1 = async { + realm.asFlow().collect { + c1.send(it) + } + } + c1.receiveOrFail(message = "Failed to receive initial event on Channel 1").let { realmChange -> + assertIs>(realmChange) + assertEquals(startingVersion, realmChange.realm.version()) + } + + realm.write { /* Do nothing */ } + val nextVersion = realm.version() + + val observer2 = async { + realm.asFlow().collect { + c2.send(it) + } + } + + c1.receiveOrFail(message = "Failed to receive update event on Channel 1").let { realmChange -> + assertIs>(realmChange) + assertEquals(nextVersion, realmChange.realm.version()) + } + c2.receiveOrFail(message = "Failed to receive initial event on Channel 2").let { realmChange -> + assertIs>(realmChange) + assertEquals(nextVersion, realmChange.realm.version()) + } + + observer1.cancel() + observer2.cancel() + c1.cancel() + c2.cancel() + } + + @Test override fun asFlow() { runBlocking {