Skip to content

Commit

Permalink
Avoid initial version pinning (#1519)
Browse files Browse the repository at this point in the history
  • Loading branch information
clementetb authored Oct 31, 2023
1 parent e0ad270 commit 161ea05
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* `Realm.getNumberOfActiveVersions` now returns the actual number of active versions. (Core issue [#6960](https://github.com/realm/realm-core/pull/6960))
* Fixed memory leak on Darwin caused by a reference cycle between resources and the GC cleaner. (Issue [#1530](https://github.com/realm/realm-kotlin/pull/1530))
* Fixed memory leaks on the JVM platform, see PR for more information. (Issue [#1526](https://github.com/realm/realm-kotlin/pull/1526))
* Removed pin on the initial realm version after opening a Realm. (Issue [#1519](https://github.com/realm/realm-kotlin/pull/1519))

### Compatibility
* File format: Generates Realms with file format v23.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ internal abstract class LiveRealm(
internal fun gcTrackedSnapshot(): FrozenRealmReference {
return snapshotLock.withLock {
_snapshot.value.also { snapshot ->
if (_closeSnapshotWhenAdvancing) {
if (_closeSnapshotWhenAdvancing && !snapshot.isClosed()) {
log.trace("${this@LiveRealm} ENABLE-TRACKING ${snapshot.version()}")
_closeSnapshotWhenAdvancing = false
}
Expand Down Expand Up @@ -129,14 +129,14 @@ internal abstract class LiveRealm(
log.trace("${this@LiveRealm} CLOSE-UNTRACKED $version")
_snapshot.value.close()
} else {
// TODO Split into track and clean up as we don't need to hold headLock while
// cleaning up as version tracker is only accessed from the same thread
versionTracker.trackAndCloseExpiredReferences(_snapshot.value)
versionTracker.trackReference(_snapshot.value)
}
_snapshot.value = realmReference.snapshot(owner)
log.trace("${this@LiveRealm} ADVANCING $version -> ${_snapshot.value.version()}")
_closeSnapshotWhenAdvancing = true
}

versionTracker.closeExpiredReferences()
}

protected open fun onSchemaChanged(schema: RealmSchemaPointer) {
Expand All @@ -156,7 +156,7 @@ internal abstract class LiveRealm(
// Close actual live reference. From this point off the snapshot will not be updated.
realmReference.close()
// Close current reference
_snapshot.value?.let {
_snapshot.value.let {
log.trace("$this CLOSE-ACTIVE ${it.version()}")
it.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ public class RealmImpl private constructor(
// closed.
internal val realmStateFlow =
MutableSharedFlow<State>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

private var _realmReference: AtomicRef<FrozenRealmReference?> = atomic(null)
private val realmReferenceLock = SynchronizableObject()
// Initial realm reference that would be used until the notifier or writer are available.
internal var initialRealmReference: AtomicRef<FrozenRealmReference?> = atomic(null)

/**
* The current Realm reference that points to the underlying frozen C++ SharedRealm.
Expand All @@ -99,7 +98,7 @@ public class RealmImpl private constructor(
// Maybe we could just rely on the notifier to issue the initial frozen version, but that
// would require us to sync that up. Didn't address this as we already have a todo on fixing
// constructing the initial frozen version in the initialization of updatableRealm.
private val versionTracker = VersionTracker(this, log)
internal val versionTracker = VersionTracker(this, log)

// Injection point for synchronized Realms. This property should only be used to hold state
// required by synchronized realms. See `SyncedRealmContext` for more details.
Expand Down Expand Up @@ -131,13 +130,17 @@ public class RealmImpl private constructor(
}
val (frozenReference, fileCreated) = configuration.openRealm(this@RealmImpl)
realmFileCreated = assetFileCopied || fileCreated
versionTracker.trackAndCloseExpiredReferences(frozenReference)
_realmReference.value = frozenReference
versionTracker.trackReference(frozenReference)
initialRealmReference.value = frozenReference
configuration.initializeRealmData(this@RealmImpl, realmFileCreated)
}

realmScope.launch {
notifier.realmChanged().collect {
removeInitialRealmReference()
// Closing this reference might be done by the GC:
// https://github.com/realm/realm-kotlin/issues/1527
versionTracker.closeExpiredReferences()
notifierFlow.emit(UpdatedRealmImpl(this@RealmImpl))
}
}
Expand Down Expand Up @@ -226,39 +229,44 @@ public class RealmImpl private constructor(
return notifier.registerObserver(t)
}

public fun realmReference(): FrozenRealmReference {
realmReferenceLock.withLock {
val value1 = _realmReference.value
// We don't consider advancing the version if is is already closed.
value1?.let {
if (it.isClosed()) return it
}

// Consider versions of current realm, notifier and writer to identify if we should
// advance the user facing realms version to a newer frozen snapshot.
val version = value1?.version()
val notifierSnapshot = notifier.version
val writerSnapshot = writer.version

var newest: LiveRealmHolder<LiveRealm>? = null
if (notifierSnapshot != null && version != null && notifierSnapshot > version) {
newest = notifier
}
@Suppress("ComplexCondition")
if (writerSnapshot != null && version != null && ((writerSnapshot > version) || (notifierSnapshot != null && writerSnapshot > notifierSnapshot))) {
newest = writer
}
if (newest != null) {
_realmReference.value = newest.snapshot
log.debug("$this ADVANCING $version -> ${_realmReference.value?.version()}")
}
/**
* Removes the local reference to start relying on the notifier - writer for snapshots.
*/
private fun removeInitialRealmReference() {
if (initialRealmReference.value != null) {
log.trace("REMOVING INITIAL VERSION")
// There is at least a new version available in the notifier, stop tracking the local one
initialRealmReference.value = null
}
return _realmReference.value ?: sdkError("Accessing realmReference before realm has been opened")
}

public fun realmReference(): FrozenRealmReference {
// We don't require to return the latest snapshot to the user but the closest the best.
// `initialRealmReference` is accessed from different threads, grab a copy to safely operate on it.
return initialRealmReference.value.let { localReference ->
// Find whether the user-facing, notifier or writer has the latest snapshot.
// Sort is stable, it will try to preserve the following order.
listOf(
{ localReference } to localReference?.uncheckedVersion(),
{ writer.snapshot } to writer.version,
{ notifier.snapshot } to notifier.version,
).sortedByDescending {
it.second
}.first().first.invoke()
} ?: sdkError("Accessing realmReference before realm has been opened")
}

public fun activeVersions(): VersionInfo {
val mainVersions: VersionData? = _realmReference.value?.let { VersionData(it.uncheckedVersion(), versionTracker.versions()) }
return VersionInfo(mainVersions, notifier.versions(), writer.versions())
val mainVersions: VersionData = VersionData(
current = initialRealmReference.value?.uncheckedVersion(),
active = versionTracker.versions()
)

return VersionInfo(
main = mainVersions,
notifier = notifier.versions(),
writer = writer.versions()
)
}

override fun close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal class SuspendableNotifier(
private inner class NotifierRealm : LiveRealm(
owner = owner,
configuration = owner.configuration,
scheduler = scheduler
scheduler = scheduler,
) {
// This is guaranteed to be triggered before any other notifications for the same
// update as we get all callbacks on the same single thread dispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ internal class SuspendableWriter(
LiveRealm(
owner = owner,
configuration = owner.configuration,
scheduler = scheduler
scheduler = scheduler,
),
InternalMutableRealm,
InternalTypedRealm,
Expand All @@ -85,6 +85,7 @@ internal class SuspendableWriter(

// Must only be accessed from the dispatchers thread
override val realm: WriterRealm by realmInitializer

private val shouldClose = kotlinx.atomicfu.atomic<Boolean>(false)
private val transactionMutex = Mutex(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ import io.realm.kotlin.VersionId
/**
* Version meta data for a single instance.
*/
public data class VersionData(val current: VersionId, val active: Set<VersionId>) {
val versions: Set<VersionId> = setOf(current) + active
public data class VersionData(val current: VersionId?, val active: Set<VersionId>) {
val versions: Set<VersionId> = (current?.let { setOf(it) } ?: emptySet()) + active
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.realm.kotlin.VersionId
* Version meta data for an overall [Realm]-instance with [VersionData] for the user-facing [Realm]
* and the underlying [SuspendableNotifier]'s and [SuspendableWriter]'s live realms.
*/
public data class VersionInfo(val main: VersionData?, val notifier: VersionData?, val writer: VersionData?) {
public data class VersionInfo(val main: VersionData, val notifier: VersionData?, val writer: VersionData?) {
val all: Set<VersionId> = setOf(main, notifier, writer).mapNotNull { it?.versions }.flatten().toSet()
val allTracked: Set<VersionId> = setOf(main, notifier, writer).mapNotNull { it?.active }.flatten().toSet()
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.realm.kotlin.internal.platform.WeakReference
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic

internal typealias IntermediateReference = Pair<RealmPointer, WeakReference<RealmReference>>
/**
* Bookkeeping of intermediate versions that needs to be closed when no longer referenced or when
* explicitly closing a realm.
Expand All @@ -32,14 +33,33 @@ import kotlinx.atomicfu.atomic
internal class VersionTracker(private val owner: BaseRealmImpl, private val log: ContextLogger) {
// Set of currently open realms. Storing the native pointer explicitly to enable us to close
// the realm when the RealmReference is no longer referenced anymore.
private val intermediateReferences: AtomicRef<Set<Pair<RealmPointer, WeakReference<RealmReference>>>> = atomic(mutableSetOf())
private val intermediateReferences: AtomicRef<Set<IntermediateReference>> =
atomic(mutableSetOf())

fun trackAndCloseExpiredReferences(realmReference: FrozenRealmReference? = null) {
val references = mutableSetOf<Pair<RealmPointer, WeakReference<RealmReference>>>()
realmReference?.let {
fun trackReference(realmReference: FrozenRealmReference) {
// We need a new object to update the atomic reference
val references = mutableSetOf<IntermediateReference>().apply {
addAll(intermediateReferences.value)
}

realmReference.let {
log.trace("$owner TRACK-VERSION ${realmReference.version()}")
references.add(Pair(realmReference.dbPointer, WeakReference(it)))
}

intermediateReferences.value = references
}
/**
* Closes any realm reference that has been reclaimed by the GC.
*
* @return false if there is no reference left to clean.
*/
// Closing expired references might be done by the GC:
// https://github.com/realm/realm-kotlin/issues/1527
fun closeExpiredReferences() {
// We need a new object to update the atomic reference
val references = mutableSetOf<IntermediateReference>()

intermediateReferences.value.forEach { entry ->
val (pointer, ref) = entry
if (ref.get() == null) {
Expand All @@ -49,6 +69,7 @@ internal class VersionTracker(private val owner: BaseRealmImpl, private val log:
references.add(entry)
}
}

intermediateReferences.value = references
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.realm.kotlin.log.RealmLog
import io.realm.kotlin.notifications.RealmChange
import io.realm.kotlin.notifications.ResultsChange
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.test.util.receiveOrFail
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
Expand Down Expand Up @@ -83,7 +84,7 @@ class VersionTrackingTests {
assertEquals(1, allTracked.size)
// The notifier might or might not had time to run
notifier?.let {
assertEquals(2, it.current.version)
assertEquals(2, it.current?.version)
assertEquals(0, it.active.size)
}
assertNull(writer)
Expand Down Expand Up @@ -211,6 +212,45 @@ class VersionTrackingTests {
samples.map { it.list.version() }.joinToString { it.toString() }
)
}

@Test
@Suppress("invisible_member", "invisible_reference")
fun initialVersionDereferencedAfterFirstWrite() {
(realm as RealmImpl).let { realm ->
assertNotNull(realm.initialRealmReference.value, toString())
assertEquals(1, realm.versionTracker.versions().size, toString())

val realmUpdates = Channel<Unit>(1)

runBlocking {
val deferred = async {
realm.asFlow().collect {
realmUpdates.trySend(Unit)
}
}

// Wait for the notifier to start
realmUpdates.receiveOrFail()

realm.write { }

// Wait for the notifier to start
realmUpdates.receiveOrFail()

assertNull(realm.initialRealmReference.value, toString())
assertEquals(1, realm.versionTracker.versions().size, toString())

deferred.cancel()
realmUpdates.close()
}
}
}
}

@Suppress("invisible_member", "invisible_reference")
internal fun Realm.userFacingRealmVersions(): Int = (this as RealmImpl).let { realm ->
if (realm.initialRealmReference.value != null) 1
else 0
}

internal fun Realm.activeVersions(): VersionInfo = (this as RealmImpl).activeVersions()

0 comments on commit 161ea05

Please sign in to comment.