Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid initial version pinning #1519

Merged
merged 38 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d80bae5
Avoid initial version pinning
clementetb Sep 13, 2023
5ee6feb
Add changelog entry
clementetb Sep 13, 2023
369b4d8
Let the notifier handle GC snapshots
clementetb Sep 14, 2023
4cfc9f7
Revert "Let the notifier handle GC snapshots"
clementetb Sep 15, 2023
aca8768
Revert "Avoid initial version pinning"
clementetb Sep 15, 2023
15fe501
Split version tracker tracking functions
clementetb Sep 15, 2023
947d0ed
Remove on initial
clementetb Sep 15, 2023
6589248
Trigger clean up of GC tracked references
clementetb Sep 15, 2023
f77011b
Clean up
clementetb Sep 15, 2023
1bd1f6f
Merge branch 'releases' into ct/fix-initialversion-pinned
clementetb Sep 15, 2023
ef03369
Amend race condition
clementetb Sep 15, 2023
58b8b3f
move event
clementetb Sep 15, 2023
ab31164
Crash fix
clementetb Sep 16, 2023
664b8f8
linting
clementetb Sep 16, 2023
fffe381
Fix race condition
clementetb Sep 16, 2023
4b8be5d
linting
clementetb Sep 16, 2023
cf7fbdd
Remove prints
clementetb Sep 17, 2023
7794a45
do not notify on init
clementetb Sep 17, 2023
8c0082d
Fix race condition with lazy property
clementetb Sep 17, 2023
907bf96
Update versiontracker references atomically
clementetb Sep 18, 2023
ac32b82
track all references references
clementetb Sep 18, 2023
6c30a6a
Revert changes
clementetb Sep 18, 2023
aed35f4
Clean up
clementetb Sep 18, 2023
4a192e9
point to possible improvements
clementetb Sep 25, 2023
4e9c025
Use exisiting notification channel
clementetb Sep 27, 2023
38f7da9
Revert atomizing isInitialized
clementetb Sep 27, 2023
d5bd1f7
Remove main from tracking tests
clementetb Sep 27, 2023
c52a6f5
linting
clementetb Sep 27, 2023
5dfa145
Cleanup
clementetb Sep 27, 2023
5d67d1c
PR change requests
clementetb Sep 28, 2023
cecd2e1
Move initial realm instance lifecycle to notifier realm
clementetb Oct 16, 2023
2f29652
linting
clementetb Oct 16, 2023
963e505
Revert "linting"
clementetb Oct 23, 2023
da38e82
Revert "Move initial realm instance lifecycle to notifier realm"
clementetb Oct 23, 2023
4e8c51f
Update test cases
clementetb Oct 25, 2023
514a514
Merge branch 'releases' into ct/fix-initialversion-pinned
clementetb Oct 27, 2023
9305124
Update packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/…
clementetb Oct 30, 2023
edc44a4
PR change requests
clementetb Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* `Realm.getNumberOfActiveVersions` now returns the actual number of active versions. (Core issue [#6960](https://github.com/realm/realm-core/pull/6960))
* Fixed memory leak on Darwin caused by a reference cycle between resources and the GC cleaner. (Issue [#1530](https://github.com/realm/realm-kotlin/pull/1530))
* Fixed memory leaks on the JVM platform, see PR for more information. (Issue [#1526](https://github.com/realm/realm-kotlin/pull/1526))
* Removed pin on the initial realm version after opening a Realm. (Issue [#1519](https://github.com/realm/realm-kotlin/pull/1519))
clementetb marked this conversation as resolved.
Show resolved Hide resolved

### Compatibility
* File format: Generates Realms with file format v23.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ internal abstract class LiveRealm(
internal fun gcTrackedSnapshot(): FrozenRealmReference {
return snapshotLock.withLock {
_snapshot.value.also { snapshot ->
if (_closeSnapshotWhenAdvancing) {
if (_closeSnapshotWhenAdvancing && !snapshot.isClosed()) {
log.trace("${this@LiveRealm} ENABLE-TRACKING ${snapshot.version()}")
_closeSnapshotWhenAdvancing = false
}
Expand Down Expand Up @@ -129,14 +129,14 @@ internal abstract class LiveRealm(
log.trace("${this@LiveRealm} CLOSE-UNTRACKED $version")
_snapshot.value.close()
} else {
// TODO Split into track and clean up as we don't need to hold headLock while
// cleaning up as version tracker is only accessed from the same thread
versionTracker.trackAndCloseExpiredReferences(_snapshot.value)
versionTracker.trackReference(_snapshot.value)
}
_snapshot.value = realmReference.snapshot(owner)
log.trace("${this@LiveRealm} ADVANCING $version -> ${_snapshot.value.version()}")
_closeSnapshotWhenAdvancing = true
}

versionTracker.closeExpiredReferences()
}

protected open fun onSchemaChanged(schema: RealmSchemaPointer) {
Expand All @@ -156,7 +156,7 @@ internal abstract class LiveRealm(
// Close actual live reference. From this point off the snapshot will not be updated.
realmReference.close()
// Close current reference
_snapshot.value?.let {
_snapshot.value.let {
log.trace("$this CLOSE-ACTIVE ${it.version()}")
it.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ public class RealmImpl private constructor(
// closed.
internal val realmStateFlow =
MutableSharedFlow<State>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

private var _realmReference: AtomicRef<FrozenRealmReference?> = atomic(null)
private val realmReferenceLock = SynchronizableObject()
// Initial realm reference that would be used until the notifier or writer are available.
internal var initialRealmReference: AtomicRef<FrozenRealmReference?> = atomic(null)

/**
* The current Realm reference that points to the underlying frozen C++ SharedRealm.
Expand All @@ -99,7 +98,7 @@ public class RealmImpl private constructor(
// Maybe we could just rely on the notifier to issue the initial frozen version, but that
// would require us to sync that up. Didn't address this as we already have a todo on fixing
// constructing the initial frozen version in the initialization of updatableRealm.
private val versionTracker = VersionTracker(this, log)
internal val versionTracker = VersionTracker(this, log)

// Injection point for synchronized Realms. This property should only be used to hold state
// required by synchronized realms. See `SyncedRealmContext` for more details.
Expand Down Expand Up @@ -131,13 +130,17 @@ public class RealmImpl private constructor(
}
val (frozenReference, fileCreated) = configuration.openRealm(this@RealmImpl)
realmFileCreated = assetFileCopied || fileCreated
versionTracker.trackAndCloseExpiredReferences(frozenReference)
_realmReference.value = frozenReference
versionTracker.trackReference(frozenReference)
initialRealmReference.value = frozenReference
configuration.initializeRealmData(this@RealmImpl, realmFileCreated)
}

realmScope.launch {
notifier.realmChanged().collect {
removeInitialRealmReference()
clementetb marked this conversation as resolved.
Show resolved Hide resolved
clementetb marked this conversation as resolved.
Show resolved Hide resolved
// Closing this reference might be done by the GC:
// https://github.com/realm/realm-kotlin/issues/1527
versionTracker.closeExpiredReferences()
notifierFlow.emit(UpdatedRealmImpl(this@RealmImpl))
}
}
Expand Down Expand Up @@ -226,41 +229,38 @@ public class RealmImpl private constructor(
return notifier.registerObserver(t)
}

public fun realmReference(): FrozenRealmReference {
realmReferenceLock.withLock {
val value1 = _realmReference.value
// We don't consider advancing the version if is is already closed.
value1?.let {
if (it.isClosed()) return it
}

// Consider versions of current realm, notifier and writer to identify if we should
// advance the user facing realms version to a newer frozen snapshot.
val version = value1?.version()
val notifierSnapshot = notifier.version
val writerSnapshot = writer.version

var newest: LiveRealmHolder<LiveRealm>? = null
if (notifierSnapshot != null && version != null && notifierSnapshot > version) {
newest = notifier
}
@Suppress("ComplexCondition")
if (writerSnapshot != null && version != null && ((writerSnapshot > version) || (notifierSnapshot != null && writerSnapshot > notifierSnapshot))) {
newest = writer
}
if (newest != null) {
_realmReference.value = newest.snapshot
log.debug("$this ADVANCING $version -> ${_realmReference.value?.version()}")
}
/**
* Removes the local reference to start relying on the notifier - writer for snapshots.
*/
private fun removeInitialRealmReference() {
clementetb marked this conversation as resolved.
Show resolved Hide resolved
clementetb marked this conversation as resolved.
Show resolved Hide resolved
if (initialRealmReference.value != null) {
log.trace("REMOVING INITIAL VERSION")
// There is at least a new version available in the notifier, stop tracking the local one
initialRealmReference.value = null
}
return _realmReference.value ?: sdkError("Accessing realmReference before realm has been opened")
}

public fun activeVersions(): VersionInfo {
val mainVersions: VersionData? = _realmReference.value?.let { VersionData(it.uncheckedVersion(), versionTracker.versions()) }
return VersionInfo(mainVersions, notifier.versions(), writer.versions())
public fun realmReference(): FrozenRealmReference {
// We don't require to return the latest snapshot to the user but the closest the best.
// `initialRealmReference` is accessed from different threads, grab a copy to safely operate on it.
return initialRealmReference.value.let { localReference ->
// Find whether the user-facing, notifier or writer has the latest snapshot.
// Sort is stable, it will try to preserve the following order.
listOf(
{ localReference } to localReference?.uncheckedVersion(),
{ notifier.snapshot } to notifier.version,
{ writer.snapshot } to writer.version
).sortedByDescending {
it.second
}.first().first()
clementetb marked this conversation as resolved.
Show resolved Hide resolved
} ?: sdkError("Accessing realmReference before realm has been opened")
}

public fun activeVersions(): VersionInfo = VersionInfo(
notifier = notifier.versions(),
writer = writer.versions()
)

override fun close() {
// TODO Reconsider this constraint. We have the primitives to check is we are on the
// writer thread and just close the realm in writer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal class SuspendableNotifier(
private inner class NotifierRealm : LiveRealm(
owner = owner,
configuration = owner.configuration,
scheduler = scheduler
scheduler = scheduler,
) {
// This is guaranteed to be triggered before any other notifications for the same
// update as we get all callbacks on the same single thread dispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ internal class SuspendableWriter(
LiveRealm(
owner = owner,
configuration = owner.configuration,
scheduler = scheduler
scheduler = scheduler,
),
InternalMutableRealm,
InternalTypedRealm,
Expand All @@ -85,6 +85,7 @@ internal class SuspendableWriter(

// Must only be accessed from the dispatchers thread
override val realm: WriterRealm by realmInitializer

private val shouldClose = kotlinx.atomicfu.atomic<Boolean>(false)
private val transactionMutex = Mutex(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.realm.kotlin.VersionId
* Version meta data for an overall [Realm]-instance with [VersionData] for the user-facing [Realm]
* and the underlying [SuspendableNotifier]'s and [SuspendableWriter]'s live realms.
*/
public data class VersionInfo(val main: VersionData?, val notifier: VersionData?, val writer: VersionData?) {
val all: Set<VersionId> = setOf(main, notifier, writer).mapNotNull { it?.versions }.flatten().toSet()
val allTracked: Set<VersionId> = setOf(main, notifier, writer).mapNotNull { it?.active }.flatten().toSet()
public data class VersionInfo(val notifier: VersionData?, val writer: VersionData?) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we track the initial version anymore? Isn't it as important to know if the initial version is released as it is with the versions tracked by the notifier and writer 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed because I was testing it now in a different test case: initialVersionDereferencedAfterFirstWrite. But it is true that it is better to keep it.

val all: Set<VersionId> = setOf(notifier, writer).mapNotNull { it?.versions }.flatten().toSet()
val allTracked: Set<VersionId> = setOf(notifier, writer).mapNotNull { it?.active }.flatten().toSet()
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.realm.kotlin.internal.platform.WeakReference
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic

internal typealias IntermediateReference = Pair<RealmPointer, WeakReference<RealmReference>>
/**
* Bookkeeping of intermediate versions that needs to be closed when no longer referenced or when
* explicitly closing a realm.
Expand All @@ -32,14 +33,33 @@ import kotlinx.atomicfu.atomic
internal class VersionTracker(private val owner: BaseRealmImpl, private val log: ContextLogger) {
// Set of currently open realms. Storing the native pointer explicitly to enable us to close
// the realm when the RealmReference is no longer referenced anymore.
private val intermediateReferences: AtomicRef<Set<Pair<RealmPointer, WeakReference<RealmReference>>>> = atomic(mutableSetOf())
private val intermediateReferences: AtomicRef<MutableSet<IntermediateReference>> =
clementetb marked this conversation as resolved.
Show resolved Hide resolved
atomic(mutableSetOf())

fun trackAndCloseExpiredReferences(realmReference: FrozenRealmReference? = null) {
val references = mutableSetOf<Pair<RealmPointer, WeakReference<RealmReference>>>()
realmReference?.let {
fun trackReference(realmReference: FrozenRealmReference) {
// We need a new object to update the atomic reference
val references = mutableSetOf<IntermediateReference>().apply {
addAll(intermediateReferences.value)
}

realmReference.let {
log.trace("$owner TRACK-VERSION ${realmReference.version()}")
references.add(Pair(realmReference.dbPointer, WeakReference(it)))
}

intermediateReferences.value = references
}
/**
* Closes any realm reference that has been reclaimed by the GC.
*
* @return false if there is no reference left to clean.
*/
// Closing expired references might be done by the GC:
// https://github.com/realm/realm-kotlin/issues/1527
fun closeExpiredReferences() {
// We need a new object to update the atomic reference
val references = mutableSetOf<IntermediateReference>()

intermediateReferences.value.forEach { entry ->
val (pointer, ref) = entry
if (ref.get() == null) {
Expand All @@ -49,6 +69,7 @@ internal class VersionTracker(private val owner: BaseRealmImpl, private val log:
references.add(entry)
}
}

intermediateReferences.value = references
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.realm.kotlin.log.RealmLog
import io.realm.kotlin.notifications.RealmChange
import io.realm.kotlin.notifications.ResultsChange
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.test.util.receiveOrFail
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
Expand Down Expand Up @@ -79,8 +80,8 @@ class VersionTrackingTests {
@Test
fun open() = runBlocking {
realm.activeVersions().run {
assertEquals(1, all.size)
assertEquals(1, allTracked.size)
assertEquals(0, all.size)
assertEquals(0, allTracked.size)
// The notifier might or might not had time to run
notifier?.let {
assertEquals(2, it.current.version)
Expand All @@ -93,51 +94,48 @@ class VersionTrackingTests {
@Test
fun write_voidBlockIsNotTracked() = runBlocking {
realm.activeVersions().run {
assertEquals(1, all.size)
assertEquals(1, allTracked.size)
assertEquals(0, all.size)
assertEquals(0, allTracked.size)
assertNull(writer)
}

// Write that doesn't return objects does not trigger tracking additional versions
realm.write<Unit> { copyToRealm(Sample()) }
realm.activeVersions().run {
assertEquals(1, allTracked.size, toString())
assertEquals(0, allTracked.size, toString())
assertNotNull(writer, toString())
assertEquals(0, writer?.active?.size, toString())
}

// Until we actually query the object
realm.query<Sample>().find()
realm.activeVersions().run {
assertEquals(2, allTracked.size, toString())
assertEquals(1, allTracked.size, toString())
assertNotNull(writer, toString())
assertEquals(1, writer?.active?.size, toString())
}
Unit
}

@Test
fun write_returnedObjectIsTracked() = runBlocking {
realm.activeVersions().run {
assertEquals(1, all.size)
assertEquals(1, allTracked.size)
assertEquals(0, all.size)
assertNull(writer)
}

// Or if we immediately return the frozen object instance (object is returned even though
// not assigned to a variable unless the generic return type is <Unit>)
realm.write { copyToRealm(Sample()) }
realm.activeVersions().run {
assertEquals(2, allTracked.size, toString())
assertEquals(1, allTracked.size, toString())
assertNotNull(writer, toString())
assertEquals(1, writer?.active?.size, toString())
}
Unit
}

@Test
fun realmAsFlow_doesNotTrackVersions() = runBlocking {
realm.activeVersions().run {
assertEquals(1, all.size)
assertEquals(1, allTracked.size)
assertEquals(0, all.size)
assertNull(writer)
}

Expand All @@ -149,7 +147,7 @@ class VersionTrackingTests {
realm.write<Unit> { copyToRealm(Sample()) }
realm.write<Unit> { copyToRealm(Sample()) }
realm.activeVersions().run {
assertEquals(1, allTracked.size, toString())
assertEquals(0, allTracked.size, toString())
assertNotNull(notifier, toString())
assertEquals(0, notifier?.active?.size, toString())
assertNotNull(writer, toString())
Expand All @@ -161,8 +159,7 @@ class VersionTrackingTests {
@Test
fun objectNotificationsCausesTracking() = runBlocking {
realm.activeVersions().run {
assertEquals(1, all.size)
assertEquals(1, allTracked.size)
assertEquals(0, all.size)
assertNull(writer)
}

Expand Down Expand Up @@ -211,6 +208,41 @@ class VersionTrackingTests {
samples.map { it.list.version() }.joinToString { it.toString() }
)
}

@Test
@Suppress("invisible_member", "invisible_reference")
fun initialVersionDereferencedAfterFirstWrite() {
(realm as RealmImpl).let { realm ->
realm.initialRealmReference.value

clementetb marked this conversation as resolved.
Show resolved Hide resolved
assertNotNull(realm.initialRealmReference.value, toString())
assertEquals(1, realm.versionTracker.versions().size, toString())

val realmUpdates = Channel<Unit>(1)

runBlocking {
val deferred = async {
realm.asFlow().collect {
realmUpdates.trySend(Unit)
}
}

// Wait for the notifier to start
realmUpdates.receiveOrFail()

realm.write { }

// Wait for the notifier to start
realmUpdates.receiveOrFail()

assertNull(realm.initialRealmReference.value, toString())
assertEquals(1, realm.versionTracker.versions().size, toString())

deferred.cancel()
realmUpdates.close()
}
}
}
}

internal fun Realm.activeVersions(): VersionInfo = (this as RealmImpl).activeVersions()