Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
nihohit committed Aug 14, 2023
1 parent e83c173 commit 916cbc1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
27 changes: 14 additions & 13 deletions redis/src/cluster_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,22 @@ pub(crate) struct SlotMap {
read_from_replica: ReadFromReplicaStrategy,
}

fn get_address_from_slot<'a>(
slot: &'a SlotMapValue,
read_from_replica: &'a ReadFromReplicaStrategy,
fn get_address_from_slot(
slot: &SlotMapValue,
read_from_replica: ReadFromReplicaStrategy,
slot_addr: SlotAddr,
) -> &'a str {
if slot_addr == SlotAddr::Master || slot.replicas.is_empty() {
return slot.primary.as_str();
) -> &str {
if slot_addr == SlotAddr::Master || slot.addrs.replicas.is_empty() {
return slot.addrs.primary.as_str();
}
match read_from_replica {
ReadFromReplicaStrategy::AlwaysFromPrimary => slot.primary.as_str(),
ReadFromReplicaStrategy::AlwaysFromPrimary => slot.addrs.primary.as_str(),
ReadFromReplicaStrategy::RoundRobin => {
let index = s;
latest_read_replica_index.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
% slot.replicas.len();
slot.replicas[index].as_str()
let index = slot
.latest_used_replica
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
% slot.addrs.replicas.len();
slot.addrs.replicas[index].as_str()
}
}
}
Expand Down Expand Up @@ -132,8 +133,8 @@ impl SlotMap {
.and_then(|(end, slot_value)| {
if slot <= *end && slot_value.start <= slot {
Some(get_address_from_slot(
&slot_value.addrs,
&self.read_from_replica,
slot_value,
self.read_from_replica,
route.slot_addr(),
))
} else {
Expand Down
10 changes: 5 additions & 5 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1680,16 +1680,16 @@ fn test_async_cluster_round_robin_read_from_replica() {
.query_async::<_, ()>(&mut connection)
.await
.unwrap();
cmd("GET")
.arg("foo")
.query_async::<_, ()>(&mut connection)
.await
.unwrap();
cmd("GET")
.arg("bar")
.query_async::<_, ()>(&mut connection)
.await
.unwrap();
cmd("GET")
.arg("foo")
.query_async::<_, ()>(&mut connection)
.await
.unwrap();
cmd("GET")
.arg("bar")
.query_async::<_, ()>(&mut connection)
Expand Down

0 comments on commit 916cbc1

Please sign in to comment.