From 032ae58b6135927a59884491212aa0dd56db7b76 Mon Sep 17 00:00:00 2001 From: shohame Date: Sun, 28 Jul 2024 11:17:18 +0000 Subject: [PATCH 1/8] Handle PubSub commands routing --- redis/src/cluster.rs | 7 ++ redis/src/cluster_async/mod.rs | 5 ++ redis/src/cluster_routing.rs | 121 +++++++++++++++++++++++++++++---- 3 files changed, 121 insertions(+), 12 deletions(-) diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index 85582cae0..5c0702d85 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -661,6 +661,13 @@ where _ => crate::cluster_routing::combine_array_results(results), } } + Some(ResponsePolicy::CombineMaps) => { + let results = results + .into_iter() + .map(|res| res.map(|(_, val)| val)) + .collect::>>()?; + crate::cluster_routing::combine_map_results(results) + } Some(ResponsePolicy::Special) | None => { // This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user. // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes. diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 833fc01ce..0f8f20dff 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -1288,6 +1288,11 @@ where _ => crate::cluster_routing::combine_array_results(results), }) } + Some(ResponsePolicy::CombineMaps) => { + future::try_join_all(receivers.into_iter().map(get_receiver)) + .await + .and_then(|results| crate::cluster_routing::combine_map_results(results)) + } Some(ResponsePolicy::Special) | None => { // This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user. // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes. diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 73116c0bc..58e959349 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -48,6 +48,8 @@ pub enum ResponsePolicy { CombineArrays, /// Handling is not defined by the Redis standard. Will receive a special case Special, + /// Combines multiple map responses into a single map. + CombineMaps, } /// Defines whether a request should be routed to a single node, or multiple ones. @@ -187,8 +189,44 @@ pub fn logical_aggregate(values: Vec, op: LogicalAggregateOp) -> RedisRes .collect(), )) } +/// Aggregate array responses into a single map. +pub fn combine_map_results(values: Vec) -> RedisResult { + let mut map = HashMap::new(); -/// Aggreagte arrau responses into a single array. + for value in values { + match value { + Value::Array(elements) => { + let mut iter = elements.into_iter(); + + while let Some(key) = iter.next() { + if let Value::BulkString(key_bytes) = key { + if let Some(Value::Int(value)) = iter.next() { + let key_str = String::from_utf8_lossy(&key_bytes); + + *map.entry(key_str.to_string()).or_insert(0) += value; + } else { + return Err((ErrorKind::TypeError, "expected integer value").into()); + } + } else { + return Err((ErrorKind::TypeError, "expected string key").into()); + } + } + } + _ => { + return Err((ErrorKind::TypeError, "expected array of values as response").into()); + } + } + } + + let result_vec: Vec<(Value, Value)> = map + .into_iter() + .map(|(k, v)| (Value::BulkString(k.into_bytes()), Value::Int(v))) + .collect(); + + Ok(Value::Map(result_vec)) +} + +/// Aggregate array responses into a single array. pub fn combine_array_results(values: Vec) -> RedisResult { let mut results = Vec::new(); @@ -302,7 +340,9 @@ impl ResponsePolicy { b"SCRIPT EXISTS" => Some(ResponsePolicy::AggregateLogical(LogicalAggregateOp::And)), b"DBSIZE" | b"DEL" | b"EXISTS" | b"SLOWLOG LEN" | b"TOUCH" | b"UNLINK" - | b"LATENCY RESET" => Some(ResponsePolicy::Aggregate(AggregateOp::Sum)), + | b"LATENCY RESET" | b"PUBSUB NUMPAT" => { + Some(ResponsePolicy::Aggregate(AggregateOp::Sum)) + } b"WAIT" => Some(ResponsePolicy::Aggregate(AggregateOp::Min)), @@ -314,7 +354,10 @@ impl ResponsePolicy { Some(ResponsePolicy::AllSucceeded) } - b"KEYS" | b"MGET" | b"SLOWLOG GET" => Some(ResponsePolicy::CombineArrays), + b"KEYS" | b"MGET" | b"SLOWLOG GET" | b"PUBSUB CHANNELS" | b"PUBSUB SHARDCHANNELS" => { + Some(ResponsePolicy::CombineArrays) + } + b"PUBSUB NUMSUB" | b"PUBSUB SHARDNUMSUB" => Some(ResponsePolicy::CombineMaps), b"FUNCTION KILL" | b"SCRIPT KILL" => Some(ResponsePolicy::OneSucceeded), @@ -354,11 +397,30 @@ enum RouteBy { fn base_routing(cmd: &[u8]) -> RouteBy { match cmd { - b"ACL SETUSER" | b"ACL DELUSER" | b"ACL SAVE" | b"CLIENT SETNAME" | b"CLIENT SETINFO" - | b"SLOWLOG GET" | b"SLOWLOG LEN" | b"SLOWLOG RESET" | b"CONFIG SET" - | b"CONFIG RESETSTAT" | b"CONFIG REWRITE" | b"SCRIPT FLUSH" | b"SCRIPT LOAD" - | b"LATENCY RESET" | b"LATENCY GRAPH" | b"LATENCY HISTOGRAM" | b"LATENCY HISTORY" - | b"LATENCY DOCTOR" | b"LATENCY LATEST" => RouteBy::AllNodes, + b"ACL SETUSER" + | b"ACL DELUSER" + | b"ACL SAVE" + | b"CLIENT SETNAME" + | b"CLIENT SETINFO" + | b"SLOWLOG GET" + | b"SLOWLOG LEN" + | b"SLOWLOG RESET" + | b"CONFIG SET" + | b"CONFIG RESETSTAT" + | b"CONFIG REWRITE" + | b"SCRIPT FLUSH" + | b"SCRIPT LOAD" + | b"LATENCY RESET" + | b"LATENCY GRAPH" + | b"LATENCY HISTOGRAM" + | b"LATENCY HISTORY" + | b"LATENCY DOCTOR" + | b"LATENCY LATEST" + | b"PUBSUB NUMPAT" + | b"PUBSUB CHANNELS" + | b"PUBSUB NUMSUB" + | b"PUBSUB SHARDCHANNELS" + | b"PUBSUB SHARDNUMSUB" => RouteBy::AllNodes, b"DBSIZE" | b"FLUSHALL" @@ -463,10 +525,6 @@ fn base_routing(cmd: &[u8]) -> RouteBy { | b"MODULE LOAD" | b"MODULE LOADEX" | b"MODULE UNLOAD" - | b"PUBSUB CHANNELS" - | b"PUBSUB NUMPAT" - | b"PUBSUB NUMSUB" - | b"PUBSUB SHARDCHANNELS" | b"READONLY" | b"READWRITE" | b"SAVE" @@ -1233,4 +1291,43 @@ mod tests { ]) ); } + + #[test] + fn test_combine_map_results() { + let input = vec![]; + let result = combine_map_results(input).unwrap(); + assert_eq!(result, Value::Map(vec![])); + + let input = vec![ + Value::Array(vec![ + Value::BulkString(b"key1".to_vec()), + Value::Int(5), + Value::BulkString(b"key2".to_vec()), + Value::Int(10), + ]), + Value::Array(vec![ + Value::BulkString(b"key1".to_vec()), + Value::Int(3), + Value::BulkString(b"key3".to_vec()), + Value::Int(15), + ]), + ]; + let result = combine_map_results(input).unwrap(); + let mut expected = vec![ + (Value::BulkString(b"key1".to_vec()), Value::Int(8)), + (Value::BulkString(b"key2".to_vec()), Value::Int(10)), + (Value::BulkString(b"key3".to_vec()), Value::Int(15)), + ]; + expected.sort_by(|a, b| a.0.cmp(&b.0)); + let mut result_vec = match result { + Value::Map(v) => v, + _ => panic!("Expected Map"), + }; + result_vec.sort_by(|a, b| a.0.cmp(&b.0)); + assert_eq!(result_vec, expected); + + let input = vec![Value::Int(5)]; + let result = combine_map_results(input); + assert!(result.is_err()); + } } From b8c4d50453966217b4a39b2bdf40787ab71081f5 Mon Sep 17 00:00:00 2001 From: shohame Date: Sun, 28 Jul 2024 11:25:19 +0000 Subject: [PATCH 2/8] remove cmp --- redis/src/cluster_routing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 58e959349..28992aaf3 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1323,7 +1323,7 @@ mod tests { Value::Map(v) => v, _ => panic!("Expected Map"), }; - result_vec.sort_by(|a, b| a.0.cmp(&b.0)); + // result_vec.sort_by(|a, b| a.0.cmp(&b.0)); assert_eq!(result_vec, expected); let input = vec![Value::Int(5)]; From e2fb4bad910e17af3577422ae8051b87fb2c5213 Mon Sep 17 00:00:00 2001 From: shohame Date: Sun, 28 Jul 2024 11:27:57 +0000 Subject: [PATCH 3/8] remove cmp --- redis/src/cluster_routing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 28992aaf3..fbf4fd343 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1318,7 +1318,7 @@ mod tests { (Value::BulkString(b"key2".to_vec()), Value::Int(10)), (Value::BulkString(b"key3".to_vec()), Value::Int(15)), ]; - expected.sort_by(|a, b| a.0.cmp(&b.0)); + //expected.sort_by(|a, b| a.0.cmp(&b.0)); let mut result_vec = match result { Value::Map(v) => v, _ => panic!("Expected Map"), From 8fab4d0afe8c8bec655fb86f1065983fefa007d8 Mon Sep 17 00:00:00 2001 From: shohame Date: Sun, 28 Jul 2024 11:32:56 +0000 Subject: [PATCH 4/8] fix use of function --- redis/src/cluster_routing.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index fbf4fd343..1bfe798b5 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1295,7 +1295,7 @@ mod tests { #[test] fn test_combine_map_results() { let input = vec![]; - let result = combine_map_results(input).unwrap(); + let result = super::combine_map_results(input).unwrap(); assert_eq!(result, Value::Map(vec![])); let input = vec![ @@ -1312,7 +1312,7 @@ mod tests { Value::Int(15), ]), ]; - let result = combine_map_results(input).unwrap(); + let result = super::combine_map_results(input).unwrap(); let mut expected = vec![ (Value::BulkString(b"key1".to_vec()), Value::Int(8)), (Value::BulkString(b"key2".to_vec()), Value::Int(10)), @@ -1327,7 +1327,7 @@ mod tests { assert_eq!(result_vec, expected); let input = vec![Value::Int(5)]; - let result = combine_map_results(input); + let result = super::combine_map_results(input); assert!(result.is_err()); } } From 4f0ac148e5c56f389ba43ac6119d6c30c1497ace Mon Sep 17 00:00:00 2001 From: shohame Date: Sun, 28 Jul 2024 11:39:10 +0000 Subject: [PATCH 5/8] fix linter --- redis/src/cluster_routing.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 1bfe798b5..76bcc09ea 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1313,13 +1313,13 @@ mod tests { ]), ]; let result = super::combine_map_results(input).unwrap(); - let mut expected = vec![ + let expected = vec![ (Value::BulkString(b"key1".to_vec()), Value::Int(8)), (Value::BulkString(b"key2".to_vec()), Value::Int(10)), (Value::BulkString(b"key3".to_vec()), Value::Int(15)), ]; //expected.sort_by(|a, b| a.0.cmp(&b.0)); - let mut result_vec = match result { + let result_vec = match result { Value::Map(v) => v, _ => panic!("Expected Map"), }; From 26d2e459e19d521b930ca80c447c6916b8d75a01 Mon Sep 17 00:00:00 2001 From: shohame Date: Sun, 28 Jul 2024 11:48:20 +0000 Subject: [PATCH 6/8] new way to compare --- redis/src/cluster_routing.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 76bcc09ea..2207bdbe2 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1313,17 +1313,23 @@ mod tests { ]), ]; let result = super::combine_map_results(input).unwrap(); - let expected = vec![ + let mut expected = vec![ (Value::BulkString(b"key1".to_vec()), Value::Int(8)), (Value::BulkString(b"key2".to_vec()), Value::Int(10)), (Value::BulkString(b"key3".to_vec()), Value::Int(15)), ]; - //expected.sort_by(|a, b| a.0.cmp(&b.0)); - let result_vec = match result { + expected.sort_unstable_by(|a, b| match (&a.0, &b.0) { + (Value::BulkString(a_bytes), Value::BulkString(b_bytes)) => a_bytes.cmp(b_bytes), + _ => std::cmp::Ordering::Equal, + }); + let mut result_vec = match result { Value::Map(v) => v, _ => panic!("Expected Map"), }; - // result_vec.sort_by(|a, b| a.0.cmp(&b.0)); + result_vec.sort_unstable_by(|a, b| match (&a.0, &b.0) { + (Value::BulkString(a_bytes), Value::BulkString(b_bytes)) => a_bytes.cmp(b_bytes), + _ => std::cmp::Ordering::Equal, + }); assert_eq!(result_vec, expected); let input = vec![Value::Int(5)]; From 017d6cf36d2fa70d53466f52f1b375f8a9865bef Mon Sep 17 00:00:00 2001 From: shohame Date: Sun, 28 Jul 2024 12:38:53 +0000 Subject: [PATCH 7/8] linter fix --- redis/src/cluster_async/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 0f8f20dff..e0479d55a 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -1291,7 +1291,7 @@ where Some(ResponsePolicy::CombineMaps) => { future::try_join_all(receivers.into_iter().map(get_receiver)) .await - .and_then(|results| crate::cluster_routing::combine_map_results(results)) + .and_then(crate::cluster_routing::combine_map_results) } Some(ResponsePolicy::Special) | None => { // This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user. From 9495b97c13807bff555faa35b465a94c478bfde5 Mon Sep 17 00:00:00 2001 From: shohame Date: Mon, 29 Jul 2024 10:21:27 +0000 Subject: [PATCH 8/8] hash map of u8 Signed-off-by: shohame --- redis/src/cluster_routing.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 2207bdbe2..a37b875be 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -191,7 +191,7 @@ pub fn logical_aggregate(values: Vec, op: LogicalAggregateOp) -> RedisRes } /// Aggregate array responses into a single map. pub fn combine_map_results(values: Vec) -> RedisResult { - let mut map = HashMap::new(); + let mut map: HashMap, i64> = HashMap::new(); for value in values { match value { @@ -201,9 +201,7 @@ pub fn combine_map_results(values: Vec) -> RedisResult { while let Some(key) = iter.next() { if let Value::BulkString(key_bytes) = key { if let Some(Value::Int(value)) = iter.next() { - let key_str = String::from_utf8_lossy(&key_bytes); - - *map.entry(key_str.to_string()).or_insert(0) += value; + *map.entry(key_bytes).or_insert(0) += value; } else { return Err((ErrorKind::TypeError, "expected integer value").into()); } @@ -220,7 +218,7 @@ pub fn combine_map_results(values: Vec) -> RedisResult { let result_vec: Vec<(Value, Value)> = map .into_iter() - .map(|(k, v)| (Value::BulkString(k.into_bytes()), Value::Int(v))) + .map(|(k, v)| (Value::BulkString(k), Value::Int(v))) .collect(); Ok(Value::Map(result_vec))