Skip to content

Commit

Permalink
notify tasks on endpoint rotation (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci authored Oct 3, 2023
1 parent e7f9778 commit ba32a53
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 30 deletions.
39 changes: 28 additions & 11 deletions src/extensions/api/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ impl EthApi {
}
_ = interval.tick() => {
tracing::warn!("No new blocks for {stale_timeout:?} seconds, rotating endpoint");
client.rotate_endpoint().await.expect("Failed to rotate endpoint");
client.rotate_endpoint().await;
break;
}
_ = client.on_rotation() => {
// endpoint is rotated, break the loop and restart subscription
break;
}
}
Expand All @@ -140,18 +144,31 @@ impl EthApi {
.subscribe("eth_subscribe", ["newFinalizedHeads".into()].into(), "eth_unsubscribe")
.await?;

while let Some(Ok(val)) = sub.next().await {
let number = super::get_number(&val)?;
let hash = super::get_hash(&val)?;
loop {
tokio::select! {
val = sub.next() => {
if let Some(Ok(val)) = val {
let number = super::get_number(&val)?;
let hash = super::get_hash(&val)?;

if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash)
{
tracing::error!("Error in background task: {e}");
client.rotate_endpoint().await;
break;
}

if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) {
tracing::error!("Error in background task: {e}");
client.rotate_endpoint().await.expect("Failed to rotate endpoint");
break;
tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
} else {
break;
}
}
_ = client.on_rotation() => {
// endpoint is rotated, break the loop and restart subscription
break;
}
}

tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
}

Ok::<(), anyhow::Error>(())
Expand Down
44 changes: 31 additions & 13 deletions src/extensions/api/substrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ impl SubstrateApi {
}
_ = interval.tick() => {
tracing::warn!("No new blocks for {stale_timeout:?} seconds, rotating endpoint");
client.rotate_endpoint().await.expect("Failed to rotate endpoint");
client.rotate_endpoint().await;
break;
}
_ = client.on_rotation() => {
// endpoint is rotated, break the loop and restart subscription
break;
}
}
Expand All @@ -122,28 +126,42 @@ impl SubstrateApi {
tokio::spawn(async move {
loop {
let run = async {
let sub = client
let mut sub = client
.subscribe(
"chain_subscribeFinalizedHeads",
[].into(),
"chain_unsubscribeFinalizedHeads",
)
.await?;

let mut sub = sub;
while let Some(Ok(val)) = sub.next().await {
let number = super::get_number(&val)?;
loop {
tokio::select! {
val = sub.next() => {
if let Some(Ok(val)) = val {
let number = super::get_number(&val)?;

let hash = client.request("chain_getBlockHash", vec![number.into()]).await?;
let hash = client
.request("chain_getBlockHash", vec![number.into()])
.await?;

if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) {
tracing::error!("Error in background task: {e}");
client.rotate_endpoint().await.expect("Failed to rotate endpoint");
break;
}
if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash)
{
tracing::error!("Error in background task: {e}");
client.rotate_endpoint().await;
break;
}

tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
} else {
break;
}
}
_ = client.on_rotation() => {
// endpoint is rotated, break the loop and restart subscription
break;
}
}
}

Ok::<(), anyhow::Error>(())
Expand Down
9 changes: 7 additions & 2 deletions src/extensions/api/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ async fn rotate_endpoint_on_head_mismatch() {
let client = Client::new([format!("ws://{addr1}"), format!("ws://{addr2}")]).unwrap();

let client = Arc::new(client);
// TODO: investigate why it takes a while to connect to another endpoint
let api = SubstrateApi::new(client.clone(), std::time::Duration::from_millis(5_000));
let api = SubstrateApi::new(client.clone(), std::time::Duration::from_millis(100));

let head = api.get_head();
let finalized_head = api.get_finalized_head();
Expand Down Expand Up @@ -312,6 +311,12 @@ async fn rotate_endpoint_on_head_mismatch() {
tx.send(json!("0xaa")).unwrap();
}

// wait a bit to process tasks
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

assert!(head_sink.is_closed());
assert!(finalized_head_sink.is_closed());

// current finalized head is still 2
assert_eq!(api.get_finalized_head().read().await, (json!("0xbb"), 0x02));

Expand Down
23 changes: 20 additions & 3 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use jsonrpsee::{
use opentelemetry::trace::FutureExt;
use rand::{seq::SliceRandom, thread_rng};
use serde::Deserialize;
use tokio::sync::Notify;

use crate::{
extension::Extension,
Expand All @@ -36,6 +37,7 @@ const TRACER: utils::telemetry::Tracer = utils::telemetry::Tracer::new("client")

pub struct Client {
sender: tokio::sync::mpsc::Sender<Message>,
rotation_notify: Arc<Notify>,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -96,6 +98,9 @@ impl Client {

let (disconnect_tx, mut disconnect_rx) = tokio::sync::mpsc::channel::<()>(10);

let rotation_notify = Arc::new(Notify::new());
let rotating = rotation_notify.clone();

tokio::spawn(async move {
let tx = tx2;

Expand Down Expand Up @@ -292,6 +297,7 @@ impl Client {
tracing::trace!("Received message {message:?}");
match message {
Some(Message::RotateEndpoint) => {
rotating.notify_waiters();
tracing::info!("Rotate endpoint");
ws = build_ws().await;
}
Expand All @@ -306,7 +312,10 @@ impl Client {
}
});

Ok(Self { sender: tx })
Ok(Self {
sender: tx,
rotation_notify,
})
}

pub async fn request(&self, method: &str, params: Vec<JsonValue>) -> Result<JsonValue, ErrorObjectOwned> {
Expand Down Expand Up @@ -349,8 +358,16 @@ impl Client {
rx.with_context(cx).await.map_err(errors::failed)?
}

pub async fn rotate_endpoint(&self) -> Result<(), ()> {
self.sender.send(Message::RotateEndpoint).await.map_err(|_| ())
pub async fn rotate_endpoint(&self) {
self.sender
.send(Message::RotateEndpoint)
.await
.expect("Failed to rotate endpoint");
}

/// Returns a future that resolves when the endpoint is rotated.
pub async fn on_rotation(&self) {
self.rotation_notify.notified().await
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/extensions/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async fn multiple_endpoints() {

assert_eq!(result.to_string(), "2");

client.rotate_endpoint().await.unwrap();
client.rotate_endpoint().await;

tokio::time::sleep(Duration::from_millis(100)).await;

Expand Down

0 comments on commit ba32a53

Please sign in to comment.