diff --git a/CHANGELOG.md b/CHANGELOG.md index 917ec1c80f..9b8f417b7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ * None. ### Enhancements -* None. +* [Sync] Added option to use managed WebSockets via OkHttp instead of Realm's built-in WebSocket client for Sync traffic (Only Android and JVM targets for now). Managed WebSockets offer improved support for proxies and firewalls that require authentication. This feature is currently opt-in and can be enabled by using `AppConfiguration.usePlatformNetworking()`. Managed WebSockets will become the default in a future version. (PR [#1528](https://github.com/realm/realm-kotlin/pull/1528)). ### Fixed * Cache notification callback JNI references at startup to ensure that symbols can be resolved in core callbacks. (Issue [#1577](https://github.com/realm/realm-kotlin/issues/1577)) @@ -25,7 +25,8 @@ * Minimum R8: 8.0.34. ### Internal -* None. +* Update to Ktor 2.3.4. +* Updated to CMake 3.27.7 ## 1.13.1-SNAPSHOT (YYYY-MM-DD) @@ -119,7 +120,7 @@ This release upgrades the Sync metadata in a way that is not compatible with old * Fix error in `RealmAny.equals` that would sometimes return `true` when comparing RealmAnys wrapping same type but different values. (Issue [#1523](https://github.com/realm/realm-kotlin/pull/1523)) * [Sync] If calling a function on App Services that resulted in a redirect, it would only redirect for GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517)) * [Sync] Manual client reset on Windows would not trigger correctly when run inside `onManualResetFallback`. (Issue [#1515](https://github.com/realm/realm-kotlin/pull/1515)) -* [Sync] `ClientResetRequiredException.executeClientReset()` now returns a boolean indicating if the manual reset fully succeded or not. (Issue [#1515](https://github.com/realm/realm-kotlin/pull/1515)) +* [Sync] `ClientResetRequiredException.executeClientReset()` now returns a boolean indicating if the manual reset fully succeeded or not. (Issue [#1515](https://github.com/realm/realm-kotlin/pull/1515)) * [Sync] If calling a function on App Services that resulted in a redirect, it would only redirect for GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517)) * [Sync] If calling a function on App Services that resulted in a redirect, it would only redirect for GET requests. (Issue [#1517](https://github.com/realm/realm-kotlin/pull/1517)) diff --git a/Jenkinsfile b/Jenkinsfile index e39f7d87b0..674ff50a36 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -191,7 +191,7 @@ pipeline { "integrationtest", { forwardAdbPorts() - testAndCollect("packages", "cleanAllTests -PincludeSdkModules=false connectedAndroidTest") + testAndCollect("packages", "cleanAllTests -PsyncUsePlatformNetworking=true -PincludeSdkModules=false connectedAndroidTest") } ) } @@ -215,7 +215,7 @@ pipeline { steps { testWithServer([ { - testAndCollect("packages", 'cleanAllTests jvmTest -PincludeSdkModules=false ') + testAndCollect("packages", 'cleanAllTests jvmTest -PsyncUsePlatformNetworking=true -PincludeSdkModules=false ') } ]) } @@ -235,7 +235,7 @@ pipeline { steps { testWithServer([ { - testAndCollect("packages", 'cleanAllTests :test-sync:connectedAndroidtest -PincludeSdkModules=false -PtestBuildType=debugMinified') + testAndCollect("packages", 'cleanAllTests :test-sync:connectedAndroidtest -PsyncUsePlatformNetworking=true -PincludeSdkModules=false -PtestBuildType=debugMinified') } ]) sh 'rm mapping.zip || true' diff --git a/buildSrc/src/main/kotlin/Config.kt b/buildSrc/src/main/kotlin/Config.kt index b71f9e4fb8..bcb85872dd 100644 --- a/buildSrc/src/main/kotlin/Config.kt +++ b/buildSrc/src/main/kotlin/Config.kt @@ -115,7 +115,7 @@ object Versions { const val buildkonfig = "0.13.3" // https://github.com/yshrsmz/BuildKonfig // Not currently used, so mostly here for documentation. Core requires minimum 3.15, but 3.18.1 is available through the Android SDK. // Build also tested successfully with 3.21.4 (latest release). - const val cmake = "3.22.1" + const val cmake = "3.27.7" const val coroutines = "1.7.0" // https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core const val datetime = "0.4.0" // https://github.com/Kotlin/kotlinx-datetime const val detektPlugin = "1.22.0-RC2" // https://github.com/detekt/detekt @@ -131,7 +131,7 @@ object Versions { const val latestKotlin = "1.9.20" // https://kotlinlang.org/docs/eap.html#build-details const val kotlinCompileTesting = "1.5.0" // https://github.com/tschuchortdev/kotlin-compile-testing const val ktlint = "0.45.2" // https://github.com/pinterest/ktlint - const val ktor = "2.1.2" // https://github.com/ktorio/ktor + const val ktor = "2.3.4" // https://github.com/ktorio/ktor const val multidex = "2.0.1" // https://developer.android.com/jetpack/androidx/releases/multidex const val nexusPublishPlugin = "1.1.0" // https://github.com/gradle-nexus/publish-plugin const val okio = "3.2.0" // https://square.github.io/okio/#releases diff --git a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt index 7b49fc5f8b..6a846f05f0 100644 --- a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt +++ b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt @@ -29,6 +29,9 @@ import io.realm.kotlin.internal.interop.sync.NetworkTransport import io.realm.kotlin.internal.interop.sync.ProgressDirection import io.realm.kotlin.internal.interop.sync.SyncSessionResyncMode import io.realm.kotlin.internal.interop.sync.SyncUserIdentity +import io.realm.kotlin.internal.interop.sync.WebSocketTransport +import io.realm.kotlin.internal.interop.sync.WebsocketCallbackResult +import io.realm.kotlin.internal.interop.sync.WebsocketErrorCode import kotlinx.coroutines.CoroutineDispatcher import org.mongodb.kbson.ObjectId import kotlin.jvm.JvmInline @@ -103,9 +106,15 @@ interface RealmUserT : CapiT interface RealmNetworkTransportT : CapiT interface RealmSyncSessionT : CapiT interface RealmSubscriptionT : CapiT +interface RealmSyncSocketObserverPointerT : CapiT +interface RealmSyncSocketCallbackPointerT : CapiT + interface RealmBaseSubscriptionSet : CapiT +interface RealmSyncSocket : CapiT interface RealmSubscriptionSetT : RealmBaseSubscriptionSet interface RealmMutableSubscriptionSetT : RealmBaseSubscriptionSet +interface RealmSyncSocketT : RealmSyncSocket + // Public type aliases binding to internal verbose type safe type definitions. This should allow us // to easily change implementation details later on. typealias RealmAsyncOpenTaskPointer = NativePointer @@ -121,7 +130,11 @@ typealias RealmSubscriptionPointer = NativePointer typealias RealmBaseSubscriptionSetPointer = NativePointer typealias RealmSubscriptionSetPointer = NativePointer typealias RealmMutableSubscriptionSetPointer = NativePointer - +typealias RealmSyncSocketPointer = NativePointer +typealias RealmSyncSocketObserverPointer = NativePointer +typealias RealmSyncSocketCallbackPointer = NativePointer +typealias RealmWebsocketHandlerCallbackPointer = NativePointer +typealias RealmWebsocketProviderPointer = NativePointer /** * Class for grouping and normalizing values we want to send as part of * logging in Sync Users. @@ -494,7 +507,6 @@ expect object RealmInterop { fun realm_app_link_credentials(app: RealmAppPointer, user: RealmUserPointer, credentials: RealmCredentialsPointer, callback: AppCallback) fun realm_clear_cached_apps() fun realm_app_sync_client_get_default_file_path_for_realm( - app: RealmAppPointer, syncConfig: RealmSyncConfigurationPointer, overriddenName: String? ): String @@ -799,4 +811,22 @@ expect object RealmInterop { fun realm_sync_subscriptionset_commit( mutableSubscriptionSet: RealmMutableSubscriptionSetPointer ): RealmSubscriptionSetPointer + + fun realm_sync_set_websocket_transport( + syncClientConfig: RealmSyncClientConfigurationPointer, + webSocketTransport: WebSocketTransport + ) + + fun realm_sync_socket_callback_complete(nativePointer: RealmWebsocketHandlerCallbackPointer, cancelled: Boolean = false, status: WebsocketCallbackResult = WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_SUCCESS, reason: String = "") + + fun realm_sync_socket_websocket_connected(nativePointer: RealmWebsocketProviderPointer, protocol: String) + + fun realm_sync_socket_websocket_error(nativePointer: RealmWebsocketProviderPointer) + + fun realm_sync_socket_websocket_message( + nativePointer: RealmWebsocketProviderPointer, + data: ByteArray + ): Boolean + + fun realm_sync_socket_websocket_closed(nativePointer: RealmWebsocketProviderPointer, wasClean: Boolean, errorCode: WebsocketErrorCode, reason: String = "") } diff --git a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt index dae3a68dd8..a182d610a8 100644 --- a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt +++ b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt @@ -121,7 +121,7 @@ expect enum class WebsocketErrorCode : CodeDescription { RLM_ERR_WEBSOCKET_FATAL_ERROR; companion object { - internal fun of(nativeValue: Int): WebsocketErrorCode? + fun of(nativeValue: Int): WebsocketErrorCode? } } diff --git a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/SyncUserIdentity.kt b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/SyncUserIdentity.kt index c45b3b9916..3fbca71c76 100644 --- a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/SyncUserIdentity.kt +++ b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/SyncUserIdentity.kt @@ -1,3 +1,19 @@ +/* + * Copyright 2022 Realm Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.realm.kotlin.internal.interop.sync /** diff --git a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/WebSocketTransport.kt b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/WebSocketTransport.kt new file mode 100644 index 0000000000..136a535c05 --- /dev/null +++ b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/sync/WebSocketTransport.kt @@ -0,0 +1,166 @@ +/* + * Copyright 2023 Realm Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.realm.kotlin.internal.interop.sync + +import io.realm.kotlin.internal.interop.RealmInterop +import io.realm.kotlin.internal.interop.RealmWebsocketHandlerCallbackPointer +import io.realm.kotlin.internal.interop.RealmWebsocketProviderPointer +import kotlinx.coroutines.Job + +/** + * Interface to be implemented by the websocket provider. This helps un-bundle the implementation + * from Core to leverage the platform capabilities (Proxy, firewall, vpn etc.). + */ +interface WebSocketTransport { + /** + * Submit a handler function to be executed by the event loop. + */ + fun post(handlerCallback: RealmWebsocketHandlerCallbackPointer) + + /** + * Create and register a new timer whose handler function will be posted + * to the event loop when the provided delay expires. + * @return [CancellableTimer] to be called if the timer is to be cancelled before the delay. + */ + fun createTimer( + delayInMilliseconds: Long, + handlerCallback: RealmWebsocketHandlerCallbackPointer, + ): CancellableTimer + + /** + * Create a new websocket pointed to the server indicated by endpoint and + * connect to the server. Any events that occur during the execution of the + * websocket will call directly to the handlers provided by the observer (new messages, error, close events) + * + * @return [WebSocketClient] instance to be used by Core to send data, and signal a close session. + */ + @Suppress("LongParameterList") + fun connect( + observer: WebSocketObserver, + path: String, + address: String, + port: Long, + isSsl: Boolean, + numProtocols: Long, + supportedSyncProtocols: String + ): WebSocketClient + + /** + * Writes to the previously created websocket in [connect] the binary data. The provided [handlerCallback] needs + * to run in the event loop after a successful write or in case of an error. + */ + fun write( + webSocketClient: WebSocketClient, + data: ByteArray, + length: Long, + handlerCallback: RealmWebsocketHandlerCallbackPointer + ) + + /** + * This helper function run the provided function pointer. It needs to be called within the same event loop context (thread) + * as the rest of the other functions. + */ + fun runCallback( + handlerCallback: RealmWebsocketHandlerCallbackPointer, + cancelled: Boolean = false, + status: WebsocketCallbackResult = WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_SUCCESS, + reason: String = "" + ) { + RealmInterop.realm_sync_socket_callback_complete( + handlerCallback, cancelled, status, reason + ) + } + + /** + * Core signal the transport, all websockets previously created with [connect] would have been closed at this point + * this is useful to do any resource cleanup like shutting down the engine or closing coroutine dispatcher. + */ + fun close() +} + +/** + * Cancel a previously scheduled timer created via [WebSocketTransport.createTimer]. + */ +class CancellableTimer( + private val job: Job, + private val cancelCallback: () -> Unit +) { + fun cancel() { + job.cancel() + cancelCallback() + } +} + +/** + * Define an interface to interact with the websocket created via [WebSocketTransport.connect]. + * This will be called from Core. + */ +interface WebSocketClient { + /** + * Send a binary Frame to the remote peer. + */ + fun send(message: ByteArray, handlerCallback: RealmWebsocketHandlerCallbackPointer) + + /** + * Close the websocket. + */ + fun close() +} + +/** + * Defines an abstraction of the underlying Http engine used to create the websocket. + * This abstraction is needed in order to deterministically create and shutdown the engine at the transport level. + * All websocket within the same App share the same transport and by definition the same engine. + */ +interface WebsocketEngine { + fun shutdown() + fun getInstance(): T +} + +/** + * Wrapper around Core callback pointer (observer). This will delegate calls for all incoming messages from the remote peer. + */ +class WebSocketObserver(private val webSocketObserverPointer: RealmWebsocketProviderPointer) { + /** + * Communicate the negotiated Sync protocol. + */ + fun onConnected(protocol: String) { + RealmInterop.realm_sync_socket_websocket_connected(webSocketObserverPointer, protocol) + } + + /** + * Notify an error. + */ + fun onError() { + RealmInterop.realm_sync_socket_websocket_error(webSocketObserverPointer) + } + + /** + * Forward received message to Core. + */ + fun onNewMessage(data: ByteArray): Boolean { + return RealmInterop.realm_sync_socket_websocket_message(webSocketObserverPointer, data) + } + + /** + * Notify closure message. + */ + fun onClose(wasClean: Boolean, errorCode: WebsocketErrorCode, reason: String) { + RealmInterop.realm_sync_socket_websocket_closed( + webSocketObserverPointer, wasClean, errorCode, reason + ) + } +} diff --git a/packages/cinterop/src/jvm/CMakeLists.txt b/packages/cinterop/src/jvm/CMakeLists.txt index 28087655d3..874f7226d8 100644 --- a/packages/cinterop/src/jvm/CMakeLists.txt +++ b/packages/cinterop/src/jvm/CMakeLists.txt @@ -15,7 +15,7 @@ if (CMAKE_SYSTEM_NAME MATCHES "^Windows") elseif (CMAKE_SYSTEM_NAME MATCHES "^Android") MESSAGE("Building JNI for Android") - set(CAPI_BUILD "${CMAKE_SOURCE_DIR}/../../../external/core}/build-android-${ANDROID_ABI}-${CMAKE_BUILD_TYPE}") + set(CAPI_BUILD "${CMAKE_SOURCE_DIR}/../../../external/core/build-android-${ANDROID_ABI}-${CMAKE_BUILD_TYPE}") set(REALM_INCLUDE_DIRS ${CAPI_BUILD}/src ${CINTEROP_JNI} ${SWIG_JNI_GENERATED} ${SWIG_JNI_HELPERS}) set(REALM_TARGET_LINK_LIBS log android RealmFFIStatic Realm::ObjectStore) diff --git a/packages/cinterop/src/jvm/jni/java_class_global_def.hpp b/packages/cinterop/src/jvm/jni/java_class_global_def.hpp index c35f41f903..3979de66a1 100644 --- a/packages/cinterop/src/jvm/jni/java_class_global_def.hpp +++ b/packages/cinterop/src/jvm/jni/java_class_global_def.hpp @@ -66,6 +66,8 @@ class JavaClassGlobalDef { , m_io_realm_kotlin_internal_interop_app_callback(env, "io/realm/kotlin/internal/interop/AppCallback", false) , m_io_realm_kotlin_internal_interop_connection_state_change_callback(env, "io/realm/kotlin/internal/interop/ConnectionStateChangeCallback", false) , m_io_realm_kotlin_internal_interop_sync_thread_observer(env, "io/realm/kotlin/internal/interop/SyncThreadObserver", false) + , m_io_realm_kotlin_internal_interop_sync_websocket_transport(env, "io/realm/kotlin/internal/interop/sync/WebSocketTransport", false) + , m_io_realm_kotlin_internal_interop_sync_websocket_client(env, "io/realm/kotlin/internal/interop/sync/WebSocketClient", false) , m_io_realm_kotlin_internal_interop_notification_callback(env, "io/realm/kotlin/internal/interop/NotificationCallback", false) { } @@ -93,6 +95,8 @@ class JavaClassGlobalDef { jni_util::JavaClass m_io_realm_kotlin_internal_interop_app_callback; jni_util::JavaClass m_io_realm_kotlin_internal_interop_connection_state_change_callback; jni_util::JavaClass m_io_realm_kotlin_internal_interop_sync_thread_observer; + jni_util::JavaClass m_io_realm_kotlin_internal_interop_sync_websocket_transport; + jni_util::JavaClass m_io_realm_kotlin_internal_interop_sync_websocket_client; jni_util::JavaClass m_io_realm_kotlin_internal_interop_notification_callback; inline static std::unique_ptr& instance() @@ -235,6 +239,14 @@ class JavaClassGlobalDef { return jni_util::JavaMethod(env, instance()->m_kotlin_jvm_functions_function1, "invoke", "(Ljava/lang/Object;)Ljava/lang/Object;"); } + + inline static const jni_util::JavaClass& sync_websocket_transport() { + return instance()->m_io_realm_kotlin_internal_interop_sync_websocket_transport; + } + + inline static const jni_util::JavaClass& sync_websocket_client() { + return instance()->m_io_realm_kotlin_internal_interop_sync_websocket_client; + } }; } // namespace realm diff --git a/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt b/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt index 20f8b0f1fb..b79c41f4f4 100644 --- a/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt +++ b/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt @@ -30,6 +30,9 @@ import io.realm.kotlin.internal.interop.sync.NetworkTransport import io.realm.kotlin.internal.interop.sync.ProgressDirection import io.realm.kotlin.internal.interop.sync.SyncSessionResyncMode import io.realm.kotlin.internal.interop.sync.SyncUserIdentity +import io.realm.kotlin.internal.interop.sync.WebSocketTransport +import io.realm.kotlin.internal.interop.sync.WebsocketCallbackResult +import io.realm.kotlin.internal.interop.sync.WebsocketErrorCode import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch @@ -1216,7 +1219,6 @@ actual object RealmInterop { } actual fun realm_app_sync_client_get_default_file_path_for_realm( - app: RealmAppPointer, syncConfig: RealmSyncConfigurationPointer, overriddenName: String? ): String { @@ -2056,6 +2058,36 @@ actual object RealmInterop { return LongPointerWrapper(realmc.realm_sync_subscription_set_commit(mutableSubscriptionSet.cptr())) } + actual fun realm_sync_set_websocket_transport( + syncClientConfig: RealmSyncClientConfigurationPointer, + webSocketTransport: WebSocketTransport + ) { + realmc.realm_sync_websocket_new(syncClientConfig.cptr(), webSocketTransport) + } + + actual fun realm_sync_socket_callback_complete(nativePointer: RealmWebsocketHandlerCallbackPointer, cancelled: Boolean, status: WebsocketCallbackResult, reason: String) { + realmc.realm_sync_websocket_callback_complete(cancelled, nativePointer.cptr(), status.nativeValue, reason) + } + + actual fun realm_sync_socket_websocket_connected(nativePointer: RealmWebsocketProviderPointer, protocol: String) { + realmc.realm_sync_websocket_connected(nativePointer.cptr(), protocol) + } + + actual fun realm_sync_socket_websocket_error(nativePointer: RealmWebsocketProviderPointer) { + realmc.realm_sync_websocket_error(nativePointer.cptr()) + } + + actual fun realm_sync_socket_websocket_message( + nativePointer: RealmWebsocketProviderPointer, + data: ByteArray + ): Boolean { + return realmc.realm_sync_websocket_message(nativePointer.cptr(), data, data.size.toLong()) + } + + actual fun realm_sync_socket_websocket_closed(nativePointer: RealmWebsocketProviderPointer, wasClean: Boolean, errorCode: WebsocketErrorCode, reason: String) { + realmc.realm_sync_websocket_closed(nativePointer.cptr(), wasClean, errorCode.nativeValue, reason) + } + fun NativePointer.cptr(): Long { return (this as LongPointerWrapper).ptr } diff --git a/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt b/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt index d0fa2de33e..c01be29fc1 100644 --- a/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt +++ b/packages/cinterop/src/jvm/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt @@ -130,7 +130,7 @@ actual enum class WebsocketErrorCode( RLM_ERR_WEBSOCKET_FATAL_ERROR("FatalError", realm_web_socket_errno_e.RLM_ERR_WEBSOCKET_FATAL_ERROR); actual companion object { - internal actual fun of(nativeValue: Int): WebsocketErrorCode? = + actual fun of(nativeValue: Int): WebsocketErrorCode? = values().firstOrNull { value -> value.nativeValue == nativeValue } diff --git a/packages/cinterop/src/native/realm.def b/packages/cinterop/src/native/realm.def index ec72bbd95a..01f59930d9 100644 --- a/packages/cinterop/src/native/realm.def +++ b/packages/cinterop/src/native/realm.def @@ -12,6 +12,10 @@ headerFilter = realm.h realm/error_codes.h // libraryPaths.ios_x64 = ../external/core/build-macos_x64/src/realm/object-store/c_api ../external/core/build-macos_x64/src/realm ../external/core/build-macos_x64/src/realm/parser ../external/core/build-macos_x64/src/realm/object-store/ linkerOpts = -lcompression -lz -framework Foundation -framework CoreFoundation -framework Security strictEnums = realm_errno realm_error_category realm_sync_errno_client realm_sync_errno_connection realm_sync_errno_session realm_web_socket_errno realm_sync_socket_callback_result + +// We don't want to convert Websocket binary data to String +noStringConversion = realm_sync_socket_websocket_message + --- #include #include diff --git a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/ErrorCode.kt b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/ErrorCode.kt index aadd8db464..07d2f18a88 100644 --- a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/ErrorCode.kt +++ b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/ErrorCode.kt @@ -20,7 +20,7 @@ import realm_wrapper.realm_errno actual enum class ErrorCode( override val description: String, - private val nativeError: realm_errno + nativeError: realm_errno ) : CodeDescription { RLM_ERR_NONE("None", realm_errno.RLM_ERR_NONE), RLM_ERR_RUNTIME("Runtime", realm_errno.RLM_ERR_RUNTIME), diff --git a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt index 6363425be5..cb73c2398f 100644 --- a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt +++ b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt @@ -22,6 +22,7 @@ import io.realm.kotlin.internal.interop.Constants.ENCRYPTION_KEY_LENGTH import io.realm.kotlin.internal.interop.sync.ApiKeyWrapper import io.realm.kotlin.internal.interop.sync.AppError import io.realm.kotlin.internal.interop.sync.AuthProvider +import io.realm.kotlin.internal.interop.sync.CancellableTimer import io.realm.kotlin.internal.interop.sync.CoreCompensatingWriteInfo import io.realm.kotlin.internal.interop.sync.CoreConnectionState import io.realm.kotlin.internal.interop.sync.CoreSubscriptionSetState @@ -34,6 +35,11 @@ import io.realm.kotlin.internal.interop.sync.Response import io.realm.kotlin.internal.interop.sync.SyncError import io.realm.kotlin.internal.interop.sync.SyncSessionResyncMode import io.realm.kotlin.internal.interop.sync.SyncUserIdentity +import io.realm.kotlin.internal.interop.sync.WebSocketClient +import io.realm.kotlin.internal.interop.sync.WebSocketObserver +import io.realm.kotlin.internal.interop.sync.WebSocketTransport +import io.realm.kotlin.internal.interop.sync.WebsocketCallbackResult +import io.realm.kotlin.internal.interop.sync.WebsocketErrorCode import kotlinx.atomicfu.AtomicBoolean import kotlinx.atomicfu.atomic import kotlinx.cinterop.AutofreeScope @@ -72,6 +78,7 @@ import kotlinx.cinterop.refTo import kotlinx.cinterop.set import kotlinx.cinterop.staticCFunction import kotlinx.cinterop.toCStringArray +import kotlinx.cinterop.toCValues import kotlinx.cinterop.toKString import kotlinx.cinterop.useContents import kotlinx.cinterop.usePinned @@ -120,6 +127,12 @@ import realm_wrapper.realm_sync_client_metadata_mode import realm_wrapper.realm_sync_session_resync_mode import realm_wrapper.realm_sync_session_state_e import realm_wrapper.realm_sync_session_stop_policy_e +import realm_wrapper.realm_sync_socket_post_callback_t +import realm_wrapper.realm_sync_socket_t +import realm_wrapper.realm_sync_socket_timer_callback_t +import realm_wrapper.realm_sync_socket_timer_t +import realm_wrapper.realm_sync_socket_websocket_t +import realm_wrapper.realm_sync_socket_write_callback_t import realm_wrapper.realm_t import realm_wrapper.realm_user_identity import realm_wrapper.realm_user_t @@ -2279,7 +2292,6 @@ actual object RealmInterop { } actual fun realm_app_sync_client_get_default_file_path_for_realm( - app: RealmAppPointer, syncConfig: RealmSyncConfigurationPointer, overriddenName: String? ): String { @@ -2756,6 +2768,149 @@ actual object RealmInterop { ) } + actual fun realm_sync_set_websocket_transport( + syncClientConfig: RealmSyncClientConfigurationPointer, + webSocketTransport: WebSocketTransport + ) { + val realmSyncSocketNew: CPointer = + checkedPointerResult( + realm_wrapper.realm_sync_socket_new( + userdata = StableRef.create(webSocketTransport).asCPointer(), + userdata_free = staticCFunction { userdata: CPointer? -> + safeUserData(userdata).close() + disposeUserData(userdata) + }, + post_func = staticCFunction { userdata: CPointer?, syncSocketCallback: CPointer? -> + val callback: WebsocketFunctionHandlerCallback = { cancelled, _, _ -> + realm_wrapper.realm_sync_socket_post_complete( + syncSocketCallback, + if (cancelled) WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_OPERATION_ABORTED.asNativeEnum else WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_SUCCESS.asNativeEnum, + "" + ) + } + + safeUserData(userdata).post( + CPointerWrapper(StableRef.create(callback).asCPointer()) + ) + }, + create_timer_func = staticCFunction { userdata: CPointer?, delayInMilliseconds: uint64_t, syncSocketCallback: CPointer? -> + val callback: WebsocketFunctionHandlerCallback = { cancelled, _, _ -> + if (cancelled) { + realm_wrapper.realm_sync_socket_timer_canceled(syncSocketCallback) + } else { + realm_wrapper.realm_sync_socket_timer_complete( + syncSocketCallback, + WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_SUCCESS.asNativeEnum, + "" + ) + } + } + + safeUserData(userdata).let { ws -> + val job: CancellableTimer = ws.createTimer( + delayInMilliseconds.toLong(), + CPointerWrapper(StableRef.create(callback).asCPointer()) + ) + StableRef.create(job).asCPointer() + } + }, + cancel_timer_func = staticCFunction { userdata: CPointer?, timer: realm_sync_socket_timer_t? -> + safeUserData(timer).cancel() + }, + free_timer_func = staticCFunction { userdata: CPointer?, timer: realm_sync_socket_timer_t? -> + disposeUserData(timer) + }, + websocket_connect_func = staticCFunction { userdata: CPointer?, endpoint: CValue, observer: CPointer? -> + safeUserData(userdata).let { websocketTransport -> + endpoint.useContents { + val managedObserver = WebSocketObserver(CPointerWrapper(observer)) + + val supportedProtocols = mutableListOf() + for (i in 0 until this.num_protocols.toInt()) { + val protocol: CPointer>? = + this.protocols?.get(i) + supportedProtocols.add(protocol.safeKString()) + } + val webSocketClient: WebSocketClient = websocketTransport.connect( + managedObserver, + this.path.safeKString(), + this.address.safeKString(), + this.port.toLong(), + this.is_ssl, + this.num_protocols.toLong(), + supportedProtocols.joinToString(", ") + ) + StableRef.create(webSocketClient).asCPointer() + } + } + }, + websocket_write_func = staticCFunction { userdata: CPointer?, websocket: realm_sync_socket_websocket_t?, data: CPointer?, length: size_t, callback: CPointer? -> + val postWriteCallback: WebsocketFunctionHandlerCallback = + { cancelled, status, reason -> + realm_wrapper.realm_sync_socket_write_complete( + callback, + if (cancelled) WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_OPERATION_ABORTED.asNativeEnum else WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_SUCCESS.asNativeEnum, + reason + ) + } + + safeUserData(userdata).let { websocketTransport -> + safeUserData(websocket).let { webSocketClient -> + data?.readBytes(length.toInt())?.run { + websocketTransport.write( + webSocketClient, + this, + length.toLong(), + CPointerWrapper( + StableRef.create(postWriteCallback).asCPointer() + ) + ) + } + } + } + Unit + }, + websocket_free_func = staticCFunction { userdata: CPointer?, websocket: realm_sync_socket_websocket_t? -> + safeUserData(websocket).close() + disposeUserData(websocket) + } + ) + ) ?: error("Couldn't create Sync Socket") + realm_wrapper.realm_sync_client_config_set_sync_socket( + syncClientConfig.cptr(), + realmSyncSocketNew + ) + realm_release(realmSyncSocketNew) + } + + actual fun realm_sync_socket_callback_complete(nativePointer: RealmWebsocketHandlerCallbackPointer, cancelled: Boolean, status: WebsocketCallbackResult, reason: String) { + safeUserData(nativePointer.cptr())(cancelled, status, reason) + disposeUserData(nativePointer.cptr()) + } + + actual fun realm_sync_socket_websocket_connected(nativePointer: RealmWebsocketProviderPointer, protocol: String) { + realm_wrapper.realm_sync_socket_websocket_connected(nativePointer.cptr(), protocol) + } + + actual fun realm_sync_socket_websocket_error(nativePointer: RealmWebsocketProviderPointer) { + realm_wrapper.realm_sync_socket_websocket_error(nativePointer.cptr()) + } + + actual fun realm_sync_socket_websocket_message( + nativePointer: RealmWebsocketProviderPointer, + data: ByteArray + ): Boolean { + return realm_wrapper.realm_sync_socket_websocket_message( + nativePointer.cptr(), + data.toCValues(), + data.size.toULong() + ) + } + + actual fun realm_sync_socket_websocket_closed(nativePointer: RealmWebsocketProviderPointer, wasClean: Boolean, errorCode: WebsocketErrorCode, reason: String) { + realm_wrapper.realm_sync_socket_websocket_closed(nativePointer.cptr(), wasClean, errorCode.asNativeEnum, reason) + } + @Suppress("LongParameterList") actual fun realm_app_config_new( appId: String, @@ -3472,6 +3627,8 @@ actual object RealmInterop { } } +private typealias WebsocketFunctionHandlerCallback = (Boolean, WebsocketCallbackResult, String) -> Unit + fun realm_value_t.asByteArray(): ByteArray { if (this.type != realm_value_type.RLM_TYPE_BINARY) { error("Value is not of type ByteArray: $this.type") diff --git a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt index 818028eeb2..805184bc5c 100644 --- a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt +++ b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/sync/ProtocolErrorCode.kt @@ -132,8 +132,10 @@ actual enum class WebsocketErrorCode( override val nativeValue: Int = errorCode.value.toInt() + val asNativeEnum: realm_web_socket_errno = errorCode + actual companion object { - internal actual fun of(nativeValue: Int): WebsocketErrorCode? = + actual fun of(nativeValue: Int): WebsocketErrorCode? = values().firstOrNull { value -> value.nativeValue == nativeValue } diff --git a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp index dbb7531306..9c123c91db 100644 --- a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp +++ b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp @@ -312,7 +312,7 @@ class CustomJVMScheduler { void notify(realm_work_queue_t* work_queue) { // There is currently no signaling of creation/tear down of the core notifier thread, so we // just attach it as a daemon thread here on first notification to allow the JVM to - // shutdown propertly. See https://github.com/realm/realm-core/issues/6429 + // shutdown property. See https://github.com/realm/realm-core/issues/6429 auto jenv = get_env(true, true, "core-notifier"); jni_check_exception(jenv); jenv->CallVoidMethod(m_jvm_dispatch_scheduler, m_notify_method, @@ -742,6 +742,226 @@ realm_http_transport_t* realm_network_transport_new(jobject network_transport) { }); } +// *** BEGIN - WebSocket Client (Platform Networking) *** // + +using WebsocketFunctionHandlerCallback = std::function; + +static void websocket_post_func(realm_userdata_t userdata, + realm_sync_socket_post_callback_t* realm_callback) { + // Some calls to 'post' happens from the external commit helper which is not necessarily attached yet to a JVM thread + auto jenv = get_env(true, true); // attach as daemon thread + + WebsocketFunctionHandlerCallback* lambda = new WebsocketFunctionHandlerCallback([realm_callback=std::move(realm_callback)](bool cancelled, int status, const char* reason) { + realm_sync_socket_post_complete(realm_callback, + cancelled ? realm_sync_socket_callback_result::RLM_ERR_SYNC_SOCKET_OPERATION_ABORTED : realm_sync_socket_callback_result::RLM_ERR_SYNC_SOCKET_SUCCESS, + ""); + }); + jobject lambda_callback_pointer_wrapper = wrap_pointer(jenv,reinterpret_cast(lambda)); + + static JavaMethod post_method(jenv, JavaClassGlobalDef::sync_websocket_transport(), "post", + "(Lio/realm/kotlin/internal/interop/NativePointer;)V"); + jobject websocket_transport = static_cast(userdata); + jenv->CallVoidMethod(websocket_transport, post_method, lambda_callback_pointer_wrapper); + jni_check_exception(jenv); + + jenv->DeleteLocalRef(lambda_callback_pointer_wrapper); +} + +static realm_sync_socket_timer_t websocket_create_timer_func( + realm_userdata_t userdata, uint64_t delay_ms, + realm_sync_socket_timer_callback_t *realm_callback) { + // called from main thread/event loop which should be already attached to JVM + auto jenv = get_env(false); + + WebsocketFunctionHandlerCallback *lambda = new WebsocketFunctionHandlerCallback( + [realm_callback = std::move(realm_callback)](bool cancel, int status, + const char *reason) { + if (cancel) { + realm_sync_socket_timer_canceled(realm_callback); + } else { + realm_sync_socket_timer_complete(realm_callback, + realm_sync_socket_callback_result::RLM_ERR_SYNC_SOCKET_SUCCESS, + ""); + } + }); + jobject lambda_callback_pointer_wrapper = wrap_pointer(jenv,reinterpret_cast(lambda)); + + static JavaMethod create_timer_method (jenv, JavaClassGlobalDef::sync_websocket_transport(), "createTimer", + "(JLio/realm/kotlin/internal/interop/NativePointer;)Lio/realm/kotlin/internal/interop/sync/CancellableTimer;"); + jobject websocket_transport = static_cast(userdata); + jobject cancellable_timer = jenv->CallObjectMethod(websocket_transport, create_timer_method, jlong(delay_ms), lambda_callback_pointer_wrapper); + jni_check_exception(jenv); + + jenv->DeleteLocalRef(lambda_callback_pointer_wrapper); + return reinterpret_cast(jenv->NewGlobalRef(cancellable_timer)); +} + +static void websocket_cancel_timer_func(realm_userdata_t userdata, + realm_sync_socket_timer_t timer_userdata) { + if (timer_userdata != nullptr) { + auto jenv = get_env(false); + jobject cancellable_timer = static_cast(timer_userdata); + + static JavaClass cancellable_timer_class(jenv, "io/realm/kotlin/internal/interop/sync/CancellableTimer"); + static JavaMethod cancel_method(jenv, cancellable_timer_class, "cancel", "()V"); + jenv->CallVoidMethod(cancellable_timer, cancel_method); + jni_check_exception(jenv); + + jenv->DeleteGlobalRef(cancellable_timer); + } +} + +static realm_sync_socket_websocket_t websocket_connect_func( + realm_userdata_t userdata, realm_websocket_endpoint_t endpoint, + realm_websocket_observer_t* realm_websocket_observer) { + + auto jenv = get_env(false); + + jobject observer_pointer = wrap_pointer(jenv,reinterpret_cast(realm_websocket_observer)); + + static JavaClass websocket_observer_class(jenv, "io/realm/kotlin/internal/interop/sync/WebSocketObserver"); + static JavaMethod websocket_observer_constructor(jenv, websocket_observer_class, "", + "(Lio/realm/kotlin/internal/interop/NativePointer;)V"); + jobject websocket_observer = jenv->NewObject(websocket_observer_class, websocket_observer_constructor, observer_pointer); + + static JavaMethod connect_method(jenv, JavaClassGlobalDef::sync_websocket_transport(), "connect", + "(Lio/realm/kotlin/internal/interop/sync/WebSocketObserver;Ljava/lang/String;Ljava/lang/String;JZJLjava/lang/String;)Lio/realm/kotlin/internal/interop/sync/WebSocketClient;"); + jobject websocket_transport = static_cast(userdata); + + std::ostringstream supported_protocol; + for (size_t i = 0; i < endpoint.num_protocols; ++i) { + supported_protocol << endpoint.protocols[i] << ", "; + } + + jobject websocket_client = jenv->CallObjectMethod(websocket_transport, connect_method, + websocket_observer, + to_jstring(jenv, endpoint.path), + to_jstring(jenv, endpoint.address), + jlong(endpoint.port), + endpoint.is_ssl, + jlong(endpoint.num_protocols), + to_jstring(jenv, supported_protocol.str().c_str())); + jni_check_exception(jenv); + + realm_sync_socket_websocket_t global_websocket_ref = reinterpret_cast(jenv->NewGlobalRef(websocket_client)); + + jenv->DeleteLocalRef(websocket_observer); + jenv->DeleteLocalRef(observer_pointer); + + return global_websocket_ref; +} + +static void websocket_async_write_func(realm_userdata_t userdata, + realm_sync_socket_websocket_t websocket_userdata, + const char* data, size_t size, + realm_sync_socket_write_callback_t* realm_callback) { + auto jenv = get_env(false); + + WebsocketFunctionHandlerCallback* lambda = new WebsocketFunctionHandlerCallback([realm_callback=std::move(realm_callback)](bool cancelled, int status, const char* reason) { + realm_sync_socket_write_complete(realm_callback, + cancelled ? realm_sync_socket_callback_result::RLM_ERR_SYNC_SOCKET_OPERATION_ABORTED: realm_sync_socket_callback_result::RLM_ERR_SYNC_SOCKET_SUCCESS, + ""); + }); + jobject lambda_callback_pointer_wrapper = wrap_pointer(jenv,reinterpret_cast(lambda)); + + static jmethodID write_method = jenv->GetMethodID(JavaClassGlobalDef::sync_websocket_transport(), "write", + "(Lio/realm/kotlin/internal/interop/sync/WebSocketClient;[BJLio/realm/kotlin/internal/interop/NativePointer;)V"); + jobject websocket_transport = static_cast(userdata); + + jbyteArray byteArray = jenv->NewByteArray(size); + jenv->SetByteArrayRegion(byteArray, 0, size, reinterpret_cast(data)); + + jenv->CallVoidMethod(websocket_transport, write_method, + static_cast(websocket_userdata), + byteArray, + jlong(size), + lambda_callback_pointer_wrapper); + jni_check_exception(jenv); + + jenv->DeleteLocalRef(byteArray); + jenv->DeleteLocalRef(lambda_callback_pointer_wrapper); + +} + +static void realm_sync_websocket_free(realm_userdata_t userdata, + realm_sync_socket_websocket_t websocket_userdata) { + if (websocket_userdata != nullptr) { + auto jenv = get_env(false); + static jmethodID close_method = jenv->GetMethodID(JavaClassGlobalDef::sync_websocket_client(), "close", "()V"); + + jobject websocket_client = static_cast(websocket_userdata); + jenv->CallVoidMethod(websocket_client, close_method); + jni_check_exception(jenv); + + jenv->DeleteGlobalRef(websocket_client); + } +} + +static void realm_sync_userdata_free(realm_userdata_t userdata) { + if (userdata != nullptr) { + auto jenv = get_env(false); + + static jmethodID close_method = jenv->GetMethodID(JavaClassGlobalDef::sync_websocket_transport(), "close", "()V"); + + jobject websocket_transport = static_cast(userdata); + jenv->CallVoidMethod(websocket_transport, close_method); + jni_check_exception(jenv); + + jenv->DeleteGlobalRef(websocket_transport); + } +} + +// This should run in the context of CoroutineScope +void realm_sync_websocket_callback_complete(bool cancelled, int64_t lambda_ptr, int status, const char* reason) { + WebsocketFunctionHandlerCallback* callback = reinterpret_cast(lambda_ptr); + (*callback)(cancelled, status, reason); + delete callback; +} + +void realm_sync_websocket_connected(int64_t observer_ptr, const char* protocol) { + realm_sync_socket_websocket_connected(reinterpret_cast(observer_ptr), protocol); +} + +void realm_sync_websocket_error(int64_t observer_ptr) { + realm_sync_socket_websocket_error(reinterpret_cast(observer_ptr)); +} + +bool realm_sync_websocket_message(int64_t observer_ptr, jbyteArray data, size_t size) { + auto jenv = get_env(false); + jbyte* byteData = jenv->GetByteArrayElements(data, NULL); + std::unique_ptr charData(new char[size]); // not null terminated (used in util::Span with size parameter) + std::memcpy(charData.get(), byteData, size); + bool close_websocket = !realm_sync_socket_websocket_message(reinterpret_cast(observer_ptr), charData.get(), size); + jenv->ReleaseByteArrayElements(data, byteData, JNI_ABORT); + return close_websocket; +} + +void realm_sync_websocket_closed(int64_t observer_ptr, bool was_clean, int error_code, const char* reason) { + realm_sync_socket_websocket_closed(reinterpret_cast(observer_ptr), was_clean, static_cast(error_code), reason); +} + +realm_sync_socket_t* realm_sync_websocket_new(int64_t sync_client_config_ptr, jobject websocket_transport) { + auto jenv = get_env(false); // Always called from JVM + realm_sync_socket_t* socket_provider = realm_sync_socket_new(jenv->NewGlobalRef(websocket_transport), /*userdata*/ + realm_sync_userdata_free/*userdata_free*/, + websocket_post_func/*post_func*/, + websocket_create_timer_func/*create_timer_func*/, + websocket_cancel_timer_func/*cancel_timer_func*/, + [](realm_userdata_t userdata, + realm_sync_socket_timer_t timer_userdata){ + }/*free_timer_func*/, + websocket_connect_func/*websocket_connect_func*/, + websocket_async_write_func/*websocket_write_func*/, + realm_sync_websocket_free/*websocket_free_func*/); + jni_check_exception(jenv); + + realm_sync_client_config_set_sync_socket(reinterpret_cast(sync_client_config_ptr)/*config*/, socket_provider); + realm_release(socket_provider); + return socket_provider; +} + +// *** END - WebSocket Client (Platform Networking) *** // + void set_log_callback(jint j_log_level, jobject log_callback) { auto jenv = get_env(false); auto log_level = static_cast(j_log_level); diff --git a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h index 4f466beef2..84caf586dc 100644 --- a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h +++ b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h @@ -149,4 +149,16 @@ realm_property_info_t_cleanup(realm_property_info_t* value); void realm_class_info_t_cleanup(realm_class_info_t * value); +realm_sync_socket_t* realm_sync_websocket_new(int64_t sync_client_config_ptr, jobject websocket_transport); + +void realm_sync_websocket_callback_complete(bool cancelled, int64_t lambda_ptr, int status, const char* reason); + +void realm_sync_websocket_connected(int64_t observer_ptr, const char* protocol); + +void realm_sync_websocket_error(int64_t observer_ptr); + +bool realm_sync_websocket_message(int64_t observer_ptr, jbyteArray data, size_t size); + +void realm_sync_websocket_closed(int64_t observer_ptr, bool was_clean, int error_code, const char* reason); + #endif //TEST_REALM_API_HELPERS_H diff --git a/packages/library-base/proguard-rules-consumer-common.pro b/packages/library-base/proguard-rules-consumer-common.pro index 3eb86435de..3d73db1d9a 100644 --- a/packages/library-base/proguard-rules-consumer-common.pro +++ b/packages/library-base/proguard-rules-consumer-common.pro @@ -127,6 +127,20 @@ *; } +# Platform networking callback +-keep class io.realm.kotlin.internal.interop.sync.WebSocketTransport { + *; +} +-keep class io.realm.kotlin.internal.interop.sync.CancellableTimer { + *; +} +-keep class io.realm.kotlin.internal.interop.sync.WebSocketClient { + *; +} +-keep class io.realm.kotlin.internal.interop.sync.WebSocketObserver { + *; +} + # Un-comment for debugging #-printconfiguration /tmp/full-r8-config.txt #-keepattributes LineNumberTable,SourceFile diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmListInternal.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmListInternal.kt index 8f1e60a0a7..b0ee54ac8d 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmListInternal.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmListInternal.kt @@ -214,7 +214,7 @@ internal interface ListOperator : CollectionOperator { fun get(index: Int): E - // TODO OPTIMIZE We technically don't need update policy and cache for primitie lists but right now RealmObjectHelper.assign doesn't know how to differentiate the calls to the operator + // TODO OPTIMIZE We technically don't need update policy and cache for primitive lists but right now RealmObjectHelper.assign doesn't know how to differentiate the calls to the operator fun insert( index: Int, element: E, diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/platform/CoroutineUtils.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/platform/CoroutineUtils.kt index c7e2e17246..270198220c 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/platform/CoroutineUtils.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/platform/CoroutineUtils.kt @@ -15,7 +15,7 @@ import kotlin.coroutines.EmptyCoroutineContext public expect fun singleThreadDispatcher(id: String): CloseableCoroutineDispatcher /** - * Returns a default multithread dispatcher used by Sync. + * Returns a default multithreaded dispatcher used by Sync. * TODO https://github.com/realm/realm-kotlin/issues/501 compute size based on number of cores */ public expect fun multiThreadDispatcher(size: Int = 3): CloseableCoroutineDispatcher diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/util/CoroutineDispatcherFactory.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/util/CoroutineDispatcherFactory.kt index 370af4d31f..c9124324fc 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/util/CoroutineDispatcherFactory.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/util/CoroutineDispatcherFactory.kt @@ -63,7 +63,7 @@ public fun interface CoroutineDispatcherFactory { /** * Returns the dispatcher from the factory configuration, creating it if needed. - * If a dispatcher is created, calling this method multiple times wille create a + * If a dispatcher is created, calling this method multiple times will create a * new dispatcher for each call. */ public fun create(): DispatcherHolder diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/AppConfiguration.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/AppConfiguration.kt index c638e25317..0c3cf46938 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/AppConfiguration.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/AppConfiguration.kt @@ -22,6 +22,7 @@ import io.realm.kotlin.annotations.ExperimentalRealmSerializerApi import io.realm.kotlin.internal.ContextLogger import io.realm.kotlin.internal.interop.sync.MetadataMode import io.realm.kotlin.internal.interop.sync.NetworkTransport +import io.realm.kotlin.internal.interop.sync.WebSocketTransport import io.realm.kotlin.internal.platform.appFilesDirectory import io.realm.kotlin.internal.platform.canWrite import io.realm.kotlin.internal.platform.directoryExists @@ -38,6 +39,7 @@ import io.realm.kotlin.mongodb.ext.profile import io.realm.kotlin.mongodb.internal.AppConfigurationImpl import io.realm.kotlin.mongodb.internal.KtorNetworkTransport import io.realm.kotlin.mongodb.internal.LogObfuscatorImpl +import io.realm.kotlin.mongodb.internal.RealmWebSocketTransport import io.realm.kotlin.mongodb.sync.SyncConfiguration import io.realm.kotlin.mongodb.sync.SyncTimeoutOptions import io.realm.kotlin.mongodb.sync.SyncTimeoutOptionsBuilder @@ -160,6 +162,7 @@ public interface AppConfiguration { private var syncRootDirectory: String = appFilesDirectory() private var userLoggers: List = listOf() private var networkTransport: NetworkTransport? = null + private var websocketTransport: WebSocketTransport? = null private var appName: String? = null private var appVersion: String? = null @@ -170,6 +173,7 @@ public interface AppConfiguration { private var authorizationHeaderName: String = DEFAULT_AUTHORIZATION_HEADER_NAME private var enableSessionMultiplexing: Boolean = false private var syncTimeoutOptions: SyncTimeoutOptions = SyncTimeoutOptionsBuilder().build() + private var usePlatformNetworking: Boolean = false /** * Sets the encryption key used to encrypt the user metadata Realm only. Individual @@ -373,6 +377,17 @@ public interface AppConfiguration { return this } + /** + * Platform Networking offer improved support for proxies and firewalls that require authentication, + * instead of Realm's built-in WebSocket client for Sync traffic. This will become the default in a future version. + * + * Note: Only Android and JVM targets are supported so far. + */ + public fun usePlatformNetworking(enable: Boolean = true): Builder = + apply { + this.usePlatformNetworking = enable + } + /** * Allows defining a custom network transport. It is used by some tests that require simulating * network responses. @@ -441,6 +456,13 @@ public interface AppConfiguration { ) } + val websocketTransport: WebSocketTransport? = + if (usePlatformNetworking) { + websocketTransport ?: RealmWebSocketTransport( + timeoutMs = 60000 + ) + } else null + return AppConfigurationImpl( appId = appId, baseUrl = baseUrl, @@ -450,6 +472,7 @@ public interface AppConfiguration { else MetadataMode.RLM_SYNC_CLIENT_METADATA_MODE_ENCRYPTED, appNetworkDispatcherFactory = appNetworkDispatcherFactory, networkTransportFactory = networkTransport, + websocketTransport = websocketTransport, syncRootDirectory = syncRootDirectory, logger = logConfig, appName = appName, diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/AppConfigurationImpl.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/AppConfigurationImpl.kt index f209ac2796..08a6b9285d 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/AppConfigurationImpl.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/AppConfigurationImpl.kt @@ -24,6 +24,7 @@ import io.realm.kotlin.internal.interop.RealmSyncClientConfigurationPointer import io.realm.kotlin.internal.interop.SyncConnectionParams import io.realm.kotlin.internal.interop.sync.MetadataMode import io.realm.kotlin.internal.interop.sync.NetworkTransport +import io.realm.kotlin.internal.interop.sync.WebSocketTransport import io.realm.kotlin.internal.platform.DEVICE_MANUFACTURER import io.realm.kotlin.internal.platform.DEVICE_MODEL import io.realm.kotlin.internal.platform.OS_VERSION @@ -47,6 +48,7 @@ public class AppConfigurationImpl @OptIn(ExperimentalKBsonSerializerApi::class) override val encryptionKey: ByteArray?, private val appNetworkDispatcherFactory: CoroutineDispatcherFactory, internal val networkTransportFactory: (dispatcher: DispatcherHolder) -> NetworkTransport, + private val websocketTransport: WebSocketTransport?, override val metadataMode: MetadataMode, override val syncRootDirectory: String, public val logger: LogConfiguration?, @@ -88,12 +90,15 @@ public class AppConfigurationImpl @OptIn(ExperimentalKBsonSerializerApi::class) } val sdkInfo = "RealmKotlin/$SDK_VERSION" val synClientConfig: RealmSyncClientConfigurationPointer = initializeSyncClientConfig( + websocketTransport, sdkInfo, applicationInfo.toString() ) - return Triple( + + return AppResources( appDispatcher, networkTransport, + websocketTransport, RealmInterop.realm_app_get( appConfigPointer, synClientConfig, @@ -143,7 +148,11 @@ public class AppConfigurationImpl @OptIn(ExperimentalKBsonSerializerApi::class) ) } - private fun initializeSyncClientConfig(sdkInfo: String?, applicationInfo: String?): RealmSyncClientConfigurationPointer = + private fun initializeSyncClientConfig( + webSocketTransport: WebSocketTransport?, + sdkInfo: String?, + applicationInfo: String? + ): RealmSyncClientConfigurationPointer = RealmInterop.realm_sync_client_config_new() .also { syncClientConfig -> // Initialize client configuration first @@ -202,6 +211,11 @@ public class AppConfigurationImpl @OptIn(ExperimentalKBsonSerializerApi::class) syncClientConfig, syncTimeoutOptions.fastReconnectLimit.inWholeMilliseconds.toULong() ) + + // Use platform networking for Sync client WebSockets if provided + webSocketTransport?.let { + RealmInterop.realm_sync_set_websocket_transport(syncClientConfig, it) + } } internal companion object { diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/AppImpl.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/AppImpl.kt index ee1a151d80..ffd161b575 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/AppImpl.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/AppImpl.kt @@ -20,6 +20,7 @@ import io.realm.kotlin.internal.interop.RealmAppPointer import io.realm.kotlin.internal.interop.RealmInterop import io.realm.kotlin.internal.interop.RealmUserPointer import io.realm.kotlin.internal.interop.sync.NetworkTransport +import io.realm.kotlin.internal.interop.sync.WebSocketTransport import io.realm.kotlin.internal.toDuration import io.realm.kotlin.internal.util.DispatcherHolder import io.realm.kotlin.internal.util.Validation @@ -40,7 +41,12 @@ import kotlinx.coroutines.flow.MutableSharedFlow import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -internal typealias AppResources = Triple +public data class AppResources( + val dispatcherHolder: DispatcherHolder, + val networkTransport: NetworkTransport, + val websocketTransport: WebSocketTransport?, + val realmAppPointer: RealmAppPointer +) // TODO Public due to being a transitive dependency to UserImpl public class AppImpl( @@ -50,6 +56,7 @@ public class AppImpl( internal val nativePointer: RealmAppPointer internal val appNetworkDispatcher: DispatcherHolder private val networkTransport: NetworkTransport + private val websocketTransport: WebSocketTransport? private var lastOnlineStateReported: Duration? = null private var lastConnectedState: Boolean? = null // null = unknown, true = connected, false = disconnected @@ -95,9 +102,10 @@ public class AppImpl( init { val appResources: AppResources = configuration.createNativeApp() - appNetworkDispatcher = appResources.first - networkTransport = appResources.second - nativePointer = appResources.third + appNetworkDispatcher = appResources.dispatcherHolder + networkTransport = appResources.networkTransport + websocketTransport = appResources.websocketTransport + nativePointer = appResources.realmAppPointer NetworkStateObserver.addListener(connectionListener) } diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/RealmWebSocketTransport.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/RealmWebSocketTransport.kt new file mode 100644 index 0000000000..2f30169e2e --- /dev/null +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/RealmWebSocketTransport.kt @@ -0,0 +1,123 @@ +package io.realm.kotlin.mongodb.internal + +import io.realm.kotlin.internal.interop.RealmWebsocketHandlerCallbackPointer +import io.realm.kotlin.internal.interop.sync.CancellableTimer +import io.realm.kotlin.internal.interop.sync.WebSocketClient +import io.realm.kotlin.internal.interop.sync.WebSocketObserver +import io.realm.kotlin.internal.interop.sync.WebSocketTransport +import io.realm.kotlin.internal.interop.sync.WebsocketEngine +import io.realm.kotlin.internal.util.CoroutineDispatcherFactory +import io.realm.kotlin.internal.util.DispatcherHolder +import kotlinx.atomicfu.AtomicRef +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +public class RealmWebSocketTransport( + public val timeoutMs: Long +) : WebSocketTransport { + // we need a single thread dispatcher to act like an event loop + private val dispatcherHolder: DispatcherHolder = CoroutineDispatcherFactory.managed("RealmWebsocketTransport").create() + public val scope: CoroutineScope = CoroutineScope(dispatcherHolder.dispatcher) + + public val engine: WebsocketEngine by lazy { websocketEngine(timeoutMs) } + + private val websocketClients = mutableListOf() + + override fun post(handlerCallback: RealmWebsocketHandlerCallbackPointer) { + scope.launch { + (this as Job).invokeOnCompletion { completionHandler: Throwable? -> + when (completionHandler) { + // Only run the callback successfully if it was not cancelled in the meantime + null -> runCallback(handlerCallback) + else -> runCallback( + handlerCallback, cancelled = true + ) + } + } + } + } + + override fun createTimer( + delayInMilliseconds: Long, + handlerCallback: RealmWebsocketHandlerCallbackPointer + ): CancellableTimer { + val atomicCallback: AtomicRef = + atomic(handlerCallback) + return CancellableTimer( + scope.launch { + delay(delayInMilliseconds) + atomicCallback.getAndSet(null)?.run { // this -> callback pointer + runCallback(this) + } + } + ) { + scope.launch { + atomicCallback.getAndSet(null)?.run { // this -> callback pointer + runCallback(this, cancelled = true) + } + } + } + } + + override fun connect( + observer: WebSocketObserver, + path: String, + address: String, + port: Long, + isSsl: Boolean, + numProtocols: Long, + supportedSyncProtocols: String, + ): WebSocketClient = platformWebsocketClient( + observer, path, address, port, isSsl, supportedSyncProtocols, this + ).also { + websocketClients.add(it) + } + + override fun write( + webSocketClient: WebSocketClient, + data: ByteArray, + length: Long, + handlerCallback: RealmWebsocketHandlerCallbackPointer + ) { + webSocketClient.send(data, handlerCallback) + } + + override fun close() { + // Notify clients that the transport is tearing down + websocketClients.forEach { it.close() } + + // Shutdown Websocket Engine shared by same App + if (websocketClients.isNotEmpty()) { + // Shutdown is an event that should be posted to the event loop, otherwise + // premature closing of the websocket could occur. Even if the transport is tearing down + // we still want to close gracefully the connection. + scope.launch { + engine.shutdown() + } + } + + // Closing the coroutine dispatcher should also be done via the event loop, to avoid prematurely + // closing the thread executor of a running coroutine (this will throw a `InterruptedException`) + scope.launch { + dispatcherHolder.close() + cancel() + } + websocketClients.clear() + } +} + +public expect fun websocketEngine(timeoutMs: Long): WebsocketEngine +@Suppress("LongParameterList") +public expect fun platformWebsocketClient( + observer: WebSocketObserver, + path: String, + address: String, + port: Long, + isSsl: Boolean, + supportedSyncProtocols: String, + transport: RealmWebSocketTransport +): WebSocketClient diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncConfiguration.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncConfiguration.kt index 675fbf08a2..6224e89df0 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncConfiguration.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncConfiguration.kt @@ -599,7 +599,6 @@ public interface SyncConfiguration : Configuration { ) }.let { auxSyncConfig -> RealmInterop.realm_app_sync_client_get_default_file_path_for_realm( - (user as UserImpl).app.nativePointer, auxSyncConfig, name ) diff --git a/packages/library-sync/src/jvm/kotlin/io/realm/kotlin/mongodb/internal/OkHttpWebsocketClient.kt b/packages/library-sync/src/jvm/kotlin/io/realm/kotlin/mongodb/internal/OkHttpWebsocketClient.kt new file mode 100644 index 0000000000..0d0b52a402 --- /dev/null +++ b/packages/library-sync/src/jvm/kotlin/io/realm/kotlin/mongodb/internal/OkHttpWebsocketClient.kt @@ -0,0 +1,273 @@ +package io.realm.kotlin.mongodb.internal + +import io.realm.kotlin.internal.ContextLogger +import io.realm.kotlin.internal.interop.RealmWebsocketHandlerCallbackPointer +import io.realm.kotlin.internal.interop.sync.WebSocketClient +import io.realm.kotlin.internal.interop.sync.WebSocketObserver +import io.realm.kotlin.internal.interop.sync.WebsocketCallbackResult +import io.realm.kotlin.internal.interop.sync.WebsocketEngine +import io.realm.kotlin.internal.interop.sync.WebsocketErrorCode +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.launch +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.Response +import okhttp3.WebSocket +import okhttp3.WebSocketListener +import okio.ByteString +import okio.ByteString.Companion.toByteString +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.random.Random + +@Suppress("LongParameterList") +public class OkHttpWebsocketClient( + private val observer: WebSocketObserver, + path: String, + address: String, + port: Long, + isSsl: Boolean, + supportedSyncProtocols: String, + websocketEngine: WebsocketEngine, + /** + * We use this single threaded scope as an event loop to run all events on the same thread, + * in the order they were queued in. This include callback to Core via [observer] as well as calls + * from Core to send a Frame or close the Websocket. + */ + private val scope: CoroutineScope, + private val runCallback: ( + handlerCallback: RealmWebsocketHandlerCallbackPointer, + cancelled: Boolean, + status: WebsocketCallbackResult, + reason: String + ) -> Unit +) : WebSocketClient, WebSocketListener() { + + private val logger = ContextLogger("Websocket-${Random.nextInt()}") + + /** + * [WebsocketEngine] responsible of establishing the connection, sending and receiving websocket Frames. + */ + private val okHttpClient: OkHttpClient = websocketEngine.getInstance() + + private lateinit var webSocket: WebSocket + + /** + * Indicates that the websocket is in the process of being closed by Core. + * We can still send enqueued Frames like 'unbind' but we should not communicate back any incoming messages to + * Core via the [observer]. + */ + private val observerIsClosed: AtomicBoolean = AtomicBoolean(false) + + /** + * Indicates that the websocket is effectively closed. No message should be sent or received after this. + */ + private val isClosed: AtomicBoolean = AtomicBoolean(false) + + private val protocolSelectionHeader = "Sec-WebSocket-Protocol" + + init { + val websocketURL = "${if (isSsl) "wss" else "ws"}://$address:$port$path" + val request: Request = Request.Builder().url(websocketURL) + .addHeader(protocolSelectionHeader, supportedSyncProtocols) + .build() + + scope.launch { + okHttpClient.newWebSocket(request, this@OkHttpWebsocketClient) + } + } + + override fun onOpen(webSocket: WebSocket, response: Response) { + super.onOpen(webSocket, response) + logger.debug("onOpen websocket ${webSocket.request().url}") + + this.webSocket = webSocket + + response.header(protocolSelectionHeader)?.let { selectedProtocol -> + runIfObserverNotClosed { + observer.onConnected(selectedProtocol) + } + } + } + + override fun onMessage(webSocket: WebSocket, bytes: ByteString) { + super.onMessage(webSocket, bytes) + logger.trace("onMessage: ${bytes.toByteArray().decodeToString()} isClosed = ${isClosed.get()} observerIsClosed = ${observerIsClosed.get()}") + + runIfObserverNotClosed { + val shouldClose: Boolean = observer.onNewMessage(bytes.toByteArray()) + if (shouldClose) { + webSocket.close( + WebsocketErrorCode.RLM_ERR_WEBSOCKET_OK.nativeValue, + "websocket should be closed after last message received" + ) + } + } + } + + override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { + super.onClosing(webSocket, code, reason) + logger.debug("onClosing code = $code reason = $reason isClosed = ${isClosed.get()} observerIsClosed = ${observerIsClosed.get()}") + } + + override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { + super.onClosed(webSocket, code, reason) + logger.debug("onClosed code = $code reason = $reason isClosed = ${isClosed.get()} observerIsClosed = ${observerIsClosed.get()}") + + isClosed.set(true) + + runIfObserverNotClosed { + // It's important to rely properly the error code from the server. + // The server will report auth errors (and a few other error types) + // as websocket application-level errors after establishing the socket, rather than failing at the HTTP layer. + // since the websocket spec does not allow the HTTP status code from the response to be + // passed back to the client from the websocket implementation (example instruct a refresh token + // via a 401 HTTP response is not possible) see https://jira.mongodb.org/browse/BAAS-10531. + // In order to provide a reasonable response that the Sync Client can react upon, the private range of websocket close status codes + // 4000-4999, can be used to return a more specific error. + WebsocketErrorCode.of(code)?.let { errorCode -> + observer.onClose( + true, errorCode, reason + ) + } ?: run { + observer.onClose( + true, + WebsocketErrorCode.RLM_ERR_WEBSOCKET_FATAL_ERROR, + "Unknown error code $code. original reason $reason" + ) + } + } + } + + override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { + super.onFailure(webSocket, t, response) + logger.debug("onFailure throwable '${t.message}' isClosed = ${isClosed.get()} observerIsClosed = ${observerIsClosed.get()}") + + runIfObserverNotClosed { + observer.onError() + } + } + + override fun send(message: ByteArray, handlerCallback: RealmWebsocketHandlerCallbackPointer) { + logger.trace("send: ${message.decodeToString()} isClosed = ${isClosed.get()} observerIsClosed = ${observerIsClosed.get()}") + + // send any queued Frames even if the Core observer is closed, but only if the websocket is still open, this can be a message like 'unbind' + // which instruct the Sync server to terminate the Sync Session (server will respond by 'unbound'). + if (!isClosed.get()) { + scope.launch { + try { + if (!isClosed.get()) { // double check that the websocket is still open before sending. + webSocket.send(message.toByteString()) + runCallback( + handlerCallback, + observerIsClosed.get(), // if the Core observer is closed we run this callback as cancelled (to free underlying resources) + WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_SUCCESS, + "" + ) + } else { + runCallback( + handlerCallback, + observerIsClosed.get(), // if the Core observer is closed we run this callback as cancelled (to free underlying resources) + WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_CONNECTION_CLOSED, + "Connection already closed" + ) + } + } catch (e: Exception) { + runCallback( + handlerCallback, + observerIsClosed.get(), // if the Core observer is closed we run this callback as cancelled (to free underlying resources) + WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_RUNTIME, + "Sending Frame exception: ${e.message}" + ) + } + } + } else { + scope.launch { + runCallback( + handlerCallback, + observerIsClosed.get(), // if the Core observer is closed we run this callback as cancelled (to free underlying resources) + WebsocketCallbackResult.RLM_ERR_SYNC_SOCKET_CONNECTION_CLOSED, + "Connection already closed" + ) + } + } + } + + override fun close() { + logger.debug("close") + observerIsClosed.set(true) + + if (::webSocket.isInitialized) { + scope.launch { + if (!isClosed.get()) { + webSocket.close( + WebsocketErrorCode.RLM_ERR_WEBSOCKET_OK.nativeValue, "client closed websocket" + ) + } + } + } + } + + /** + * Runs the [block] inside the transport [scope] only if Core didn't initiate the Websocket closure. + */ + private fun runIfObserverNotClosed(block: () -> Unit) { + if (!observerIsClosed.get()) { // if Core has already closed the websocket there's no point in scheduling this coroutine. + scope.launch { + // The session could have been paused/closed in the meantime which will cause the WebSocket to be destroyed, as well as the 'observer', + // so avoid invoking any Core observer callback on a deleted 'CAPIWebSocketObserver'. + if (!observerIsClosed.get()) { // only run if Core observer is still valid (i.e Core didn't close the websocket yet) + block() + } + } + } + } +} + +private class OkHttpEngine(timeoutMs: Long) : WebsocketEngine { + private var engine: OkHttpClient = + OkHttpClient.Builder() + .connectTimeout(timeoutMs, TimeUnit.MILLISECONDS) + .readTimeout(timeoutMs, TimeUnit.MILLISECONDS) + .callTimeout(timeoutMs, TimeUnit.MILLISECONDS) + .writeTimeout(timeoutMs, TimeUnit.MILLISECONDS) + .build() + + override fun shutdown() { + engine.dispatcher.executorService.shutdown() + } + + override fun getInstance(): T { + @Suppress("UNCHECKED_CAST") return engine as T + } +} + +public actual fun websocketEngine(timeoutMs: Long): WebsocketEngine { + return OkHttpEngine(timeoutMs) +} + +@Suppress("LongParameterList") +public actual fun platformWebsocketClient( + observer: WebSocketObserver, + path: String, + address: String, + port: Long, + isSsl: Boolean, + supportedSyncProtocols: String, + transport: RealmWebSocketTransport +): WebSocketClient { + return OkHttpWebsocketClient( + observer, + path, + address, + port, + isSsl, + supportedSyncProtocols, + transport.engine, + transport.scope + ) { handlerCallback: RealmWebsocketHandlerCallbackPointer, cancelled: Boolean, status: WebsocketCallbackResult, reason: String -> + transport.scope.launch { + transport.runCallback(handlerCallback, cancelled, status, reason) + } + } +} diff --git a/packages/library-sync/src/nativeDarwin/kotlin/io/realm/kotlin/mongodb/internal/NetworkStateObserver.kt b/packages/library-sync/src/nativeDarwin/kotlin/io/realm/kotlin/mongodb/internal/NetworkStateObserver.kt index 75cb4b9d68..48ac84215b 100644 --- a/packages/library-sync/src/nativeDarwin/kotlin/io/realm/kotlin/mongodb/internal/NetworkStateObserver.kt +++ b/packages/library-sync/src/nativeDarwin/kotlin/io/realm/kotlin/mongodb/internal/NetworkStateObserver.kt @@ -1,8 +1,24 @@ package io.realm.kotlin.mongodb.internal +import io.realm.kotlin.internal.interop.sync.WebSocketClient +import io.realm.kotlin.internal.interop.sync.WebSocketObserver +import io.realm.kotlin.internal.interop.sync.WebsocketEngine + internal actual fun registerSystemNetworkObserver() { // This is handled automatically by Realm Core which will also call `Sync.reconnect()` // automatically. So on iOS/macOS we do not do anything. // See https://github.com/realm/realm-core/blob/a678c36a85cf299f745f68f8b5ceff364d714181/src/realm/object-store/sync/impl/sync_client.hpp#L82C3-L82C3 // for further details. } + +public actual fun platformWebsocketClient( + observer: WebSocketObserver, + path: String, + address: String, + port: Long, + isSsl: Boolean, + supportedSyncProtocols: String, + transport: RealmWebSocketTransport +): WebSocketClient = TODO() + +public actual fun websocketEngine(timeoutMs: Long): WebsocketEngine = TODO() diff --git a/packages/test-sync/build.gradle.kts b/packages/test-sync/build.gradle.kts index d04f38661f..07f49567ce 100644 --- a/packages/test-sync/build.gradle.kts +++ b/packages/test-sync/build.gradle.kts @@ -313,6 +313,7 @@ buildkonfig { buildConfigField(Type.STRING, "privateApiKey", "") } buildConfigField(Type.STRING, "clusterName", getPropertyValue("syncTestClusterName") ?: "") + buildConfigField(Type.BOOLEAN, "usePlatformNetworking", getPropertyValue("syncUsePlatformNetworking") ?: "false") } } diff --git a/packages/test-sync/src/commonMain/kotlin/io/realm/kotlin/test/mongodb/TestApp.kt b/packages/test-sync/src/commonMain/kotlin/io/realm/kotlin/test/mongodb/TestApp.kt index b34dbf1794..91b7ff6aa2 100644 --- a/packages/test-sync/src/commonMain/kotlin/io/realm/kotlin/test/mongodb/TestApp.kt +++ b/packages/test-sync/src/commonMain/kotlin/io/realm/kotlin/test/mongodb/TestApp.kt @@ -45,7 +45,7 @@ import kotlinx.coroutines.CoroutineDispatcher import org.mongodb.kbson.ExperimentalKBsonSerializerApi import org.mongodb.kbson.serialization.EJson -val TEST_APP_PARTITION = syncServerAppName("pbs") // With Partion-based Sync +val TEST_APP_PARTITION = syncServerAppName("pbs") // With Partition-based Sync val TEST_APP_FLEX = syncServerAppName("flx") // With Flexible Sync val TEST_APP_CLUSTER_NAME = SyncServerConfig.clusterName @@ -148,10 +148,13 @@ open class TestApp private constructor( } } + app.close() + + // Tearing down the SyncSession still relies on the the event loop (powered by the coroutines) of the platform networking + // to post Function Handler, so we need to close it after we close the App if (dispatcher is CloseableCoroutineDispatcher) { dispatcher.close() } - app.close() // Close network client resources closeClient() @@ -192,7 +195,7 @@ open class TestApp private constructor( } @Suppress("invisible_member", "invisible_reference") - var config = AppConfiguration.Builder(appAdmin.clientAppId) + val config = AppConfiguration.Builder(appAdmin.clientAppId) .baseUrl(TEST_SERVER_BASE_URL) .networkTransport(networkTransport) .ejson(ejson) @@ -204,6 +207,9 @@ open class TestApp private constructor( else listOf(customLogger) ) } + if (SyncServerConfig.usePlatformNetworking) { + usePlatformNetworking() + } } val app = App.create( diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleSyncConfigurationTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleSyncConfigurationTests.kt index 841def7936..be13d1f51f 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleSyncConfigurationTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FlexibleSyncConfigurationTests.kt @@ -129,7 +129,6 @@ class FlexibleSyncConfigurationTests { // val config: SyncConfiguration = SyncConfiguration.defaultConfig(user) // assertFailsWith { config.partitionValue } // } - @Test fun defaultPath() { val user: User = app.asTestApp.createUserAndLogin() diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/SyncedRealmTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/SyncedRealmTests.kt index b33e0ed05f..87532a9e6a 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/SyncedRealmTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/SyncedRealmTests.kt @@ -947,10 +947,12 @@ class SyncedRealmTests { val (email1, password1) = randomEmail() to "password1234" val user = flexApp.createUserAndLogIn(email1, password1) val localConfig = createWriteCopyLocalConfig("local.realm") - val syncConfig = createPartitionSyncConfig( + val syncConfig = createFlexibleSyncConfig( user = user, name = "sync.realm", - partitionValue = partitionValue, + initialSubscriptions = { + it.query().subscribe(name = "parentSubscription") + } ) Realm.open(syncConfig).use { flexSyncRealm: Realm -> flexSyncRealm.writeBlocking { @@ -960,6 +962,7 @@ class SyncedRealmTests { } ) } + // Copy to local Realm flexSyncRealm.writeCopyTo(localConfig) }