Skip to content

Commit

Permalink
added binary support
Browse files Browse the repository at this point in the history
  • Loading branch information
avifenesh committed Jul 7, 2024
1 parent 30350e6 commit 25060da
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 39 deletions.
74 changes: 67 additions & 7 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
cluster_topology::SLOT_SIZE,
cmd,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
FromRedisValue, InfoDict,
FromRedisValue, InfoDict, ToRedisArgs,
};

use crate::{
Expand Down Expand Up @@ -135,16 +135,16 @@ where
})
}

// Special handling for `SCAN` command, using cluster_scan
/// Special handling for `SCAN` command, using cluster_scan.
/// If you wish to use a match pattern, use [`cluster_scan_with_pattern`].
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `match_pattern` - An optional match pattern of requested keys.
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
Expand All @@ -170,7 +170,7 @@ where
/// let mut keys: Vec<String> = vec![];
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan(scan_state_rc, None, None, None).await.unwrap();
/// connection.cluster_scan(scan_state_rc, None, None).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
Expand All @@ -187,13 +187,73 @@ where
pub async fn cluster_scan(
&mut self,
scan_state_rc: ScanStateRC,
match_pattern: Option<&str>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type);
self.route_cluster_scan(cluster_scan_args).await
}

/// Special handling for `SCAN` command, using cluster_scan_with_pattern.
/// It is a special case of [`cluster_scan`], with an additional match pattern.
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `match_pattern` - A match pattern of requested keys.
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
///
/// # Returns
///
/// A [`ScanStateRC`] for the updated state of the scan and the vector of keys that were found in the scan.
/// structure of returned value:
/// `Ok((ScanStateRC, Vec<Value>))`
///
/// When the scan is finished [`ScanStateRC`] will be None, and can be checked by calling `scan_state_wrapper.is_finished()`.
///
/// # Example
/// ```rust,no_run
/// use redis::cluster::ClusterClient;
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
///
/// async fn scan_all_cluster() -> Vec<String> {
/// let nodes = vec!["redis://127.0.0.1/"];
/// let client = ClusterClient::new(nodes).unwrap();
/// let mut connection = client.get_async_connection(None).await.unwrap();
/// let mut scan_state_rc = ScanStateRC::new();
/// let mut keys: Vec<String> = vec![];
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan_with_pattern(scan_state_rc, None, b"my_key", None).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
/// .map(|v| from_redis_value(&v).unwrap())
/// .collect::<Vec<String>>(); // Change the type of `keys` to `Vec<String>`
/// keys.append(&mut scan_keys);
/// if scan_state_rc.is_finished() {
/// break;
/// }
/// }
/// keys
/// }
/// ```
pub async fn cluster_scan_with_pattern<K: ToRedisArgs>(
&mut self,
scan_state_rc: ScanStateRC,
match_pattern: K,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(
scan_state_rc,
match_pattern.map(|s| s.to_string()),
Some(match_pattern.to_redis_args().concat()),
count,
object_type,
);
Expand Down
16 changes: 8 additions & 8 deletions redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::cluster_async::{
};
use crate::cluster_routing::SlotAddr;
use crate::cluster_topology::SLOT_SIZE;
use crate::{cmd, from_redis_value, Cmd, ErrorKind, RedisError, RedisResult, Value};
use crate::{cmd, from_redis_value, Cmd, ErrorKind, RedisError, RedisResult, ToRedisArgs, Value};
use async_trait::async_trait;
use std::sync::Arc;
use strum_macros::Display;
Expand All @@ -31,10 +31,10 @@ const BITS_ARRAY_SIZE: usize = NUM_OF_SLOTS / BITS_PER_U64;
const END_OF_SCAN: u16 = NUM_OF_SLOTS as u16 + 1;
type SlotsBitsArray = [u64; BITS_ARRAY_SIZE];

#[derive(Debug, Clone)]
#[derive(Clone)]
pub(crate) struct ClusterScanArgs {
pub(crate) scan_state_cursor: ScanStateRC,
match_pattern: Option<String>,
match_pattern: Option<Vec<u8>>,
count: Option<usize>,
object_type: Option<ObjectType>,
}
Expand All @@ -59,7 +59,7 @@ pub enum ObjectType {
impl ClusterScanArgs {
pub(crate) fn new(
scan_state_cursor: ScanStateRC,
match_pattern: Option<String>,
match_pattern: Option<Vec<u8>>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> Self {
Expand Down Expand Up @@ -484,10 +484,10 @@ where
}

// Send the scan command to the address in the scan_state
async fn send_scan<C>(
async fn send_scan<C, K: ToRedisArgs + Clone>(
scan_state: &ScanState,
core: &C,
match_pattern: Option<String>,
match_pattern: Option<K>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<Value>
Expand Down Expand Up @@ -515,10 +515,10 @@ where
// if all slots are not covered by the cluster we will return an error indicating that the cluster is not well configured.
// if all slots are covered by cluster but we failed to get a new address to scan we will return an error indicating that we failed to get a new address to scan.
// if we got a new address to scan but the scan command failed to route to the address we will return an error indicating that we failed to route the command.
async fn retry_scan<C>(
async fn retry_scan<C, K: ToRedisArgs + Clone>(
scan_state: &ScanState,
core: &C,
match_pattern: Option<String>,
match_pattern: Option<K>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<(RedisResult<Value>, ScanState)>
Expand Down
43 changes: 19 additions & 24 deletions redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, None)
.cluster_scan(scan_state_rc, None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -112,7 +112,7 @@ mod test_cluster_scan_async {
loop {
count += 1;
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, None)
.cluster_scan(scan_state_rc, None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -183,9 +183,8 @@ mod test_cluster_scan_async {
let mut result: RedisResult<Value> = Ok(Value::Nil);
loop {
count += 1;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
let (next_cursor, scan_keys) = match scan_response {
Ok((cursor, keys)) => (cursor, keys),
Err(e) => {
Expand Down Expand Up @@ -256,9 +255,8 @@ mod test_cluster_scan_async {
let mut count = 0;
loop {
count += 1;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
Expand Down Expand Up @@ -427,9 +425,8 @@ mod test_cluster_scan_async {
let mut count = 0;
loop {
count += 1;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
Expand Down Expand Up @@ -497,7 +494,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, None)
.cluster_scan(scan_state_rc, None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -557,7 +554,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, None)
.cluster_scan(scan_state_rc, None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -625,7 +622,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, Some("key:pattern:*"), None, None)
.cluster_scan_with_pattern(scan_state_rc, "key:pattern:*", None, None)
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -683,7 +680,7 @@ mod test_cluster_scan_async {
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, Some(ObjectType::Set))
.cluster_scan(scan_state_rc, None, Some(ObjectType::Set))
.await
.unwrap();
scan_state_rc = next_cursor;
Expand Down Expand Up @@ -736,11 +733,11 @@ mod test_cluster_scan_async {
let mut comparing_times = 0;
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc.clone(), None, Some(100), None)
.cluster_scan(scan_state_rc.clone(), Some(100), None)
.await
.unwrap();
let (_, scan_without_count_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, Some(100), None)
.cluster_scan(scan_state_rc, Some(100), None)
.await
.unwrap();
if !scan_keys.is_empty() && !scan_without_count_keys.is_empty() {
Expand Down Expand Up @@ -795,9 +792,8 @@ mod test_cluster_scan_async {
let mut count = 0;
loop {
count += 1;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
Expand All @@ -810,7 +806,7 @@ mod test_cluster_scan_async {
if count == 5 {
drop(cluster);
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc.clone(), None, None, None)
.cluster_scan(scan_state_rc.clone(), None, None)
.await;
assert!(scan_response.is_err());
break;
Expand All @@ -819,9 +815,8 @@ mod test_cluster_scan_async {
cluster = TestClusterContext::new(3, 0);
connection = cluster.async_connection(None).await;
loop {
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
connection.cluster_scan(scan_state_rc, None, None).await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
Expand Down

0 comments on commit 25060da

Please sign in to comment.