diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index d9d7cf4..4cf479b 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -161,6 +161,12 @@ impl Client { let tx = message_tx_bg.clone(); let request_backoff_counter = request_backoff_counter.clone(); + // total timeout for a request + let task_timeout = request_timeout + .unwrap_or(Duration::from_secs(30)) + // buffer 5 seconds for the request to be processed + .saturating_add(Duration::from_secs(5)); + tokio::spawn(async move { match message { Message::Request { @@ -176,61 +182,71 @@ impl Client { return; } - let result = ws.request(&method, params.clone()).await; - match result { - result @ Ok(_) => { - request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed); - // make sure it's still connected - if response.is_closed() { - return; + if let Ok(result) = + tokio::time::timeout(task_timeout, ws.request(&method, params.clone())).await + { + match result { + result @ Ok(_) => { + request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed); + // make sure it's still connected + if response.is_closed() { + return; + } + let _ = response.send(result); } - let _ = response.send(result); - } - Err(err) => { - tracing::debug!("Request failed: {:?}", err); - match err { - Error::RequestTimeout - | Error::Transport(_) - | Error::RestartNeeded(_) - | Error::MaxSlotsExceeded => { - tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await; - - // make sure it's still connected - if response.is_closed() { - return; - } - - // make sure we still have retries left - if retries == 0 { - let _ = response.send(Err(Error::RequestTimeout)); - return; + Err(err) => { + tracing::debug!("Request failed: {:?}", err); + match err { + Error::RequestTimeout + | Error::Transport(_) + | Error::RestartNeeded(_) + | Error::MaxSlotsExceeded => { + tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await; + + // make sure it's still connected + if response.is_closed() { + return; + } + + // make sure we still have retries left + if retries == 0 { + let _ = response.send(Err(Error::RequestTimeout)); + return; + } + + if matches!(err, Error::RequestTimeout) { + tx.send(Message::RotateEndpoint) + .await + .expect("Failed to send rotate message"); + } + + tx.send(Message::Request { + method, + params, + response, + retries, + }) + .await + .expect("Failed to send request message"); } - - if matches!(err, Error::RequestTimeout) { - tx.send(Message::RotateEndpoint) - .await - .expect("Failed to send rotate message"); + err => { + // make sure it's still connected + if response.is_closed() { + return; + } + // not something we can handle, send it back to the caller + let _ = response.send(Err(err)); } - - tx.send(Message::Request { - method, - params, - response, - retries, - }) - .await - .expect("Failed to send request message"); - } - err => { - // make sure it's still connected - if response.is_closed() { - return; - } - // not something we can handle, send it back to the caller - let _ = response.send(Err(err)); } } } + } else { + tracing::error!("request timed out method: {} params: {:?}", method, params); + // make sure it's still connected + if response.is_closed() { + return; + } + let _ = response.send(Err(Error::RequestTimeout)); } } Message::Subscribe { @@ -242,62 +258,75 @@ impl Client { } => { retries = retries.saturating_sub(1); - let result = ws.subscribe(&subscribe, params.clone(), &unsubscribe).await; - match result { - result @ Ok(_) => { - request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed); - // make sure it's still connected - if response.is_closed() { - return; + if let Ok(result) = tokio::time::timeout( + task_timeout, + ws.subscribe(&subscribe, params.clone(), &unsubscribe), + ) + .await + { + match result { + result @ Ok(_) => { + request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed); + // make sure it's still connected + if response.is_closed() { + return; + } + let _ = response.send(result); } - let _ = response.send(result); - } - Err(err) => { - tracing::debug!("Subscribe failed: {:?}", err); - match err { - Error::RequestTimeout - | Error::Transport(_) - | Error::RestartNeeded(_) - | Error::MaxSlotsExceeded => { - tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await; - - // make sure it's still connected - if response.is_closed() { - return; + Err(err) => { + tracing::debug!("Subscribe failed: {:?}", err); + match err { + Error::RequestTimeout + | Error::Transport(_) + | Error::RestartNeeded(_) + | Error::MaxSlotsExceeded => { + tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await; + + // make sure it's still connected + if response.is_closed() { + return; + } + + // make sure we still have retries left + if retries == 0 { + let _ = response.send(Err(Error::RequestTimeout)); + return; + } + + if matches!(err, Error::RequestTimeout) { + tx.send(Message::RotateEndpoint) + .await + .expect("Failed to send rotate message"); + } + + tx.send(Message::Subscribe { + subscribe, + params, + unsubscribe, + response, + retries, + }) + .await + .expect("Failed to send subscribe message") } - - // make sure we still have retries left - if retries == 0 { - let _ = response.send(Err(Error::RequestTimeout)); - return; - } - - if matches!(err, Error::RequestTimeout) { - tx.send(Message::RotateEndpoint) - .await - .expect("Failed to send rotate message"); + err => { + // make sure it's still connected + if response.is_closed() { + return; + } + // not something we can handle, send it back to the caller + let _ = response.send(Err(err)); } - - tx.send(Message::Subscribe { - subscribe, - params, - unsubscribe, - response, - retries, - }) - .await - .expect("Failed to send subscribe message") - } - err => { - // make sure it's still connected - if response.is_closed() { - return; - } - // not something we can handle, send it back to the caller - let _ = response.send(Err(err)); } } } + } else { + tracing::error!("subscribe timed out subscribe: {} params: {:?}", subscribe, params); + // make sure it's still connected + if response.is_closed() { + return; + } + let _ = response.send(Err(Error::RequestTimeout)); } } Message::RotateEndpoint => {