diff --git a/src/extensions/api/eth.rs b/src/extensions/api/eth.rs index 9b9ea9b..dfb57d2 100644 --- a/src/extensions/api/eth.rs +++ b/src/extensions/api/eth.rs @@ -114,7 +114,11 @@ impl EthApi { } _ = interval.tick() => { tracing::warn!("No new blocks for {stale_timeout:?} seconds, rotating endpoint"); - client.rotate_endpoint().await.expect("Failed to rotate endpoint"); + client.rotate_endpoint().await; + break; + } + _ = client.on_rotation() => { + // endpoint is rotated, break the loop and restart subscription break; } } @@ -140,18 +144,31 @@ impl EthApi { .subscribe("eth_subscribe", ["newFinalizedHeads".into()].into(), "eth_unsubscribe") .await?; - while let Some(Ok(val)) = sub.next().await { - let number = super::get_number(&val)?; - let hash = super::get_hash(&val)?; + loop { + tokio::select! { + val = sub.next() => { + if let Some(Ok(val)) = val { + let number = super::get_number(&val)?; + let hash = super::get_hash(&val)?; + + if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) + { + tracing::error!("Error in background task: {e}"); + client.rotate_endpoint().await; + break; + } - if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) { - tracing::error!("Error in background task: {e}"); - client.rotate_endpoint().await.expect("Failed to rotate endpoint"); - break; + tracing::debug!("New finalized head: {number} {hash}"); + finalized_head_tx.send_replace(Some((hash, number))); + } else { + break; + } + } + _ = client.on_rotation() => { + // endpoint is rotated, break the loop and restart subscription + break; + } } - - tracing::debug!("New finalized head: {number} {hash}"); - finalized_head_tx.send_replace(Some((hash, number))); } Ok::<(), anyhow::Error>(()) diff --git a/src/extensions/api/substrate.rs b/src/extensions/api/substrate.rs index 7b1d2f4..710e786 100644 --- a/src/extensions/api/substrate.rs +++ b/src/extensions/api/substrate.rs @@ -101,7 +101,11 @@ impl SubstrateApi { } _ = interval.tick() => { tracing::warn!("No new blocks for {stale_timeout:?} seconds, rotating endpoint"); - client.rotate_endpoint().await.expect("Failed to rotate endpoint"); + client.rotate_endpoint().await; + break; + } + _ = client.on_rotation() => { + // endpoint is rotated, break the loop and restart subscription break; } } @@ -122,7 +126,7 @@ impl SubstrateApi { tokio::spawn(async move { loop { let run = async { - let sub = client + let mut sub = client .subscribe( "chain_subscribeFinalizedHeads", [].into(), @@ -130,20 +134,34 @@ impl SubstrateApi { ) .await?; - let mut sub = sub; - while let Some(Ok(val)) = sub.next().await { - let number = super::get_number(&val)?; + loop { + tokio::select! { + val = sub.next() => { + if let Some(Ok(val)) = val { + let number = super::get_number(&val)?; - let hash = client.request("chain_getBlockHash", vec![number.into()]).await?; + let hash = client + .request("chain_getBlockHash", vec![number.into()]) + .await?; - if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) { - tracing::error!("Error in background task: {e}"); - client.rotate_endpoint().await.expect("Failed to rotate endpoint"); - break; - } + if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) + { + tracing::error!("Error in background task: {e}"); + client.rotate_endpoint().await; + break; + } - tracing::debug!("New finalized head: {number} {hash}"); - finalized_head_tx.send_replace(Some((hash, number))); + tracing::debug!("New finalized head: {number} {hash}"); + finalized_head_tx.send_replace(Some((hash, number))); + } else { + break; + } + } + _ = client.on_rotation() => { + // endpoint is rotated, break the loop and restart subscription + break; + } + } } Ok::<(), anyhow::Error>(()) diff --git a/src/extensions/api/tests.rs b/src/extensions/api/tests.rs index d5e6726..0651b4f 100644 --- a/src/extensions/api/tests.rs +++ b/src/extensions/api/tests.rs @@ -241,8 +241,7 @@ async fn rotate_endpoint_on_head_mismatch() { let client = Client::new([format!("ws://{addr1}"), format!("ws://{addr2}")]).unwrap(); let client = Arc::new(client); - // TODO: investigate why it takes a while to connect to another endpoint - let api = SubstrateApi::new(client.clone(), std::time::Duration::from_millis(5_000)); + let api = SubstrateApi::new(client.clone(), std::time::Duration::from_millis(100)); let head = api.get_head(); let finalized_head = api.get_finalized_head(); @@ -312,6 +311,12 @@ async fn rotate_endpoint_on_head_mismatch() { tx.send(json!("0xaa")).unwrap(); } + // wait a bit to process tasks + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + + assert!(head_sink.is_closed()); + assert!(finalized_head_sink.is_closed()); + // current finalized head is still 2 assert_eq!(api.get_finalized_head().read().await, (json!("0xbb"), 0x02)); diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 929ad1a..207554c 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -20,6 +20,7 @@ use jsonrpsee::{ use opentelemetry::trace::FutureExt; use rand::{seq::SliceRandom, thread_rng}; use serde::Deserialize; +use tokio::sync::Notify; use crate::{ extension::Extension, @@ -36,6 +37,7 @@ const TRACER: utils::telemetry::Tracer = utils::telemetry::Tracer::new("client") pub struct Client { sender: tokio::sync::mpsc::Sender, + rotation_notify: Arc, } #[derive(Deserialize, Debug)] @@ -96,6 +98,9 @@ impl Client { let (disconnect_tx, mut disconnect_rx) = tokio::sync::mpsc::channel::<()>(10); + let rotation_notify = Arc::new(Notify::new()); + let rotating = rotation_notify.clone(); + tokio::spawn(async move { let tx = tx2; @@ -292,6 +297,7 @@ impl Client { tracing::trace!("Received message {message:?}"); match message { Some(Message::RotateEndpoint) => { + rotating.notify_waiters(); tracing::info!("Rotate endpoint"); ws = build_ws().await; } @@ -306,7 +312,10 @@ impl Client { } }); - Ok(Self { sender: tx }) + Ok(Self { + sender: tx, + rotation_notify, + }) } pub async fn request(&self, method: &str, params: Vec) -> Result { @@ -349,8 +358,16 @@ impl Client { rx.with_context(cx).await.map_err(errors::failed)? } - pub async fn rotate_endpoint(&self) -> Result<(), ()> { - self.sender.send(Message::RotateEndpoint).await.map_err(|_| ()) + pub async fn rotate_endpoint(&self) { + self.sender + .send(Message::RotateEndpoint) + .await + .expect("Failed to rotate endpoint"); + } + + /// Returns a future that resolves when the endpoint is rotated. + pub async fn on_rotation(&self) { + self.rotation_notify.notified().await } } diff --git a/src/extensions/client/tests.rs b/src/extensions/client/tests.rs index fcdd273..31637d5 100644 --- a/src/extensions/client/tests.rs +++ b/src/extensions/client/tests.rs @@ -90,7 +90,7 @@ async fn multiple_endpoints() { assert_eq!(result.to_string(), "2"); - client.rotate_endpoint().await.unwrap(); + client.rotate_endpoint().await; tokio::time::sleep(Duration::from_millis(100)).await;