diff --git a/src/middlewares/subscriptions/merge_subscription.rs b/src/middlewares/subscriptions/merge_subscription.rs index 6d7e52d..4780361 100644 --- a/src/middlewares/subscriptions/merge_subscription.rs +++ b/src/middlewares/subscriptions/merge_subscription.rs @@ -230,31 +230,34 @@ impl Middleware> for MergeSubscript .await?; // broadcast new values - let mut stream = subscribe(); - - loop { - tokio::select! { - resp = stream.recv() => { - match resp { - Ok(new_value) => { - if let Err(e) = sink.send(new_value).await { - tracing::trace!("subscription sink closed {e:?}"); - break; + tokio::spawn(async move { + // create receiver inside task to avoid msg been broadcast before stream.recv() is hit + let mut stream = subscribe(); + + loop { + tokio::select! { + resp = stream.recv() => { + match resp { + Ok(new_value) => { + if let Err(e) = sink.send(new_value).await { + tracing::trace!("subscription sink closed {e:?}"); + break; + } + } + Err(e) => { + // this should never happen + tracing::error!("subscription stream error {e:?}"); + unreachable!("subscription stream error {e:?}"); } - } - Err(e) => { - // this should never happen - tracing::error!("subscription stream error {e:?}"); - unreachable!("subscription stream error {e:?}"); } } - } - _ = sink.closed() => { - tracing::trace!("subscription sink closed"); - break; + _ = sink.closed() => { + tracing::trace!("subscription sink closed"); + break; + } } } - } + }); Ok(()) }