From 6b36279d6eb3a0e0325956431482bc8a5dcd9b99 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Mon, 9 Sep 2024 21:50:00 +0300 Subject: [PATCH 1/2] Do not retry requests that have been dropped by the user. https://github.com/valkey-io/valkey-glide/issues/2138 --- redis/src/cluster_async/mod.rs | 4 ++- redis/tests/test_cluster_async.rs | 48 ++++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 2225062b7..965a05cf8 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -835,7 +835,9 @@ impl Future for Request { fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll { let mut this = self.as_mut().project(); - if this.request.is_none() { + // If the sender is closed, the caller is no longer waiting for the reply, and it is ambiguous + // whether they expect the side-effect of the request to happen or not. + if this.request.is_none() || this.request.as_ref().unwrap().sender.is_closed() { return Poll::Ready(Next::Done); } let future = match this.future.as_mut().project() { diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 4d0883d47..b690ed87b 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -16,7 +16,7 @@ mod cluster_async { }; use futures::prelude::*; - use futures_time::task::sleep; + use futures_time::{future::FutureExt, task::sleep}; use once_cell::sync::Lazy; use std::ops::Add; @@ -4142,6 +4142,52 @@ mod cluster_async { .unwrap(); } + #[test] + fn test_async_cluster_do_not_retry_when_receiver_was_dropped() { + let name = "test_async_cluster_do_not_retry_when_receiver_was_dropped"; + let cmd = cmd("FAKE_COMMAND"); + let packed_cmd = cmd.get_packed_command(); + let request_counter = Arc::new(AtomicU32::new(0)); + let cloned_req_counter = request_counter.clone(); + let MockEnv { + runtime, + async_connection: mut connection, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(5) + .max_retry_wait(2) + .min_retry_wait(2), + name, + move |received_cmd: &[u8], _| { + respond_startup(name, received_cmd)?; + + if received_cmd == packed_cmd { + cloned_req_counter.fetch_add(1, Ordering::Relaxed); + return Err(Err((ErrorKind::TryAgain, "seriously, try again").into())); + } + + Err(Ok(Value::Okay)) + }, + ); + + runtime.block_on(async move { + let err = cmd + .query_async::<_, Value>(&mut connection) + .timeout(futures_time::time::Duration::from_millis(1)) + .await + .unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::TimedOut); + + // we sleep here, to allow the cluster connection time to retry. We expect it won't, but without this + // sleep the test will complete before the the runtime gave the connection time to retry, which would've made the + // test pass regardless of whether the connection tries retrying or not. + sleep(Duration::from_millis(10).into()).await; + }); + + assert_eq!(request_counter.load(Ordering::Relaxed), 1); + } + #[cfg(feature = "tls-rustls")] mod mtls_test { use crate::support::mtls_test::create_cluster_client_from_cluster; From 64ce5971673abb344c80e6474c077ab5f4ca8250 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Tue, 10 Sep 2024 20:49:27 +0300 Subject: [PATCH 2/2] fix lint --- redis/benches/bench_basic.rs | 4 ++-- redis/benches/bench_cluster_async.rs | 4 ++-- redis/tests/support/mod.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/redis/benches/bench_basic.rs b/redis/benches/bench_basic.rs index 638417b64..356f74217 100644 --- a/redis/benches/bench_basic.rs +++ b/redis/benches/bench_basic.rs @@ -28,13 +28,13 @@ fn bench_simple_getsetdel_async(b: &mut Bencher) { runtime .block_on(async { let key = "test_key"; - redis::cmd("SET") + () = redis::cmd("SET") .arg(key) .arg(42) .query_async(&mut con) .await?; let _: isize = redis::cmd("GET").arg(key).query_async(&mut con).await?; - redis::cmd("DEL").arg(key).query_async(&mut con).await?; + () = redis::cmd("DEL").arg(key).query_async(&mut con).await?; Ok::<_, RedisError>(()) }) .unwrap() diff --git a/redis/benches/bench_cluster_async.rs b/redis/benches/bench_cluster_async.rs index 347908f9e..28c3b83c8 100644 --- a/redis/benches/bench_cluster_async.rs +++ b/redis/benches/bench_cluster_async.rs @@ -21,9 +21,9 @@ fn bench_cluster_async( runtime .block_on(async { let key = "test_key"; - redis::cmd("SET").arg(key).arg(42).query_async(con).await?; + () = redis::cmd("SET").arg(key).arg(42).query_async(con).await?; let _: isize = redis::cmd("GET").arg(key).query_async(con).await?; - redis::cmd("DEL").arg(key).query_async(con).await?; + () = redis::cmd("DEL").arg(key).query_async(con).await?; Ok::<_, RedisError>(()) }) diff --git a/redis/tests/support/mod.rs b/redis/tests/support/mod.rs index 8169ee94b..335cd045d 100644 --- a/redis/tests/support/mod.rs +++ b/redis/tests/support/mod.rs @@ -653,7 +653,7 @@ pub fn build_keys_and_certs_for_tls(tempdir: &TempDir) -> TlsFilePaths { .arg("genrsa") .arg("-out") .arg(name) - .arg(&format!("{size}")) + .arg(format!("{size}")) .stdout(process::Stdio::null()) .stderr(process::Stdio::null()) .spawn()