Skip to content

Commit

Permalink
Addressing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Jul 24, 2023
1 parent 4431f77 commit 744ab15
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 88 deletions.
139 changes: 66 additions & 73 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,22 +879,22 @@ fn calculate_hash<T: Hash>(t: &T) -> u64 {
}

pub(crate) fn calculate_topology(
topology_results: Vec<Result<Value, RedisError>>,
retries: Option<Arc<atomic::AtomicUsize>>,
topology_views: Vec<Value>,
retries: Option<Arc<atomic::AtomicUsize>>, // TODO: change to usize
tls_mode: Option<TlsMode>,
read_from_replicas: bool,
num_of_queried_nodes: usize,
) -> Result<SlotMap, RedisError> {
assert!(!topology_results.is_empty());
if topology_views.is_empty() {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error: All CLUSTER SLOTS results are errors",
)));
}
const MIN_ACCURACY_RATE: f32 = 0.2;
let num_of_nodes = topology_results.len();
let mut hash_view_map = HashMap::new();
for view_result in topology_results {
let view = match view_result {
Ok(view) => view,
Err(_) => {
continue;
}
};
let mut new_slots = SlotMap::new();
for view in topology_views {
let hash_value = calculate_hash(&view);
let topology_entry = hash_view_map.entry(hash_value).or_insert(TopologyView {
hash_value,
Expand All @@ -903,34 +903,25 @@ pub(crate) fn calculate_topology(
});
topology_entry.nodes_count += 1;
}
if hash_view_map.is_empty() {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error: All CLUSTER SLOTS results are errors",
)));
}
let mut most_frequent_topology: Option<&TopologyView> = None;
let mut has_more_than_a_single_max = false;
let vec_iter = hash_view_map.iter().map(|(_, view)| view);
let mut new_slots = SlotMap::new();
// Find the most frequent topology view
for curr_view in vec_iter {
if most_frequent_topology.is_none() {
most_frequent_topology = Some(curr_view);
continue;
}
let max_view = match most_frequent_topology {
Some(view) => view,
None => {
most_frequent_topology = Some(curr_view);
continue;
}
};
if max_view.nodes_count < curr_view.nodes_count {
most_frequent_topology = Some(curr_view);
has_more_than_a_single_max = false;
} else if max_view.nodes_count == curr_view.nodes_count {
has_more_than_a_single_max = true;
match max_view.cmp(curr_view) {
std::cmp::Ordering::Less => {
most_frequent_topology = Some(curr_view);
has_more_than_a_single_max = false;
}
std::cmp::Ordering::Equal => has_more_than_a_single_max = true,
std::cmp::Ordering::Greater => continue,
}
}
let most_frequent_topology = match most_frequent_topology {
Expand All @@ -940,7 +931,7 @@ pub(crate) fn calculate_topology(
if has_more_than_a_single_max {
// More than a single most frequent view was found
if (retries.is_some() && retries.unwrap().fetch_sub(1, atomic::Ordering::SeqCst) == 1)
|| num_of_nodes < 3
|| num_of_queried_nodes < 3
{
// If it's the last retry, or if we it's a 2-nodes cluster, we'll return all found topologies to be checked by the caller
for (idx, topology_view) in hash_view_map.iter() {
Expand All @@ -966,16 +957,17 @@ pub(crate) fn calculate_topology(
"Slot refresh error: Couldn't get a majority in topology views",
)));
}
let accuracy_num = most_frequent_topology.nodes_count as f32 / num_of_nodes as f32;
if accuracy_num >= MIN_ACCURACY_RATE {
// Calculates the accuracy of the topology view by checking how many nodes share this view out of the total number queried
let accuracy_rate = most_frequent_topology.nodes_count as f32 / num_of_queried_nodes as f32;
if accuracy_rate >= MIN_ACCURACY_RATE {
parse_slots(&most_frequent_topology.topology_value, tls_mode)
.and_then(|v| build_slot_map(&mut new_slots, v, read_from_replicas))?;
return Ok(new_slots);
Ok(new_slots)
} else {
return Err(RedisError::from((
Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error: The accuracy of the topology view is too low",
)));
)))
}
}

Expand Down Expand Up @@ -1028,106 +1020,107 @@ mod tests {

#[test]
fn test_topology_calculator() {
let err = RedisError::from((ErrorKind::ResponseError, "parse error"));
let single_node_view = Value::Bulk(vec![Value::Bulk(vec![
Value::Int(0 as i64),
Value::Int(16383 as i64),
Value::Int(0_i64),
Value::Int(16383_i64),
Value::Bulk(vec![
Value::Data("node1".as_bytes().to_vec()),
Value::Int(6379 as i64),
Value::Int(6379_i64),
]),
])]);
let single_node_missing_slots_view = Value::Bulk(vec![Value::Bulk(vec![
Value::Int(0 as i64),
Value::Int(4000 as i64),
Value::Int(0_i64),
Value::Int(4000_i64),
Value::Bulk(vec![
Value::Data("node1".as_bytes().to_vec()),
Value::Int(6379 as i64),
Value::Int(6379_i64),
]),
])]);
let two_nodes_full_coverage_view = Value::Bulk(vec![
Value::Bulk(vec![
Value::Int(0 as i64),
Value::Int(4000 as i64),
Value::Int(0_i64),
Value::Int(4000_i64),
Value::Bulk(vec![
Value::Data("node1".as_bytes().to_vec()),
Value::Int(6379 as i64),
Value::Int(6379_i64),
]),
]),
Value::Bulk(vec![
Value::Int(4001 as i64),
Value::Int(16383 as i64),
Value::Int(4001_i64),
Value::Int(16383_i64),
Value::Bulk(vec![
Value::Data("node2".as_bytes().to_vec()),
Value::Int(6380 as i64),
Value::Int(6380_i64),
]),
]),
]);
let two_nodes_missing_slots_view = Value::Bulk(vec![
Value::Bulk(vec![
Value::Int(0 as i64),
Value::Int(3000 as i64),
Value::Int(0_i64),
Value::Int(3000_i64),
Value::Bulk(vec![
Value::Data("node3".as_bytes().to_vec()),
Value::Int(6381 as i64),
Value::Int(6381_i64),
]),
]),
Value::Bulk(vec![
Value::Int(4001 as i64),
Value::Int(16383 as i64),
Value::Int(4001_i64),
Value::Int(16383_i64),
Value::Bulk(vec![
Value::Data("node4".as_bytes().to_vec()),
Value::Int(6382 as i64),
Value::Int(6382_i64),
]),
]),
]);

// Has a majority, single_node_view should be chosen
// 4 nodes queried (1 error): Has a majority, single_node_view should be chosen
let mut queried_nodes: usize = 4;
let topology_results = vec![
Ok(single_node_view.clone()),
Ok(single_node_view.clone()),
Ok(two_nodes_full_coverage_view.clone()),
Err(err),
single_node_view.clone(),
single_node_view.clone(),
two_nodes_full_coverage_view.clone(),
];
let node1_addr = SlotAddrs::new("node1:6379".to_string(), None);
let node2_addr = SlotAddrs::new("node2:6380".to_string(), None);
let topology_view = calculate_topology(topology_results, None, None, false).unwrap();
let topology_view =
calculate_topology(topology_results, None, None, false, queried_nodes).unwrap();
let res: Vec<_> = topology_view.values().collect();
let excepted = vec![&node1_addr];
assert_eq!(res, excepted);

// no majority, should return an error
// 3 nodes queried: No majority, should return an error
queried_nodes = 3;
let topology_results = vec![
Ok(single_node_view.clone()),
Ok(two_nodes_full_coverage_view.clone()),
Ok(two_nodes_missing_slots_view.clone()),
single_node_view,
two_nodes_full_coverage_view.clone(),
two_nodes_missing_slots_view.clone(),
];
let topology_view = calculate_topology(topology_results, None, None, false);
let topology_view = calculate_topology(topology_results, None, None, false, queried_nodes);
assert!(topology_view.is_err());

// no majority, last retry, should get the view that has a full slot coverage
// 3 nodes queried:: No majority, last retry, should get the view that has a full slot coverage
let topology_results = vec![
Ok(single_node_missing_slots_view.clone()),
Ok(two_nodes_full_coverage_view.clone()),
Ok(two_nodes_missing_slots_view.clone()),
single_node_missing_slots_view,
two_nodes_full_coverage_view.clone(),
two_nodes_missing_slots_view.clone(),
];
let topology_view = calculate_topology(
topology_results,
Some(Arc::new(AtomicUsize::new(1))),
None,
false,
queried_nodes,
)
.unwrap();
let res: Vec<_> = topology_view.values().collect();
let excepted: Vec<&SlotAddrs> = vec![&node1_addr, &node2_addr];
assert_eq!(res, excepted);

// 2 nodes no majority, should get the view that has a full slot coverage
let topology_results = vec![
Ok(two_nodes_full_coverage_view.clone()),
Ok(two_nodes_missing_slots_view.clone()),
];
let topology_view = calculate_topology(topology_results, None, None, false).unwrap();
// 2 nodes queried: No majority, should get the view that has a full slot coverage
queried_nodes = 2;
let topology_results = vec![two_nodes_full_coverage_view, two_nodes_missing_slots_view];
let topology_view =
calculate_topology(topology_results, None, None, false, queried_nodes).unwrap();
let res: Vec<_> = topology_view.values().collect();
assert_eq!(res, excepted);
}
Expand Down
27 changes: 12 additions & 15 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,26 +471,23 @@ where
let mut rng = thread_rng();
read_guard.0.values().choose_multiple(&mut rng, amount)
};
let mut cluster_slot_futures = Vec::new();
for conn in requested_nodes.iter_mut() {
let mut conn: C = conn.clone().await;
let slots_future = async move { conn.req_packed_command(&slot_cmd()).await };
#[cfg(feature = "tokio-comp")]
cluster_slot_futures.push(tokio::spawn(slots_future));
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
cluster_slot_futures.push(AsyncStd::spawn(slots_future));
}
let mut topology_join_results = futures::future::join_all(cluster_slot_futures).await;
let mut topology_results: Vec<_> = topology_join_results
let mut topology_join_results =
futures::future::join_all(requested_nodes.iter_mut().map(|conn| async move {
let mut conn: C = conn.clone().await;
conn.req_packed_command(&slot_cmd()).await
}))
.await;
let mut topology_values: Vec<_> = topology_join_results
.drain(..)
.filter_map(|r| r.ok())
.collect();
topology_results.shuffle(&mut thread_rng());
topology_values.shuffle(&mut thread_rng());
let new_slots = calculate_topology(
topology_results,
topology_values,
retries.clone(),
inner.cluster_params.tls,
inner.cluster_params.read_from_replicas,
num_of_nodes,
)?;

let connections: &ConnectionMap<C> = &read_guard.0;
Expand Down Expand Up @@ -520,8 +517,8 @@ where

drop(read_guard);
let mut write_guard = inner.conn_lock.write().await;
let _ = mem::replace(&mut write_guard.1, new_slots);
let _ = mem::replace(&mut write_guard.0, new_connections);
write_guard.1 = new_slots;
write_guard.0 = new_connections;
Ok(())
}
}
Expand Down

0 comments on commit 744ab15

Please sign in to comment.