Skip to content

Commit

Permalink
Platform networking (#1528)
Browse files Browse the repository at this point in the history
* Using platform networking for Sync Websocket

---------

Co-authored-by: Clemente <[email protected]>
Co-authored-by: Claus Rørbech <[email protected]>
  • Loading branch information
3 people authored Dec 12, 2023
1 parent d94e70c commit 445a6a9
Show file tree
Hide file tree
Showing 32 changed files with 1,165 additions and 34 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* None.

### Enhancements
* None.
* [Sync] Added option to use managed WebSockets via OkHttp instead of Realm's built-in WebSocket client for Sync traffic (Only Android and JVM targets for now). Managed WebSockets offer improved support for proxies and firewalls that require authentication. This feature is currently opt-in and can be enabled by using `AppConfiguration.usePlatformNetworking()`. Managed WebSockets will become the default in a future version. (PR [#1528](https://github.com/realm/realm-kotlin/pull/1528)).

### Fixed
* Cache notification callback JNI references at startup to ensure that symbols can be resolved in core callbacks. (Issue [#1577](https://github.com/realm/realm-kotlin/issues/1577))
Expand All @@ -25,7 +25,8 @@
* Minimum R8: 8.0.34.

### Internal
* None.
* Update to Ktor 2.3.4.
* Updated to CMake 3.27.7


## 1.13.1-SNAPSHOT (YYYY-MM-DD)
Expand Down Expand Up @@ -119,7 +120,7 @@ This release upgrades the Sync metadata in a way that is not compatible with old
* Fix error in `RealmAny.equals` that would sometimes return `true` when comparing RealmAnys wrapping same type but different values. (Issue [#1523](https://github.com/realm/realm-kotlin/pull/1523))
* [Sync] If calling a function on App Services that resulted in a redirect, it would only redirect for GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517))
* [Sync] Manual client reset on Windows would not trigger correctly when run inside `onManualResetFallback`. (Issue [#1515](https://github.com/realm/realm-kotlin/pull/1515))
* [Sync] `ClientResetRequiredException.executeClientReset()` now returns a boolean indicating if the manual reset fully succeded or not. (Issue [#1515](https://github.com/realm/realm-kotlin/pull/1515))
* [Sync] `ClientResetRequiredException.executeClientReset()` now returns a boolean indicating if the manual reset fully succeeded or not. (Issue [#1515](https://github.com/realm/realm-kotlin/pull/1515))
* [Sync] If calling a function on App Services that resulted in a redirect, it would only redirect for
GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517))
* [Sync] If calling a function on App Services that resulted in a redirect, it would only redirect for GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517))
Expand Down
6 changes: 3 additions & 3 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pipeline {
"integrationtest",
{
forwardAdbPorts()
testAndCollect("packages", "cleanAllTests -PincludeSdkModules=false connectedAndroidTest")
testAndCollect("packages", "cleanAllTests -PsyncUsePlatformNetworking=true -PincludeSdkModules=false connectedAndroidTest")
}
)
}
Expand All @@ -215,7 +215,7 @@ pipeline {
steps {
testWithServer([
{
testAndCollect("packages", 'cleanAllTests jvmTest -PincludeSdkModules=false ')
testAndCollect("packages", 'cleanAllTests jvmTest -PsyncUsePlatformNetworking=true -PincludeSdkModules=false ')
}
])
}
Expand All @@ -235,7 +235,7 @@ pipeline {
steps {
testWithServer([
{
testAndCollect("packages", 'cleanAllTests :test-sync:connectedAndroidtest -PincludeSdkModules=false -PtestBuildType=debugMinified')
testAndCollect("packages", 'cleanAllTests :test-sync:connectedAndroidtest -PsyncUsePlatformNetworking=true -PincludeSdkModules=false -PtestBuildType=debugMinified')
}
])
sh 'rm mapping.zip || true'
Expand Down
4 changes: 2 additions & 2 deletions buildSrc/src/main/kotlin/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ object Versions {
const val buildkonfig = "0.13.3" // https://github.com/yshrsmz/BuildKonfig
// Not currently used, so mostly here for documentation. Core requires minimum 3.15, but 3.18.1 is available through the Android SDK.
// Build also tested successfully with 3.21.4 (latest release).
const val cmake = "3.22.1"
const val cmake = "3.27.7"
const val coroutines = "1.7.0" // https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core
const val datetime = "0.4.0" // https://github.com/Kotlin/kotlinx-datetime
const val detektPlugin = "1.22.0-RC2" // https://github.com/detekt/detekt
Expand All @@ -131,7 +131,7 @@ object Versions {
const val latestKotlin = "1.9.20" // https://kotlinlang.org/docs/eap.html#build-details
const val kotlinCompileTesting = "1.5.0" // https://github.com/tschuchortdev/kotlin-compile-testing
const val ktlint = "0.45.2" // https://github.com/pinterest/ktlint
const val ktor = "2.1.2" // https://github.com/ktorio/ktor
const val ktor = "2.3.4" // https://github.com/ktorio/ktor
const val multidex = "2.0.1" // https://developer.android.com/jetpack/androidx/releases/multidex
const val nexusPublishPlugin = "1.1.0" // https://github.com/gradle-nexus/publish-plugin
const val okio = "3.2.0" // https://square.github.io/okio/#releases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import io.realm.kotlin.internal.interop.sync.NetworkTransport
import io.realm.kotlin.internal.interop.sync.ProgressDirection
import io.realm.kotlin.internal.interop.sync.SyncSessionResyncMode
import io.realm.kotlin.internal.interop.sync.SyncUserIdentity
import io.realm.kotlin.internal.interop.sync.WebSocketTransport
import io.realm.kotlin.internal.interop.sync.WebsocketCallbackResult
import io.realm.kotlin.internal.interop.sync.WebsocketErrorCode
import kotlinx.coroutines.CoroutineDispatcher
import org.mongodb.kbson.ObjectId
import kotlin.jvm.JvmInline
Expand Down Expand Up @@ -103,9 +106,15 @@ interface RealmUserT : CapiT
interface RealmNetworkTransportT : CapiT
interface RealmSyncSessionT : CapiT
interface RealmSubscriptionT : CapiT
interface RealmSyncSocketObserverPointerT : CapiT
interface RealmSyncSocketCallbackPointerT : CapiT

interface RealmBaseSubscriptionSet : CapiT
interface RealmSyncSocket : CapiT
interface RealmSubscriptionSetT : RealmBaseSubscriptionSet
interface RealmMutableSubscriptionSetT : RealmBaseSubscriptionSet
interface RealmSyncSocketT : RealmSyncSocket

// Public type aliases binding to internal verbose type safe type definitions. This should allow us
// to easily change implementation details later on.
typealias RealmAsyncOpenTaskPointer = NativePointer<RealmAsyncOpenTaskT>
Expand All @@ -121,7 +130,11 @@ typealias RealmSubscriptionPointer = NativePointer<RealmSubscriptionT>
typealias RealmBaseSubscriptionSetPointer = NativePointer<out RealmBaseSubscriptionSet>
typealias RealmSubscriptionSetPointer = NativePointer<RealmSubscriptionSetT>
typealias RealmMutableSubscriptionSetPointer = NativePointer<RealmMutableSubscriptionSetT>

typealias RealmSyncSocketPointer = NativePointer<RealmSyncSocketT>
typealias RealmSyncSocketObserverPointer = NativePointer<RealmSyncSocketObserverPointerT>
typealias RealmSyncSocketCallbackPointer = NativePointer<RealmSyncSocketCallbackPointerT>
typealias RealmWebsocketHandlerCallbackPointer = NativePointer<CapiT>
typealias RealmWebsocketProviderPointer = NativePointer<CapiT>
/**
* Class for grouping and normalizing values we want to send as part of
* logging in Sync Users.
Expand Down Expand Up @@ -494,7 +507,6 @@ expect object RealmInterop {
fun realm_app_link_credentials(app: RealmAppPointer, user: RealmUserPointer, credentials: RealmCredentialsPointer, callback: AppCallback<RealmUserPointer>)
fun realm_clear_cached_apps()
fun realm_app_sync_client_get_default_file_path_for_realm(
app: RealmAppPointer,
syncConfig: RealmSyncConfigurationPointer,
overriddenName: String?
): String
Expand Down Expand Up @@ -799,4 +811,22 @@ expect object RealmInterop {
fun realm_sync_subscriptionset_commit(
mutableSubscriptionSet: RealmMutableSubscriptionSetPointer
): RealmSubscriptionSetPointer

fun realm_sync_set_websocket_transport(
syncClientConfig: RealmSyncClientConfigurationPointer,
webSocketTransport: WebSocketTransport
)

fun realm_sync_socket_callback_complete(nativePointer: RealmWebsocketHandlerCallbackPointer, cancelled: Boolean = false, status: WebsocketCallbackResult = WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_SUCCESS, reason: String = "")

fun realm_sync_socket_websocket_connected(nativePointer: RealmWebsocketProviderPointer, protocol: String)

fun realm_sync_socket_websocket_error(nativePointer: RealmWebsocketProviderPointer)

fun realm_sync_socket_websocket_message(
nativePointer: RealmWebsocketProviderPointer,
data: ByteArray
): Boolean

fun realm_sync_socket_websocket_closed(nativePointer: RealmWebsocketProviderPointer, wasClean: Boolean, errorCode: WebsocketErrorCode, reason: String = "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ expect enum class WebsocketErrorCode : CodeDescription {
RLM_ERR_WEBSOCKET_FATAL_ERROR;

companion object {
internal fun of(nativeValue: Int): WebsocketErrorCode?
fun of(nativeValue: Int): WebsocketErrorCode?
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2022 Realm Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.realm.kotlin.internal.interop.sync

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 2023 Realm Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.realm.kotlin.internal.interop.sync

import io.realm.kotlin.internal.interop.RealmInterop
import io.realm.kotlin.internal.interop.RealmWebsocketHandlerCallbackPointer
import io.realm.kotlin.internal.interop.RealmWebsocketProviderPointer
import kotlinx.coroutines.Job

/**
* Interface to be implemented by the websocket provider. This helps un-bundle the implementation
* from Core to leverage the platform capabilities (Proxy, firewall, vpn etc.).
*/
interface WebSocketTransport {
/**
* Submit a handler function to be executed by the event loop.
*/
fun post(handlerCallback: RealmWebsocketHandlerCallbackPointer)

/**
* Create and register a new timer whose handler function will be posted
* to the event loop when the provided delay expires.
* @return [CancellableTimer] to be called if the timer is to be cancelled before the delay.
*/
fun createTimer(
delayInMilliseconds: Long,
handlerCallback: RealmWebsocketHandlerCallbackPointer,
): CancellableTimer

/**
* Create a new websocket pointed to the server indicated by endpoint and
* connect to the server. Any events that occur during the execution of the
* websocket will call directly to the handlers provided by the observer (new messages, error, close events)
*
* @return [WebSocketClient] instance to be used by Core to send data, and signal a close session.
*/
@Suppress("LongParameterList")
fun connect(
observer: WebSocketObserver,
path: String,
address: String,
port: Long,
isSsl: Boolean,
numProtocols: Long,
supportedSyncProtocols: String
): WebSocketClient

/**
* Writes to the previously created websocket in [connect] the binary data. The provided [handlerCallback] needs
* to run in the event loop after a successful write or in case of an error.
*/
fun write(
webSocketClient: WebSocketClient,
data: ByteArray,
length: Long,
handlerCallback: RealmWebsocketHandlerCallbackPointer
)

/**
* This helper function run the provided function pointer. It needs to be called within the same event loop context (thread)
* as the rest of the other functions.
*/
fun runCallback(
handlerCallback: RealmWebsocketHandlerCallbackPointer,
cancelled: Boolean = false,
status: WebsocketCallbackResult = WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_SUCCESS,
reason: String = ""
) {
RealmInterop.realm_sync_socket_callback_complete(
handlerCallback, cancelled, status, reason
)
}

/**
* Core signal the transport, all websockets previously created with [connect] would have been closed at this point
* this is useful to do any resource cleanup like shutting down the engine or closing coroutine dispatcher.
*/
fun close()
}

/**
* Cancel a previously scheduled timer created via [WebSocketTransport.createTimer].
*/
class CancellableTimer(
private val job: Job,
private val cancelCallback: () -> Unit
) {
fun cancel() {
job.cancel()
cancelCallback()
}
}

/**
* Define an interface to interact with the websocket created via [WebSocketTransport.connect].
* This will be called from Core.
*/
interface WebSocketClient {
/**
* Send a binary Frame to the remote peer.
*/
fun send(message: ByteArray, handlerCallback: RealmWebsocketHandlerCallbackPointer)

/**
* Close the websocket.
*/
fun close()
}

/**
* Defines an abstraction of the underlying Http engine used to create the websocket.
* This abstraction is needed in order to deterministically create and shutdown the engine at the transport level.
* All websocket within the same App share the same transport and by definition the same engine.
*/
interface WebsocketEngine {
fun shutdown()
fun <T> getInstance(): T
}

/**
* Wrapper around Core callback pointer (observer). This will delegate calls for all incoming messages from the remote peer.
*/
class WebSocketObserver(private val webSocketObserverPointer: RealmWebsocketProviderPointer) {
/**
* Communicate the negotiated Sync protocol.
*/
fun onConnected(protocol: String) {
RealmInterop.realm_sync_socket_websocket_connected(webSocketObserverPointer, protocol)
}

/**
* Notify an error.
*/
fun onError() {
RealmInterop.realm_sync_socket_websocket_error(webSocketObserverPointer)
}

/**
* Forward received message to Core.
*/
fun onNewMessage(data: ByteArray): Boolean {
return RealmInterop.realm_sync_socket_websocket_message(webSocketObserverPointer, data)
}

/**
* Notify closure message.
*/
fun onClose(wasClean: Boolean, errorCode: WebsocketErrorCode, reason: String) {
RealmInterop.realm_sync_socket_websocket_closed(
webSocketObserverPointer, wasClean, errorCode, reason
)
}
}
2 changes: 1 addition & 1 deletion packages/cinterop/src/jvm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ if (CMAKE_SYSTEM_NAME MATCHES "^Windows")

elseif (CMAKE_SYSTEM_NAME MATCHES "^Android")
MESSAGE("Building JNI for Android")
set(CAPI_BUILD "${CMAKE_SOURCE_DIR}/../../../external/core}/build-android-${ANDROID_ABI}-${CMAKE_BUILD_TYPE}")
set(CAPI_BUILD "${CMAKE_SOURCE_DIR}/../../../external/core/build-android-${ANDROID_ABI}-${CMAKE_BUILD_TYPE}")
set(REALM_INCLUDE_DIRS ${CAPI_BUILD}/src ${CINTEROP_JNI} ${SWIG_JNI_GENERATED} ${SWIG_JNI_HELPERS})
set(REALM_TARGET_LINK_LIBS log android RealmFFIStatic Realm::ObjectStore)

Expand Down
12 changes: 12 additions & 0 deletions packages/cinterop/src/jvm/jni/java_class_global_def.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class JavaClassGlobalDef {
, m_io_realm_kotlin_internal_interop_app_callback(env, "io/realm/kotlin/internal/interop/AppCallback", false)
, m_io_realm_kotlin_internal_interop_connection_state_change_callback(env, "io/realm/kotlin/internal/interop/ConnectionStateChangeCallback", false)
, m_io_realm_kotlin_internal_interop_sync_thread_observer(env, "io/realm/kotlin/internal/interop/SyncThreadObserver", false)
, m_io_realm_kotlin_internal_interop_sync_websocket_transport(env, "io/realm/kotlin/internal/interop/sync/WebSocketTransport", false)
, m_io_realm_kotlin_internal_interop_sync_websocket_client(env, "io/realm/kotlin/internal/interop/sync/WebSocketClient", false)
, m_io_realm_kotlin_internal_interop_notification_callback(env, "io/realm/kotlin/internal/interop/NotificationCallback", false)
{
}
Expand Down Expand Up @@ -93,6 +95,8 @@ class JavaClassGlobalDef {
jni_util::JavaClass m_io_realm_kotlin_internal_interop_app_callback;
jni_util::JavaClass m_io_realm_kotlin_internal_interop_connection_state_change_callback;
jni_util::JavaClass m_io_realm_kotlin_internal_interop_sync_thread_observer;
jni_util::JavaClass m_io_realm_kotlin_internal_interop_sync_websocket_transport;
jni_util::JavaClass m_io_realm_kotlin_internal_interop_sync_websocket_client;
jni_util::JavaClass m_io_realm_kotlin_internal_interop_notification_callback;

inline static std::unique_ptr<JavaClassGlobalDef>& instance()
Expand Down Expand Up @@ -235,6 +239,14 @@ class JavaClassGlobalDef {
return jni_util::JavaMethod(env, instance()->m_kotlin_jvm_functions_function1, "invoke",
"(Ljava/lang/Object;)Ljava/lang/Object;");
}

inline static const jni_util::JavaClass& sync_websocket_transport() {
return instance()->m_io_realm_kotlin_internal_interop_sync_websocket_transport;
}

inline static const jni_util::JavaClass& sync_websocket_client() {
return instance()->m_io_realm_kotlin_internal_interop_sync_websocket_client;
}
};

} // namespace realm
Expand Down
Loading

0 comments on commit 445a6a9

Please sign in to comment.