Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task processing timeout #121

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 128 additions & 99 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ impl Client {
let tx = message_tx_bg.clone();
let request_backoff_counter = request_backoff_counter.clone();

// total timeout for a request
let task_timeout = request_timeout
.unwrap_or(Duration::from_secs(30))
// buffer 5 seconds for the request to be processed
.saturating_add(Duration::from_secs(5));

tokio::spawn(async move {
match message {
Message::Request {
Expand All @@ -176,61 +182,71 @@ impl Client {
return;
}

let result = ws.request(&method, params.clone()).await;
match result {
result @ Ok(_) => {
request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed);
// make sure it's still connected
if response.is_closed() {
return;
if let Ok(result) =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this background task code is slowing getting out of control... we should seek ways to refactor this gigantic handler to something more manageable later

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

tokio::time::timeout(task_timeout, ws.request(&method, params.clone())).await
{
match result {
result @ Ok(_) => {
request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed);
// make sure it's still connected
if response.is_closed() {
return;
}
let _ = response.send(result);
}
let _ = response.send(result);
}
Err(err) => {
tracing::debug!("Request failed: {:?}", err);
match err {
Error::RequestTimeout
| Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await;

// make sure it's still connected
if response.is_closed() {
return;
}

// make sure we still have retries left
if retries == 0 {
let _ = response.send(Err(Error::RequestTimeout));
return;
Err(err) => {
tracing::debug!("Request failed: {:?}", err);
match err {
Error::RequestTimeout
| Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await;

// make sure it's still connected
if response.is_closed() {
return;
}

// make sure we still have retries left
if retries == 0 {
let _ = response.send(Err(Error::RequestTimeout));
return;
}

if matches!(err, Error::RequestTimeout) {
tx.send(Message::RotateEndpoint)
.await
.expect("Failed to send rotate message");
}

tx.send(Message::Request {
method,
params,
response,
retries,
})
.await
.expect("Failed to send request message");
}

if matches!(err, Error::RequestTimeout) {
tx.send(Message::RotateEndpoint)
.await
.expect("Failed to send rotate message");
err => {
// make sure it's still connected
if response.is_closed() {
return;
}
// not something we can handle, send it back to the caller
let _ = response.send(Err(err));
}

tx.send(Message::Request {
method,
params,
response,
retries,
})
.await
.expect("Failed to send request message");
}
err => {
// make sure it's still connected
if response.is_closed() {
return;
}
// not something we can handle, send it back to the caller
let _ = response.send(Err(err));
}
}
}
} else {
tracing::error!("request timed out method: {} params: {:?}", method, params);
// make sure it's still connected
if response.is_closed() {
return;
}
let _ = response.send(Err(Error::RequestTimeout));
}
}
Message::Subscribe {
Expand All @@ -242,62 +258,75 @@ impl Client {
} => {
retries = retries.saturating_sub(1);

let result = ws.subscribe(&subscribe, params.clone(), &unsubscribe).await;
match result {
result @ Ok(_) => {
request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed);
// make sure it's still connected
if response.is_closed() {
return;
if let Ok(result) = tokio::time::timeout(
task_timeout,
ws.subscribe(&subscribe, params.clone(), &unsubscribe),
)
.await
{
match result {
result @ Ok(_) => {
request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed);
// make sure it's still connected
if response.is_closed() {
return;
}
let _ = response.send(result);
}
let _ = response.send(result);
}
Err(err) => {
tracing::debug!("Subscribe failed: {:?}", err);
match err {
Error::RequestTimeout
| Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await;

// make sure it's still connected
if response.is_closed() {
return;
Err(err) => {
tracing::debug!("Subscribe failed: {:?}", err);
match err {
Error::RequestTimeout
| Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await;

// make sure it's still connected
if response.is_closed() {
return;
}

// make sure we still have retries left
if retries == 0 {
let _ = response.send(Err(Error::RequestTimeout));
return;
}

if matches!(err, Error::RequestTimeout) {
tx.send(Message::RotateEndpoint)
.await
.expect("Failed to send rotate message");
}

tx.send(Message::Subscribe {
subscribe,
params,
unsubscribe,
response,
retries,
})
.await
.expect("Failed to send subscribe message")
}

// make sure we still have retries left
if retries == 0 {
let _ = response.send(Err(Error::RequestTimeout));
return;
}

if matches!(err, Error::RequestTimeout) {
tx.send(Message::RotateEndpoint)
.await
.expect("Failed to send rotate message");
err => {
// make sure it's still connected
if response.is_closed() {
return;
}
// not something we can handle, send it back to the caller
let _ = response.send(Err(err));
}

tx.send(Message::Subscribe {
subscribe,
params,
unsubscribe,
response,
retries,
})
.await
.expect("Failed to send subscribe message")
}
err => {
// make sure it's still connected
if response.is_closed() {
return;
}
// not something we can handle, send it back to the caller
let _ = response.send(Err(err));
}
}
}
} else {
tracing::error!("subscribe timed out subscribe: {} params: {:?}", subscribe, params);
// make sure it's still connected
if response.is_closed() {
return;
}
let _ = response.send(Err(Error::RequestTimeout));
}
}
Message::RotateEndpoint => {
Expand Down