Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RKOTLIN-1079] Add support for transfer progress estimate #1575

Merged
merged 34 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
10db36d
Use new c-api progress notifier callback signature
clementetb Nov 15, 2023
9d9d4c2
Allow progress listeners on Flexible sync configurations
clementetb Nov 15, 2023
363da96
Add flexible sync tests
clementetb Nov 16, 2023
6e66152
Remove flexible progress listener tests
clementetb Nov 16, 2023
2fe9522
Undo changes
clementetb Nov 16, 2023
dc40f59
Merge branch 'main' into ct/progress-estimates
clementetb May 15, 2024
ce82589
Reenable tests
clementetb May 15, 2024
d82a491
Update docs
clementetb May 15, 2024
377701d
Enable progress listeners on flx
clementetb May 16, 2024
2914b62
Limit test size
clementetb May 16, 2024
57a6e9d
Linting
clementetb May 16, 2024
36c83fe
Upload latest changes
clementetb May 17, 2024
c07b0b0
Fix FLX progress listener tests
rorbech May 17, 2024
ec52867
Add CHANGELOG entry
rorbech May 17, 2024
0ce46e1
Fix FLX progress listener tests
rorbech May 17, 2024
1bf32d0
Merge branch 'main' into ct/progress-estimates
rorbech May 17, 2024
c7f7907
Clean up docs
rorbech May 17, 2024
535807f
Update packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/…
rorbech May 21, 2024
1d2250d
Merge branch 'main' into ct/progress-estimates
clementetb May 21, 2024
b7d1ce5
Merge branch 'main' into ct/progress-estimates
clementetb May 21, 2024
49c690c
Merge branch 'main' into ct/progress-estimates
clementetb May 21, 2024
d730c92
Reduce test size
clementetb May 22, 2024
81edc2a
Fix test cases
clementetb May 22, 2024
0e07c6d
Merge branch 'main' into ct/progress-estimates
clementetb May 22, 2024
291974d
Fix worksAfterExceptions test case
clementetb May 22, 2024
2929d42
Remove buffer
clementetb May 22, 2024
af0f30a
Try fix for PBS tests
clementetb May 23, 2024
e5e0638
Add debug traces
clementetb May 23, 2024
2eb34da
Reduce binary size
clementetb May 23, 2024
b4b6bcc
Remove printouts
clementetb May 23, 2024
bae6fc3
Merge branch 'main' into ct/progress-estimates
rorbech May 23, 2024
5fd5705
Merge branch 'main' into ct/progress-estimates
clementetb May 23, 2024
2baa1f9
Reenable tests
clementetb May 23, 2024
67ded77
Add copyright.
clementetb May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
This release will bump the Realm file format from version 23 to 24. Opening a file with an older format will automatically upgrade it from file format v10. If you want to upgrade from an earlier file format version you will have to use Realm Kotlin v1.13.1 or earlier. Downgrading to a previous file format is not possible.

### Breaking changes
* None.
* Sync progress updates no longer report `transferredBytes` and `totalBytes`. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the new values are the enhancement, but I think I would have combined both into one breaking change entry and put emphasis on the new better precision ... And support for flx-sync, right?


### Enhancements
* Support for RealmLists and RealmDictionaries in `RealmAny`. (Issue [#1434](https://github.com/realm/realm-kotlin/issues/1434))
* Optimized `RealmList.indexOf()` and `RealmList.contains()` using Core implementation of operations instead of iterating elements and comparing them in Kotlin. (Issue [#1625](https://github.com/realm/realm-kotlin/pull/1666) [RKOTLIN-995](https://jira.mongodb.org/browse/RKOTLIN-995)).
* Sync progress updates now report an estimate ranged from `0.0` to `1.0` with `Progress.estimate`. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)).

### Fixed
* None.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fun interface AsyncOpenCallback {
}

fun interface ProgressCallback {
fun onChange(transferredBytes: Long, totalBytes: Long)
fun onChange(progressEstimate: Double)
}

fun interface ConnectionStateChangeCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import kotlinx.cinterop.CPointerVar
import kotlinx.cinterop.CPointerVarOf
import kotlinx.cinterop.CValue
import kotlinx.cinterop.CVariable
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.LongVar
import kotlinx.cinterop.MemScope
import kotlinx.cinterop.StableRef
Expand Down Expand Up @@ -268,6 +269,7 @@ fun String.toRString(memScope: MemScope) = cValue<realm_string_t> {
set(memScope, this@toRString)
}

@OptIn(ExperimentalForeignApi::class)
@Suppress("LargeClass", "FunctionNaming")
actual object RealmInterop {

Expand Down Expand Up @@ -2792,10 +2794,9 @@ actual object RealmInterop {
return CPointerWrapper(
realm_wrapper.realm_sync_session_register_progress_notifier(
syncSession.cptr(),
staticCFunction<COpaquePointer?, ULong, ULong, Double, Unit> { userData, transferred_bytes, total_bytes, _ ->
staticCFunction<COpaquePointer?, ULong, ULong, Double, Unit> { userData, _, _, progress_estimate ->
safeUserData<ProgressCallback>(userData).run {
// TODO Progress ignored until https://github.com/realm/realm-kotlin/pull/1575
onChange(transferred_bytes.toLong(), total_bytes.toLong())
onChange(progress_estimate)
}
},
direction.nativeValue,
Expand Down
7 changes: 3 additions & 4 deletions packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1244,14 +1244,13 @@ sync_after_client_reset_handler(realm_sync_config_t* config, jobject after_handl
}

void
realm_sync_session_progress_notifier_callback(void *userdata, uint64_t transferred_bytes, uint64_t total_bytes, double progress) {
realm_sync_session_progress_notifier_callback(void *userdata, uint64_t, uint64_t, double progress_estimate) {
auto env = get_env(true);

// TODO Progress ignored until https://github.com/realm/realm-kotlin/pull/1575
static JavaMethod java_callback_method(env, JavaClassGlobalDef::progress_callback(), "onChange", "(JJ)V");
static JavaMethod java_callback_method(env, JavaClassGlobalDef::progress_callback(), "onChange", "(D)V");

jni_check_exception(env);
env->CallVoidMethod(static_cast<jobject>(userdata), java_callback_method, jlong(transferred_bytes), jlong(total_bytes));
env->CallVoidMethod(static_cast<jobject>(userdata), java_callback_method, jdouble(progress_estimate));
jni_check_exception(env);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/jni-swig-stub/src/main/jni/realm_api_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void
sync_after_client_reset_handler(realm_sync_config_t* config, jobject after_handler);

void
realm_sync_session_progress_notifier_callback(void *userdata, uint64_t transferred_bytes, uint64_t total_bytes, double progress);
realm_sync_session_progress_notifier_callback(void *userdata, uint64_t, uint64_t, double progress_estimate);

void
realm_sync_session_connection_state_change_callback(void *userdata, realm_sync_connection_state_e old_state, realm_sync_connection_state_e new_state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ internal open class SyncSessionImpl(
Direction.UPLOAD -> ProgressDirection.RLM_SYNC_PROGRESS_DIRECTION_UPLOAD
},
progressMode == ProgressMode.INDEFINITELY
) { transferredBytes: Long, totalBytes: Long ->
val progress = Progress(transferredBytes.toULong(), totalBytes.toULong())
) { progressEstimate: Double ->
val progress = Progress(progressEstimate)
trySendWithBufferOverflowCheck(progress)
if (progressMode == ProgressMode.CURRENT_CHANGES && progress.isTransferComplete) {
close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@ package io.realm.kotlin.mongodb.sync
*/
public data class Progress(
/**
* Total number of bytes that has been transferred by the [SyncSession].
* Transfer progress estimation ranged from 0.0 to 1.0.
*/
val transferredBytes: ULong,
/**
* Total number of transferable bytes (bytes that have been transferred + pending bytes not
* yet transferred).
*/
val transferableBytes: ULong
val estimate: Double,
) {
/**
* Property indicating if all pending bytes have been transferred.
Expand All @@ -40,5 +35,5 @@ public data class Progress(
* flow can continue to emit events with `isTransferComplete = false` for subsequent events
* after returning a progress indicator with `isTransferComplete = true`.
*/
public val isTransferComplete: Boolean = transferredBytes >= transferableBytes
public val isTransferComplete: Boolean = estimate == 1.0
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.last
import kotlinx.coroutines.flow.map
Expand Down Expand Up @@ -104,12 +101,17 @@ class ProgressListenerTests {
// We are not sure when the realm actually knows of the remote changes and consider
// them current, so wait a bit
delay(10.seconds)
realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES)
.run {
withTimeout(TIMEOUT) {
assertTrue(last().isTransferComplete)
realm.syncSession.progressAsFlow(
Direction.DOWNLOAD,
ProgressMode.CURRENT_CHANGES
).run {
withTimeout(TIMEOUT) {
last().let { progress: Progress ->
assertTrue(progress.isTransferComplete)
assertEquals(1.0, progress.estimate)
}
}
}
// Progress.isTransferComplete does not guarantee that changes are integrated and
// visible in the realm
realm.syncSession.downloadAllServerChanges(TIMEOUT)
Expand All @@ -128,10 +130,12 @@ class ProgressListenerTests {
uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT)

Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm ->
val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY)
val flow = realm.syncSession
.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY)
.completionCounter()

withTimeout(TIMEOUT) {
flow.takeWhile { completed -> completed < 3 }
flow.takeWhile { completed -> println(completed); completed < 3 }
clementetb marked this conversation as resolved.
Show resolved Hide resolved
.collect { completed ->
uploadRealm.writeSampleData(
TEST_SIZE,
Expand All @@ -149,11 +153,15 @@ class ProgressListenerTests {
Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm ->
for (i in 0..3) {
realm.writeSampleData(TEST_SIZE, idOffset = TEST_SIZE * i, timeout = TIMEOUT)
realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES).run {
withTimeout(TIMEOUT) {
assertTrue(last().isTransferComplete)
realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES)
.run {
withTimeout(TIMEOUT) {
last().let {
assertTrue(it.isTransferComplete)
assertEquals(1.0, it.estimate)
}
}
}
}
}
}
}
Expand Down Expand Up @@ -301,25 +309,17 @@ class ProgressListenerTests {
}
}

// Operator that will return a flow that emits an incresing integer on each completion event
// Operator that will return a flow that emits an increasing integer on each completion event
private fun Flow<Progress>.completionCounter(): Flow<Int> =
filter { it.isTransferComplete }
.distinctUntilChanged()
// Increment completed count if we are done transferring and the amount of bytes has
// increased
.scan(0UL to 0) { (bytes, completed), progress ->
if (progress.isTransferComplete && progress.transferableBytes > bytes) {
(progress.transferredBytes to completed + 1)
} else {
(bytes to completed)
}
}
.drop(1)
.map { (_, completed) -> completed }
map {
it.estimate.toInt()
}.scan(0) { accumulator, _ ->
accumulator + 1
}

private fun createSyncConfig(
user: User,
partitionValue: String = getTestPartitionValue()
partitionValue: String = getTestPartitionValue(),
): SyncConfiguration {
return SyncConfiguration.Builder(user, partitionValue, PARTITION_BASED_SCHEMA)
.build()
Expand Down
Loading