From 21907144c037ddb00439202053d9f53fcb8c7a12 Mon Sep 17 00:00:00 2001 From: Ermal Kaleci Date: Wed, 1 Nov 2023 19:21:38 +0100 Subject: [PATCH] bring back tokio::spawn which was removed on #122. I thought it was unnecessary but I was wrong. Subscription request needs to complete with Ok(()) while background task will keep sending updates. --- .../subscriptions/merge_subscription.rs | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/src/middlewares/subscriptions/merge_subscription.rs b/src/middlewares/subscriptions/merge_subscription.rs index 6d7e52d..4780361 100644 --- a/src/middlewares/subscriptions/merge_subscription.rs +++ b/src/middlewares/subscriptions/merge_subscription.rs @@ -230,31 +230,34 @@ impl Middleware> for MergeSubscript .await?; // broadcast new values - let mut stream = subscribe(); - - loop { - tokio::select! { - resp = stream.recv() => { - match resp { - Ok(new_value) => { - if let Err(e) = sink.send(new_value).await { - tracing::trace!("subscription sink closed {e:?}"); - break; + tokio::spawn(async move { + // create receiver inside task to avoid msg been broadcast before stream.recv() is hit + let mut stream = subscribe(); + + loop { + tokio::select! { + resp = stream.recv() => { + match resp { + Ok(new_value) => { + if let Err(e) = sink.send(new_value).await { + tracing::trace!("subscription sink closed {e:?}"); + break; + } + } + Err(e) => { + // this should never happen + tracing::error!("subscription stream error {e:?}"); + unreachable!("subscription stream error {e:?}"); } - } - Err(e) => { - // this should never happen - tracing::error!("subscription stream error {e:?}"); - unreachable!("subscription stream error {e:?}"); } } - } - _ = sink.closed() => { - tracing::trace!("subscription sink closed"); - break; + _ = sink.closed() => { + tracing::trace!("subscription sink closed"); + break; + } } } - } + }); Ok(()) }