Skip to content

Commit

Permalink
Move read_from_replica filtering to usage time.
Browse files Browse the repository at this point in the history
Slot addresses should be filled with replicas, regardless of whether
the cluster uses read_from_replica. This affects which nodes are queried
during slot refresh, or AllNodes operations.
  • Loading branch information
nihohit authored and barshaul committed Aug 6, 2023
1 parent dbc0c4e commit 2143290
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 89 deletions.
12 changes: 6 additions & 6 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ where
)));
for conn in samples.iter_mut() {
let value = conn.req_command(&slot_cmd())?;
match parse_slots(&value, self.cluster_params.tls).and_then(|v| {
build_slot_map(&mut new_slots, v, self.cluster_params.read_from_replicas)
}) {
match parse_slots(&value, self.cluster_params.tls)
.and_then(|v| build_slot_map(&mut new_slots, v))
{
Ok(_) => {
result = Ok(new_slots);
break;
Expand Down Expand Up @@ -330,7 +330,7 @@ where
route: &Route,
) -> RedisResult<(String, &'a mut C)> {
let slots = self.slots.borrow();
if let Some(addr) = slots.slot_addr_for_route(route) {
if let Some(addr) = slots.slot_addr_for_route(route, self.read_from_replicas) {
Ok((
addr.to_string(),
self.get_connection_by_addr(connections, addr)?,
Expand Down Expand Up @@ -362,7 +362,7 @@ where

let addr_for_slot = |route: Route| -> RedisResult<String> {
let slot_addr = slots
.slot_addr_for_route(&route)
.slot_addr_for_route(&route, self.read_from_replicas)
.ok_or((ErrorKind::ClusterDown, "Missing slot coverage"))?;
Ok(slot_addr.to_string())
};
Expand Down Expand Up @@ -415,7 +415,7 @@ where
let mut results = HashMap::new();

// TODO: reconnect and shit
let addresses = slots.addresses_for_multi_routing(&routing);
let addresses = slots.addresses_for_multi_routing(&routing, self.read_from_replicas);
for addr in addresses {
let addr = addr.to_string();
if let Some(connection) = connections.get_mut(&addr) {
Expand Down
9 changes: 6 additions & 3 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,6 @@ where
topology_values,
curr_retry,
inner.cluster_params.tls,
inner.cluster_params.read_from_replicas,
num_of_nodes_to_query,
)?;

Expand Down Expand Up @@ -716,7 +715,7 @@ where
let read_guard = core.conn_lock.read().await;
let (receivers, requests): (Vec<_>, Vec<_>) = read_guard
.1
.addresses_for_multi_routing(routing)
.addresses_for_multi_routing(routing, core.cluster_params.read_from_replicas)
.into_iter()
.enumerate()
.filter_map(|(index, addr)| {
Expand Down Expand Up @@ -875,7 +874,11 @@ where
Some(Redirect::Ask(ask_addr)) => Some(ask_addr),
None => route
.as_ref()
.and_then(|route| read_guard.1.slot_addr_for_route(route))
.and_then(|route| {
read_guard
.1
.slot_addr_for_route(route, core.cluster_params.read_from_replicas)
})
.map(|addr| addr.to_string()),
}
.map(|addr| {
Expand Down
93 changes: 42 additions & 51 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,14 +443,6 @@ impl Slot {
pub fn end(&self) -> u16 {
self.end
}

pub fn master(&self) -> &str {
&self.master
}

pub fn replicas(&self) -> &Vec<String> {
&self.replicas
}
}

/// What type of node should a request be routed to.
Expand All @@ -475,26 +467,22 @@ impl SlotAddrs {
Self([master_node, replica])
}

pub(crate) fn slot_addr(&self, slot_addr: &SlotAddr) -> &str {
match slot_addr {
SlotAddr::Master => &self.0[0],
SlotAddr::Replica => &self.0[1],
pub(crate) fn slot_addr(&self, slot_addr: SlotAddr, allow_replica: bool) -> &str {
if allow_replica && slot_addr == SlotAddr::Replica {
self.0[1].as_str()
} else {
self.0[0].as_str()
}
}

pub(crate) fn from_slot(slot: &Slot, read_from_replicas: bool) -> Self {
let replica = if !read_from_replicas || slot.replicas().is_empty() {
pub(crate) fn from_slot(slot: Slot) -> Self {
let replica = if slot.replicas.is_empty() {
None
} else {
Some(
slot.replicas()
.choose(&mut thread_rng())
.unwrap()
.to_string(),
)
Some(slot.replicas.choose(&mut thread_rng()).unwrap().to_string())
};

SlotAddrs::new(slot.master().to_string(), replica)
SlotAddrs::new(slot.master, replica)
}
}

Expand Down Expand Up @@ -522,8 +510,8 @@ impl Route {
self.0
}

pub(crate) fn slot_addr(&self) -> &SlotAddr {
&self.1
pub(crate) fn slot_addr(&self) -> SlotAddr {
self.1
}
}

Expand All @@ -539,11 +527,11 @@ mod tests {
parser::parse_redis_value,
};

fn from_slots(slots: &[Slot], read_from_replicas: bool) -> SlotMap {
fn from_slots(slots: Vec<Slot>) -> SlotMap {
SlotMap(
slots
.iter()
.map(|slot| (slot.end(), SlotAddrs::from_slot(slot, read_from_replicas)))
.into_iter()
.map(|slot| (slot.end(), SlotAddrs::from_slot(slot)))
.collect(),
)
}
Expand Down Expand Up @@ -815,68 +803,71 @@ mod tests {

#[test]
fn test_slot_map() {
let slot_map = from_slots(
&[
Slot {
start: 1,
end: 1000,
master: "node1:6379".to_owned(),
replicas: vec!["replica1:6379".to_owned()],
},
Slot {
start: 1001,
end: 2000,
master: "node2:6379".to_owned(),
replicas: vec!["replica2:6379".to_owned()],
},
],
true,
);
let slot_map = from_slots(vec![
Slot {
start: 1,
end: 1000,
master: "node1:6379".to_owned(),
replicas: vec!["replica1:6379".to_owned()],
},
Slot {
start: 1001,
end: 2000,
master: "node2:6379".to_owned(),
replicas: vec!["replica2:6379".to_owned()],
},
]);

assert_eq!(
"node1:6379",
slot_map
.slot_addr_for_route(&Route::new(1, SlotAddr::Master))
.slot_addr_for_route(&Route::new(1, SlotAddr::Master), false)
.unwrap()
);
assert_eq!(
"node1:6379",
slot_map
.slot_addr_for_route(&Route::new(500, SlotAddr::Master))
.slot_addr_for_route(&Route::new(500, SlotAddr::Master), false)
.unwrap()
);
assert_eq!(
"node1:6379",
slot_map
.slot_addr_for_route(&Route::new(1000, SlotAddr::Master))
.slot_addr_for_route(&Route::new(1000, SlotAddr::Master), false)
.unwrap()
);
assert_eq!(
"replica1:6379",
slot_map
.slot_addr_for_route(&Route::new(1000, SlotAddr::Replica))
.slot_addr_for_route(&Route::new(1000, SlotAddr::Replica), true)
.unwrap()
);
assert_eq!(
"node1:6379",
slot_map
.slot_addr_for_route(&Route::new(1000, SlotAddr::Replica), false)
.unwrap()
);
assert_eq!(
"node2:6379",
slot_map
.slot_addr_for_route(&Route::new(1001, SlotAddr::Master))
.slot_addr_for_route(&Route::new(1001, SlotAddr::Master), false)
.unwrap()
);
assert_eq!(
"node2:6379",
slot_map
.slot_addr_for_route(&Route::new(1500, SlotAddr::Master))
.slot_addr_for_route(&Route::new(1500, SlotAddr::Master), false)
.unwrap()
);
assert_eq!(
"node2:6379",
slot_map
.slot_addr_for_route(&Route::new(2000, SlotAddr::Master))
.slot_addr_for_route(&Route::new(2000, SlotAddr::Master), false)
.unwrap()
);
assert!(slot_map
.slot_addr_for_route(&Route::new(2001, SlotAddr::Master))
.slot_addr_for_route(&Route::new(2001, SlotAddr::Master), false)
.is_none());
}
}
Loading

0 comments on commit 2143290

Please sign in to comment.