Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid initial version pinning #1519

Merged
merged 38 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d80bae5
Avoid initial version pinning
clementetb Sep 13, 2023
5ee6feb
Add changelog entry
clementetb Sep 13, 2023
369b4d8
Let the notifier handle GC snapshots
clementetb Sep 14, 2023
4cfc9f7
Revert "Let the notifier handle GC snapshots"
clementetb Sep 15, 2023
aca8768
Revert "Avoid initial version pinning"
clementetb Sep 15, 2023
15fe501
Split version tracker tracking functions
clementetb Sep 15, 2023
947d0ed
Remove on initial
clementetb Sep 15, 2023
6589248
Trigger clean up of GC tracked references
clementetb Sep 15, 2023
f77011b
Clean up
clementetb Sep 15, 2023
1bd1f6f
Merge branch 'releases' into ct/fix-initialversion-pinned
clementetb Sep 15, 2023
ef03369
Amend race condition
clementetb Sep 15, 2023
58b8b3f
move event
clementetb Sep 15, 2023
ab31164
Crash fix
clementetb Sep 16, 2023
664b8f8
linting
clementetb Sep 16, 2023
fffe381
Fix race condition
clementetb Sep 16, 2023
4b8be5d
linting
clementetb Sep 16, 2023
cf7fbdd
Remove prints
clementetb Sep 17, 2023
7794a45
do not notify on init
clementetb Sep 17, 2023
8c0082d
Fix race condition with lazy property
clementetb Sep 17, 2023
907bf96
Update versiontracker references atomically
clementetb Sep 18, 2023
ac32b82
track all references references
clementetb Sep 18, 2023
6c30a6a
Revert changes
clementetb Sep 18, 2023
aed35f4
Clean up
clementetb Sep 18, 2023
4a192e9
point to possible improvements
clementetb Sep 25, 2023
4e9c025
Use exisiting notification channel
clementetb Sep 27, 2023
38f7da9
Revert atomizing isInitialized
clementetb Sep 27, 2023
d5bd1f7
Remove main from tracking tests
clementetb Sep 27, 2023
c52a6f5
linting
clementetb Sep 27, 2023
5dfa145
Cleanup
clementetb Sep 27, 2023
5d67d1c
PR change requests
clementetb Sep 28, 2023
cecd2e1
Move initial realm instance lifecycle to notifier realm
clementetb Oct 16, 2023
2f29652
linting
clementetb Oct 16, 2023
963e505
Revert "linting"
clementetb Oct 23, 2023
da38e82
Revert "Move initial realm instance lifecycle to notifier realm"
clementetb Oct 23, 2023
4e8c51f
Update test cases
clementetb Oct 25, 2023
514a514
Merge branch 'releases' into ct/fix-initialversion-pinned
clementetb Oct 27, 2023
9305124
Update packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/…
clementetb Oct 30, 2023
edc44a4
PR change requests
clementetb Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

### Fixed
* `Realm.getNumberOfActiveVersions` now returns the actual number of active versions. (Core issue [#6960](https://github.com/realm/realm-core/pull/6960))
* Removed pin on the initial realm version after opening a Realm. (Issue [#1519](https://github.com/realm/realm-kotlin/pull/1519))
clementetb marked this conversation as resolved.
Show resolved Hide resolved

### Compatibility
* File format: Generates Realms with file format v23.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ internal abstract class LiveRealm(
val owner: RealmImpl,
configuration: InternalConfiguration,
private val scheduler: LiveRealmContext,
private val onSnapshotAvailable: () -> Unit = { },
) : BaseRealmImpl(configuration) {

private val realmChangeRegistration: NotificationToken
Expand Down Expand Up @@ -99,7 +100,7 @@ internal abstract class LiveRealm(
internal fun gcTrackedSnapshot(): FrozenRealmReference {
return snapshotLock.withLock {
_snapshot.value.also { snapshot ->
if (_closeSnapshotWhenAdvancing) {
if (_closeSnapshotWhenAdvancing && !snapshot.isClosed()) {
log.trace("${this@LiveRealm} ENABLE-TRACKING ${snapshot.version()}")
_closeSnapshotWhenAdvancing = false
}
Expand All @@ -117,6 +118,7 @@ internal abstract class LiveRealm(
// Always executed on the live realm's backing thread
internal open fun onRealmChanged() {
updateSnapshot()
onSnapshotAvailable()
}
// Always executed on the live realm's backing thread
internal fun updateSnapshot() {
Expand All @@ -129,14 +131,14 @@ internal abstract class LiveRealm(
log.trace("${this@LiveRealm} CLOSE-UNTRACKED $version")
_snapshot.value.close()
} else {
// TODO Split into track and clean up as we don't need to hold headLock while
// cleaning up as version tracker is only accessed from the same thread
versionTracker.trackAndCloseExpiredReferences(_snapshot.value)
versionTracker.trackReference(_snapshot.value)
}
_snapshot.value = realmReference.snapshot(owner)
log.trace("${this@LiveRealm} ADVANCING $version -> ${_snapshot.value.version()}")
_closeSnapshotWhenAdvancing = true
}

versionTracker.closeExpiredReferences()
}

protected open fun onSchemaChanged(schema: RealmSchemaPointer) {
Expand All @@ -156,7 +158,7 @@ internal abstract class LiveRealm(
// Close actual live reference. From this point off the snapshot will not be updated.
realmReference.close()
// Close current reference
_snapshot.value?.let {
_snapshot.value.let {
log.trace("$this CLOSE-ACTIVE ${it.version()}")
it.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,43 @@
package io.realm.kotlin.internal

import io.realm.kotlin.VersionId
import kotlinx.atomicfu.AtomicBoolean
import kotlinx.atomicfu.atomic

/**
* A **live realm holder** encapsulated common properties of [SuspendableWriter] and
* [SuspendableNotifier] for easier access to version information and GC-tracked snapshot
* references when advancing the version of [RealmImpl].
*/
internal abstract class LiveRealmHolder<out LiveRealm> {

abstract val realmInitializer: Lazy<LiveRealm>
abstract val realm: io.realm.kotlin.internal.LiveRealm

/**
* Indicates whether the live realm has been initialized or not.
*/
val isInitialized: AtomicBoolean = atomic(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the realmInitializer not good enough for this? Seems to be serving the exact same purpose 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was runnning into some race conditions so I made it atomic, seems it is no longer an issue.


/**
* Current version of the frozen snapshot reference of the live realm. This is not guaranteed
* to the same version as the actual live realm, but can be used to indicate that we can
* request a more recent GC-tracked snapshot from the [LiveRealmHolder] through [snapshot].
*/
val version: VersionId?
get() = if (realmInitializer.isInitialized()) { realm.snapshotVersion } else null
get() = if (isInitialized.value) { realm.snapshotVersion } else null

/**
* Returns a GC-tracked snapshot from the underlying [realm]. See [LiveRealm.gcTrackedSnapshot]
* for details of the tracking.
*/
val snapshot: FrozenRealmReference?
get() = if (realmInitializer.isInitialized()) {
get() = if (isInitialized.value) {
realm.gcTrackedSnapshot()
} else null

/**
* Dump the current snapshot and tracked versions of the LiveRealm used for debugging purpose.
*/
fun versions(): VersionData? = if (realmInitializer.isInitialized()) {
fun versions(): VersionData? = if (isInitialized.value) {
realm.versions()
} else {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.realm.kotlin.internal
import io.realm.kotlin.Configuration
import io.realm.kotlin.MutableRealm
import io.realm.kotlin.Realm
import io.realm.kotlin.VersionId
import io.realm.kotlin.dynamic.DynamicRealm
import io.realm.kotlin.internal.dynamic.DynamicRealmImpl
import io.realm.kotlin.internal.interop.RealmInterop
Expand All @@ -36,6 +37,7 @@ import io.realm.kotlin.notifications.internal.InitialRealmImpl
import io.realm.kotlin.notifications.internal.UpdatedRealmImpl
import io.realm.kotlin.query.RealmQuery
import io.realm.kotlin.types.TypedRealmObject
import kotlinx.atomicfu.AtomicBoolean
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -72,19 +74,22 @@ public class RealmImpl private constructor(
private val notifier = SuspendableNotifier(
owner = this,
scheduler = notificationScheduler,
onSnapshotAvailable = ::removeInitialRealmReference
clementetb marked this conversation as resolved.
Show resolved Hide resolved
)
private val writer = SuspendableWriter(
owner = this,
scheduler = writeScheduler,
onSnapshotAvailable = ::removeInitialRealmReference
)

// Internal flow to ease monitoring of realm state for closing active flows then the realm is
// closed.
internal val realmStateFlow =
MutableSharedFlow<State>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

private var _realmReference: AtomicRef<FrozenRealmReference?> = atomic(null)
private val realmReferenceLock = SynchronizableObject()
// Initial realm reference that would be used until the notifier or writer are available.
private var initialRealmReference: AtomicRef<FrozenRealmReference?> = atomic(null)
// Controls whether we have to clean any tracked references.
private val _skipClosingReferences: AtomicBoolean = atomic(false)

/**
* The current Realm reference that points to the underlying frozen C++ SharedRealm.
Expand Down Expand Up @@ -131,8 +136,8 @@ public class RealmImpl private constructor(
}
val (frozenReference, fileCreated) = configuration.openRealm(this@RealmImpl)
realmFileCreated = assetFileCopied || fileCreated
versionTracker.trackAndCloseExpiredReferences(frozenReference)
_realmReference.value = frozenReference
versionTracker.trackReference(frozenReference)
initialRealmReference.value = frozenReference
configuration.initializeRealmData(this@RealmImpl, realmFileCreated)
}

Expand Down Expand Up @@ -226,38 +231,48 @@ public class RealmImpl private constructor(
return notifier.registerObserver(t)
}

public fun realmReference(): FrozenRealmReference {
realmReferenceLock.withLock {
val value1 = _realmReference.value
// We don't consider advancing the version if is is already closed.
value1?.let {
if (it.isClosed()) return it
}

// Consider versions of current realm, notifier and writer to identify if we should
// advance the user facing realms version to a newer frozen snapshot.
val version = value1?.version()
val notifierSnapshot = notifier.version
val writerSnapshot = writer.version
/**
* Removes the local reference to start relying on the notifier - writer for snapshots.
*/
private fun removeInitialRealmReference() {
clementetb marked this conversation as resolved.
Show resolved Hide resolved
clementetb marked this conversation as resolved.
Show resolved Hide resolved
// Ensure only a single thread can work on the version tracker.
val skipClosingReferences = _skipClosingReferences.getAndSet(true)
log.trace("SKIP REMOVING INITIAL VERSION = $skipClosingReferences")
if (!skipClosingReferences) {
initialRealmReference.value = null
// Closing this reference might be done by the GC:
// https://github.com/realm/realm-kotlin/issues/1527
val emptyTracker = versionTracker.closeExpiredReferences()
log.trace("EMPTY INITIAL TRACKER = $emptyTracker")
_skipClosingReferences.value = emptyTracker
}
}

var newest: LiveRealmHolder<LiveRealm>? = null
if (notifierSnapshot != null && version != null && notifierSnapshot > version) {
newest = notifier
}
@Suppress("ComplexCondition")
if (writerSnapshot != null && version != null && ((writerSnapshot > version) || (notifierSnapshot != null && writerSnapshot > notifierSnapshot))) {
newest = writer
}
if (newest != null) {
_realmReference.value = newest.snapshot
log.debug("$this ADVANCING $version -> ${_realmReference.value?.version()}")
}
public fun realmReference(): FrozenRealmReference {
// We don't require to return the latest snapshot to the user but the closest the best.
// `initialRealmReference` is accessed from different threads, grab a copy to safely operate on it.
return initialRealmReference.value.let { localReference ->
localReference ?: run {
// Find whether the notifier or writer has the latest snapshot.
val notifierVersion: VersionId? = notifier.version
val writerVersion: VersionId? = writer.version
val newest: LiveRealmHolder<LiveRealm> =
if (writerVersion != null && (notifierVersion == null || writerVersion > notifierVersion))
writer
else
notifier
newest.snapshot
} ?: sdkError("Accessing realmReference before realm has been opened")
}
return _realmReference.value ?: sdkError("Accessing realmReference before realm has been opened")
}

public fun activeVersions(): VersionInfo {
val mainVersions: VersionData? = _realmReference.value?.let { VersionData(it.uncheckedVersion(), versionTracker.versions()) }
val mainVersions: VersionData? = initialRealmReference.value?.let {
VersionData(
it.uncheckedVersion(),
versionTracker.versions()
)
}
return VersionInfo(mainVersions, notifier.versions(), writer.versions())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import kotlinx.coroutines.withContext
internal class SuspendableNotifier(
private val owner: RealmImpl,
private val scheduler: LiveRealmContext,
private val onSnapshotAvailable: () -> Unit,
) : LiveRealmHolder<LiveRealm>() {
// Flow used to emit events when the version of the live realm is updated
// Adding extra buffer capacity as we are otherwise never able to emit anything
Expand All @@ -53,7 +54,8 @@ internal class SuspendableNotifier(
private inner class NotifierRealm : LiveRealm(
owner = owner,
configuration = owner.configuration,
scheduler = scheduler
scheduler = scheduler,
onSnapshotAvailable = onSnapshotAvailable,
) {
// This is guaranteed to be triggered before any other notifications for the same
// update as we get all callbacks on the same single thread dispatcher
Expand All @@ -74,9 +76,13 @@ internal class SuspendableNotifier(
}
}

override val realmInitializer = lazy<LiveRealm> { NotifierRealm() }
// Must only be accessed from the dispatchers thread
override val realm: LiveRealm by realmInitializer
override val realm: LiveRealm by lazy<LiveRealm> {
NotifierRealm().also {
isInitialized.value = true
onSnapshotAvailable()
}
}

/**
* Listen to changes to a Realm.
Expand All @@ -87,7 +93,7 @@ internal class SuspendableNotifier(
internal suspend fun realmChanged(): Flow<VersionId> {
// Touching realm will open the underlying realm and register change listeners, but must
// happen on the dispatcher as the realm can only be touched on the dispatcher's thread.
if (!realmInitializer.isInitialized()) {
if (!isInitialized.value) {
withContext(dispatcher) { realm }
}
return _realmChanged.asSharedFlow()
Expand Down Expand Up @@ -139,7 +145,7 @@ internal class SuspendableNotifier(
runBlocking(dispatcher) {
// Calling close on a non initialized Realm is wasteful since before calling RealmInterop.close
// The Realm will be first opened (RealmInterop.open) and an instance created in vain.
if (realmInitializer.isInitialized()) {
if (isInitialized.value) {
realm.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import kotlin.reflect.KClass
internal class SuspendableWriter(
private val owner: RealmImpl,
private val scheduler: LiveRealmContext,
private val onSnapshotAvailable: () -> Unit = {},
) :
LiveRealmHolder<SuspendableWriter.WriterRealm>() {
private val tid: ULong
Expand All @@ -59,7 +60,8 @@ internal class SuspendableWriter(
LiveRealm(
owner = owner,
configuration = owner.configuration,
scheduler = scheduler
scheduler = scheduler,
onSnapshotAvailable = onSnapshotAvailable,
),
InternalMutableRealm,
InternalTypedRealm,
Expand All @@ -79,12 +81,14 @@ internal class SuspendableWriter(
override fun cancelWrite() { super.cancelWrite() }
}

override val realmInitializer: Lazy<WriterRealm> = lazy {
WriterRealm()
// Must only be accessed from the dispatchers thread
override val realm: WriterRealm by lazy {
WriterRealm().also {
isInitialized.value = true
onSnapshotAvailable()
}
}

// Must only be accessed from the dispatchers thread
override val realm: WriterRealm by realmInitializer
private val shouldClose = kotlinx.atomicfu.atomic<Boolean>(false)
private val transactionMutex = Mutex(false)

Expand Down Expand Up @@ -208,7 +212,7 @@ internal class SuspendableWriter(
withContext(dispatcher) {
// Calling close on a non initialized Realm is wasteful since before calling RealmInterop.close
// The Realm will be first opened (RealmInterop.open) and an instance created in vain.
if (realmInitializer.isInitialized()) {
if (isInitialized.value) {
realm.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package io.realm.kotlin.internal

import io.realm.kotlin.VersionId
import io.realm.kotlin.internal.interop.NativePointer
import io.realm.kotlin.internal.interop.RealmInterop
import io.realm.kotlin.internal.interop.RealmPointer
import io.realm.kotlin.internal.interop.RealmT
import io.realm.kotlin.internal.platform.WeakReference
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic

internal typealias IntermediateReference = Pair<NativePointer<out RealmT>, WeakReference<RealmReference>>
clementetb marked this conversation as resolved.
Show resolved Hide resolved
/**
* Bookkeeping of intermediate versions that needs to be closed when no longer referenced or when
* explicitly closing a realm.
Expand All @@ -32,14 +34,33 @@ import kotlinx.atomicfu.atomic
internal class VersionTracker(private val owner: BaseRealmImpl, private val log: ContextLogger) {
// Set of currently open realms. Storing the native pointer explicitly to enable us to close
// the realm when the RealmReference is no longer referenced anymore.
private val intermediateReferences: AtomicRef<Set<Pair<RealmPointer, WeakReference<RealmReference>>>> = atomic(mutableSetOf())
private val intermediateReferences: AtomicRef<MutableSet<IntermediateReference>> =
clementetb marked this conversation as resolved.
Show resolved Hide resolved
atomic(mutableSetOf())

fun trackAndCloseExpiredReferences(realmReference: FrozenRealmReference? = null) {
val references = mutableSetOf<Pair<RealmPointer, WeakReference<RealmReference>>>()
realmReference?.let {
fun trackReference(realmReference: FrozenRealmReference) {
// We need a new object to update the atomic reference
val references = mutableSetOf<IntermediateReference>().apply {
addAll(intermediateReferences.value)
}

realmReference.let {
log.trace("$owner TRACK-VERSION ${realmReference.version()}")
references.add(Pair(realmReference.dbPointer, WeakReference(it)))
}

intermediateReferences.value = references
}
/**
* Closes any realm reference that has been reclaimed by the GC.
*
* @return false if there is no reference left to clean.
*/
// Closing expired references might be done by the GC:
// https://github.com/realm/realm-kotlin/issues/1527
fun closeExpiredReferences(): Boolean {
// We need a new object to update the atomic reference
val references = mutableSetOf<IntermediateReference>()

intermediateReferences.value.forEach { entry ->
val (pointer, ref) = entry
if (ref.get() == null) {
Expand All @@ -49,7 +70,9 @@ internal class VersionTracker(private val owner: BaseRealmImpl, private val log:
references.add(entry)
}
}

intermediateReferences.value = references
return references.isEmpty()
}

fun versions(): Set<VersionId> =
Expand Down
Loading