From 2432dfeec43a5b5c82b296df81572113aa849eab Mon Sep 17 00:00:00 2001 From: Joffrey Bion Date: Mon, 7 Oct 2024 01:15:39 +0200 Subject: [PATCH] Add sendRawFrameAndMaybeAwaitReceipt Resolves: https://github.com/joffrey-bion/krossbow/issues/507 --- .../api/krossbow-stomp-core.api | 32 ++++++-- .../hildan/krossbow/stomp/BaseStompSession.kt | 72 ++++++++---------- .../org/hildan/krossbow/stomp/StompSession.kt | 73 ++++++++++++++----- .../hildan/krossbow/test/NoopStompSession.kt | 22 +----- .../conversions/jackson/MockStompSession.kt | 24 +++--- .../api/krossbow-stomp-kxserialization.api | 5 ++ .../conversions/moshi/MockStompSession.kt | 24 +++--- 7 files changed, 139 insertions(+), 113 deletions(-) diff --git a/krossbow-stomp-core/api/krossbow-stomp-core.api b/krossbow-stomp-core/api/krossbow-stomp-core.api index 6860a97e6..c6f7d2e50 100644 --- a/krossbow-stomp-core/api/krossbow-stomp-core.api +++ b/krossbow-stomp-core/api/krossbow-stomp-core.api @@ -60,20 +60,31 @@ public final class org/hildan/krossbow/stomp/StompReceipt { } public abstract interface class org/hildan/krossbow/stomp/StompSession { - public abstract fun abort (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun ack (Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun abort (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun abort$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun ack (Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun ack$default (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; - public abstract fun begin (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun commit (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun ack$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun begin (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun begin$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun commit (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun commit$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun nack (Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun nack (Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun nack$default (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun nack$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun send (Lorg/hildan/krossbow/stomp/headers/StompSendHeaders;Lorg/hildan/krossbow/stomp/frame/FrameBody;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun sendRawFrameAndMaybeAwaitReceipt (Lorg/hildan/krossbow/stomp/frame/StompFrame;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun subscribe (Lorg/hildan/krossbow/stomp/headers/StompSubscribeHeaders;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class org/hildan/krossbow/stomp/StompSession$DefaultImpls { + public static fun abort (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun ack (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun ack$default (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static fun begin (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun commit (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun nack (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun nack$default (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } @@ -93,6 +104,9 @@ public final class org/hildan/krossbow/stomp/StompWsExtensionsKt { public static synthetic fun stomp$default (Lorg/hildan/krossbow/websocket/WebSocketConnection;Lorg/hildan/krossbow/stomp/config/StompConfig;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } +public abstract interface annotation class org/hildan/krossbow/stomp/UnsafeStompSessionApi : java/lang/annotation/Annotation { +} + public final class org/hildan/krossbow/stomp/WebSocketClosedUnexpectedly : java/lang/Exception { public fun (ILjava/lang/String;)V public final fun getCode ()I @@ -172,6 +186,14 @@ public abstract interface class org/hildan/krossbow/stomp/conversions/TypedStomp public abstract fun subscribe (Lorg/hildan/krossbow/stomp/headers/StompSubscribeHeaders;Lorg/hildan/krossbow/stomp/conversions/KTypeRef;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public final class org/hildan/krossbow/stomp/conversions/TypedStompSession$DefaultImpls { + public static fun abort (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun ack (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun begin (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun commit (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun nack (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public final class org/hildan/krossbow/stomp/conversions/text/TextConversionsKt { public static final fun withTextConversions (Lorg/hildan/krossbow/stomp/StompSession;Lorg/hildan/krossbow/stomp/conversions/text/TextMessageConverter;)Lorg/hildan/krossbow/stomp/conversions/TypedStompSession; } diff --git a/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/BaseStompSession.kt b/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/BaseStompSession.kt index c29e3d8b6..71e1c4363 100644 --- a/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/BaseStompSession.kt +++ b/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/BaseStompSession.kt @@ -117,18 +117,13 @@ internal class BaseStompSession( return prepareHeadersAndSendFrame(StompFrame.Send(headers, body)) } + @OptIn(UnsafeStompSessionApi::class) private suspend fun prepareHeadersAndSendFrame(frame: StompFrame): StompReceipt? { val effectiveFrame = frame.copyWithHeaders { maybeSetContentLength(frame.body) maybeSetAutoReceipt() } - val receiptId = effectiveFrame.headers.receipt - if (receiptId == null) { - sendStompFrame(effectiveFrame) - return null - } - sendAndWaitForReceipt(receiptId, effectiveFrame) - return StompReceipt(receiptId) + return sendRawFrameAndMaybeAwaitReceipt(effectiveFrame) } private fun StompHeadersBuilder.maybeSetContentLength(frameBody: FrameBody?) { @@ -143,22 +138,6 @@ internal class BaseStompSession( } } - private suspend fun sendAndWaitForReceipt(receiptId: String, frame: StompFrame) { - withTimeoutOrNull(frame.receiptTimeout) { - sharedStompEvents - .onSubscription { - sendStompFrame(frame) - } - .dematerializeErrorsAndCompletion() - .filterIsInstance() - .firstOrNull { it.headers.receiptId == receiptId } - ?: throw SessionDisconnectedException("The STOMP frames flow completed unexpectedly while waiting for RECEIPT frame with id='$receiptId'") - } ?: throw LostReceiptException(receiptId, frame.receiptTimeout, frame) - } - - private val StompFrame.receiptTimeout: Duration - get() = if (command == StompCommand.DISCONNECT) config.disconnectTimeout else config.receiptTimeout - override suspend fun subscribe(headers: StompSubscribeHeaders): Flow { return startSubscription(headers) .consumeAsFlow() @@ -185,30 +164,37 @@ internal class BaseStompSession( } private suspend fun unsubscribe(subscriptionId: String) { - sendStompFrame(StompFrame.Unsubscribe(StompUnsubscribeHeaders(id = subscriptionId))) - } - - override suspend fun ack(ackId: String, transactionId: String?) { - sendStompFrame(StompFrame.Ack(StompAckHeaders(ackId) { transaction = transactionId })) - } - - override suspend fun nack(ackId: String, transactionId: String?) { - sendStompFrame(StompFrame.Nack(StompNackHeaders(ackId) { transaction = transactionId })) + sendRawStompFrame(StompFrame.Unsubscribe(StompUnsubscribeHeaders(id = subscriptionId))) } - override suspend fun begin(transactionId: String) { - sendStompFrame(StompFrame.Begin(StompBeginHeaders(transactionId))) + @UnsafeStompSessionApi + override suspend fun sendRawFrameAndMaybeAwaitReceipt(frame: StompFrame): StompReceipt? { + val receiptId = frame.headers.receipt + if (receiptId == null) { + sendRawStompFrame(frame) + return null + } + sendRawFrameAndAwaitReceipt(receiptId, frame) + return StompReceipt(receiptId) } - override suspend fun commit(transactionId: String) { - sendStompFrame(StompFrame.Commit(StompCommitHeaders(transactionId))) + private suspend fun sendRawFrameAndAwaitReceipt(receiptId: String, frame: StompFrame) { + withTimeoutOrNull(frame.receiptTimeout) { + sharedStompEvents + .onSubscription { + sendRawStompFrame(frame) + } + .dematerializeErrorsAndCompletion() + .filterIsInstance() + .firstOrNull { it.headers.receiptId == receiptId } + ?: throw SessionDisconnectedException("The STOMP frames flow completed unexpectedly while waiting for RECEIPT frame with id='$receiptId'") + } ?: throw LostReceiptException(receiptId, frame.receiptTimeout, frame) } - override suspend fun abort(transactionId: String) { - sendStompFrame(StompFrame.Abort(StompAbortHeaders(transactionId))) - } + private val StompFrame.receiptTimeout: Duration + get() = if (command == StompCommand.DISCONNECT) config.disconnectTimeout else config.receiptTimeout - private suspend fun sendStompFrame(frame: StompFrame) { + private suspend fun sendRawStompFrame(frame: StompFrame) { stompSocket.sendStompFrame(frame) heartBeater?.notifyMsgSent() } @@ -221,11 +207,11 @@ internal class BaseStompSession( sharedStompEvents.emit(StompEvent.Close) } + @OptIn(UnsafeStompSessionApi::class) private suspend fun sendDisconnectFrameAndWaitForReceipt() { try { - val receiptId = generateUuid() - val disconnectFrame = StompFrame.Disconnect(StompDisconnectHeaders { receipt = receiptId }) - sendAndWaitForReceipt(receiptId, disconnectFrame) + val disconnectFrame = StompFrame.Disconnect(StompDisconnectHeaders { receipt = generateUuid() }) + sendRawFrameAndMaybeAwaitReceipt(disconnectFrame) } catch (e: LostReceiptException) { // Sometimes the server closes the connection too quickly to send a RECEIPT, which is not really an error // http://stomp.github.io/stomp-specification-1.2.html#Connection_Lingering diff --git a/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/StompSession.kt b/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/StompSession.kt index e5c14611c..1f238e3ac 100644 --- a/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/StompSession.kt +++ b/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/StompSession.kt @@ -1,14 +1,15 @@ package org.hildan.krossbow.stomp -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.* import kotlinx.io.bytestring.* -import org.hildan.krossbow.stomp.config.StompConfig -import org.hildan.krossbow.stomp.frame.FrameBody -import org.hildan.krossbow.stomp.frame.StompFrame -import org.hildan.krossbow.stomp.headers.StompSendHeaders -import org.hildan.krossbow.stomp.headers.StompSubscribeHeaders -import org.hildan.krossbow.stomp.utils.generateUuid +import org.hildan.krossbow.stomp.config.* +import org.hildan.krossbow.stomp.frame.* +import org.hildan.krossbow.stomp.headers.* +import org.hildan.krossbow.stomp.utils.* + +@RequiresOptIn("This API is unsafe. Make sure you read the documentation and understand the consequences") +@Retention(AnnotationRetention.BINARY) +annotation class UnsafeStompSessionApi /** * A coroutine-based STOMP session. This interface defines interactions with a STOMP server. @@ -117,10 +118,9 @@ interface StompSession { * If auto-receipt is disabled and no `receipt` header is provided, this method doesn't wait for a RECEIPT frame * and never throws [LostReceiptException]. * Instead, it returns immediately after sending the SUBSCRIBE frame. - * This means that, in this case, there is no real guarantee that the subscription actually happened when this - * method returns. + * In this case, there is no real guarantee that the subscription actually happened when this method returns. * - * The unsubscription happens by sending an UNSUBSCRIBE frame when the flow collector's coroutine is cancelled, or + * The unsubscription happens by sending an `UNSUBSCRIBE` frame when the flow collector's coroutine is canceled, or * when a terminal operator such as [Flow.first][kotlinx.coroutines.flow.first] completes the flow from the * consumer's side. * @@ -137,7 +137,10 @@ interface StompSession { * The provided [ackId] must match the `ack` header of the message to acknowledge. * If this acknowledgement is part of a transaction, the [transactionId] should be provided. */ - suspend fun ack(ackId: String, transactionId: String? = null) + @OptIn(UnsafeStompSessionApi::class) + suspend fun ack(ackId: String, transactionId: String? = null) { + sendRawFrameAndMaybeAwaitReceipt(StompFrame.Ack(StompAckHeaders(ackId) { transaction = transactionId })) + } /** * Sends a NACK frame with the given [ackId]. @@ -145,28 +148,64 @@ interface StompSession { * The provided [ackId] must match the `ack` header of the message to refuse. * If this acknowledgement is part of a transaction, the [transactionId] should be provided. */ - suspend fun nack(ackId: String, transactionId: String? = null) + @OptIn(UnsafeStompSessionApi::class) + suspend fun nack(ackId: String, transactionId: String? = null) { + sendRawFrameAndMaybeAwaitReceipt(StompFrame.Nack(StompNackHeaders(ackId) { transaction = transactionId })) + } /** * Sends a BEGIN frame with the given [transactionId]. * * @see withTransaction */ - suspend fun begin(transactionId: String) + @OptIn(UnsafeStompSessionApi::class) + suspend fun begin(transactionId: String) { + sendRawFrameAndMaybeAwaitReceipt(StompFrame.Begin(StompBeginHeaders(transactionId))) + } /** * Sends a COMMIT frame with the given [transactionId]. * * @see withTransaction */ - suspend fun commit(transactionId: String) + @OptIn(UnsafeStompSessionApi::class) + suspend fun commit(transactionId: String) { + sendRawFrameAndMaybeAwaitReceipt(StompFrame.Commit(StompCommitHeaders(transactionId))) + } /** * Sends an ABORT frame with the given [transactionId]. * * @see withTransaction */ - suspend fun abort(transactionId: String) + @OptIn(UnsafeStompSessionApi::class) + suspend fun abort(transactionId: String) { + sendRawFrameAndMaybeAwaitReceipt(StompFrame.Abort(StompAbortHeaders(transactionId))) + } + + /** + * Sends the given [frame] as-is, without modifications to its headers, regardless of the configuration. + * + * This means that: + * * no `receipt` header is added even if [autoReceipt][StompConfig.autoReceipt] is enabled + * * no `content-length` header is added even if [autoContentLength][StompConfig.autoContentLength] is enabled + * + * If a `receipt` header is present in the given [frame], this method suspends until the corresponding `RECEIPT` + * frame is received from the server. + * If no `RECEIPT` frame is received in the configured [time limit][StompConfig.receiptTimeout], a + * [LostReceiptException] is thrown. + * + * If no `receipt` header is present in the given [frame], this method doesn't wait for a `RECEIPT` frame + * and never throws [LostReceiptException]. + * Instead, it returns immediately after sending the frame. + * In this case, there is no guarantee that the server received the frame when this method returns. + * + * WARNING: Prefer using higher-level APIs over this function. + * Sending raw frames may break some invariants or the state of this session. + * In particular, using subscription-related frames could be a problem. + */ + @UnsafeStompSessionApi + suspend fun sendRawFrameAndMaybeAwaitReceipt(frame: StompFrame): StompReceipt? /** * If [graceful disconnect][StompConfig.gracefulDisconnect] is enabled (which is the default), sends a DISCONNECT @@ -176,7 +215,7 @@ interface StompSession { * If a RECEIPT frame is not received within the [configured time][StompConfig.disconnectTimeout], it may * be because the server closed the connection too quickly to send a RECEIPT frame, which is * [not considered an error](http://stomp.github.io/stomp-specification-1.2.html#Connection_Lingering). - * That's why this function doesn't throw an exception in this case, it just returns normally. + * That's why this function doesn't throw an exception in this case; it just returns normally. */ suspend fun disconnect() } diff --git a/krossbow-stomp-core/src/commonTest/kotlin/org/hildan/krossbow/test/NoopStompSession.kt b/krossbow-stomp-core/src/commonTest/kotlin/org/hildan/krossbow/test/NoopStompSession.kt index c7692aa19..9847e06d3 100644 --- a/krossbow-stomp-core/src/commonTest/kotlin/org/hildan/krossbow/test/NoopStompSession.kt +++ b/krossbow-stomp-core/src/commonTest/kotlin/org/hildan/krossbow/test/NoopStompSession.kt @@ -2,8 +2,7 @@ package org.hildan.krossbow.test import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emptyFlow -import org.hildan.krossbow.stomp.StompReceipt -import org.hildan.krossbow.stomp.StompSession +import org.hildan.krossbow.stomp.* import org.hildan.krossbow.stomp.frame.FrameBody import org.hildan.krossbow.stomp.frame.StompFrame import org.hildan.krossbow.stomp.headers.StompSendHeaders @@ -15,21 +14,8 @@ open class NoopStompSession : StompSession { override suspend fun subscribe(headers: StompSubscribeHeaders): Flow = emptyFlow() - override suspend fun ack(ackId: String, transactionId: String?) { - } + @UnsafeStompSessionApi + override suspend fun sendRawFrameAndMaybeAwaitReceipt(frame: StompFrame): StompReceipt? = null - override suspend fun nack(ackId: String, transactionId: String?) { - } - - override suspend fun begin(transactionId: String) { - } - - override suspend fun commit(transactionId: String) { - } - - override suspend fun abort(transactionId: String) { - } - - override suspend fun disconnect() { - } + override suspend fun disconnect() = Unit } diff --git a/krossbow-stomp-jackson/src/test/kotlin/org/hildan/krossbow/stomp/conversions/jackson/MockStompSession.kt b/krossbow-stomp-jackson/src/test/kotlin/org/hildan/krossbow/stomp/conversions/jackson/MockStompSession.kt index ec8bfb192..888d024d1 100644 --- a/krossbow-stomp-jackson/src/test/kotlin/org/hildan/krossbow/stomp/conversions/jackson/MockStompSession.kt +++ b/krossbow-stomp-jackson/src/test/kotlin/org/hildan/krossbow/stomp/conversions/jackson/MockStompSession.kt @@ -3,8 +3,7 @@ package org.hildan.krossbow.stomp.conversions.jackson import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.consumeAsFlow -import org.hildan.krossbow.stomp.StompReceipt -import org.hildan.krossbow.stomp.StompSession +import org.hildan.krossbow.stomp.* import org.hildan.krossbow.stomp.frame.FrameBody import org.hildan.krossbow.stomp.frame.StompFrame import org.hildan.krossbow.stomp.headers.StompMessageHeaders @@ -24,23 +23,18 @@ class MockStompSession : StompSession { incomingFrames.send(frame) } - override suspend fun send(headers: StompSendHeaders, body: FrameBody?): StompReceipt? { - sentFrames.send(StompFrame.Send(headers, body)) - return null - } + @OptIn(UnsafeStompSessionApi::class) + override suspend fun send(headers: StompSendHeaders, body: FrameBody?): StompReceipt? = + sendRawFrameAndMaybeAwaitReceipt(StompFrame.Send(headers, body)) override suspend fun subscribe(headers: StompSubscribeHeaders): Flow = incomingFrames.consumeAsFlow() - override suspend fun ack(ackId: String, transactionId: String?) = error("This mock doesn't support this method") - - override suspend fun nack(ackId: String, transactionId: String?) = error("This mock doesn't support this method") - - override suspend fun begin(transactionId: String) = error("This mock doesn't support this method") - - override suspend fun commit(transactionId: String) = error("This mock doesn't support this method") - - override suspend fun abort(transactionId: String) = error("This mock doesn't support this method") + @UnsafeStompSessionApi + override suspend fun sendRawFrameAndMaybeAwaitReceipt(frame: StompFrame): StompReceipt? { + sentFrames.send(frame) + return frame.headers.receipt?.let { StompReceipt(it) } + } override suspend fun disconnect() = error("This mock doesn't support this method") } diff --git a/krossbow-stomp-kxserialization/api/krossbow-stomp-kxserialization.api b/krossbow-stomp-kxserialization/api/krossbow-stomp-kxserialization.api index 4214eefc4..e8f3822d9 100644 --- a/krossbow-stomp-kxserialization/api/krossbow-stomp-kxserialization.api +++ b/krossbow-stomp-kxserialization/api/krossbow-stomp-kxserialization.api @@ -12,7 +12,12 @@ public abstract interface class org/hildan/krossbow/stomp/conversions/kxserializ } public final class org/hildan/krossbow/stomp/conversions/kxserialization/StompSessionWithKxSerialization$DefaultImpls { + public static fun abort (Lorg/hildan/krossbow/stomp/conversions/kxserialization/StompSessionWithKxSerialization;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun ack (Lorg/hildan/krossbow/stomp/conversions/kxserialization/StompSessionWithKxSerialization;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun begin (Lorg/hildan/krossbow/stomp/conversions/kxserialization/StompSessionWithKxSerialization;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun commit (Lorg/hildan/krossbow/stomp/conversions/kxserialization/StompSessionWithKxSerialization;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun convertAndSend$default (Lorg/hildan/krossbow/stomp/conversions/kxserialization/StompSessionWithKxSerialization;Lorg/hildan/krossbow/stomp/headers/StompSendHeaders;Ljava/lang/Object;Lkotlinx/serialization/SerializationStrategy;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static fun nack (Lorg/hildan/krossbow/stomp/conversions/kxserialization/StompSessionWithKxSerialization;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class org/hildan/krossbow/stomp/conversions/kxserialization/StompSessionWithKxSerializationKt { diff --git a/krossbow-stomp-moshi/src/test/kotlin/org/hildan/krossbow/stomp/conversions/moshi/MockStompSession.kt b/krossbow-stomp-moshi/src/test/kotlin/org/hildan/krossbow/stomp/conversions/moshi/MockStompSession.kt index c76b6da5f..073ef3e4f 100644 --- a/krossbow-stomp-moshi/src/test/kotlin/org/hildan/krossbow/stomp/conversions/moshi/MockStompSession.kt +++ b/krossbow-stomp-moshi/src/test/kotlin/org/hildan/krossbow/stomp/conversions/moshi/MockStompSession.kt @@ -3,8 +3,7 @@ package org.hildan.krossbow.stomp.conversions.moshi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.consumeAsFlow -import org.hildan.krossbow.stomp.StompReceipt -import org.hildan.krossbow.stomp.StompSession +import org.hildan.krossbow.stomp.* import org.hildan.krossbow.stomp.frame.FrameBody import org.hildan.krossbow.stomp.frame.StompFrame import org.hildan.krossbow.stomp.headers.StompMessageHeaders @@ -24,23 +23,18 @@ class MockStompSession : StompSession { incomingFrames.send(frame) } - override suspend fun send(headers: StompSendHeaders, body: FrameBody?): StompReceipt? { - sentFrames.send(StompFrame.Send(headers, body)) - return null - } + @OptIn(UnsafeStompSessionApi::class) + override suspend fun send(headers: StompSendHeaders, body: FrameBody?): StompReceipt? = + sendRawFrameAndMaybeAwaitReceipt(StompFrame.Send(headers, body)) override suspend fun subscribe(headers: StompSubscribeHeaders): Flow = incomingFrames.consumeAsFlow() - override suspend fun ack(ackId: String, transactionId: String?) = error("This mock doesn't support this method") - - override suspend fun nack(ackId: String, transactionId: String?) = error("This mock doesn't support this method") - - override suspend fun begin(transactionId: String) = error("This mock doesn't support this method") - - override suspend fun commit(transactionId: String) = error("This mock doesn't support this method") - - override suspend fun abort(transactionId: String) = error("This mock doesn't support this method") + @UnsafeStompSessionApi + override suspend fun sendRawFrameAndMaybeAwaitReceipt(frame: StompFrame): StompReceipt? { + sentFrames.send(frame) + return frame.headers.receipt?.let { StompReceipt(it) } + } override suspend fun disconnect() = error("This mock doesn't support this method") }