diff --git a/redis/Cargo.toml b/redis/Cargo.toml index 4752296e1..bde5b404d 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -54,10 +54,16 @@ r2d2 = { version = "0.8.8", optional = true } # Only needed for cluster crc16 = { version = "0.4", optional = true } rand = { version = "0.8", optional = true } +derivative = { version = "2.2.0", optional = true } + # Only needed for async_std support async-std = { version = "1.8.0", optional = true} async-trait = { version = "0.1.24", optional = true } -derivative = { version = "2.2.0", optional = true } +# To avoid conflicts, backoff-std-async.version != backoff-tokio.version so we could run tests with --all-features +backoff-std-async = { package = "backoff", version = "0.3.0", optional = true, features = ["async-std"] } + +# Only needed for tokio support +backoff-tokio = { package = "backoff", version = "0.4.0", optional = true, features = ["tokio"] } # Only needed for native tls native-tls = { version = "0.2", optional = true } @@ -93,10 +99,10 @@ tls-native-tls = ["native-tls"] tls-rustls = ["rustls", "rustls-native-certs"] tls-rustls-insecure = ["tls-rustls", "rustls/dangerous_configuration"] tls-rustls-webpki-roots = ["tls-rustls", "webpki-roots"] -async-std-comp = ["aio", "async-std"] +async-std-comp = ["aio", "async-std", "backoff-std-async"] async-std-native-tls-comp = ["async-std-comp", "async-native-tls", "tls-native-tls"] async-std-rustls-comp = ["async-std-comp", "futures-rustls", "tls-rustls"] -tokio-comp = ["aio", "tokio", "tokio/net"] +tokio-comp = ["aio", "tokio", "tokio/net", "backoff-tokio"] tokio-native-tls-comp = ["tokio-comp", "tls-native-tls", "tokio-native-tls"] tokio-rustls-comp = ["tokio-comp", "tls-rustls", "tokio-rustls"] connection-manager = ["arc-swap", "futures", "aio", "tokio-retry"] diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 19fc8cd4a..b8510118d 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -28,7 +28,10 @@ use std::{ marker::Unpin, mem, pin::Pin, - sync::{atomic, Arc, Mutex}, + sync::{ + atomic::{self, AtomicUsize}, + Arc, Mutex, + }, task::{self, Poll}, }; @@ -40,13 +43,26 @@ use crate::{ MultipleNodeRoutingInfo, Redirect, ResponsePolicy, Route, RoutingInfo, SingleNodeRoutingInfo, }, - cluster_topology::{calculate_topology, SlotMap}, + cluster_topology::{ + calculate_topology, SlotMap, DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL, + DEFAULT_REFRESH_SLOTS_RETRY_TIMEOUT, + }, Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, RedisFuture, RedisResult, Value, }; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use crate::aio::{async_std::AsyncStd, RedisRuntime}; +#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] +use backoff_std_async::future::retry; +#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] +use backoff_std_async::{Error, ExponentialBackoff}; + +#[cfg(feature = "tokio-comp")] +use backoff_tokio::future::retry; +#[cfg(feature = "tokio-comp")] +use backoff_tokio::{Error, ExponentialBackoff}; + use futures::{ future::{self, BoxFuture}, prelude::*, @@ -458,8 +474,7 @@ where refresh_error: None, state: ConnectionState::PollComplete, }; - // TODO: add retries - connection.refresh_slots().await?; + connection.refresh_slots_with_retries().await?; Ok(connection) } @@ -525,80 +540,6 @@ where } } - // Query a node to discover slot-> master mappings. - fn refresh_slots(&mut self) -> impl Future> { - self.refresh_slots_with_retries(None) - } - // Query a node to discover slot-> master mappings with retries - fn refresh_slots_with_retries( - &mut self, - retries: Option>, - ) -> impl Future> { - let inner = self.inner.clone(); - - async move { - let read_guard = inner.conn_lock.read().await; - let num_of_nodes = read_guard.0.len(); - const MAX_REQUESTED_NODES: usize = 50; - let num_of_nodes_to_query = std::cmp::min(num_of_nodes, MAX_REQUESTED_NODES); - let mut requested_nodes = { - let mut rng = thread_rng(); - read_guard - .0 - .values() - .choose_multiple(&mut rng, num_of_nodes_to_query) - }; - let topology_join_results = - futures::future::join_all(requested_nodes.iter_mut().map(|conn| async move { - let mut conn: C = conn.clone().await; - conn.req_packed_command(&slot_cmd()).await - })) - .await; - let topology_values: Vec<_> = topology_join_results - .into_iter() - .filter_map(|r| r.ok()) - .collect(); - let new_slots = calculate_topology( - topology_values, - retries.clone(), - inner.cluster_params.tls, - inner.cluster_params.read_from_replicas, - num_of_nodes_to_query, - )?; - - let connections: &ConnectionMap = &read_guard.0; - let mut nodes = new_slots.values().flatten().collect::>(); - nodes.sort_unstable(); - nodes.dedup(); - let nodes_len = nodes.len(); - let addresses_and_connections_iter = nodes - .into_iter() - .map(|addr| (addr, connections.get(addr).cloned())); - let new_connections: HashMap> = - stream::iter(addresses_and_connections_iter) - .fold( - HashMap::with_capacity(nodes_len), - |mut connections, (addr, connection)| async { - let conn = - Self::get_or_create_conn(addr, connection, &inner.cluster_params) - .await; - if let Ok(conn) = conn { - connections - .insert(addr.to_string(), async { conn }.boxed().shared()); - } - connections - }, - ) - .await; - - drop(read_guard); - let mut write_guard = inner.conn_lock.write().await; - write_guard.1 = new_slots; - write_guard.0 = new_connections; - Ok(()) - } - } - async fn aggregate_results( receivers: Vec<(String, oneshot::Receiver>)>, routing: &MultipleNodeRoutingInfo, @@ -682,6 +623,90 @@ where } } + // Query a node to discover slot-> master mappings with retries + fn refresh_slots_with_retries(&mut self) -> impl Future> { + let inner = self.inner.clone(); + async move { + let retry_strategy = ExponentialBackoff { + initial_interval: DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL, + max_interval: DEFAULT_REFRESH_SLOTS_RETRY_TIMEOUT, + ..Default::default() + }; + let retries_counter = AtomicUsize::new(0); + retry(retry_strategy, || { + retries_counter.fetch_add(1, atomic::Ordering::Relaxed); + Self::refresh_slots( + inner.clone(), + retries_counter.load(atomic::Ordering::Relaxed), + ) + .map_err(Error::from) + }) + .await?; + Ok(()) + } + } + + // Query a node to discover slot-> master mappings + async fn refresh_slots(inner: Arc>, curr_retry: usize) -> RedisResult<()> { + let read_guard = inner.conn_lock.read().await; + let num_of_nodes = read_guard.0.len(); + const MAX_REQUESTED_NODES: usize = 50; + let num_of_nodes_to_query = std::cmp::min(num_of_nodes, MAX_REQUESTED_NODES); + let mut requested_nodes = { + let mut rng = thread_rng(); + read_guard + .0 + .values() + .choose_multiple(&mut rng, num_of_nodes_to_query) + }; + let topology_join_results = + futures::future::join_all(requested_nodes.iter_mut().map(|conn| async move { + let mut conn: C = conn.clone().await; + conn.req_packed_command(&slot_cmd()).await + })) + .await; + let topology_values: Vec<_> = topology_join_results + .into_iter() + .filter_map(|r| r.ok()) + .collect(); + let new_slots = calculate_topology( + topology_values, + curr_retry, + inner.cluster_params.tls, + inner.cluster_params.read_from_replicas, + num_of_nodes_to_query, + )?; + + let connections: &ConnectionMap = &read_guard.0; + let mut nodes = new_slots.values().flatten().collect::>(); + nodes.sort_unstable(); + nodes.dedup(); + let nodes_len = nodes.len(); + let addresses_and_connections_iter = nodes + .into_iter() + .map(|addr| (addr, connections.get(addr).cloned())); + let new_connections: HashMap> = + stream::iter(addresses_and_connections_iter) + .fold( + HashMap::with_capacity(nodes_len), + |mut connections, (addr, connection)| async { + let conn = + Self::get_or_create_conn(addr, connection, &inner.cluster_params).await; + if let Ok(conn) = conn { + connections.insert(addr.to_string(), async { conn }.boxed().shared()); + } + connections + }, + ) + .await; + + drop(read_guard); + let mut write_guard = inner.conn_lock.write().await; + write_guard.1 = new_slots; + write_guard.0 = new_connections; + Ok(()) + } + async fn execute_on_multiple_nodes<'a>( cmd: &'a Arc, routing: &'a MultipleNodeRoutingInfo, @@ -902,7 +927,7 @@ where } Poll::Ready(Err(err)) => { self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin( - self.refresh_slots(), + self.refresh_slots_with_retries(), ))); Poll::Ready(Err(err)) } @@ -1142,7 +1167,7 @@ where PollFlushAction::None => return Poll::Ready(Ok(())), PollFlushAction::RebuildSlots => { self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots( - Box::pin(self.refresh_slots()), + Box::pin(self.refresh_slots_with_retries()), )); } PollFlushAction::Reconnect(addrs) => { diff --git a/redis/src/cluster_topology.rs b/redis/src/cluster_topology.rs index 997464151..35332a19e 100644 --- a/redis/src/cluster_topology.rs +++ b/redis/src/cluster_topology.rs @@ -1,3 +1,5 @@ +//! This module provides the functionality to refresh and calculate the cluster topology for Redis Cluster. + use crate::cluster::get_connection_addr; use crate::cluster_routing::MultipleNodeRoutingInfo; use crate::cluster_routing::Route; @@ -8,10 +10,17 @@ use derivative::Derivative; use log::trace; use std::collections::hash_map::DefaultHasher; use std::collections::BTreeMap; +use std::collections::HashMap; use std::collections::HashSet; use std::hash::{Hash, Hasher}; -use std::sync::Arc; -use std::{collections::HashMap, sync::atomic}; +use std::time::Duration; + +/// The default number of refersh topology retries +pub const DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES: usize = 3; +/// The default timeout for retrying topology refresh +pub const DEFAULT_REFRESH_SLOTS_RETRY_TIMEOUT: Duration = Duration::from_secs(1); +/// The default initial interval for retrying topology refresh +pub const DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL: Duration = Duration::from_millis(100); pub(crate) const SLOT_SIZE: u16 = 16384; @@ -229,7 +238,7 @@ fn calculate_hash(t: &T) -> u64 { pub(crate) fn calculate_topology( topology_views: Vec, - retries: Option>, // TODO: change to usize + curr_retry: usize, tls_mode: Option, read_from_replicas: bool, num_of_queried_nodes: usize, @@ -278,10 +287,8 @@ pub(crate) fn calculate_topology( }; if has_more_than_a_single_max { // More than a single most frequent view was found - // If it's the last retry, or if we it's a 2-nodes cluster, we'll return all found topologies to be checked by the caller - if (retries.is_some() && retries.unwrap().fetch_sub(1, atomic::Ordering::SeqCst) == 1) - || num_of_queried_nodes < 3 - { + // If we reached the last retry, or if we it's a 2-nodes cluster, we'll return all found topologies to be checked by the caller + if curr_retry >= DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES || num_of_queried_nodes < 3 { for (idx, topology_view) in hash_view_map.values().enumerate() { match parse_slots(&topology_view.topology_value, tls_mode) .and_then(|v| build_slot_map(&mut new_slots, v, read_from_replicas)) @@ -329,7 +336,6 @@ pub(crate) fn calculate_topology( mod tests { use super::*; use crate::cluster_routing::SlotAddrs; - use std::sync::atomic::AtomicUsize; #[test] fn test_get_hashtag() { @@ -415,7 +421,7 @@ mod tests { get_view(&ViewType::TwoNodesViewFullCoverage), ]; let topology_view = - calculate_topology(topology_results, None, None, false, queried_nodes).unwrap(); + calculate_topology(topology_results, 1, None, false, queried_nodes).unwrap(); let res: Vec<_> = topology_view.values().collect(); let node_1 = get_node_addr("node1", 6379); let expected: Vec<&SlotAddrs> = vec![&node_1]; @@ -431,7 +437,7 @@ mod tests { get_view(&ViewType::TwoNodesViewFullCoverage), get_view(&ViewType::TwoNodesViewMissingSlots), ]; - let topology_view = calculate_topology(topology_results, None, None, false, queried_nodes); + let topology_view = calculate_topology(topology_results, 1, None, false, queried_nodes); assert!(topology_view.is_err()); } @@ -444,14 +450,8 @@ mod tests { get_view(&ViewType::TwoNodesViewFullCoverage), get_view(&ViewType::TwoNodesViewMissingSlots), ]; - let topology_view = calculate_topology( - topology_results, - Some(Arc::new(AtomicUsize::new(1))), - None, - false, - queried_nodes, - ) - .unwrap(); + let topology_view = + calculate_topology(topology_results, 3, None, false, queried_nodes).unwrap(); let res: Vec<_> = topology_view.values().collect(); let node_1 = get_node_addr("node1", 6379); let node_2 = get_node_addr("node2", 6380); @@ -468,7 +468,7 @@ mod tests { get_view(&ViewType::TwoNodesViewMissingSlots), ]; let topology_view = - calculate_topology(topology_results, None, None, false, queried_nodes).unwrap(); + calculate_topology(topology_results, 1, None, false, queried_nodes).unwrap(); let res: Vec<_> = topology_view.values().collect(); let node_1 = get_node_addr("node1", 6379); let node_2 = get_node_addr("node2", 6380); @@ -484,8 +484,7 @@ mod tests { get_view(&ViewType::SingleNodeViewMissingSlots), get_view(&ViewType::TwoNodesViewMissingSlots), ]; - let topology_view_res = - calculate_topology(topology_results, None, None, false, queried_nodes); + let topology_view_res = calculate_topology(topology_results, 1, None, false, queried_nodes); assert!(topology_view_res.is_err()); } } diff --git a/redis/src/lib.rs b/redis/src/lib.rs index 212213c17..ae53a8ba2 100644 --- a/redis/src/lib.rs +++ b/redis/src/lib.rs @@ -448,7 +448,8 @@ mod cluster_pipeline; pub mod cluster_routing; #[cfg(feature = "cluster")] -mod cluster_topology; +#[cfg_attr(docsrs, doc(cfg(feature = "cluster")))] +pub mod cluster_topology; #[cfg(feature = "r2d2")] #[cfg_attr(docsrs, doc(cfg(feature = "r2d2")))] diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 8c6b8da60..13e626437 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -14,6 +14,7 @@ use redis::{ cluster::ClusterClient, cluster_async::Connect, cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo}, + cluster_topology::DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES, cmd, parse_redis_value, AsyncCommands, Cmd, ErrorKind, InfoDict, IntoConnectionInfo, RedisError, RedisFuture, RedisResult, Script, Value, }; @@ -394,11 +395,8 @@ fn test_async_cluster_tryagain_exhaust_retries() { assert_eq!(requests.load(atomic::Ordering::SeqCst), 3); } -fn get_node_view_and_port_index( - num_of_view: usize, - ports: &Vec, - called_port: u16, -) -> (usize, usize) { +// Obtain the view index associated with the node with [called_port] port +fn get_node_view_index(num_of_views: usize, ports: &Vec, called_port: u16) -> usize { let port_index = ports .iter() .position(|&p| p == called_port) @@ -409,10 +407,10 @@ fn get_node_view_and_port_index( ) }); // If we have less views than nodes, use the last view - if port_index < num_of_view { - (port_index, port_index) + if port_index < num_of_views { + port_index } else { - (num_of_view - 1, port_index) + num_of_views - 1 } } #[test] @@ -487,19 +485,18 @@ fn test_async_cluster_move_error_when_new_node_is_added() { assert_eq!(value, Ok(Some(123))); } -fn test_cluster_refresh_topology_after_moved_error_get_succeed( +fn test_cluster_refresh_topology_after_moved_assert_get_succeed_and_expected_retries( slots_config_vec: Vec>, ports: Vec, has_a_majority: bool, ) { assert!(!ports.is_empty() && !slots_config_vec.is_empty()); let name = "refresh_topology_moved"; + let num_of_nodes = ports.len(); let requests = atomic::AtomicUsize::new(0); let started = atomic::AtomicBool::new(false); - let refreshed: Vec<_> = ports - .iter() - .map(|_| atomic::AtomicBool::new(false)) - .collect(); + let refresh_calls = Arc::new(atomic::AtomicUsize::new(0)); + let refresh_calls_cloned = refresh_calls.clone(); let MockEnv { runtime, async_connection: mut connection, @@ -530,16 +527,8 @@ fn test_cluster_refresh_topology_after_moved_error_get_succeed( )), _ => { if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { - let (view_index, port_index) = - get_node_view_and_port_index(slots_config_vec.len(), &ports, port); - if has_a_majority { - // We should be able to refresh the topology in the first try if we have a majority in - // the topology views, so CLUSTER SLOTS should be called only once for each node - assert!(!refreshed - .get(port_index) - .unwrap() - .swap(true, Ordering::SeqCst)); - } + refresh_calls_cloned.fetch_add(1, atomic::Ordering::SeqCst); + let view_index = get_node_view_index(slots_config_vec.len(), &ports, port); Err(Ok(create_topology_from_config( name, slots_config_vec[view_index].clone(), @@ -552,14 +541,32 @@ fn test_cluster_refresh_topology_after_moved_error_get_succeed( } } }); - - let value = runtime.block_on( - cmd("GET") + runtime.block_on(async move { + let res = cmd("GET") .arg("test") - .query_async::<_, Option>(&mut connection), - ); - - assert_eq!(value, Ok(Some(123))); + .query_async::<_, Option>(&mut connection) + .await; + assert_eq!(res, Ok(Some(123))); + // If there is a majority in the topology views, or if it's a 2-nodes cluster, we shall be able to calculate the topology on the first try, + // so each node will be queried only once with CLUSTER SLOTS. + // Otherwise, if we don't have a majority, we expect to see the refresh_slots function being called with the maximum retry number. + let expected_calls = if has_a_majority || num_of_nodes == 2 {num_of_nodes} else {DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES * num_of_nodes}; + let mut refreshed_calls = 0; + for _ in 0..100 { + refreshed_calls = refresh_calls.load(atomic::Ordering::Relaxed); + if refreshed_calls == expected_calls { + return; + } else { + let sleep_duration = core::time::Duration::from_millis(100); + #[cfg(feature = "tokio-comp")] + tokio::time::sleep(sleep_duration).await; + + #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] + async_std::task::sleep(sleep_duration).await; + } + } + panic!("Failed to reach to the expected topology refresh retries. Found={refreshed_calls}, Expected={expected_calls}") + }); } fn test_cluster_refresh_topology_in_client_init_get_succeed( @@ -588,8 +595,7 @@ fn test_cluster_refresh_topology_in_client_init_get_succeed( if contains_slice(cmd, b"PING") { return Err(Ok(Value::Status("OK".into()))); } else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { - let (view_index, _) = - get_node_view_and_port_index(slots_config_vec.len(), &ports, port); + let view_index = get_node_view_index(slots_config_vec.len(), &ports, port); return Err(Ok(create_topology_from_config( name, slots_config_vec[view_index].clone(), @@ -666,9 +672,9 @@ fn get_topology_with_majority(ports: &Vec) -> Vec> { } #[test] -fn test_async_cluster_move_error_refresh_topology_all_nodes_agree() { +fn test_cluster_refresh_topology_after_moved_error_all_nodes_agree_get_succeed() { let ports = get_ports(3); - test_cluster_refresh_topology_after_moved_error_get_succeed( + test_cluster_refresh_topology_after_moved_assert_get_succeed_and_expected_retries( get_topology_with_majority(&ports), ports, true, @@ -676,7 +682,7 @@ fn test_async_cluster_move_error_refresh_topology_all_nodes_agree() { } #[test] -fn test_async_cluster_client_initilization_refresh_topology_all_nodes_agree() { +fn test_cluster_refresh_topology_in_client_init_all_nodes_agree_get_succeed() { let ports = get_ports(3); test_cluster_refresh_topology_in_client_init_get_succeed( get_topology_with_majority(&ports), @@ -685,10 +691,10 @@ fn test_async_cluster_client_initilization_refresh_topology_all_nodes_agree() { } #[test] -fn test_async_cluster_move_error_refresh_topology_no_majority() { +fn test_cluster_refresh_topology_after_moved_error_with_no_majority_get_succeed() { for num_of_nodes in 2..4 { let ports = get_ports(num_of_nodes); - test_cluster_refresh_topology_after_moved_error_get_succeed( + test_cluster_refresh_topology_after_moved_assert_get_succeed_and_expected_retries( get_no_majority_topology_view(&ports), ports, false, @@ -696,6 +702,17 @@ fn test_async_cluster_move_error_refresh_topology_no_majority() { } } +#[test] +fn test_cluster_refresh_topology_in_client_init_with_no_majority_get_succeed() { + for num_of_nodes in 2..4 { + let ports = get_ports(num_of_nodes); + test_cluster_refresh_topology_in_client_init_get_succeed( + get_no_majority_topology_view(&ports), + ports, + ); + } +} + #[test] fn test_async_cluster_ask_redirect() { let name = "node";