Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not retry requests that have been dropped by the user. #189

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions redis/benches/bench_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ fn bench_simple_getsetdel_async(b: &mut Bencher) {
runtime
.block_on(async {
let key = "test_key";
redis::cmd("SET")
() = redis::cmd("SET")
.arg(key)
.arg(42)
.query_async(&mut con)
.await?;
let _: isize = redis::cmd("GET").arg(key).query_async(&mut con).await?;
redis::cmd("DEL").arg(key).query_async(&mut con).await?;
() = redis::cmd("DEL").arg(key).query_async(&mut con).await?;
Ok::<_, RedisError>(())
})
.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions redis/benches/bench_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ fn bench_cluster_async(
runtime
.block_on(async {
let key = "test_key";
redis::cmd("SET").arg(key).arg(42).query_async(con).await?;
() = redis::cmd("SET").arg(key).arg(42).query_async(con).await?;
let _: isize = redis::cmd("GET").arg(key).query_async(con).await?;
redis::cmd("DEL").arg(key).query_async(con).await?;
() = redis::cmd("DEL").arg(key).query_async(con).await?;

Ok::<_, RedisError>(())
})
Expand Down
4 changes: 3 additions & 1 deletion redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,9 @@ impl<C> Future for Request<C> {

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
if this.request.is_none() {
// If the sender is closed, the caller is no longer waiting for the reply, and it is ambiguous
// whether they expect the side-effect of the request to happen or not.
if this.request.is_none() || this.request.as_ref().unwrap().sender.is_closed() {
return Poll::Ready(Next::Done);
}
let future = match this.future.as_mut().project() {
Expand Down
2 changes: 1 addition & 1 deletion redis/tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ pub fn build_keys_and_certs_for_tls(tempdir: &TempDir) -> TlsFilePaths {
.arg("genrsa")
.arg("-out")
.arg(name)
.arg(&format!("{size}"))
.arg(format!("{size}"))
.stdout(process::Stdio::null())
.stderr(process::Stdio::null())
.spawn()
Expand Down
48 changes: 47 additions & 1 deletion redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod cluster_async {
};

use futures::prelude::*;
use futures_time::task::sleep;
use futures_time::{future::FutureExt, task::sleep};
use once_cell::sync::Lazy;
use std::ops::Add;

Expand Down Expand Up @@ -4142,6 +4142,52 @@ mod cluster_async {
.unwrap();
}

#[test]
fn test_async_cluster_do_not_retry_when_receiver_was_dropped() {
let name = "test_async_cluster_do_not_retry_when_receiver_was_dropped";
let cmd = cmd("FAKE_COMMAND");
let packed_cmd = cmd.get_packed_command();
let request_counter = Arc::new(AtomicU32::new(0));
let cloned_req_counter = request_counter.clone();
let MockEnv {
runtime,
async_connection: mut connection,
..
} = MockEnv::with_client_builder(
ClusterClient::builder(vec![&*format!("redis://{name}")])
.retries(5)
.max_retry_wait(2)
.min_retry_wait(2),
name,
move |received_cmd: &[u8], _| {
respond_startup(name, received_cmd)?;

if received_cmd == packed_cmd {
cloned_req_counter.fetch_add(1, Ordering::Relaxed);
return Err(Err((ErrorKind::TryAgain, "seriously, try again").into()));
}

Err(Ok(Value::Okay))
},
);

runtime.block_on(async move {
let err = cmd
.query_async::<_, Value>(&mut connection)
.timeout(futures_time::time::Duration::from_millis(1))
.await
.unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::TimedOut);

// we sleep here, to allow the cluster connection time to retry. We expect it won't, but without this
// sleep the test will complete before the the runtime gave the connection time to retry, which would've made the
// test pass regardless of whether the connection tries retrying or not.
sleep(Duration::from_millis(10).into()).await;
});

assert_eq!(request_counter.load(Ordering::Relaxed), 1);
}

#[cfg(feature = "tls-rustls")]
mod mtls_test {
use crate::support::mtls_test::create_cluster_client_from_cluster;
Expand Down