Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platform networking #1528

Merged
merged 53 commits into from
Dec 12, 2023
Merged

Platform networking #1528

merged 53 commits into from
Dec 12, 2023

Conversation

nhachicha
Copy link
Collaborator

@nhachicha nhachicha commented Sep 26, 2023

Add the option to use Ktor as the Websocket client instead of the built-in C++ WebSocket client for Sync.

TODO

  • Changelog entry
  • Switch to a dedicated single-threaded worker?

Waiting

@nhachicha nhachicha marked this pull request as ready for review October 11, 2023 11:34
@nhachicha nhachicha requested review from cmelchior, clementetb and rorbech and removed request for cmelchior October 11, 2023 11:34
Copy link
Contributor

@cmelchior cmelchior left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just dropping an initial review of auxiliary files before diving into the WebSocket code.

Mostly clarifying questions, but also looks like we should update Core on main so you can merge that into this branch.

CHANGELOG.md Outdated
@@ -16,6 +16,7 @@
* [Sync] If calling a function on App Services that resulted in a redirect, it would only redirect for
GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517))
* [Sync] If calling a function on App Services that resulted in a redirect, it would only redirect for GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517))
* [Sync] Added option to use managed WebSockets via Ktor instead of Realm's built-in WebSocket client for Sync traffic. Managed WebSockets offer improved support for proxies and firewalls that require authentication. This feature is currently opt-in and can be enabled by using `AppConfiguration.usePlatformNetworking()`. Managed WebSockets will become the default in a future version. (PR [#1528](https://github.com/realm/realm-kotlin/pull/1528)). Currently only JVM and Android platforms are supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great description 💯

buildSrc/src/main/kotlin/Config.kt Show resolved Hide resolved
Jenkinsfile Outdated
@@ -188,7 +188,7 @@ pipeline {
"integrationtest",
{
forwardAdbPorts()
testAndCollect("packages", "cleanAllTests -PincludeSdkModules=false connectedAndroidTest")
testAndCollect("packages", "cleanAllTests -PREALM_USE_PLATFORM_NETWORKING=true -PincludeSdkModules=false connectedAndroidTest")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct that we no longer run full unit tests on the old implementation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is running with the new implementation for Android/JVM, we can duplicate the job to run also on legacy impl

// AuthenticationProvider is not removed once user is logged out
assertEquals(AuthenticationProvider.EMAIL_PASSWORD, emailUser.provider)
}
// @Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mistake?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is part of updating Core realm_user_get_auth_provider is no longer available

@@ -1260,7 +1260,8 @@ class SyncedRealmTests {
}
assertTrue(customLogger.logs.isNotEmpty())
assertTrue(customLogger.logs.any { it.contains("Connection[1]: Negotiated protocol version:") }, "Missing Connection[1]")
assertTrue(customLogger.logs.any { it.contains("MyCustomApp/1.0.0") }, "Missing MyCustomApp/1.0.0")
// user_agent is not populated in the new socket provider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a bug or intended?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

expect enum class WebsocketCallbackResult : CodeDescription {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests for this mapping should be added to SyncEnumTests

@@ -1115,7 +1118,8 @@ actual object RealmInterop {
}

actual fun realm_user_get_auth_provider(user: RealmUserPointer): AuthProvider {
return AuthProvider.of(realmc.realm_user_get_auth_provider(user.cptr()))
TODO("No longer valid")
// return AuthProvider.of(realmc.realm_user_get_auth_provider(user.cptr()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably upgrade Core in a separate PR from this one...there is quite a few things in here only related to that.

retryOnConnectionFailure(true)
}
}
// Revert to OkHttp when https://youtrack.jetbrains.com/issue/KTOR-6266 is fixed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably mention this in the internal changelog

@@ -48,4 +48,5 @@ internal expect class HttpClientCache(timeoutMs: Long, customLogger: Logger? = n
fun close() // Close any resources stored in the cache.
}

// TODO use a la private val clientCache: HttpClientCache = HttpClientCache(timeoutMs, logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure what this means?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left over

@@ -19,5 +20,8 @@ internal actual class HttpClientCache actual constructor(timeoutMs: Long, custom
}

public actual fun createPlatformClient(block: HttpClientConfig<*>.() -> Unit): HttpClient {
return HttpClient(Darwin, block)
return HttpClient(Darwin) {
install(WebSockets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we installing it here if Darwin isn't supported yet?

Also it seems to already being installed here: https://github.com/realm/realm-kotlin/pull/1528/files#diff-2270f40a8e4fabaacf4f2791bbe75dee7428db4c09f84fc606ab90aaa99a78e1R277

Copy link
Contributor

@clementetb clementetb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some minor comments I am still reviewing the Transport implementation

* Platform Networking offer improved support for proxies and firewalls that require authentication,
* instead of Realm's built-in WebSocket client for Sync traffic. This will become the default in a future version.
*/
public fun usePlatformNetworking(enable: Boolean = true): Builder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we annotate it as experimental? It will become a breaking change when it becomes stable and we remove it.

Copy link
Collaborator Author

@nhachicha nhachicha Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I think though we use experimental annotations like ExperimentalAsymmetricSyncApi & ExperimentalFlexibleSyncApi to advertise the ability to change the API without necessary relying on pruning / semantic versioning, which is not the case here, the API will be removed on a next major not evolve over time...

packages/test-sync/build.gradle.kts Outdated Show resolved Hide resolved
@@ -132,8 +132,10 @@ actual enum class WebsocketErrorCode(

override val nativeValue: Int = errorCode.value.toInt()

val asNativeEnum: realm_web_socket_errno = errorCode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't just elevate errorCode to be a class property?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not shared/used for WebsocketErrorCode JVM actual implementation

@@ -137,4 +137,16 @@ realm_sync_thread_error(realm_userdata_t userdata, const char* error);
realm_scheduler_t*
realm_create_generic_scheduler();

realm_sync_socket_t* realm_sync_websocket_new(int64_t sync_client_config_ptr, jobject websocket_transport);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come pointers are int64_t and not void*?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already using int64_t for pointers. What would be the difference? anyway it will hold the value of jlong which should be fine (even on 32 bits arch I believe)

@@ -0,0 +1,322 @@
package io.realm.kotlin.mongodb.internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

©

@@ -393,6 +406,14 @@ public interface AppConfiguration {
)
}

val websocketTransport: ((DispatcherHolder) -> WebSocketTransport)? =
if (usePlatformNetworking) { dispatcherHolder ->
websocketTransport ?: KtorWebSocketTransport(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any logs we like to forward?

val websocketTransport: ((DispatcherHolder) -> WebSocketTransport)? =
if (usePlatformNetworking) { dispatcherHolder ->
websocketTransport ?: KtorWebSocketTransport(
timeoutMs = 60000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might like to update #408 to add another config option for this timeout.

Copy link
Contributor

@clementetb clementetb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The web socket logic might need some refactoring to ease reasoning on it. Logic could even be reduced by using scope jobs.

Otherwise, I am happy when CI is happy, it is a experimental feature 🧪

}
}

override fun connect(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Claus, the connect method is way too long with many implementation details.

It would be easier to reason if, at least, the WebSocketClient class was outside of the connect method, that way we would have a clear distinction between the Core API and the web sockets API.


// Writing messages to WebSocket
scope.launch {
writeChannel.consumeEach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we are not using jobs instead of this channel write? We could benefit of the existing scope job handling to do this instead of iterating over the channel.

On each core-post we would post a job to the scope to the actual write.

session.cancel() // Terminate the WebSocket session, connect needs to be called again.
}
// Collect unprocessed writes and cancel them (mainly to avoid leaking the FunctionHandler).
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up, by using the scope-job way, here by canceling the scope we would cancel any pending writes. The runcallback could be invoked automatically if these scope-jobs are configured with invokeOnCompletion.

Copy link
Contributor

@rorbech rorbech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Order of magnitudes easier to understand and follow the logic 💪😎 Could use some comments on the classes from WebSocketTransport.kt to give an faster entry point, but quite easy to follow when you dive into the implementation.

It feels a bit odd to schedule all websockets on the same thread, but can't really judge if I feel it is worth reworking at the moment 🤔

@@ -943,10 +943,12 @@ class SyncedRealmTests {
val (email1, password1) = randomEmail() to "password1234"
val user = flexApp.createUserAndLogIn(email1, password1)
val localConfig = createWriteCopyLocalConfig("local.realm")
val syncConfig = createPartitionSyncConfig(
val syncConfig = createFlexibleSyncConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙈 ... 😅

@@ -373,6 +377,17 @@ public interface AppConfiguration {
return this
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it obvious what Platform networking is actually referring to? Should be a bit more explicit as in the CHANGELOG?

override fun post(handlerCallback: RealmWebsocketHandlerCallbackPointer) {
scope.launch {
(this as Job).invokeOnCompletion { completionHandler: Throwable? ->
// Only run the callback if it was not cancelled in the meantime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit pick. The comment seems a bit off? You are actually calling runCallback in each case. Also feels a bit weird that cancelled = true when completionHandler == null. Maybe just a matter of naming 🤪

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the wording is weird. The point is to "run successfully" the callback if there was no Cancellation error. cancelled = true should be set if coroutine was cancelled (i.e completionHandler contains an error or a CancellationException)

public val timeoutMs: Long
) : WebSocketTransport {
// we need a single thread dispatcher to act like an event loop
private val dispatcherHolder: DispatcherHolder = CoroutineDispatcherFactory.managed("RealmWebsocketTransport").create()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that it is wrong level of parallelism here. Seems weird that we have the options to selection multiplexing or not, but multiple websocket will be operation on the same thread. I don't know if this would have any actual effect though 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a dispatcher per websocket will only benefit if we have a full-duplex apps running in parallel which is a rare use case. Most use cases are the same app sharing the same transport, which is more inlined with multiplexing becoming the default...
We can revisit these approximations in case of a performance issue.

Copy link
Contributor

@cmelchior cmelchior left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great 🚀 ... I only have a few minor comments/questions.

CHANGELOG.md Outdated Show resolved Hide resolved
@@ -41,7 +43,11 @@ class RealmTests {
val app = TestApp("cleanupAllRealmThreadsOnClose")
val user = app.login(Credentials.anonymous())
val configuration = SyncConfiguration.create(user, TestHelper.randomPartitionValue(), setOf(ParentPk::class, ChildPk::class))
Realm.open(configuration).close()
Realm.open(configuration).use {
// we make sure Schema is exchanged correctly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you encounter some kind of error if this didn't happen?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the beginning... although it was using the old Ktor implementation, I'll revert

packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp Outdated Show resolved Hide resolved
isSsl: Boolean,
supportedSyncProtocols: String,
transport: RealmWebSocketTransport
): WebSocketClient = TODO()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe create an issue specifically tracking support for this to Darwin? And then link it here

private val protocolSelectionHeader = "Sec-WebSocket-Protocol"

init {
val websocketURL = "${if (isSsl) "wss" else "ws"}://$address:$port$path"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this URL was set by the Location request in Core? Should we built it up ourselves?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Core provides this endpoint.

@nhachicha nhachicha merged commit 445a6a9 into main Dec 12, 2023
2 checks passed
@nhachicha nhachicha deleted the nh/platform_networking branch December 12, 2023 13:43
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Mar 14, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants