From 10db36dc82c043521905e1c283a731197b674ecb Mon Sep 17 00:00:00 2001 From: Clemente Date: Wed, 15 Nov 2023 11:52:51 +0100 Subject: [PATCH 01/26] Use new c-api progress notifier callback signature --- .../kotlin/io/realm/kotlin/internal/interop/Callback.kt | 2 +- .../io/realm/kotlin/internal/interop/RealmInterop.kt | 4 ++-- packages/external/core | 2 +- packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp | 6 +++--- packages/jni-swig-stub/src/main/jni/realm_api_helpers.h | 2 +- .../io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt | 4 ++-- .../kotlin/io/realm/kotlin/mongodb/sync/Progress.kt | 7 ++++++- 7 files changed, 16 insertions(+), 11 deletions(-) diff --git a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/Callback.kt b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/Callback.kt index 4d262f206d..8f04d806d8 100644 --- a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/Callback.kt +++ b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/Callback.kt @@ -85,7 +85,7 @@ fun interface AsyncOpenCallback { } fun interface ProgressCallback { - fun onChange(transferredBytes: Long, totalBytes: Long) + fun onChange(transferredBytes: Long, totalBytes: Long, progressEstimate: Double) } fun interface ConnectionStateChangeCallback { diff --git a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt index 213be632f4..2d001e0945 100644 --- a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt +++ b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt @@ -2657,9 +2657,9 @@ actual object RealmInterop { return CPointerWrapper( realm_wrapper.realm_sync_session_register_progress_notifier( syncSession.cptr(), - staticCFunction { userData, transferred_bytes, total_bytes -> + staticCFunction { userData, transferred_bytes, total_bytes, progress_estimate -> safeUserData(userData).run { - onChange(transferred_bytes.toLong(), total_bytes.toLong()) + onChange(transferred_bytes.toLong(), total_bytes.toLong(), progress_estimate) } }, direction.nativeValue, diff --git a/packages/external/core b/packages/external/core index 7556b535aa..88756b3a8c 160000 --- a/packages/external/core +++ b/packages/external/core @@ -1 +1 @@ -Subproject commit 7556b535aa7b27d49c13444894f7e9db778b3203 +Subproject commit 88756b3a8c0e3ea628415d94f92c8f538ba09349 diff --git a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp index 03783aaf80..8daafff2d6 100644 --- a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp +++ b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp @@ -948,13 +948,13 @@ sync_after_client_reset_handler(realm_sync_config_t* config, jobject after_handl } void -realm_sync_session_progress_notifier_callback(void *userdata, uint64_t transferred_bytes, uint64_t total_bytes) { +realm_sync_session_progress_notifier_callback(void *userdata, uint64_t transferred_bytes, uint64_t total_bytes, double progress_estimate) { auto env = get_env(true); - static JavaMethod java_callback_method(env, JavaClassGlobalDef::progress_callback(), "onChange", "(JJ)V"); + static JavaMethod java_callback_method(env, JavaClassGlobalDef::progress_callback(), "onChange", "(JJD)V"); jni_check_exception(env); - env->CallVoidMethod(static_cast(userdata), java_callback_method, jlong(transferred_bytes), jlong(total_bytes)); + env->CallVoidMethod(static_cast(userdata), java_callback_method, jlong(transferred_bytes), jlong(total_bytes), jdouble(progress_estimate)); jni_check_exception(env); } diff --git a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h index 022bca21a7..0d1702efbf 100644 --- a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h +++ b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h @@ -101,7 +101,7 @@ void sync_after_client_reset_handler(realm_sync_config_t* config, jobject after_handler); void -realm_sync_session_progress_notifier_callback(void *userdata, uint64_t transferred_bytes, uint64_t total_bytes); +realm_sync_session_progress_notifier_callback(void *userdata, uint64_t transferred_bytes, uint64_t total_bytes, double progress_estimate); void realm_sync_session_connection_state_change_callback(void *userdata, realm_sync_connection_state_e old_state, realm_sync_connection_state_e new_state); diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt index 7a9d3416f6..f875b4c0bf 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt @@ -124,8 +124,8 @@ internal open class SyncSessionImpl( Direction.UPLOAD -> ProgressDirection.RLM_SYNC_PROGRESS_DIRECTION_UPLOAD }, progressMode == ProgressMode.INDEFINITELY - ) { transferredBytes: Long, totalBytes: Long -> - val progress = Progress(transferredBytes.toULong(), totalBytes.toULong()) + ) { transferredBytes: Long, totalBytes: Long, progressEstimate: Double -> + val progress = Progress(transferredBytes.toULong(), totalBytes.toULong(), progressEstimate) trySendWithBufferOverflowCheck(progress) if (progressMode == ProgressMode.CURRENT_CHANGES && progress.isTransferComplete) { close() diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt index fb1ee49d43..1dbb1e4a3f 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt @@ -28,7 +28,12 @@ public data class Progress( * Total number of transferable bytes (bytes that have been transferred + pending bytes not * yet transferred). */ - val transferableBytes: ULong + val transferableBytes: ULong, + + /** + * Transfer progress estimation ranged from 0.0 to 1.0. + */ + val progressEstimate: Double, ) { /** * Property indicating if all pending bytes have been transferred. From 9d9d4c2a512adc5e7195dbd6179c25fa2cc8d930 Mon Sep 17 00:00:00 2001 From: Clemente Date: Wed, 15 Nov 2023 16:13:34 +0100 Subject: [PATCH 02/26] Allow progress listeners on Flexible sync configurations --- .../kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt index f875b4c0bf..d405279daf 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt @@ -109,9 +109,6 @@ internal open class SyncSessionImpl( direction: Direction, progressMode: ProgressMode, ): Flow { - if ((configuration as InternalConfiguration).isFlexibleSyncConfiguration) { - throw UnsupportedOperationException("Progress listeners are not supported for Flexible Sync.") - } return realm.scopedFlow { callbackFlow { val token: AtomicRef = From 363da9602124620092bd4f8f790378b72e416ece Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 16 Nov 2023 12:55:23 +0100 Subject: [PATCH 03/26] Add flexible sync tests --- .../mongodb/internal/SyncSessionImpl.kt | 3 + .../common/FlexibleProgressListenerTests.kt | 307 ++++++++++++++++++ .../mongodb/common/ProgressListenerTests.kt | 31 +- 3 files changed, 330 insertions(+), 11 deletions(-) create mode 100644 packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleProgressListenerTests.kt diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt index d405279daf..f875b4c0bf 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt @@ -109,6 +109,9 @@ internal open class SyncSessionImpl( direction: Direction, progressMode: ProgressMode, ): Flow { + if ((configuration as InternalConfiguration).isFlexibleSyncConfiguration) { + throw UnsupportedOperationException("Progress listeners are not supported for Flexible Sync.") + } return realm.scopedFlow { callbackFlow { val token: AtomicRef = diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleProgressListenerTests.kt new file mode 100644 index 0000000000..2ad829841d --- /dev/null +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleProgressListenerTests.kt @@ -0,0 +1,307 @@ +/* + * Copyright 2023 Realm Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.realm.kotlin.test.mongodb.common + +import io.realm.kotlin.Realm +import io.realm.kotlin.entities.sync.SyncObjectWithAllTypes +import io.realm.kotlin.ext.query +import io.realm.kotlin.internal.platform.runBlocking +import io.realm.kotlin.mongodb.User +import io.realm.kotlin.mongodb.sync.Direction +import io.realm.kotlin.mongodb.sync.Progress +import io.realm.kotlin.mongodb.sync.ProgressMode +import io.realm.kotlin.mongodb.sync.SyncConfiguration +import io.realm.kotlin.mongodb.syncSession +import io.realm.kotlin.test.mongodb.TEST_APP_FLEX +import io.realm.kotlin.test.mongodb.TEST_APP_PARTITION +import io.realm.kotlin.test.mongodb.TestApp +import io.realm.kotlin.test.mongodb.createUserAndLogIn +import io.realm.kotlin.test.mongodb.use +import io.realm.kotlin.test.util.use +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.last +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.scan +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.supervisorScope +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.withTimeout +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Ignore +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +@Ignore // To be enabled when flexible sync supports progress listeners +class FlexibleProgressListenerTests { + + private lateinit var app: TestApp + + @BeforeTest + fun setup() { + app = TestApp(this::class.simpleName, appName = TEST_APP_FLEX) + } + + @AfterTest + fun tearDown() { + if (this::app.isInitialized) { + app.close() + } + } + + @Test + fun downloadProgressListener_changesOnly() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> + // Verify that we: + // - get a "transferComplete" event + // - complete the flow, and + // - that all objects are available afterwards + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + // Ensure that we can do consecutive CURRENT_CHANGES registrations + for (i in 0 until 3) { + uploadRealm.writeSampleData( + TEST_SIZE, + idOffset = TEST_SIZE * i, + timeout = TIMEOUT + ) + // We are not sure when the realm actually knows of the remote changes and consider + // them current, so wait a bit + delay(10.seconds) + realm.syncSession.progressAsFlow( + Direction.DOWNLOAD, + ProgressMode.CURRENT_CHANGES + ).run { + withTimeout(TIMEOUT) { + last().let { progress: Progress -> + assertTrue(progress.isTransferComplete) + assertEquals(1.0, progress.progressEstimate) + } + } + } + // Progress.isTransferComplete does not guarantee that changes are integrated and + // visible in the realm + realm.syncSession.downloadAllServerChanges(TIMEOUT) + assertEquals( + TEST_SIZE * (i + 1), + realm.query().find().size + ) + } + } + } + } + + @Test + fun downloadProgressListener_indefinitely() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> + uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + .completionCounter() + withTimeout(TIMEOUT) { + flow.takeWhile { completed -> completed < 3 } + .collect { completed -> + uploadRealm.writeSampleData( + TEST_SIZE, + idOffset = (completed + 1) * TEST_SIZE, + timeout = TIMEOUT + ) + } + } + } + } + } + + @Test + fun uploadProgressListener_changesOnly() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + for (i in 0..3) { + realm.writeSampleData(TEST_SIZE, idOffset = TEST_SIZE * i, timeout = TIMEOUT) + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) + .run { + withTimeout(TIMEOUT) { + last().let { + assertTrue(it.isTransferComplete) + assertEquals(1.0, it.progressEstimate) + } + } + } + } + } + } + + @Test + fun uploadProgressListener_indefinitely() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + .completionCounter() + + withTimeout(TIMEOUT) { + flow.takeWhile { completed -> completed < 3 } + .collect { completed -> + realm.writeSampleData(TEST_SIZE, idOffset = (completed + 1) * TEST_SIZE) + realm.syncSession.uploadAllLocalChanges() + } + } + } + } + + @Test + fun worksAfterExceptions() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + } + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + assertFailsWith { + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + .collect { + @Suppress("TooGenericExceptionThrown") + throw RuntimeException("Crashing progress flow") + } + } + + val flow = + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + withTimeout(TIMEOUT) { + flow.first { it.isTransferComplete } + } + } + } + + @Test + fun worksAfterCancel() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + } + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + // Setup a flow that we are just going to cancel + val flow = + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + supervisorScope { + val mutex = Mutex(true) + val task = async { + flow.collect { + mutex.unlock() + } + } + // Await the flow actually being active + mutex.lock() + task.cancel() + } + + // Verify that progress listeners still work + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES).run { + withTimeout(TIMEOUT) { + flow.first { it.isTransferComplete } + } + } + } + } + + @Test + fun triggerImmediatelyWhenRegistered() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + withTimeout(10.seconds) { + // Ensure that all data is already synced + assertTrue { realm.syncSession.uploadAllLocalChanges() } + assertTrue { realm.syncSession.downloadAllServerChanges() } + // Ensure that progress listeners are triggered at least one time even though there + // is no data + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES) + .first() + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) + .first() + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + .first() + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + .first() + } + } + } + + @Test + fun completesOnClose() = runBlocking { + TestApp("completesOnClose", TEST_APP_PARTITION).use { app -> + val user = app.createUserAndLogIn() + val realm = Realm.open(createSyncConfig(user)) + try { + val flow = + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + val job = async { + withTimeout(10.seconds) { + flow.collect { } + } + } + realm.close() + job.await() + } finally { + if (!realm.isClosed()) { + realm.close() + } + } + } + } + + private suspend fun Realm.writeSampleData(count: Int, idOffset: Int = 0, timeout: Duration? = null) { + write { + for (i in idOffset until count + idOffset) { + copyToRealm(SyncObjectWithAllTypes().apply { stringField = "Object $i" }) + } + } + timeout?.let { + assertTrue { syncSession.uploadAllLocalChanges(timeout) } + } + } + + // Operator that will return a flow that emits an increasing integer on each completion event + private fun Flow.completionCounter(): Flow = + filter { it.isTransferComplete } + .distinctUntilChanged() + // Increment completed count if we are done transferring and the amount of bytes has + // increased + .scan(0UL to 0) { (bytes, completed), progress -> + if (progress.isTransferComplete && progress.transferableBytes > bytes) { + (progress.transferredBytes to completed + 1) + } else { + (bytes to completed) + } + } + .drop(1) + .map { (_, completed) -> completed } + + private fun createSyncConfig( + user: User + ): SyncConfiguration { + return SyncConfiguration.Builder(user, FLX_SYNC_SCHEMA) + .initialSubscriptions { + add(it.query()) + } + .build() + } +} diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt index a3310c5494..e3b686af65 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt @@ -58,8 +58,8 @@ import kotlin.test.fail import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -private const val TEST_SIZE = 500 -private val TIMEOUT = 30.seconds +internal const val TEST_SIZE = 500 +internal val TIMEOUT = 30.seconds class ProgressListenerTests { @@ -97,12 +97,17 @@ class ProgressListenerTests { // We are not sure when the realm actually knows of the remote changes and consider // them current, so wait a bit delay(10.seconds) - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES) - .run { - withTimeout(TIMEOUT) { - assertTrue(last().isTransferComplete) + realm.syncSession.progressAsFlow( + Direction.DOWNLOAD, + ProgressMode.CURRENT_CHANGES + ).run { + withTimeout(TIMEOUT) { + last().let { progress: Progress -> + assertTrue(progress.isTransferComplete) + assertEquals(1.0, progress.progressEstimate) } } + } // Progress.isTransferComplete does not guarantee that changes are integrated and // visible in the realm realm.syncSession.downloadAllServerChanges(TIMEOUT) @@ -142,11 +147,15 @@ class ProgressListenerTests { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> for (i in 0..3) { realm.writeSampleData(TEST_SIZE, idOffset = TEST_SIZE * i, timeout = TIMEOUT) - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES).run { - withTimeout(TIMEOUT) { - assertTrue(last().isTransferComplete) + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) + .run { + withTimeout(TIMEOUT) { + last().let { + assertTrue(it.isTransferComplete) + assertEquals(1.0, it.progressEstimate) + } + } } - } } } } @@ -284,7 +293,7 @@ class ProgressListenerTests { } } - // Operator that will return a flow that emits an incresing integer on each completion event + // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = filter { it.isTransferComplete } .distinctUntilChanged() From 6e66152e382a2e6948502a81e48a796416ed1055 Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 16 Nov 2023 13:40:42 +0100 Subject: [PATCH 04/26] Remove flexible progress listener tests --- .../common/FlexibleProgressListenerTests.kt | 307 ------------------ 1 file changed, 307 deletions(-) delete mode 100644 packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleProgressListenerTests.kt diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleProgressListenerTests.kt deleted file mode 100644 index 2ad829841d..0000000000 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleProgressListenerTests.kt +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Copyright 2023 Realm Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.realm.kotlin.test.mongodb.common - -import io.realm.kotlin.Realm -import io.realm.kotlin.entities.sync.SyncObjectWithAllTypes -import io.realm.kotlin.ext.query -import io.realm.kotlin.internal.platform.runBlocking -import io.realm.kotlin.mongodb.User -import io.realm.kotlin.mongodb.sync.Direction -import io.realm.kotlin.mongodb.sync.Progress -import io.realm.kotlin.mongodb.sync.ProgressMode -import io.realm.kotlin.mongodb.sync.SyncConfiguration -import io.realm.kotlin.mongodb.syncSession -import io.realm.kotlin.test.mongodb.TEST_APP_FLEX -import io.realm.kotlin.test.mongodb.TEST_APP_PARTITION -import io.realm.kotlin.test.mongodb.TestApp -import io.realm.kotlin.test.mongodb.createUserAndLogIn -import io.realm.kotlin.test.mongodb.use -import io.realm.kotlin.test.util.use -import kotlinx.coroutines.async -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.drop -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.last -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.scan -import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.supervisorScope -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.withTimeout -import kotlin.test.AfterTest -import kotlin.test.BeforeTest -import kotlin.test.Ignore -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFailsWith -import kotlin.test.assertTrue -import kotlin.time.Duration -import kotlin.time.Duration.Companion.seconds - -@Ignore // To be enabled when flexible sync supports progress listeners -class FlexibleProgressListenerTests { - - private lateinit var app: TestApp - - @BeforeTest - fun setup() { - app = TestApp(this::class.simpleName, appName = TEST_APP_FLEX) - } - - @AfterTest - fun tearDown() { - if (this::app.isInitialized) { - app.close() - } - } - - @Test - fun downloadProgressListener_changesOnly() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> - // Verify that we: - // - get a "transferComplete" event - // - complete the flow, and - // - that all objects are available afterwards - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> - // Ensure that we can do consecutive CURRENT_CHANGES registrations - for (i in 0 until 3) { - uploadRealm.writeSampleData( - TEST_SIZE, - idOffset = TEST_SIZE * i, - timeout = TIMEOUT - ) - // We are not sure when the realm actually knows of the remote changes and consider - // them current, so wait a bit - delay(10.seconds) - realm.syncSession.progressAsFlow( - Direction.DOWNLOAD, - ProgressMode.CURRENT_CHANGES - ).run { - withTimeout(TIMEOUT) { - last().let { progress: Progress -> - assertTrue(progress.isTransferComplete) - assertEquals(1.0, progress.progressEstimate) - } - } - } - // Progress.isTransferComplete does not guarantee that changes are integrated and - // visible in the realm - realm.syncSession.downloadAllServerChanges(TIMEOUT) - assertEquals( - TEST_SIZE * (i + 1), - realm.query().find().size - ) - } - } - } - } - - @Test - fun downloadProgressListener_indefinitely() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> - uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) - - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - val flow = - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - .completionCounter() - withTimeout(TIMEOUT) { - flow.takeWhile { completed -> completed < 3 } - .collect { completed -> - uploadRealm.writeSampleData( - TEST_SIZE, - idOffset = (completed + 1) * TEST_SIZE, - timeout = TIMEOUT - ) - } - } - } - } - } - - @Test - fun uploadProgressListener_changesOnly() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - for (i in 0..3) { - realm.writeSampleData(TEST_SIZE, idOffset = TEST_SIZE * i, timeout = TIMEOUT) - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) - .run { - withTimeout(TIMEOUT) { - last().let { - assertTrue(it.isTransferComplete) - assertEquals(1.0, it.progressEstimate) - } - } - } - } - } - } - - @Test - fun uploadProgressListener_indefinitely() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) - .completionCounter() - - withTimeout(TIMEOUT) { - flow.takeWhile { completed -> completed < 3 } - .collect { completed -> - realm.writeSampleData(TEST_SIZE, idOffset = (completed + 1) * TEST_SIZE) - realm.syncSession.uploadAllLocalChanges() - } - } - } - } - - @Test - fun worksAfterExceptions() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> - realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) - } - - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - assertFailsWith { - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - .collect { - @Suppress("TooGenericExceptionThrown") - throw RuntimeException("Crashing progress flow") - } - } - - val flow = - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - withTimeout(TIMEOUT) { - flow.first { it.isTransferComplete } - } - } - } - - @Test - fun worksAfterCancel() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> - realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) - } - - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - // Setup a flow that we are just going to cancel - val flow = - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - supervisorScope { - val mutex = Mutex(true) - val task = async { - flow.collect { - mutex.unlock() - } - } - // Await the flow actually being active - mutex.lock() - task.cancel() - } - - // Verify that progress listeners still work - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES).run { - withTimeout(TIMEOUT) { - flow.first { it.isTransferComplete } - } - } - } - } - - @Test - fun triggerImmediatelyWhenRegistered() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> - withTimeout(10.seconds) { - // Ensure that all data is already synced - assertTrue { realm.syncSession.uploadAllLocalChanges() } - assertTrue { realm.syncSession.downloadAllServerChanges() } - // Ensure that progress listeners are triggered at least one time even though there - // is no data - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES) - .first() - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) - .first() - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - .first() - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) - .first() - } - } - } - - @Test - fun completesOnClose() = runBlocking { - TestApp("completesOnClose", TEST_APP_PARTITION).use { app -> - val user = app.createUserAndLogIn() - val realm = Realm.open(createSyncConfig(user)) - try { - val flow = - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - val job = async { - withTimeout(10.seconds) { - flow.collect { } - } - } - realm.close() - job.await() - } finally { - if (!realm.isClosed()) { - realm.close() - } - } - } - } - - private suspend fun Realm.writeSampleData(count: Int, idOffset: Int = 0, timeout: Duration? = null) { - write { - for (i in idOffset until count + idOffset) { - copyToRealm(SyncObjectWithAllTypes().apply { stringField = "Object $i" }) - } - } - timeout?.let { - assertTrue { syncSession.uploadAllLocalChanges(timeout) } - } - } - - // Operator that will return a flow that emits an increasing integer on each completion event - private fun Flow.completionCounter(): Flow = - filter { it.isTransferComplete } - .distinctUntilChanged() - // Increment completed count if we are done transferring and the amount of bytes has - // increased - .scan(0UL to 0) { (bytes, completed), progress -> - if (progress.isTransferComplete && progress.transferableBytes > bytes) { - (progress.transferredBytes to completed + 1) - } else { - (bytes to completed) - } - } - .drop(1) - .map { (_, completed) -> completed } - - private fun createSyncConfig( - user: User - ): SyncConfiguration { - return SyncConfiguration.Builder(user, FLX_SYNC_SCHEMA) - .initialSubscriptions { - add(it.query()) - } - .build() - } -} From 2fe9522a046450564c71bab1444716e6424344d8 Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 16 Nov 2023 13:44:56 +0100 Subject: [PATCH 05/26] Undo changes --- .../realm/kotlin/test/mongodb/common/ProgressListenerTests.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt index e3b686af65..0c8967ce8d 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt @@ -58,8 +58,8 @@ import kotlin.test.fail import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -internal const val TEST_SIZE = 500 -internal val TIMEOUT = 30.seconds +private const val TEST_SIZE = 500 +private val TIMEOUT = 30.seconds class ProgressListenerTests { From ce82589960ff59af402a38b1ee36979d8fa64dd5 Mon Sep 17 00:00:00 2001 From: Clemente Date: Wed, 15 May 2024 19:26:49 +0200 Subject: [PATCH 06/26] Reenable tests --- CHANGELOG.md | 3 +- .../io/realm/kotlin/mongodb/sync/Progress.kt | 6 +- .../mongodb/common/ProgressListenerTests.kt | 106 ++++++++---------- 3 files changed, 53 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bd75a5b91..eda6520c01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,12 @@ This release will bump the Realm file format from version 23 to 24. Opening a file with an older format will automatically upgrade it from file format v10. If you want to upgrade from an earlier file format version you will have to use Realm Kotlin v1.13.1 or earlier. Downgrading to a previous file format is not possible. ### Breaking changes -* None. +* Sync progress updates no longer report `transferredBytes` and `totalBytes`. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)). ### Enhancements * Support for RealmLists and RealmDictionaries in `RealmAny`. (Issue [#1434](https://github.com/realm/realm-kotlin/issues/1434)) * Optimized `RealmList.indexOf()` and `RealmList.contains()` using Core implementation of operations instead of iterating elements and comparing them in Kotlin. (Issue [#1625](https://github.com/realm/realm-kotlin/pull/1666) [RKOTLIN-995](https://jira.mongodb.org/browse/RKOTLIN-995)). +* Sync progress updates now report an estimate ranged from `0.0` to `1.0` with `Progress.estimate`. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)). ### Fixed * None. diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt index 632e5e9655..1fb4a8013b 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt @@ -23,7 +23,7 @@ public data class Progress( /** * Transfer progress estimation ranged from 0.0 to 1.0. */ - val progressEstimate: Double, + val estimate: Double, ) { /** * Property indicating if all pending bytes have been transferred. @@ -32,8 +32,8 @@ public data class Progress( * the flow will complete when this returns `true`. * * If the [Progress]-flow was created with [ProgressMode.INDEFINITELY] then the - * flow can continue to emit events with `isTransferComplete = false` for subsequent events + * flow can continue to emit events with `isTransferComplete = true` for subsequent events * after returning a progress indicator with `isTransferComplete = true`. */ - public val isTransferComplete: Boolean = progressEstimate == 1.0 + public val isTransferComplete: Boolean = estimate == 1.0 } diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt index f3cf44e1db..381c6e1da0 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt @@ -40,9 +40,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.drop -import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.last import kotlinx.coroutines.flow.map @@ -111,7 +108,7 @@ class ProgressListenerTests { withTimeout(TIMEOUT) { last().let { progress: Progress -> assertTrue(progress.isTransferComplete) - assertEquals(1.0, progress.progressEstimate) + assertEquals(1.0, progress.estimate) } } } @@ -127,28 +124,29 @@ class ProgressListenerTests { } } -// @Test -// fun downloadProgressListener_indefinitely() = runBlocking { -// Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> -// uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) -// -// Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> -// val flow = realm.syncSession -// .progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) -// .completionCounter() -// withTimeout(TIMEOUT) { -// flow.takeWhile { completed -> completed < 3 } -// .collect { completed -> -// uploadRealm.writeSampleData( -// TEST_SIZE, -// idOffset = (completed + 1) * TEST_SIZE, -// timeout = TIMEOUT -// ) -// } -// } -// } -// } -// } + @Test + fun downloadProgressListener_indefinitely() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> + uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession + .progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + .completionCounter() + + withTimeout(TIMEOUT) { + flow.takeWhile { completed -> println(completed); completed < 3 } + .collect { completed -> + uploadRealm.writeSampleData( + TEST_SIZE, + idOffset = (completed + 1) * TEST_SIZE, + timeout = TIMEOUT + ) + } + } + } + } + } @Test fun uploadProgressListener_changesOnly() = runBlocking { @@ -160,7 +158,7 @@ class ProgressListenerTests { withTimeout(TIMEOUT) { last().let { assertTrue(it.isTransferComplete) - assertEquals(1.0, it.progressEstimate) + assertEquals(1.0, it.estimate) } } } @@ -168,21 +166,21 @@ class ProgressListenerTests { } } -// @Test -// fun uploadProgressListener_indefinitely() = runBlocking { -// Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> -// val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) -// .completionCounter() -// -// withTimeout(TIMEOUT) { -// flow.takeWhile { completed -> completed < 3 } -// .collect { completed -> -// realm.writeSampleData(TEST_SIZE, idOffset = (completed + 1) * TEST_SIZE) -// realm.syncSession.uploadAllLocalChangesOrFail() -// } -// } -// } -// } + @Test + fun uploadProgressListener_indefinitely() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + .completionCounter() + + withTimeout(TIMEOUT) { + flow.takeWhile { completed -> completed < 3 } + .collect { completed -> + realm.writeSampleData(TEST_SIZE, idOffset = (completed + 1) * TEST_SIZE) + realm.syncSession.uploadAllLocalChangesOrFail() + } + } + } + } @Test @Ignore // https://github.com/realm/realm-core/issues/7627 @@ -311,25 +309,17 @@ class ProgressListenerTests { } } -// // Operator that will return a flow that emits an increasing integer on each completion event -// private fun Flow.completionCounter(): Flow = -// filter { it.isTransferComplete } -// .distinctUntilChanged() -// // Increment completed count if we are done transferring and the amount of bytes has -// // increased -// .scan(0UL to 0) { (bytes, completed), progress -> -// if (progress.isTransferComplete && progress.transferableBytes > bytes) { -// (progress.transferredBytes to completed + 1) -// } else { -// (bytes to completed) -// } -// } -// .drop(1) -// .map { (_, completed) -> completed } + // Operator that will return a flow that emits an increasing integer on each completion event + private fun Flow.completionCounter(): Flow = + map { + it.estimate.toInt() + }.scan(0) { accumulator, _ -> + accumulator + 1 + } private fun createSyncConfig( user: User, - partitionValue: String = getTestPartitionValue() + partitionValue: String = getTestPartitionValue(), ): SyncConfiguration { return SyncConfiguration.Builder(user, partitionValue, PARTITION_BASED_SCHEMA) .build() From d82a491df33ccd484e0317e9c2adafa8b7faedd1 Mon Sep 17 00:00:00 2001 From: Clemente Date: Wed, 15 May 2024 22:42:27 +0200 Subject: [PATCH 07/26] Update docs --- .../commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt index 1fb4a8013b..7292f00ec5 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt @@ -32,7 +32,7 @@ public data class Progress( * the flow will complete when this returns `true`. * * If the [Progress]-flow was created with [ProgressMode.INDEFINITELY] then the - * flow can continue to emit events with `isTransferComplete = true` for subsequent events + * flow can continue to emit events with `isTransferComplete = false` for subsequent events * after returning a progress indicator with `isTransferComplete = true`. */ public val isTransferComplete: Boolean = estimate == 1.0 From 377701d9d0bb5c86df1faeac11885c01159e9a87 Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 16 May 2024 21:59:58 +0200 Subject: [PATCH 08/26] Enable progress listeners on flx --- .../mongodb/internal/SyncSessionImpl.kt | 4 - .../io/realm/kotlin/mongodb/sync/Progress.kt | 2 +- ...erTests.kt => FLXProgressListenerTests.kt} | 51 +-- .../common/PBSProgressListenerTests.kt | 308 ++++++++++++++++++ 4 files changed, 322 insertions(+), 43 deletions(-) rename packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/{ProgressListenerTests.kt => FLXProgressListenerTests.kt} (87%) create mode 100644 packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt index 5621a28fee..4d6395dd1d 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt @@ -16,7 +16,6 @@ package io.realm.kotlin.mongodb.internal -import io.realm.kotlin.internal.InternalConfiguration import io.realm.kotlin.internal.NotificationToken import io.realm.kotlin.internal.RealmImpl import io.realm.kotlin.internal.interop.CoreError @@ -109,9 +108,6 @@ internal open class SyncSessionImpl( direction: Direction, progressMode: ProgressMode, ): Flow { - if ((configuration as InternalConfiguration).isFlexibleSyncConfiguration) { - throw UnsupportedOperationException("Progress listeners are not supported for Flexible Sync.") - } return realm.scopedFlow { callbackFlow { val token: AtomicRef = diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt index 7292f00ec5..7894b2de76 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt @@ -35,5 +35,5 @@ public data class Progress( * flow can continue to emit events with `isTransferComplete = false` for subsequent events * after returning a progress indicator with `isTransferComplete = true`. */ - public val isTransferComplete: Boolean = estimate == 1.0 + public val isTransferComplete: Boolean = estimate >= 1.0 } diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt similarity index 87% rename from packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt rename to packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt index 381c6e1da0..e86023d5e2 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -27,9 +27,7 @@ import io.realm.kotlin.mongodb.sync.ProgressMode import io.realm.kotlin.mongodb.sync.SyncConfiguration import io.realm.kotlin.mongodb.syncSession import io.realm.kotlin.test.mongodb.TEST_APP_FLEX -import io.realm.kotlin.test.mongodb.TEST_APP_PARTITION import io.realm.kotlin.test.mongodb.TestApp -import io.realm.kotlin.test.mongodb.common.utils.assertFailsWithMessage import io.realm.kotlin.test.mongodb.common.utils.uploadAllLocalChangesOrFail import io.realm.kotlin.test.mongodb.createUserAndLogIn import io.realm.kotlin.test.mongodb.use @@ -48,7 +46,7 @@ import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.withTimeout -import org.mongodb.kbson.ObjectId +import kotlin.random.Random import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Ignore @@ -56,22 +54,19 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertTrue -import kotlin.test.fail import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -private const val TEST_SIZE = 500 -private val TIMEOUT = 30.seconds +class FLXProgressListenerTests { -class ProgressListenerTests { + private val TEST_SIZE = 500 + private val TIMEOUT = 30.seconds private lateinit var app: TestApp - private lateinit var partitionValue: String @BeforeTest fun setup() { - app = TestApp(this::class.simpleName, appName = TEST_APP_PARTITION) - partitionValue = ObjectId().toString() + app = TestApp(this::class.simpleName, appName = TEST_APP_FLEX) } @AfterTest @@ -135,7 +130,7 @@ class ProgressListenerTests { .completionCounter() withTimeout(TIMEOUT) { - flow.takeWhile { completed -> println(completed); completed < 3 } + flow.takeWhile { completed -> completed < 3 } .collect { completed -> uploadRealm.writeSampleData( TEST_SIZE, @@ -254,25 +249,10 @@ class ProgressListenerTests { } } - @Test - fun throwsOnFlexibleSync() = runBlocking { - TestApp("throwsOnFlexibleSync", TEST_APP_FLEX).use { - val user = app.createUserAndLogIn() - val configuration: SyncConfiguration = SyncConfiguration.create(user, FLEXIBLE_SYNC_SCHEMA) - Realm.open(configuration).use { realm -> - assertFailsWithMessage( - "Progress listeners are not supported for Flexible Sync" - ) { - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES) - } - } - } - } - @Test fun completesOnClose() = runBlocking { val channel = TestChannel(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) - TestApp("completesOnClose", TEST_APP_PARTITION).use { app -> + TestApp("completesOnClose", TEST_APP_FLEX).use { app -> val user = app.createUserAndLogIn() val realm = Realm.open(createSyncConfig(user)) try { @@ -301,11 +281,11 @@ class ProgressListenerTests { private suspend fun Realm.writeSampleData(count: Int, idOffset: Int = 0, timeout: Duration? = null) { write { for (i in idOffset until count + idOffset) { - copyToRealm(SyncObjectWithAllTypes().apply { stringField = "Object $i" }) + copyToRealm(SyncObjectWithAllTypes().apply { binaryField = Random.nextBytes(1_000_000) }) } } timeout?.let { - assertTrue { syncSession.uploadAllLocalChanges(timeout) } + syncSession.uploadAllLocalChanges(timeout) } } @@ -319,16 +299,11 @@ class ProgressListenerTests { private fun createSyncConfig( user: User, - partitionValue: String = getTestPartitionValue(), ): SyncConfiguration { - return SyncConfiguration.Builder(user, partitionValue, PARTITION_BASED_SCHEMA) + return SyncConfiguration.Builder(user, PARTITION_BASED_SCHEMA) + .initialSubscriptions { + add(it.query()) + } .build() } - - private fun getTestPartitionValue(): String { - if (!this::partitionValue.isInitialized) { - fail("Test not setup correctly. Partition value is missing") - } - return partitionValue - } } diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt new file mode 100644 index 0000000000..f1c710d57e --- /dev/null +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -0,0 +1,308 @@ +package io.realm.kotlin.test.mongodb.common + +import io.realm.kotlin.Realm +import io.realm.kotlin.entities.sync.SyncObjectWithAllTypes +import io.realm.kotlin.ext.query +import io.realm.kotlin.internal.platform.runBlocking +import io.realm.kotlin.mongodb.User +import io.realm.kotlin.mongodb.sync.Direction +import io.realm.kotlin.mongodb.sync.Progress +import io.realm.kotlin.mongodb.sync.ProgressMode +import io.realm.kotlin.mongodb.sync.SyncConfiguration +import io.realm.kotlin.mongodb.syncSession +import io.realm.kotlin.test.mongodb.TEST_APP_PARTITION +import io.realm.kotlin.test.mongodb.TestApp +import io.realm.kotlin.test.mongodb.common.utils.uploadAllLocalChangesOrFail +import io.realm.kotlin.test.mongodb.createUserAndLogIn +import io.realm.kotlin.test.mongodb.use +import io.realm.kotlin.test.util.TestChannel +import io.realm.kotlin.test.util.receiveOrFail +import io.realm.kotlin.test.util.use +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.last +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.scan +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.supervisorScope +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.withTimeout +import kotlin.random.Random +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Ignore +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue +import kotlin.test.fail +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +class PBSProgressListenerTests { + private val TEST_SIZE = 500 + private val TIMEOUT = 30.seconds + + private lateinit var app: TestApp + private lateinit var partitionValue: String + + @BeforeTest + fun setup() { + app = TestApp(this::class.simpleName, appName = TEST_APP_PARTITION) + partitionValue = org.mongodb.kbson.ObjectId().toString() + } + + @AfterTest + fun tearDown() { + if (this::app.isInitialized) { + app.close() + } + } + + @Test + @Ignore // https://github.com/realm/realm-core/issues/7627 + fun downloadProgressListener_changesOnly() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> + // Verify that we: + // - get a "transferComplete" event + // - complete the flow, and + // - that all objects are available afterwards + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + // Ensure that we can do consecutive CURRENT_CHANGES registrations + for (i in 0 until 3) { + uploadRealm.writeSampleData( + TEST_SIZE, + idOffset = TEST_SIZE * i, + timeout = TIMEOUT + ) + + // We are not sure when the realm actually knows of the remote changes and consider + // them current, so wait a bit + delay(10.seconds) + realm.syncSession.progressAsFlow( + Direction.DOWNLOAD, + ProgressMode.CURRENT_CHANGES + ).run { + withTimeout(TIMEOUT) { + last().let { progress: Progress -> + assertTrue(progress.isTransferComplete) + assertEquals(1.0, progress.estimate) + } + } + } + // Progress.isTransferComplete does not guarantee that changes are integrated and + // visible in the realm + realm.syncSession.downloadAllServerChanges(TIMEOUT) + assertEquals( + TEST_SIZE * (i + 1), + realm.query().find().size + ) + } + } + } + } + + @Test + fun downloadProgressListener_indefinitely() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> + uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession + .progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + .completionCounter() + + withTimeout(TIMEOUT) { + flow.takeWhile { completed -> completed < 3 } + .collect { completed -> + uploadRealm.writeSampleData( + TEST_SIZE, + idOffset = (completed + 1) * TEST_SIZE, + timeout = TIMEOUT + ) + } + } + } + } + } + + @Test + fun uploadProgressListener_changesOnly() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + for (i in 0..3) { + realm.writeSampleData(TEST_SIZE, idOffset = TEST_SIZE * i, timeout = TIMEOUT) + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) + .run { + withTimeout(TIMEOUT) { + last().let { + assertTrue(it.isTransferComplete) + assertEquals(1.0, it.estimate) + } + } + } + } + } + } + + @Test + fun uploadProgressListener_indefinitely() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + .completionCounter() + + withTimeout(TIMEOUT) { + flow.takeWhile { completed -> completed < 3 } + .collect { completed -> + realm.writeSampleData(TEST_SIZE, idOffset = (completed + 1) * TEST_SIZE) + realm.syncSession.uploadAllLocalChangesOrFail() + } + } + } + } + + @Test + @Ignore // https://github.com/realm/realm-core/issues/7627 + fun worksAfterExceptions() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + } + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + assertFailsWith { + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + .collect { + @Suppress("TooGenericExceptionThrown") + throw RuntimeException("Crashing progress flow") + } + } + + val flow = + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + withTimeout(TIMEOUT) { + flow.first { it.isTransferComplete } + } + } + } + + @Test + @Ignore // https://github.com/realm/realm-core/issues/7627 + fun worksAfterCancel() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + } + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + // Setup a flow that we are just going to cancel + val flow = + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + supervisorScope { + val mutex = Mutex(true) + val task = async { + flow.collect { + mutex.unlock() + } + } + // Await the flow actually being active + mutex.lock() + task.cancel() + } + + // Verify that progress listeners still work + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES).run { + withTimeout(TIMEOUT) { + flow.first { it.isTransferComplete } + } + } + } + } + + @Test + @Ignore // https://github.com/realm/realm-core/issues/7627 + fun triggerImmediatelyWhenRegistered() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + withTimeout(10.seconds) { + // Ensure that all data is already synced + realm.syncSession.uploadAllLocalChangesOrFail() + assertTrue { realm.syncSession.downloadAllServerChanges() } + // Ensure that progress listeners are triggered at least one time even though there + // is no data + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES) + .first() + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) + .first() + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + .first() + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + .first() + } + } + } + + @Test + fun completesOnClose() = runBlocking { + val channel = + TestChannel(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + TestApp("completesOnClose", TEST_APP_PARTITION).use { app -> + val user = app.createUserAndLogIn() + val realm = Realm.open(createSyncConfig(user)) + try { + val flow = + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + val job = async { + withTimeout(10.seconds) { + flow.collect { + channel.send(true) + } + } + } + // Wait for Flow to start, so we do not close the Realm before + // `flow.collect()` can be called. + channel.receiveOrFail() + realm.close() + job.await() + } finally { + channel.close() + if (!realm.isClosed()) { + realm.close() + } + } + } + } + + private suspend fun Realm.writeSampleData(count: Int, idOffset: Int = 0, timeout: Duration? = null) { + write { + for (i in idOffset until count + idOffset) { + copyToRealm(SyncObjectWithAllTypes().apply { binaryField = Random.nextBytes(1_000_000) }) + } + } + timeout?.let { + assertTrue { syncSession.uploadAllLocalChanges(timeout) } + } + } + + // Operator that will return a flow that emits an increasing integer on each completion event + private fun Flow.completionCounter(): Flow = + map { + it.estimate.toInt() + }.scan(0) { accumulator, _ -> + accumulator + 1 + } + + private fun createSyncConfig( + user: User, + partitionValue: String = getTestPartitionValue(), + ): SyncConfiguration { + return SyncConfiguration.Builder(user, partitionValue, PARTITION_BASED_SCHEMA) + .build() + } + + private fun getTestPartitionValue(): String { + if (!this::partitionValue.isInitialized) { + fail("Test not setup correctly. Partition value is missing") + } + return partitionValue + } +} From 2914b62dd192e882f183e153085cd8a2ef92f151 Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 16 May 2024 22:14:05 +0200 Subject: [PATCH 09/26] Limit test size --- .../kotlin/test/mongodb/common/FLXProgressListenerTests.kt | 4 +++- .../kotlin/test/mongodb/common/PBSProgressListenerTests.kt | 7 ++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt index e86023d5e2..4e9c08d4e5 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -20,6 +20,8 @@ import io.realm.kotlin.Realm import io.realm.kotlin.entities.sync.SyncObjectWithAllTypes import io.realm.kotlin.ext.query import io.realm.kotlin.internal.platform.runBlocking +import io.realm.kotlin.log.LogLevel +import io.realm.kotlin.log.RealmLog import io.realm.kotlin.mongodb.User import io.realm.kotlin.mongodb.sync.Direction import io.realm.kotlin.mongodb.sync.Progress @@ -59,7 +61,7 @@ import kotlin.time.Duration.Companion.seconds class FLXProgressListenerTests { - private val TEST_SIZE = 500 + private val TEST_SIZE = 5 private val TIMEOUT = 30.seconds private lateinit var app: TestApp diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index f1c710d57e..52a9725be9 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -43,7 +43,7 @@ import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds class PBSProgressListenerTests { - private val TEST_SIZE = 500 + private val TEST_SIZE = 5 private val TIMEOUT = 30.seconds private lateinit var app: TestApp @@ -286,8 +286,9 @@ class PBSProgressListenerTests { // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = map { - it.estimate.toInt() - }.scan(0) { accumulator, _ -> + it.estimate + }.scan(0) { accumulator, value -> + println(value) accumulator + 1 } From 57a6e9d458110fc31b114dd84c7129df4932a0cd Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 16 May 2024 23:11:32 +0200 Subject: [PATCH 10/26] Linting --- .../kotlin/test/mongodb/common/FLXProgressListenerTests.kt | 2 -- .../kotlin/test/mongodb/common/PBSProgressListenerTests.kt | 5 ++--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt index 4e9c08d4e5..af60e39112 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -20,8 +20,6 @@ import io.realm.kotlin.Realm import io.realm.kotlin.entities.sync.SyncObjectWithAllTypes import io.realm.kotlin.ext.query import io.realm.kotlin.internal.platform.runBlocking -import io.realm.kotlin.log.LogLevel -import io.realm.kotlin.log.RealmLog import io.realm.kotlin.mongodb.User import io.realm.kotlin.mongodb.sync.Direction import io.realm.kotlin.mongodb.sync.Progress diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index 52a9725be9..ac171e5afb 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -286,9 +286,8 @@ class PBSProgressListenerTests { // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = map { - it.estimate - }.scan(0) { accumulator, value -> - println(value) + it.estimate.toInt() + }.scan(0) { accumulator, _ -> accumulator + 1 } From 36c83fe58cddc0501f66a8a3d565097cbd91dbc8 Mon Sep 17 00:00:00 2001 From: Clemente Date: Fri, 17 May 2024 08:59:29 +0200 Subject: [PATCH 11/26] Upload latest changes --- .../mongodb/internal/SyncSessionImpl.kt | 3 +++ .../common/FLXProgressListenerTests.kt | 23 +++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt index 4d6395dd1d..62c7ed29f3 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt @@ -128,6 +128,9 @@ internal open class SyncSessionImpl( } } ) + // some of our tests need an initial event, core does not always send it, + // we send it for them + trySendWithBufferOverflowCheck(Progress(0.0)) awaitClose { token.value.cancel() } diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt index af60e39112..0a285b3494 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -59,7 +59,7 @@ import kotlin.time.Duration.Companion.seconds class FLXProgressListenerTests { - private val TEST_SIZE = 5 + private val TEST_SIZE = 2 private val TIMEOUT = 30.seconds private lateinit var app: TestApp @@ -89,7 +89,6 @@ class FLXProgressListenerTests { for (i in 0 until 3) { uploadRealm.writeSampleData( TEST_SIZE, - idOffset = TEST_SIZE * i, timeout = TIMEOUT ) @@ -131,10 +130,9 @@ class FLXProgressListenerTests { withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } - .collect { completed -> + .collect { _ -> uploadRealm.writeSampleData( TEST_SIZE, - idOffset = (completed + 1) * TEST_SIZE, timeout = TIMEOUT ) } @@ -147,7 +145,7 @@ class FLXProgressListenerTests { fun uploadProgressListener_changesOnly() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> for (i in 0..3) { - realm.writeSampleData(TEST_SIZE, idOffset = TEST_SIZE * i, timeout = TIMEOUT) + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) .run { withTimeout(TIMEOUT) { @@ -169,9 +167,9 @@ class FLXProgressListenerTests { withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } - .collect { completed -> - realm.writeSampleData(TEST_SIZE, idOffset = (completed + 1) * TEST_SIZE) - realm.syncSession.uploadAllLocalChangesOrFail() + .collect { _ -> + realm.writeSampleData(TEST_SIZE) +// realm.syncSession.uploadAllLocalChangesOrFail() } } } @@ -237,7 +235,7 @@ class FLXProgressListenerTests { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> withTimeout(10.seconds) { // Ensure that all data is already synced - realm.syncSession.uploadAllLocalChangesOrFail() +// realm.syncSession.uploadAllLocalChangesOrFail() assertTrue { realm.syncSession.downloadAllServerChanges() } // Ensure that progress listeners are triggered at least one time even though there // is no data @@ -278,14 +276,15 @@ class FLXProgressListenerTests { } } - private suspend fun Realm.writeSampleData(count: Int, idOffset: Int = 0, timeout: Duration? = null) { + private suspend fun Realm.writeSampleData(count: Int, timeout: Duration? = null) { write { - for (i in idOffset until count + idOffset) { + repeat (count) { copyToRealm(SyncObjectWithAllTypes().apply { binaryField = Random.nextBytes(1_000_000) }) } } timeout?.let { - syncSession.uploadAllLocalChanges(timeout) + // Disabling this line helps completing uploadProgressListener_changesOnly +// syncSession.uploadAllLocalChanges(timeout) } } From c07b0b059d4692a18e5ed39b5bce9b04e5719a05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20R=C3=B8rbech?= Date: Fri, 17 May 2024 11:20:23 +0200 Subject: [PATCH 12/26] Fix FLX progress listener tests --- .../kotlin/test/mongodb/common/FLXProgressListenerTests.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt index 0a285b3494..5ce2662c2e 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -299,7 +299,7 @@ class FLXProgressListenerTests { private fun createSyncConfig( user: User, ): SyncConfiguration { - return SyncConfiguration.Builder(user, PARTITION_BASED_SCHEMA) + return SyncConfiguration.Builder(user, FLEXIBLE_SYNC_SCHEMA) .initialSubscriptions { add(it.query()) } From ec52867647c7cf253d4c9d4771b4b10a43546964 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20R=C3=B8rbech?= Date: Fri, 17 May 2024 11:20:40 +0200 Subject: [PATCH 13/26] Add CHANGELOG entry --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eda6520c01..2c8472aa86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,12 @@ This release will bump the Realm file format from version 23 to 24. Opening a file with an older format will automatically upgrade it from file format v10. If you want to upgrade from an earlier file format version you will have to use Realm Kotlin v1.13.1 or earlier. Downgrading to a previous file format is not possible. ### Breaking changes -* Sync progress updates no longer report `transferredBytes` and `totalBytes`. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)). +* [Sync] Sync progress notifications now reports an estimate ranged from `0.0` to `1.0` with `Progress.estimate` instead of `transferredBytes` and `totalBytes`. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)). ### Enhancements * Support for RealmLists and RealmDictionaries in `RealmAny`. (Issue [#1434](https://github.com/realm/realm-kotlin/issues/1434)) * Optimized `RealmList.indexOf()` and `RealmList.contains()` using Core implementation of operations instead of iterating elements and comparing them in Kotlin. (Issue [#1625](https://github.com/realm/realm-kotlin/pull/1666) [RKOTLIN-995](https://jira.mongodb.org/browse/RKOTLIN-995)). -* Sync progress updates now report an estimate ranged from `0.0` to `1.0` with `Progress.estimate`. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)). +* [Sync] Sync progress notifications is now also supported for flexible sync configurations. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)). ### Fixed * None. From 0ce46e19e26f6b0572f50e9bfc011fd94cb364bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20R=C3=B8rbech?= Date: Fri, 17 May 2024 15:28:14 +0200 Subject: [PATCH 14/26] Fix FLX progress listener tests --- .../mongodb/internal/SyncSessionImpl.kt | 3 - .../common/FLXProgressListenerTests.kt | 169 ++++++++++-------- .../common/PBSProgressListenerTests.kt | 82 ++++----- 3 files changed, 135 insertions(+), 119 deletions(-) diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt index 62c7ed29f3..4d6395dd1d 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt @@ -128,9 +128,6 @@ internal open class SyncSessionImpl( } } ) - // some of our tests need an initial event, core does not always send it, - // we send it for them - trySendWithBufferOverflowCheck(Progress(0.0)) awaitClose { token.value.cancel() } diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt index 5ce2662c2e..045e3c8212 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -28,7 +28,6 @@ import io.realm.kotlin.mongodb.sync.SyncConfiguration import io.realm.kotlin.mongodb.syncSession import io.realm.kotlin.test.mongodb.TEST_APP_FLEX import io.realm.kotlin.test.mongodb.TestApp -import io.realm.kotlin.test.mongodb.common.utils.uploadAllLocalChangesOrFail import io.realm.kotlin.test.mongodb.createUserAndLogIn import io.realm.kotlin.test.mongodb.use import io.realm.kotlin.test.util.TestChannel @@ -36,11 +35,11 @@ import io.realm.kotlin.test.util.receiveOrFail import io.realm.kotlin.test.util.use import kotlinx.coroutines.async import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.last -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.scan import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.supervisorScope @@ -54,6 +53,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertTrue +import kotlin.test.fail import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -63,10 +63,12 @@ class FLXProgressListenerTests { private val TIMEOUT = 30.seconds private lateinit var app: TestApp + private lateinit var partitionValue: String @BeforeTest fun setup() { app = TestApp(this::class.simpleName, appName = TEST_APP_FLEX) + partitionValue = org.mongodb.kbson.ObjectId().toString() } @AfterTest @@ -77,7 +79,6 @@ class FLXProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun downloadProgressListener_changesOnly() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> // Verify that we: @@ -87,25 +88,28 @@ class FLXProgressListenerTests { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> // Ensure that we can do consecutive CURRENT_CHANGES registrations for (i in 0 until 3) { + val transferCompleteJob = async { + // Postpone the progress listener flow so that it is started after the + // following downloadAllServerChanges. This should ensure that we are + // actually downloading stuff. + realm.syncSession.progressAsFlow( + Direction.DOWNLOAD, + ProgressMode.CURRENT_CHANGES + ).run { + withTimeout(TIMEOUT) { + last().let { progress: Progress -> + assertTrue(progress.isTransferComplete) + assertEquals(1.0, progress.estimate) + } + } + } + } uploadRealm.writeSampleData( TEST_SIZE, timeout = TIMEOUT ) + transferCompleteJob.await() - // We are not sure when the realm actually knows of the remote changes and consider - // them current, so wait a bit - delay(10.seconds) - realm.syncSession.progressAsFlow( - Direction.DOWNLOAD, - ProgressMode.CURRENT_CHANGES - ).run { - withTimeout(TIMEOUT) { - last().let { progress: Progress -> - assertTrue(progress.isTransferComplete) - assertEquals(1.0, progress.estimate) - } - } - } // Progress.isTransferComplete does not guarantee that changes are integrated and // visible in the realm realm.syncSession.downloadAllServerChanges(TIMEOUT) @@ -167,70 +171,83 @@ class FLXProgressListenerTests { withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } - .collect { _ -> + .collect { completed -> realm.writeSampleData(TEST_SIZE) -// realm.syncSession.uploadAllLocalChangesOrFail() } } } } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun worksAfterExceptions() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> - realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) - } - - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - assertFailsWith { - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - .collect { - @Suppress("TooGenericExceptionThrown") - throw RuntimeException("Crashing progress flow") + supervisorScope { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { writerRealm -> + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = + realm.syncSession.progressAsFlow( + Direction.DOWNLOAD, + ProgressMode.INDEFINITELY + ) + assertFailsWith { + val task = async { + flow.collect { + @Suppress("TooGenericExceptionThrown") + throw RuntimeException("Crashing progress flow") + } + } + writerRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + task.getCompletionExceptionOrNull() } - } - - val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - withTimeout(TIMEOUT) { - flow.first { it.isTransferComplete } + withTimeout(TIMEOUT) { + val task = async { + flow.first { + it.isTransferComplete + } + } + writerRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + task.await() + } + } } } } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun worksAfterCancel() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> - realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) - } + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { writerRealm -> + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + // Setup a flow that we are just going to cancel + val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - // Setup a flow that we are just going to cancel - val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - supervisorScope { - val mutex = Mutex(true) - val task = async { - flow.collect { - mutex.unlock() + supervisorScope { + val mutex = Mutex(true) + val task = async { + flow.collect { + mutex.unlock() + } } + // Await the flow actually being active, this requires actual data transfer as + // we arent guaranteed any initial events. + writerRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + mutex.lock() + task.cancel() } - // Await the flow actually being active - mutex.lock() - task.cancel() - } - // Verify that progress listeners still work - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES).run { + // Verify that progress listeners still work withTimeout(TIMEOUT) { - flow.first { it.isTransferComplete } + val task = async { flow.first { it.isTransferComplete } } + // Trigger data transfer to ensure we get an event at some point + writerRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + task.await() } } } } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 + // Maybe not a guarantee that we can give for FLX. Monitor outcome of + // https://github.com/realm/realm-core/issues/7627 + @Ignore fun triggerImmediatelyWhenRegistered() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> withTimeout(10.seconds) { @@ -249,19 +266,20 @@ class FLXProgressListenerTests { @Test fun completesOnClose() = runBlocking { - val channel = TestChannel(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + val channel = TestChannel(capacity = 5, onBufferOverflow = BufferOverflow.DROP_OLDEST, failIfBufferIsEmptyOnCancel = false) TestApp("completesOnClose", TEST_APP_FLEX).use { app -> val user = app.createUserAndLogIn() val realm = Realm.open(createSyncConfig(user)) try { - val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) val job = async { withTimeout(10.seconds) { flow.collect { - channel.send(true) + channel.trySend(true) } } } + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) // Wait for Flow to start, so we do not close the Realm before // `flow.collect()` can be called. channel.receiveOrFail() @@ -277,32 +295,43 @@ class FLXProgressListenerTests { } private suspend fun Realm.writeSampleData(count: Int, timeout: Duration? = null) { - write { - repeat (count) { - copyToRealm(SyncObjectWithAllTypes().apply { binaryField = Random.nextBytes(1_000_000) }) + repeat(count) { + write { + copyToRealm( + SyncObjectWithAllTypes().apply { + stringField = getTestPartitionValue() + binaryField = Random.nextBytes(100) + } + ) } } timeout?.let { - // Disabling this line helps completing uploadProgressListener_changesOnly -// syncSession.uploadAllLocalChanges(timeout) + assertTrue { syncSession.uploadAllLocalChanges(timeout) } } } // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = - map { - it.estimate.toInt() - }.scan(0) { accumulator, _ -> - accumulator + 1 - } + buffer(10000) + .filter { it.isTransferComplete } + .scan(0) { accumulator, _ -> + accumulator + 1 + } private fun createSyncConfig( user: User, ): SyncConfiguration { return SyncConfiguration.Builder(user, FLEXIBLE_SYNC_SCHEMA) .initialSubscriptions { - add(it.query()) + add(it.query("stringField = $0", getTestPartitionValue())) } .build() } + + private fun getTestPartitionValue(): String { + if (!this::partitionValue.isInitialized) { + fail("Test not setup correctly. Partition value is missing") + } + return partitionValue + } } diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index ac171e5afb..5468827737 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -20,11 +20,11 @@ import io.realm.kotlin.test.util.receiveOrFail import io.realm.kotlin.test.util.use import kotlinx.coroutines.async import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.last -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.scan import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.supervisorScope @@ -43,7 +43,7 @@ import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds class PBSProgressListenerTests { - private val TEST_SIZE = 5 + private val TEST_SIZE = 10 private val TIMEOUT = 30.seconds private lateinit var app: TestApp @@ -63,7 +63,6 @@ class PBSProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun downloadProgressListener_changesOnly() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> // Verify that we: @@ -73,26 +72,23 @@ class PBSProgressListenerTests { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> // Ensure that we can do consecutive CURRENT_CHANGES registrations for (i in 0 until 3) { - uploadRealm.writeSampleData( - TEST_SIZE, - idOffset = TEST_SIZE * i, - timeout = TIMEOUT - ) - - // We are not sure when the realm actually knows of the remote changes and consider - // them current, so wait a bit - delay(10.seconds) - realm.syncSession.progressAsFlow( - Direction.DOWNLOAD, - ProgressMode.CURRENT_CHANGES - ).run { - withTimeout(TIMEOUT) { - last().let { progress: Progress -> - assertTrue(progress.isTransferComplete) - assertEquals(1.0, progress.estimate) + val transferCompleteJob = async { + realm.syncSession.progressAsFlow( + Direction.DOWNLOAD, + ProgressMode.CURRENT_CHANGES + ).run { + withTimeout(TIMEOUT) { + last().let { progress: Progress -> + assertTrue(progress.isTransferComplete) + assertEquals(1.0, progress.estimate) + } } } } + uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + + transferCompleteJob.await() + // Progress.isTransferComplete does not guarantee that changes are integrated and // visible in the realm realm.syncSession.downloadAllServerChanges(TIMEOUT) @@ -117,10 +113,10 @@ class PBSProgressListenerTests { withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } + .buffer(10000) .collect { completed -> uploadRealm.writeSampleData( TEST_SIZE, - idOffset = (completed + 1) * TEST_SIZE, timeout = TIMEOUT ) } @@ -133,7 +129,7 @@ class PBSProgressListenerTests { fun uploadProgressListener_changesOnly() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> for (i in 0..3) { - realm.writeSampleData(TEST_SIZE, idOffset = TEST_SIZE * i, timeout = TIMEOUT) + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) .run { withTimeout(TIMEOUT) { @@ -156,7 +152,7 @@ class PBSProgressListenerTests { withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } .collect { completed -> - realm.writeSampleData(TEST_SIZE, idOffset = (completed + 1) * TEST_SIZE) + realm.writeSampleData(TEST_SIZE) realm.syncSession.uploadAllLocalChangesOrFail() } } @@ -164,23 +160,20 @@ class PBSProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun worksAfterExceptions() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) } Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) assertFailsWith { - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - .collect { - @Suppress("TooGenericExceptionThrown") - throw RuntimeException("Crashing progress flow") - } + flow.collect { + @Suppress("TooGenericExceptionThrown") + throw RuntimeException("Crashing progress flow") + } } - val flow = - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) withTimeout(TIMEOUT) { flow.first { it.isTransferComplete } } @@ -188,7 +181,6 @@ class PBSProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun worksAfterCancel() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) @@ -197,7 +189,7 @@ class PBSProgressListenerTests { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> // Setup a flow that we are just going to cancel val flow = - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) supervisorScope { val mutex = Mutex(true) val task = async { @@ -211,10 +203,8 @@ class PBSProgressListenerTests { } // Verify that progress listeners still work - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES).run { - withTimeout(TIMEOUT) { - flow.first { it.isTransferComplete } - } + withTimeout(TIMEOUT) { + flow.first { it.isTransferComplete } } } } @@ -272,9 +262,9 @@ class PBSProgressListenerTests { } } - private suspend fun Realm.writeSampleData(count: Int, idOffset: Int = 0, timeout: Duration? = null) { - write { - for (i in idOffset until count + idOffset) { + private suspend fun Realm.writeSampleData(count: Int, timeout: Duration? = null) { + repeat(count) { + write { copyToRealm(SyncObjectWithAllTypes().apply { binaryField = Random.nextBytes(1_000_000) }) } } @@ -285,11 +275,11 @@ class PBSProgressListenerTests { // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = - map { - it.estimate.toInt() - }.scan(0) { accumulator, _ -> - accumulator + 1 - } + buffer(10000) + .filter { it.isTransferComplete } + .scan(0) { accumulator, _ -> + accumulator + 1 + } private fun createSyncConfig( user: User, From c7f79077267fbe7c591b21e6041b38484c1715da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20R=C3=B8rbech?= Date: Fri, 17 May 2024 15:31:14 +0200 Subject: [PATCH 15/26] Clean up docs --- .../kotlin/io/realm/kotlin/mongodb/sync/SyncSession.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncSession.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncSession.kt index c5998d83d0..fa612eb833 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncSession.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncSession.kt @@ -136,8 +136,6 @@ public interface SyncSession { * * The flow has an internal buffer of [Channel.BUFFERED] but if the consumer fails to consume the * elements in a timely manner the flow will be completed with an [IllegalStateException]. - * - * @throws UnsupportedOperationException if invoked on a realm with Flexible Sync enabled. */ public fun progressAsFlow( direction: Direction, From 535807f410a69f05c86fc331f88a3a4c615f20ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20R=C3=B8rbech?= Date: Tue, 21 May 2024 09:12:54 +0200 Subject: [PATCH 16/26] Update packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt Co-authored-by: Nabil Hachicha --- .../kotlin/test/mongodb/common/FLXProgressListenerTests.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt index 045e3c8212..74e78465d6 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022 Realm Inc. + * Copyright 2024 Realm Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From d730c92abd721e8b02c3ad6c4458024c15d802df Mon Sep 17 00:00:00 2001 From: Clemente Date: Wed, 22 May 2024 10:53:15 +0200 Subject: [PATCH 17/26] Reduce test size --- .../kotlin/test/mongodb/common/PBSProgressListenerTests.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index 9dba304e02..abbe3ae551 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -42,7 +42,7 @@ import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds class PBSProgressListenerTests { - private val TEST_SIZE = 10 + private val TEST_SIZE = 2 private val TIMEOUT = 30.seconds private lateinit var app: TestApp From 81edc2ac22507f4184a213d7273c2e0cd235bbef Mon Sep 17 00:00:00 2001 From: Clemente Date: Wed, 22 May 2024 15:27:55 +0200 Subject: [PATCH 18/26] Fix test cases --- .../common/FLXProgressListenerTests.kt | 29 +++---------------- .../common/PBSProgressListenerTests.kt | 13 ++++----- 2 files changed, 9 insertions(+), 33 deletions(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt index 74e78465d6..136a515058 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -48,7 +48,6 @@ import kotlinx.coroutines.withTimeout import kotlin.random.Random import kotlin.test.AfterTest import kotlin.test.BeforeTest -import kotlin.test.Ignore import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -59,7 +58,7 @@ import kotlin.time.Duration.Companion.seconds class FLXProgressListenerTests { - private val TEST_SIZE = 2 + private val TEST_SIZE = 10 private val TIMEOUT = 30.seconds private lateinit var app: TestApp @@ -171,7 +170,7 @@ class FLXProgressListenerTests { withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } - .collect { completed -> + .collect { _ -> realm.writeSampleData(TEST_SIZE) } } @@ -244,26 +243,6 @@ class FLXProgressListenerTests { } } - @Test - // Maybe not a guarantee that we can give for FLX. Monitor outcome of - // https://github.com/realm/realm-core/issues/7627 - @Ignore - fun triggerImmediatelyWhenRegistered() = runBlocking { - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> - withTimeout(10.seconds) { - // Ensure that all data is already synced -// realm.syncSession.uploadAllLocalChangesOrFail() - assertTrue { realm.syncSession.downloadAllServerChanges() } - // Ensure that progress listeners are triggered at least one time even though there - // is no data - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES).first() - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES).first() - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY).first() - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY).first() - } - } - } - @Test fun completesOnClose() = runBlocking { val channel = TestChannel(capacity = 5, onBufferOverflow = BufferOverflow.DROP_OLDEST, failIfBufferIsEmptyOnCancel = false) @@ -312,8 +291,8 @@ class FLXProgressListenerTests { // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = - buffer(10000) - .filter { it.isTransferComplete } + filter { it.isTransferComplete } + .buffer(5, onBufferOverflow = BufferOverflow.DROP_OLDEST) .scan(0) { accumulator, _ -> accumulator + 1 } diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index abbe3ae551..c41a22fa66 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -42,7 +42,7 @@ import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds class PBSProgressListenerTests { - private val TEST_SIZE = 2 + private val TEST_SIZE = 10 private val TIMEOUT = 30.seconds private lateinit var app: TestApp @@ -70,7 +70,7 @@ class PBSProgressListenerTests { // - that all objects are available afterwards Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> // Ensure that we can do consecutive CURRENT_CHANGES registrations - for (i in 0 until 3) { + repeat(3) { iteration -> val transferCompleteJob = async { realm.syncSession.progressAsFlow( Direction.DOWNLOAD, @@ -92,7 +92,7 @@ class PBSProgressListenerTests { // visible in the realm realm.syncSession.downloadAllServerChanges(TIMEOUT) assertEquals( - TEST_SIZE * (i + 1), + TEST_SIZE * (iteration + 1), realm.query().find().size ) } @@ -103,8 +103,6 @@ class PBSProgressListenerTests { @Test fun downloadProgressListener_indefinitely() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> - uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> val flow = realm.syncSession .progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) @@ -112,7 +110,6 @@ class PBSProgressListenerTests { withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } - .buffer(10000) .collect { completed -> uploadRealm.writeSampleData( TEST_SIZE, @@ -273,8 +270,8 @@ class PBSProgressListenerTests { // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = - buffer(10000) - .filter { it.isTransferComplete } + filter { it.isTransferComplete } + .buffer(2) .scan(0) { accumulator, _ -> accumulator + 1 } From 291974d071dbe4f1026ccfdf93e8517b6192cf3e Mon Sep 17 00:00:00 2001 From: Clemente Date: Wed, 22 May 2024 16:27:20 +0200 Subject: [PATCH 19/26] Fix worksAfterExceptions test case --- .../common/FLXProgressListenerTests.kt | 41 +++++++------------ 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt index 136a515058..9fe4307aa6 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -179,35 +179,22 @@ class FLXProgressListenerTests { @Test fun worksAfterExceptions() = runBlocking { - supervisorScope { - Realm.open(createSyncConfig(app.createUserAndLogIn())).use { writerRealm -> - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - val flow = - realm.syncSession.progressAsFlow( - Direction.DOWNLOAD, - ProgressMode.INDEFINITELY - ) - assertFailsWith { - val task = async { - flow.collect { - @Suppress("TooGenericExceptionThrown") - throw RuntimeException("Crashing progress flow") - } - } - writerRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) - task.getCompletionExceptionOrNull() - } - withTimeout(TIMEOUT) { - val task = async { - flow.first { - it.isTransferComplete - } - } - writerRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) - task.await() - } + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + } + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + assertFailsWith { + flow.collect { + @Suppress("TooGenericExceptionThrown") + throw RuntimeException("Crashing progress flow") } } + + withTimeout(TIMEOUT) { + flow.first { it.isTransferComplete } + } } } From 2929d420278094ea79d5bb80f473b85c3439b445 Mon Sep 17 00:00:00 2001 From: Clemente Date: Wed, 22 May 2024 17:21:16 +0200 Subject: [PATCH 20/26] Remove buffer --- .../realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index c41a22fa66..fd0b0ad197 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -271,7 +271,6 @@ class PBSProgressListenerTests { // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = filter { it.isTransferComplete } - .buffer(2) .scan(0) { accumulator, _ -> accumulator + 1 } From af0f30ab29341c87fea930ed3ec7a3b1ce8467c8 Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 23 May 2024 10:15:46 +0200 Subject: [PATCH 21/26] Try fix for PBS tests --- .../common/PBSProgressListenerTests.kt | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index fd0b0ad197..9231361e31 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -9,6 +9,7 @@ import io.realm.kotlin.mongodb.sync.Direction import io.realm.kotlin.mongodb.sync.Progress import io.realm.kotlin.mongodb.sync.ProgressMode import io.realm.kotlin.mongodb.sync.SyncConfiguration +import io.realm.kotlin.mongodb.sync.SyncSession import io.realm.kotlin.mongodb.syncSession import io.realm.kotlin.test.mongodb.TEST_APP_PARTITION import io.realm.kotlin.test.mongodb.TestApp @@ -21,10 +22,10 @@ import io.realm.kotlin.test.util.use import kotlinx.coroutines.async import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.last +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.scan import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.supervisorScope @@ -84,7 +85,9 @@ class PBSProgressListenerTests { } } } - uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + realm.syncSession.runWhilePaused { + uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + } transferCompleteJob.await() @@ -111,10 +114,12 @@ class PBSProgressListenerTests { withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } .collect { completed -> - uploadRealm.writeSampleData( - TEST_SIZE, - timeout = TIMEOUT - ) + realm.syncSession.runWhilePaused { + uploadRealm.writeSampleData( + TEST_SIZE, + timeout = TIMEOUT + ) + } } } } @@ -124,7 +129,7 @@ class PBSProgressListenerTests { @Test fun uploadProgressListener_changesOnly() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - for (i in 0..3) { + repeat(3) { realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) .run { @@ -142,13 +147,16 @@ class PBSProgressListenerTests { @Test fun uploadProgressListener_indefinitely() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + val flow = realm.syncSession + .progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) .completionCounter() withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } - .collect { completed -> - realm.writeSampleData(TEST_SIZE) + .collect { _ -> + realm.syncSession.runWhilePaused { + realm.writeSampleData(TEST_SIZE) + } realm.syncSession.uploadAllLocalChangesOrFail() } } @@ -162,7 +170,9 @@ class PBSProgressListenerTests { } Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + val flow = realm.syncSession + .progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + assertFailsWith { flow.collect { @Suppress("TooGenericExceptionThrown") @@ -185,7 +195,9 @@ class PBSProgressListenerTests { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> // Setup a flow that we are just going to cancel val flow = - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + realm.syncSession + .progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + supervisorScope { val mutex = Mutex(true) val task = async { @@ -260,9 +272,15 @@ class PBSProgressListenerTests { private suspend fun Realm.writeSampleData(count: Int, timeout: Duration? = null) { repeat(count) { write { - copyToRealm(SyncObjectWithAllTypes().apply { binaryField = Random.nextBytes(1_000_000) }) + copyToRealm( + SyncObjectWithAllTypes() + .apply { + binaryField = Random.nextBytes(1_000_000) + } + ) } } + timeout?.let { assertTrue { syncSession.uploadAllLocalChanges(timeout) } } @@ -270,7 +288,8 @@ class PBSProgressListenerTests { // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = - filter { it.isTransferComplete } + onEach { println(it) } + .filter { it.isTransferComplete } .scan(0) { accumulator, _ -> accumulator + 1 } @@ -289,4 +308,10 @@ class PBSProgressListenerTests { } return partitionValue } + + private suspend fun SyncSession.runWhilePaused(block: suspend () -> Unit) { + pause() + block() + resume() + } } From e5e06388b91fde9f3a6942b6f65df44945c13c9f Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 23 May 2024 12:37:58 +0200 Subject: [PATCH 22/26] Add debug traces --- .../kotlin/test/mongodb/common/PBSProgressListenerTests.kt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index 9231361e31..d99b9bf23a 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -4,6 +4,8 @@ import io.realm.kotlin.Realm import io.realm.kotlin.entities.sync.SyncObjectWithAllTypes import io.realm.kotlin.ext.query import io.realm.kotlin.internal.platform.runBlocking +import io.realm.kotlin.log.LogLevel +import io.realm.kotlin.log.RealmLog import io.realm.kotlin.mongodb.User import io.realm.kotlin.mongodb.sync.Direction import io.realm.kotlin.mongodb.sync.Progress @@ -51,6 +53,7 @@ class PBSProgressListenerTests { @BeforeTest fun setup() { + RealmLog.setLevel(LogLevel.INFO) app = TestApp(this::class.simpleName, appName = TEST_APP_PARTITION) partitionValue = org.mongodb.kbson.ObjectId().toString() } @@ -115,10 +118,12 @@ class PBSProgressListenerTests { flow.takeWhile { completed -> completed < 3 } .collect { completed -> realm.syncSession.runWhilePaused { + println("Writing data") uploadRealm.writeSampleData( TEST_SIZE, timeout = TIMEOUT ) + println("Data written") } } } @@ -282,7 +287,9 @@ class PBSProgressListenerTests { } timeout?.let { + println("Upload all changes") assertTrue { syncSession.uploadAllLocalChanges(timeout) } + println("Uploaded") } } From 2eb34da3d9ca6cb56f0d5d1fd093b6b828d9c685 Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 23 May 2024 13:55:44 +0200 Subject: [PATCH 23/26] Reduce binary size --- .../kotlin/test/mongodb/common/PBSProgressListenerTests.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index d99b9bf23a..92c0ea8548 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -280,7 +280,7 @@ class PBSProgressListenerTests { copyToRealm( SyncObjectWithAllTypes() .apply { - binaryField = Random.nextBytes(1_000_000) + binaryField = Random.nextBytes(100) } ) } From b4b6bcc74808a8bd3cf886c017afea6499e71e8b Mon Sep 17 00:00:00 2001 From: Clemente Date: Thu, 23 May 2024 14:52:57 +0200 Subject: [PATCH 24/26] Remove printouts --- .../test/mongodb/common/PBSProgressListenerTests.kt | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index 92c0ea8548..4bb8bf7ead 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -27,7 +27,6 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.last -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.scan import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.supervisorScope @@ -118,12 +117,10 @@ class PBSProgressListenerTests { flow.takeWhile { completed -> completed < 3 } .collect { completed -> realm.syncSession.runWhilePaused { - println("Writing data") uploadRealm.writeSampleData( TEST_SIZE, timeout = TIMEOUT ) - println("Data written") } } } @@ -287,16 +284,13 @@ class PBSProgressListenerTests { } timeout?.let { - println("Upload all changes") assertTrue { syncSession.uploadAllLocalChanges(timeout) } - println("Uploaded") } } // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = - onEach { println(it) } - .filter { it.isTransferComplete } + filter { it.isTransferComplete } .scan(0) { accumulator, _ -> accumulator + 1 } From 2baa1f9d05f27c85d73d6cadb2d5ae8c98d09b27 Mon Sep 17 00:00:00 2001 From: Clemente Date: Fri, 24 May 2024 01:19:46 +0200 Subject: [PATCH 25/26] Reenable tests --- .../kotlin/test/mongodb/common/PBSProgressListenerTests.kt | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index 00f61a7136..48b16cf42e 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -35,7 +35,6 @@ import kotlinx.coroutines.withTimeout import kotlin.random.Random import kotlin.test.AfterTest import kotlin.test.BeforeTest -import kotlin.test.Ignore import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -66,7 +65,6 @@ class PBSProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun downloadProgressListener_changesOnly() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> // Verify that we: @@ -168,7 +166,6 @@ class PBSProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun worksAfterExceptions() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) @@ -192,7 +189,6 @@ class PBSProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun worksAfterCancel() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) @@ -202,7 +198,7 @@ class PBSProgressListenerTests { // Setup a flow that we are just going to cancel val flow = realm.syncSession - .progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + .progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) supervisorScope { val mutex = Mutex(true) @@ -224,7 +220,6 @@ class PBSProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun triggerImmediatelyWhenRegistered() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> withTimeout(10.seconds) { From 67ded77983996148b779ea76a2c5e1b457acdea0 Mon Sep 17 00:00:00 2001 From: Clemente Date: Fri, 24 May 2024 09:37:27 +0200 Subject: [PATCH 26/26] Add copyright. --- .../mongodb/common/PBSProgressListenerTests.kt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index 48b16cf42e..53b93ea5c2 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -1,3 +1,19 @@ +/* + * Copyright 2022 Realm Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.realm.kotlin.test.mongodb.common import io.realm.kotlin.Realm