Skip to content

Commit

Permalink
Add retries for reqwest connect errors
Browse files Browse the repository at this point in the history
  • Loading branch information
2e3s committed Nov 26, 2023
1 parent c6a5df5 commit 70001c3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/bundle/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use serde::{Deserialize, Serialize};
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::process::{Child, Command};

#[derive(Debug, Serialize, Deserialize, Default)]
struct Watchers {
Expand Down Expand Up @@ -43,7 +43,7 @@ impl ExternalWatcher {
}
debug!("Starting an external watcher {}", self.name());

let command = Command::new(&self.path).stdout(Stdio::null()).spawn();
let command = Command::new(&self.path).spawn();

match command {
Ok(handle) => {
Expand Down
49 changes: 43 additions & 6 deletions watchers/src/report_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use anyhow::Context;
use aw_client_rust::{AwClient, Event as AwEvent};
use chrono::{DateTime, Duration, Utc};
use serde_json::{Map, Value};
use std::future::Future;

pub struct ReportClient {
pub client: AwClient,
Expand Down Expand Up @@ -31,6 +32,32 @@ impl ReportClient {
})
}

async fn run_with_retries<F, Fut, T, E>(f: F) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::error::Error + Send + Sync + 'static,
{
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
let mut attempts = 0;
loop {
match f().await {
Ok(val) => return Ok(val),
Err(e)
if attempts < 3
&& e.to_string()
.contains("tcp connect error: Connection refused") =>
{
warn!("Failed to connect, retrying: {}", e);

attempts += 1;
interval.tick().await;
}
Err(e) => return Err(e),
}
}
}

pub async fn ping(
&self,
is_idle: bool,
Expand All @@ -55,8 +82,13 @@ impl ReportClient {
}

let pulsetime = (self.config.idle_timeout + self.config.poll_time_idle).as_secs_f64();
self.client
.heartbeat(&self.idle_bucket_name, &event, pulsetime)

let request = || {
self.client
.heartbeat(&self.idle_bucket_name, &event, pulsetime)
};

Self::run_with_retries(request)
.await
.with_context(|| "Failed to send heartbeat")
}
Expand Down Expand Up @@ -96,8 +128,12 @@ impl ReportClient {
}

let interval_margin = self.config.poll_time_window.as_secs_f64() + 1.0;
self.client
.heartbeat(&self.active_window_bucket_name, &event, interval_margin)
let request = || {
self.client
.heartbeat(&self.active_window_bucket_name, &event, interval_margin)
};

Self::run_with_retries(request)
.await
.with_context(|| "Failed to send heartbeat for active window")
}
Expand All @@ -107,8 +143,9 @@ impl ReportClient {
bucket_name: &str,
bucket_type: &str,
) -> anyhow::Result<()> {
client
.create_bucket_simple(bucket_name, bucket_type)
let request = || client.create_bucket_simple(bucket_name, bucket_type);

Self::run_with_retries(request)
.await
.with_context(|| format!("Failed to create bucket {bucket_name}"))
}
Expand Down

0 comments on commit 70001c3

Please sign in to comment.