Skip to content

Commit

Permalink
Fix Realm.asFlow() potentially missing an update when doing a write i…
Browse files Browse the repository at this point in the history
…mmediately after opening the Realm
  • Loading branch information
Christian Melchior committed Dec 11, 2023
1 parent d94e70c commit 9ce59a5
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Fixed
* Cache notification callback JNI references at startup to ensure that symbols can be resolved in core callbacks. (Issue [#1577](https://github.com/realm/realm-kotlin/issues/1577))
* Using `Realm.asFlow()` could miss an update if a write was started right after opening the Realm. (Issue [#1582](https://github.com/realm/realm-kotlin/issues/1582))

### Compatibility
* File format: Generates Realms with file format v23.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
import kotlin.reflect.KClass
Expand All @@ -65,7 +66,7 @@ public class RealmImpl private constructor(
internal val realmScope =
CoroutineScope(SupervisorJob() + notificationScheduler.dispatcher)
private val notifierFlow: MutableSharedFlow<RealmChange<Realm>> =
MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
MutableSharedFlow(onBufferOverflow = BufferOverflow.DROP_OLDEST, replay = 1)

private val notifier = SuspendableNotifier(
owner = this,
Expand Down Expand Up @@ -208,7 +209,14 @@ public class RealmImpl private constructor(
}

override fun asFlow(): Flow<RealmChange<Realm>> = scopedFlow {
notifierFlow.onStart { emit(InitialRealmImpl(this@RealmImpl)) }
val firstItem = atomic(true)
return@scopedFlow notifierFlow.map {
if (firstItem.compareAndSet(expect = true, update = false)) {
InitialRealmImpl(this)
} else {
it
}
}
}

override fun writeCopyTo(configuration: Configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal class SuspendableNotifier(
// see https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt#L78
private val _realmChanged = MutableSharedFlow<VersionId>(
onBufferOverflow = BufferOverflow.DROP_OLDEST,
extraBufferCapacity = 1
replay = 1
)

val dispatcher: CoroutineDispatcher = scheduler.dispatcher
Expand Down Expand Up @@ -89,7 +89,10 @@ internal class SuspendableNotifier(
// Touching realm will open the underlying realm and register change listeners, but must
// happen on the dispatcher as the realm can only be touched on the dispatcher's thread.
if (!realmInitializer.isInitialized()) {
withContext(dispatcher) { realm }
withContext(dispatcher) {
realm
_realmChanged.emit(realm.version())
}
}
return _realmChanged.asSharedFlow()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ class RealmDictionaryTests : EmbeddedObjectCollectionQueryTests {
val listener = async {
withTimeout(10.seconds) {
flow.collect { current ->
delay(30.milliseconds)
delay(100.milliseconds)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class RealmListTests : EmbeddedObjectCollectionQueryTests {
val listener = async {
withTimeout(10.seconds) {
flow.collect { current ->
delay(30.milliseconds)
delay(100.milliseconds)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ class RealmSetTests : CollectionQueryTests {
val listener = async {
withTimeout(10.seconds) {
flow.collect { current ->
delay(30.milliseconds)
delay(100.milliseconds)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ class VersionTrackingTests {
// Write that doesn't return objects does not trigger tracking additional versions
realm.write<Unit> { copyToRealm(Sample()) }
realm.activeVersions().run {
assertEquals(1, allTracked.size, toString())
assertTrue(1 >= allTracked.size, toString())
assertNotNull(writer, toString())
assertEquals(0, writer?.active?.size, toString())
}

// Until we actually query the object
realm.query<Sample>().find()
realm.activeVersions().run {
assertEquals(2, allTracked.size, toString())
assertTrue(2 >= allTracked.size, toString())
assertNotNull(writer, toString())
assertEquals(1, writer?.active?.size, toString())
}
Expand All @@ -129,7 +129,7 @@ class VersionTrackingTests {
// not assigned to a variable unless the generic return type is <Unit>)
realm.write { copyToRealm(Sample()) }
realm.activeVersions().run {
assertEquals(2, allTracked.size, toString())
assertTrue(2 >= allTracked.size, toString())
assertNotNull(writer, toString())
assertEquals(1, writer?.active?.size, toString())
}
Expand Down Expand Up @@ -219,8 +219,8 @@ class VersionTrackingTests {
@Suppress("invisible_member", "invisible_reference")
fun initialVersionDereferencedAfterFirstWrite() {
(realm as RealmImpl).let { realm ->
assertNotNull(realm.initialRealmReference.value, toString())
assertEquals(1, realm.versionTracker.versions().size, toString())
val intermediateVersions = realm.versionTracker.versions()
assertEquals(1, intermediateVersions.size, intermediateVersions.toString())

val realmUpdates = TestChannel<Unit>()

Expand All @@ -238,9 +238,11 @@ class VersionTrackingTests {

// Wait for the notifier to start
realmUpdates.receiveOrFail()

assertNull(realm.initialRealmReference.value, toString())
assertEquals(1, realm.versionTracker.versions().size, toString())
// Depending on the exact timing, the first version might or might not have been
// GC'ed. If GC'ed, there are no intermediate versions.
val trackedVersions = realm.versionTracker.versions()
assertTrue(1 >= trackedVersions.size, trackedVersions.toString())

deferred.cancel()
realmUpdates.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,46 @@ class RealmNotificationsTests : FlowableTests {
}
}

@Test
fun registerTwoFlows() = runBlocking {
val c1 = TestChannel<RealmChange<Realm>>()
val c2 = TestChannel<RealmChange<Realm>>()
val startingVersion = realm.version()
val observer1 = async {
realm.asFlow().collect {
c1.send(it)
}
}
c1.receiveOrFail(message = "Failed to receive initial event on Channel 1").let { realmChange ->
assertIs<InitialRealm<Realm>>(realmChange)
assertEquals(startingVersion, realmChange.realm.version())
}

realm.write { /* Do nothing */ }
val nextVersion = realm.version()

val observer2 = async {
realm.asFlow().collect {
c2.send(it)
}
}

c1.receiveOrFail(message = "Failed to receive update event on Channel 1").let { realmChange ->
assertIs<UpdatedRealm<Realm>>(realmChange)
assertEquals(nextVersion, realmChange.realm.version())
}
c2.receiveOrFail(message = "Failed to receive initial event on Channel 2").let { realmChange ->
assertIs<InitialRealm<Realm>>(realmChange)
assertEquals(nextVersion, realmChange.realm.version())
}

observer1.cancel()
observer2.cancel()
c1.cancel()
c2.cancel()
}


@Test
override fun asFlow() {
runBlocking {
Expand Down

0 comments on commit 9ce59a5

Please sign in to comment.