diff --git a/src/middlewares/subscriptions/merge_subscription.rs b/src/middlewares/subscriptions/merge_subscription.rs index 6542172..6d7e52d 100644 --- a/src/middlewares/subscriptions/merge_subscription.rs +++ b/src/middlewares/subscriptions/merge_subscription.rs @@ -31,7 +31,7 @@ fn merge_storage_changes(current_value: JsonValue, new_value: JsonValue) -> Resu let mut current = serde_json::from_value::(current_value)?; let StorageChanges { block, changes } = serde_json::from_value::(new_value)?; - let changed_keys = changes.clone().into_iter().map(|(key, _)| key).collect::>(); + let changed_keys = changes.iter().map(|(key, _)| key).collect::>(); // replace block hash current.block = block; @@ -230,34 +230,31 @@ impl Middleware> for MergeSubscript .await?; // broadcast new values - tokio::spawn(async move { - // create receiver inside task to avoid msg been broadcast before stream.recv() is hit - let mut stream = subscribe(); - - loop { - tokio::select! { - resp = stream.recv() => { - match resp { - Ok(new_value) => { - if let Err(e) = sink.send(new_value).await { - tracing::trace!("subscription sink closed {e:?}"); - break; - } - } - Err(e) => { - // this should never happen - tracing::error!("subscription stream error {e:?}"); - unreachable!("subscription stream error {e:?}"); + let mut stream = subscribe(); + + loop { + tokio::select! { + resp = stream.recv() => { + match resp { + Ok(new_value) => { + if let Err(e) = sink.send(new_value).await { + tracing::trace!("subscription sink closed {e:?}"); + break; } } - } - _ = sink.closed() => { - tracing::trace!("subscription sink closed"); - break; + Err(e) => { + // this should never happen + tracing::error!("subscription stream error {e:?}"); + unreachable!("subscription stream error {e:?}"); + } } } + _ = sink.closed() => { + tracing::trace!("subscription sink closed"); + break; + } } - }); + } Ok(()) } diff --git a/src/middlewares/subscriptions/upstream.rs b/src/middlewares/subscriptions/upstream.rs index 48b2afa..c7a74ef 100644 --- a/src/middlewares/subscriptions/upstream.rs +++ b/src/middlewares/subscriptions/upstream.rs @@ -52,26 +52,42 @@ impl Middleware for UpstreamMiddleware let sink = sink.accept().await?; - while let Some(resp) = sub.next().await { - let resp = match resp { - Ok(resp) => resp, - Err(e) => { - tracing::error!("Subscription error: {}", e); - continue; + loop { + tokio::select! { + msg = sub.next() => { + match msg { + Some(resp) => { + let resp = match resp { + Ok(resp) => resp, + Err(e) => { + tracing::error!("Subscription error: {}", e); + continue; + } + }; + let resp = match SubscriptionMessage::from_json(&resp) { + Ok(resp) => resp, + Err(e) => { + tracing::error!("Failed to serialize subscription response: {}", e); + continue; + } + }; + if let Err(e) = sink.send(resp).await { + tracing::error!("Failed to send subscription response: {}", e); + break; + } + } + None => break, + } } - }; - let resp = match SubscriptionMessage::from_json(&resp) { - Ok(resp) => resp, - Err(e) => { - tracing::error!("Failed to serialize subscription response: {}", e); - continue; - } - }; - if let Err(e) = sink.send(resp).await { - tracing::info!("Failed to send subscription response: {}", e); - break; + _ = sink.closed() => { + if let Err(err) = sub.unsubscribe().await { + tracing::error!("Failed to unsubscribe: {}", err); + } + break + }, } } + Ok(()) } }