From 51cd9b4a3746fc9fc456cc90275d7447808ebff7 Mon Sep 17 00:00:00 2001 From: Iulian Barbu Date: Thu, 29 Aug 2024 12:31:35 +0300 Subject: [PATCH] cumulus/client: added external rpc connection retry logic Signed-off-by: Iulian Barbu --- .../src/reconnecting_ws_client.rs | 39 +++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs b/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs index 48d35dd3a55ee..f030fc7dee4f2 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs @@ -34,7 +34,7 @@ use jsonrpsee::{ use sc_rpc_api::chain::ChainApiClient; use schnellru::{ByLength, LruMap}; use sp_runtime::generic::SignedBlock; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc::{ channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender, }; @@ -43,6 +43,9 @@ use url::Url; use crate::rpc_client::{distribute_header, RpcDispatcherMessage}; const LOG_TARGET: &str = "reconnecting-websocket-client"; +const DEFAULT_EXTERNAL_RPC_CONN_RETRIES: usize = 5; +const DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES: u64 = 1000; +const DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES: i32 = 2; /// Worker that should be used in combination with [`RelayChainRpcClient`]. /// @@ -93,16 +96,45 @@ struct RelayChainSubscriptions { best_subscription: Subscription, } -/// Try to find a new RPC server to connect to. +/// Try to find a new RPC server to connect to. Uses a naive retry +/// logic that does an exponential backoff in between iterations +/// through all URLs from the list. It uses a constant to tell how +/// many iterations of connection attempts to all URLs we allow. We +/// return early when a connection is made. async fn connect_next_available_rpc_server( urls: &Vec, starting_position: usize, ) -> Result<(usize, Arc), ()> { tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server."); - for (counter, url) in urls.iter().cycle().skip(starting_position).take(urls.len()).enumerate() { + + let mut prev_iteration: u32 = 0; + for (counter, url) in urls + .iter() + .cycle() + .skip(starting_position) + .take(urls.len() * DEFAULT_EXTERNAL_RPC_CONN_RETRIES) + .enumerate() + { + // If we reached the end of the urls list, backoff before retrying + // connections to the entire list once more. + let Ok(current_iteration) = (counter / urls.len()).try_into() else { + tracing::error!(target: LOG_TARGET, "Too many connection attempts to the RPC servers, aborting..."); + break; + }; + if current_iteration > prev_iteration { + // Safe conversion given we convert positive i32s which are lower than u64::MAX. + tokio::time::sleep(Duration::from_millis( + DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES * + DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES.pow(prev_iteration) as u64, + )) + .await; + prev_iteration = current_iteration; + } + let index = (starting_position + counter) % urls.len(); tracing::info!( target: LOG_TARGET, + current_iteration, index, url, "Trying to connect to next external relaychain node.", @@ -112,6 +144,7 @@ async fn connect_next_available_rpc_server( Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."), }; } + Err(()) }