diff --git a/CHANGELOG.md b/CHANGELOG.md index b91fe6a523..dd9ffe5935 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * `Realm.getNumberOfActiveVersions` now returns the actual number of active versions. (Core issue [#6960](https://github.com/realm/realm-core/pull/6960)) * Fixed memory leak on Darwin caused by a reference cycle between resources and the GC cleaner. (Issue [#1530](https://github.com/realm/realm-kotlin/pull/1530)) * Fixed memory leaks on the JVM platform, see PR for more information. (Issue [#1526](https://github.com/realm/realm-kotlin/pull/1526)) +* Removed pin on the initial realm version after opening a Realm. (Issue [#1519](https://github.com/realm/realm-kotlin/pull/1519)) ### Compatibility * File format: Generates Realms with file format v23. diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/LiveRealm.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/LiveRealm.kt index f0c7b7fd97..b2704d976b 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/LiveRealm.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/LiveRealm.kt @@ -99,7 +99,7 @@ internal abstract class LiveRealm( internal fun gcTrackedSnapshot(): FrozenRealmReference { return snapshotLock.withLock { _snapshot.value.also { snapshot -> - if (_closeSnapshotWhenAdvancing) { + if (_closeSnapshotWhenAdvancing && !snapshot.isClosed()) { log.trace("${this@LiveRealm} ENABLE-TRACKING ${snapshot.version()}") _closeSnapshotWhenAdvancing = false } @@ -129,14 +129,14 @@ internal abstract class LiveRealm( log.trace("${this@LiveRealm} CLOSE-UNTRACKED $version") _snapshot.value.close() } else { - // TODO Split into track and clean up as we don't need to hold headLock while - // cleaning up as version tracker is only accessed from the same thread - versionTracker.trackAndCloseExpiredReferences(_snapshot.value) + versionTracker.trackReference(_snapshot.value) } _snapshot.value = realmReference.snapshot(owner) log.trace("${this@LiveRealm} ADVANCING $version -> ${_snapshot.value.version()}") _closeSnapshotWhenAdvancing = true } + + versionTracker.closeExpiredReferences() } protected open fun onSchemaChanged(schema: RealmSchemaPointer) { @@ -156,7 +156,7 @@ internal abstract class LiveRealm( // Close actual live reference. From this point off the snapshot will not be updated. realmReference.close() // Close current reference - _snapshot.value?.let { + _snapshot.value.let { log.trace("$this CLOSE-ACTIVE ${it.version()}") it.close() } diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt index 64d6ed3eaf..85f415f028 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt @@ -82,9 +82,8 @@ public class RealmImpl private constructor( // closed. internal val realmStateFlow = MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) - - private var _realmReference: AtomicRef = atomic(null) - private val realmReferenceLock = SynchronizableObject() + // Initial realm reference that would be used until the notifier or writer are available. + internal var initialRealmReference: AtomicRef = atomic(null) /** * The current Realm reference that points to the underlying frozen C++ SharedRealm. @@ -99,7 +98,7 @@ public class RealmImpl private constructor( // Maybe we could just rely on the notifier to issue the initial frozen version, but that // would require us to sync that up. Didn't address this as we already have a todo on fixing // constructing the initial frozen version in the initialization of updatableRealm. - private val versionTracker = VersionTracker(this, log) + internal val versionTracker = VersionTracker(this, log) // Injection point for synchronized Realms. This property should only be used to hold state // required by synchronized realms. See `SyncedRealmContext` for more details. @@ -131,13 +130,17 @@ public class RealmImpl private constructor( } val (frozenReference, fileCreated) = configuration.openRealm(this@RealmImpl) realmFileCreated = assetFileCopied || fileCreated - versionTracker.trackAndCloseExpiredReferences(frozenReference) - _realmReference.value = frozenReference + versionTracker.trackReference(frozenReference) + initialRealmReference.value = frozenReference configuration.initializeRealmData(this@RealmImpl, realmFileCreated) } realmScope.launch { notifier.realmChanged().collect { + removeInitialRealmReference() + // Closing this reference might be done by the GC: + // https://github.com/realm/realm-kotlin/issues/1527 + versionTracker.closeExpiredReferences() notifierFlow.emit(UpdatedRealmImpl(this@RealmImpl)) } } @@ -226,39 +229,44 @@ public class RealmImpl private constructor( return notifier.registerObserver(t) } - public fun realmReference(): FrozenRealmReference { - realmReferenceLock.withLock { - val value1 = _realmReference.value - // We don't consider advancing the version if is is already closed. - value1?.let { - if (it.isClosed()) return it - } - - // Consider versions of current realm, notifier and writer to identify if we should - // advance the user facing realms version to a newer frozen snapshot. - val version = value1?.version() - val notifierSnapshot = notifier.version - val writerSnapshot = writer.version - - var newest: LiveRealmHolder? = null - if (notifierSnapshot != null && version != null && notifierSnapshot > version) { - newest = notifier - } - @Suppress("ComplexCondition") - if (writerSnapshot != null && version != null && ((writerSnapshot > version) || (notifierSnapshot != null && writerSnapshot > notifierSnapshot))) { - newest = writer - } - if (newest != null) { - _realmReference.value = newest.snapshot - log.debug("$this ADVANCING $version -> ${_realmReference.value?.version()}") - } + /** + * Removes the local reference to start relying on the notifier - writer for snapshots. + */ + private fun removeInitialRealmReference() { + if (initialRealmReference.value != null) { + log.trace("REMOVING INITIAL VERSION") + // There is at least a new version available in the notifier, stop tracking the local one + initialRealmReference.value = null } - return _realmReference.value ?: sdkError("Accessing realmReference before realm has been opened") + } + + public fun realmReference(): FrozenRealmReference { + // We don't require to return the latest snapshot to the user but the closest the best. + // `initialRealmReference` is accessed from different threads, grab a copy to safely operate on it. + return initialRealmReference.value.let { localReference -> + // Find whether the user-facing, notifier or writer has the latest snapshot. + // Sort is stable, it will try to preserve the following order. + listOf( + { localReference } to localReference?.uncheckedVersion(), + { writer.snapshot } to writer.version, + { notifier.snapshot } to notifier.version, + ).sortedByDescending { + it.second + }.first().first.invoke() + } ?: sdkError("Accessing realmReference before realm has been opened") } public fun activeVersions(): VersionInfo { - val mainVersions: VersionData? = _realmReference.value?.let { VersionData(it.uncheckedVersion(), versionTracker.versions()) } - return VersionInfo(mainVersions, notifier.versions(), writer.versions()) + val mainVersions: VersionData = VersionData( + current = initialRealmReference.value?.uncheckedVersion(), + active = versionTracker.versions() + ) + + return VersionInfo( + main = mainVersions, + notifier = notifier.versions(), + writer = writer.versions() + ) } override fun close() { diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt index 3d8bfb49e4..260b254399 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt @@ -53,7 +53,7 @@ internal class SuspendableNotifier( private inner class NotifierRealm : LiveRealm( owner = owner, configuration = owner.configuration, - scheduler = scheduler + scheduler = scheduler, ) { // This is guaranteed to be triggered before any other notifications for the same // update as we get all callbacks on the same single thread dispatcher diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableWriter.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableWriter.kt index e18f20b11a..295c2637f7 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableWriter.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableWriter.kt @@ -59,7 +59,7 @@ internal class SuspendableWriter( LiveRealm( owner = owner, configuration = owner.configuration, - scheduler = scheduler + scheduler = scheduler, ), InternalMutableRealm, InternalTypedRealm, @@ -85,6 +85,7 @@ internal class SuspendableWriter( // Must only be accessed from the dispatchers thread override val realm: WriterRealm by realmInitializer + private val shouldClose = kotlinx.atomicfu.atomic(false) private val transactionMutex = Mutex(false) diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionData.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionData.kt index e964136c3f..7e1fe0656a 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionData.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionData.kt @@ -21,6 +21,6 @@ import io.realm.kotlin.VersionId /** * Version meta data for a single instance. */ -public data class VersionData(val current: VersionId, val active: Set) { - val versions: Set = setOf(current) + active +public data class VersionData(val current: VersionId?, val active: Set) { + val versions: Set = (current?.let { setOf(it) } ?: emptySet()) + active } diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionInfo.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionInfo.kt index 2d73960d9b..d18bac8d4c 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionInfo.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionInfo.kt @@ -22,7 +22,7 @@ import io.realm.kotlin.VersionId * Version meta data for an overall [Realm]-instance with [VersionData] for the user-facing [Realm] * and the underlying [SuspendableNotifier]'s and [SuspendableWriter]'s live realms. */ -public data class VersionInfo(val main: VersionData?, val notifier: VersionData?, val writer: VersionData?) { +public data class VersionInfo(val main: VersionData, val notifier: VersionData?, val writer: VersionData?) { val all: Set = setOf(main, notifier, writer).mapNotNull { it?.versions }.flatten().toSet() val allTracked: Set = setOf(main, notifier, writer).mapNotNull { it?.active }.flatten().toSet() } diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionTracker.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionTracker.kt index e4b47096bd..f0d75d5831 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionTracker.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/VersionTracker.kt @@ -23,6 +23,7 @@ import io.realm.kotlin.internal.platform.WeakReference import kotlinx.atomicfu.AtomicRef import kotlinx.atomicfu.atomic +internal typealias IntermediateReference = Pair> /** * Bookkeeping of intermediate versions that needs to be closed when no longer referenced or when * explicitly closing a realm. @@ -32,14 +33,33 @@ import kotlinx.atomicfu.atomic internal class VersionTracker(private val owner: BaseRealmImpl, private val log: ContextLogger) { // Set of currently open realms. Storing the native pointer explicitly to enable us to close // the realm when the RealmReference is no longer referenced anymore. - private val intermediateReferences: AtomicRef>>> = atomic(mutableSetOf()) + private val intermediateReferences: AtomicRef> = + atomic(mutableSetOf()) - fun trackAndCloseExpiredReferences(realmReference: FrozenRealmReference? = null) { - val references = mutableSetOf>>() - realmReference?.let { + fun trackReference(realmReference: FrozenRealmReference) { + // We need a new object to update the atomic reference + val references = mutableSetOf().apply { + addAll(intermediateReferences.value) + } + + realmReference.let { log.trace("$owner TRACK-VERSION ${realmReference.version()}") references.add(Pair(realmReference.dbPointer, WeakReference(it))) } + + intermediateReferences.value = references + } + /** + * Closes any realm reference that has been reclaimed by the GC. + * + * @return false if there is no reference left to clean. + */ + // Closing expired references might be done by the GC: + // https://github.com/realm/realm-kotlin/issues/1527 + fun closeExpiredReferences() { + // We need a new object to update the atomic reference + val references = mutableSetOf() + intermediateReferences.value.forEach { entry -> val (pointer, ref) = entry if (ref.get() == null) { @@ -49,6 +69,7 @@ internal class VersionTracker(private val owner: BaseRealmImpl, private val log: references.add(entry) } } + intermediateReferences.value = references } diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt index 919c21ba0d..6a28982495 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt @@ -32,6 +32,7 @@ import io.realm.kotlin.log.RealmLog import io.realm.kotlin.notifications.RealmChange import io.realm.kotlin.notifications.ResultsChange import io.realm.kotlin.test.platform.PlatformUtils +import io.realm.kotlin.test.util.receiveOrFail import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel @@ -83,7 +84,7 @@ class VersionTrackingTests { assertEquals(1, allTracked.size) // The notifier might or might not had time to run notifier?.let { - assertEquals(2, it.current.version) + assertEquals(2, it.current?.version) assertEquals(0, it.active.size) } assertNull(writer) @@ -211,6 +212,45 @@ class VersionTrackingTests { samples.map { it.list.version() }.joinToString { it.toString() } ) } + + @Test + @Suppress("invisible_member", "invisible_reference") + fun initialVersionDereferencedAfterFirstWrite() { + (realm as RealmImpl).let { realm -> + assertNotNull(realm.initialRealmReference.value, toString()) + assertEquals(1, realm.versionTracker.versions().size, toString()) + + val realmUpdates = Channel(1) + + runBlocking { + val deferred = async { + realm.asFlow().collect { + realmUpdates.trySend(Unit) + } + } + + // Wait for the notifier to start + realmUpdates.receiveOrFail() + + realm.write { } + + // Wait for the notifier to start + realmUpdates.receiveOrFail() + + assertNull(realm.initialRealmReference.value, toString()) + assertEquals(1, realm.versionTracker.versions().size, toString()) + + deferred.cancel() + realmUpdates.close() + } + } + } +} + +@Suppress("invisible_member", "invisible_reference") +internal fun Realm.userFacingRealmVersions(): Int = (this as RealmImpl).let { realm -> + if (realm.initialRealmReference.value != null) 1 + else 0 } internal fun Realm.activeVersions(): VersionInfo = (this as RealmImpl).activeVersions()