Skip to content

Commit

Permalink
Add sendRawFrameAndMaybeAwaitReceipt
Browse files Browse the repository at this point in the history
Resolves: #507
  • Loading branch information
joffrey-bion committed Oct 6, 2024
1 parent 5f9f263 commit 2432dfe
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 113 deletions.
32 changes: 27 additions & 5 deletions krossbow-stomp-core/api/krossbow-stomp-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,31 @@ public final class org/hildan/krossbow/stomp/StompReceipt {
}

public abstract interface class org/hildan/krossbow/stomp/StompSession {
public abstract fun abort (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun ack (Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun abort (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun abort$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun ack (Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun ack$default (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public abstract fun begin (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun commit (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun ack$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun begin (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun begin$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun commit (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun commit$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun disconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun nack (Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun nack (Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun nack$default (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun nack$suspendImpl (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun send (Lorg/hildan/krossbow/stomp/headers/StompSendHeaders;Lorg/hildan/krossbow/stomp/frame/FrameBody;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun sendRawFrameAndMaybeAwaitReceipt (Lorg/hildan/krossbow/stomp/frame/StompFrame;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun subscribe (Lorg/hildan/krossbow/stomp/headers/StompSubscribeHeaders;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class org/hildan/krossbow/stomp/StompSession$DefaultImpls {
public static fun abort (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun ack (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun ack$default (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static fun begin (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun commit (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun nack (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun nack$default (Lorg/hildan/krossbow/stomp/StompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

Expand All @@ -93,6 +104,9 @@ public final class org/hildan/krossbow/stomp/StompWsExtensionsKt {
public static synthetic fun stomp$default (Lorg/hildan/krossbow/websocket/WebSocketConnection;Lorg/hildan/krossbow/stomp/config/StompConfig;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface annotation class org/hildan/krossbow/stomp/UnsafeStompSessionApi : java/lang/annotation/Annotation {
}

public final class org/hildan/krossbow/stomp/WebSocketClosedUnexpectedly : java/lang/Exception {
public fun <init> (ILjava/lang/String;)V
public final fun getCode ()I
Expand Down Expand Up @@ -172,6 +186,14 @@ public abstract interface class org/hildan/krossbow/stomp/conversions/TypedStomp
public abstract fun subscribe (Lorg/hildan/krossbow/stomp/headers/StompSubscribeHeaders;Lorg/hildan/krossbow/stomp/conversions/KTypeRef;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class org/hildan/krossbow/stomp/conversions/TypedStompSession$DefaultImpls {
public static fun abort (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun ack (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun begin (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun commit (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun nack (Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class org/hildan/krossbow/stomp/conversions/text/TextConversionsKt {
public static final fun withTextConversions (Lorg/hildan/krossbow/stomp/StompSession;Lorg/hildan/krossbow/stomp/conversions/text/TextMessageConverter;)Lorg/hildan/krossbow/stomp/conversions/TypedStompSession;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,13 @@ internal class BaseStompSession(
return prepareHeadersAndSendFrame(StompFrame.Send(headers, body))
}

@OptIn(UnsafeStompSessionApi::class)
private suspend fun prepareHeadersAndSendFrame(frame: StompFrame): StompReceipt? {
val effectiveFrame = frame.copyWithHeaders {
maybeSetContentLength(frame.body)
maybeSetAutoReceipt()
}
val receiptId = effectiveFrame.headers.receipt
if (receiptId == null) {
sendStompFrame(effectiveFrame)
return null
}
sendAndWaitForReceipt(receiptId, effectiveFrame)
return StompReceipt(receiptId)
return sendRawFrameAndMaybeAwaitReceipt(effectiveFrame)
}

private fun StompHeadersBuilder.maybeSetContentLength(frameBody: FrameBody?) {
Expand All @@ -143,22 +138,6 @@ internal class BaseStompSession(
}
}

private suspend fun sendAndWaitForReceipt(receiptId: String, frame: StompFrame) {
withTimeoutOrNull(frame.receiptTimeout) {
sharedStompEvents
.onSubscription {
sendStompFrame(frame)
}
.dematerializeErrorsAndCompletion()
.filterIsInstance<StompFrame.Receipt>()
.firstOrNull { it.headers.receiptId == receiptId }
?: throw SessionDisconnectedException("The STOMP frames flow completed unexpectedly while waiting for RECEIPT frame with id='$receiptId'")
} ?: throw LostReceiptException(receiptId, frame.receiptTimeout, frame)
}

private val StompFrame.receiptTimeout: Duration
get() = if (command == StompCommand.DISCONNECT) config.disconnectTimeout else config.receiptTimeout

override suspend fun subscribe(headers: StompSubscribeHeaders): Flow<StompFrame.Message> {
return startSubscription(headers)
.consumeAsFlow()
Expand All @@ -185,30 +164,37 @@ internal class BaseStompSession(
}

private suspend fun unsubscribe(subscriptionId: String) {
sendStompFrame(StompFrame.Unsubscribe(StompUnsubscribeHeaders(id = subscriptionId)))
}

override suspend fun ack(ackId: String, transactionId: String?) {
sendStompFrame(StompFrame.Ack(StompAckHeaders(ackId) { transaction = transactionId }))
}

override suspend fun nack(ackId: String, transactionId: String?) {
sendStompFrame(StompFrame.Nack(StompNackHeaders(ackId) { transaction = transactionId }))
sendRawStompFrame(StompFrame.Unsubscribe(StompUnsubscribeHeaders(id = subscriptionId)))
}

override suspend fun begin(transactionId: String) {
sendStompFrame(StompFrame.Begin(StompBeginHeaders(transactionId)))
@UnsafeStompSessionApi
override suspend fun sendRawFrameAndMaybeAwaitReceipt(frame: StompFrame): StompReceipt? {
val receiptId = frame.headers.receipt
if (receiptId == null) {
sendRawStompFrame(frame)
return null
}
sendRawFrameAndAwaitReceipt(receiptId, frame)
return StompReceipt(receiptId)
}

override suspend fun commit(transactionId: String) {
sendStompFrame(StompFrame.Commit(StompCommitHeaders(transactionId)))
private suspend fun sendRawFrameAndAwaitReceipt(receiptId: String, frame: StompFrame) {
withTimeoutOrNull(frame.receiptTimeout) {
sharedStompEvents
.onSubscription {
sendRawStompFrame(frame)
}
.dematerializeErrorsAndCompletion()
.filterIsInstance<StompFrame.Receipt>()
.firstOrNull { it.headers.receiptId == receiptId }
?: throw SessionDisconnectedException("The STOMP frames flow completed unexpectedly while waiting for RECEIPT frame with id='$receiptId'")
} ?: throw LostReceiptException(receiptId, frame.receiptTimeout, frame)
}

override suspend fun abort(transactionId: String) {
sendStompFrame(StompFrame.Abort(StompAbortHeaders(transactionId)))
}
private val StompFrame.receiptTimeout: Duration
get() = if (command == StompCommand.DISCONNECT) config.disconnectTimeout else config.receiptTimeout

private suspend fun sendStompFrame(frame: StompFrame) {
private suspend fun sendRawStompFrame(frame: StompFrame) {
stompSocket.sendStompFrame(frame)
heartBeater?.notifyMsgSent()
}
Expand All @@ -221,11 +207,11 @@ internal class BaseStompSession(
sharedStompEvents.emit(StompEvent.Close)
}

@OptIn(UnsafeStompSessionApi::class)
private suspend fun sendDisconnectFrameAndWaitForReceipt() {
try {
val receiptId = generateUuid()
val disconnectFrame = StompFrame.Disconnect(StompDisconnectHeaders { receipt = receiptId })
sendAndWaitForReceipt(receiptId, disconnectFrame)
val disconnectFrame = StompFrame.Disconnect(StompDisconnectHeaders { receipt = generateUuid() })
sendRawFrameAndMaybeAwaitReceipt(disconnectFrame)
} catch (e: LostReceiptException) {
// Sometimes the server closes the connection too quickly to send a RECEIPT, which is not really an error
// http://stomp.github.io/stomp-specification-1.2.html#Connection_Lingering
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package org.hildan.krossbow.stomp

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.*
import kotlinx.io.bytestring.*
import org.hildan.krossbow.stomp.config.StompConfig
import org.hildan.krossbow.stomp.frame.FrameBody
import org.hildan.krossbow.stomp.frame.StompFrame
import org.hildan.krossbow.stomp.headers.StompSendHeaders
import org.hildan.krossbow.stomp.headers.StompSubscribeHeaders
import org.hildan.krossbow.stomp.utils.generateUuid
import org.hildan.krossbow.stomp.config.*
import org.hildan.krossbow.stomp.frame.*
import org.hildan.krossbow.stomp.headers.*
import org.hildan.krossbow.stomp.utils.*

@RequiresOptIn("This API is unsafe. Make sure you read the documentation and understand the consequences")
@Retention(AnnotationRetention.BINARY)
annotation class UnsafeStompSessionApi

/**
* A coroutine-based STOMP session. This interface defines interactions with a STOMP server.
Expand Down Expand Up @@ -117,10 +118,9 @@ interface StompSession {
* If auto-receipt is disabled and no `receipt` header is provided, this method doesn't wait for a RECEIPT frame
* and never throws [LostReceiptException].
* Instead, it returns immediately after sending the SUBSCRIBE frame.
* This means that, in this case, there is no real guarantee that the subscription actually happened when this
* method returns.
* In this case, there is no real guarantee that the subscription actually happened when this method returns.
*
* The unsubscription happens by sending an UNSUBSCRIBE frame when the flow collector's coroutine is cancelled, or
* The unsubscription happens by sending an `UNSUBSCRIBE` frame when the flow collector's coroutine is canceled, or
* when a terminal operator such as [Flow.first][kotlinx.coroutines.flow.first] completes the flow from the
* consumer's side.
*
Expand All @@ -137,36 +137,75 @@ interface StompSession {
* The provided [ackId] must match the `ack` header of the message to acknowledge.
* If this acknowledgement is part of a transaction, the [transactionId] should be provided.
*/
suspend fun ack(ackId: String, transactionId: String? = null)
@OptIn(UnsafeStompSessionApi::class)
suspend fun ack(ackId: String, transactionId: String? = null) {
sendRawFrameAndMaybeAwaitReceipt(StompFrame.Ack(StompAckHeaders(ackId) { transaction = transactionId }))
}

/**
* Sends a NACK frame with the given [ackId].
*
* The provided [ackId] must match the `ack` header of the message to refuse.
* If this acknowledgement is part of a transaction, the [transactionId] should be provided.
*/
suspend fun nack(ackId: String, transactionId: String? = null)
@OptIn(UnsafeStompSessionApi::class)
suspend fun nack(ackId: String, transactionId: String? = null) {
sendRawFrameAndMaybeAwaitReceipt(StompFrame.Nack(StompNackHeaders(ackId) { transaction = transactionId }))
}

/**
* Sends a BEGIN frame with the given [transactionId].
*
* @see withTransaction
*/
suspend fun begin(transactionId: String)
@OptIn(UnsafeStompSessionApi::class)
suspend fun begin(transactionId: String) {
sendRawFrameAndMaybeAwaitReceipt(StompFrame.Begin(StompBeginHeaders(transactionId)))
}

/**
* Sends a COMMIT frame with the given [transactionId].
*
* @see withTransaction
*/
suspend fun commit(transactionId: String)
@OptIn(UnsafeStompSessionApi::class)
suspend fun commit(transactionId: String) {
sendRawFrameAndMaybeAwaitReceipt(StompFrame.Commit(StompCommitHeaders(transactionId)))
}

/**
* Sends an ABORT frame with the given [transactionId].
*
* @see withTransaction
*/
suspend fun abort(transactionId: String)
@OptIn(UnsafeStompSessionApi::class)
suspend fun abort(transactionId: String) {
sendRawFrameAndMaybeAwaitReceipt(StompFrame.Abort(StompAbortHeaders(transactionId)))
}

/**
* Sends the given [frame] as-is, without modifications to its headers, regardless of the configuration.
*
* This means that:
* * no `receipt` header is added even if [autoReceipt][StompConfig.autoReceipt] is enabled
* * no `content-length` header is added even if [autoContentLength][StompConfig.autoContentLength] is enabled
*
* If a `receipt` header is present in the given [frame], this method suspends until the corresponding `RECEIPT`
* frame is received from the server.
* If no `RECEIPT` frame is received in the configured [time limit][StompConfig.receiptTimeout], a
* [LostReceiptException] is thrown.
*
* If no `receipt` header is present in the given [frame], this method doesn't wait for a `RECEIPT` frame
* and never throws [LostReceiptException].
* Instead, it returns immediately after sending the frame.
* In this case, there is no guarantee that the server received the frame when this method returns.
*
* WARNING: Prefer using higher-level APIs over this function.
* Sending raw frames may break some invariants or the state of this session.
* In particular, using subscription-related frames could be a problem.
*/
@UnsafeStompSessionApi
suspend fun sendRawFrameAndMaybeAwaitReceipt(frame: StompFrame): StompReceipt?

/**
* If [graceful disconnect][StompConfig.gracefulDisconnect] is enabled (which is the default), sends a DISCONNECT
Expand All @@ -176,7 +215,7 @@ interface StompSession {
* If a RECEIPT frame is not received within the [configured time][StompConfig.disconnectTimeout], it may
* be because the server closed the connection too quickly to send a RECEIPT frame, which is
* [not considered an error](http://stomp.github.io/stomp-specification-1.2.html#Connection_Lingering).
* That's why this function doesn't throw an exception in this case, it just returns normally.
* That's why this function doesn't throw an exception in this case; it just returns normally.
*/
suspend fun disconnect()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package org.hildan.krossbow.test

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import org.hildan.krossbow.stomp.StompReceipt
import org.hildan.krossbow.stomp.StompSession
import org.hildan.krossbow.stomp.*
import org.hildan.krossbow.stomp.frame.FrameBody
import org.hildan.krossbow.stomp.frame.StompFrame
import org.hildan.krossbow.stomp.headers.StompSendHeaders
Expand All @@ -15,21 +14,8 @@ open class NoopStompSession : StompSession {

override suspend fun subscribe(headers: StompSubscribeHeaders): Flow<StompFrame.Message> = emptyFlow()

override suspend fun ack(ackId: String, transactionId: String?) {
}
@UnsafeStompSessionApi
override suspend fun sendRawFrameAndMaybeAwaitReceipt(frame: StompFrame): StompReceipt? = null

override suspend fun nack(ackId: String, transactionId: String?) {
}

override suspend fun begin(transactionId: String) {
}

override suspend fun commit(transactionId: String) {
}

override suspend fun abort(transactionId: String) {
}

override suspend fun disconnect() {
}
override suspend fun disconnect() = Unit
}
Loading

0 comments on commit 2432dfe

Please sign in to comment.