diff --git a/src/bundle/modules.rs b/src/bundle/modules.rs index 56e97b0..90fa666 100644 --- a/src/bundle/modules.rs +++ b/src/bundle/modules.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; -use std::process::{Child, Command, Stdio}; +use std::process::{Child, Command}; #[derive(Debug, Serialize, Deserialize, Default)] struct Watchers { @@ -43,7 +43,7 @@ impl ExternalWatcher { } debug!("Starting an external watcher {}", self.name()); - let command = Command::new(&self.path).stdout(Stdio::null()).spawn(); + let command = Command::new(&self.path).spawn(); match command { Ok(handle) => { diff --git a/watchers/src/report_client.rs b/watchers/src/report_client.rs index 4016695..1d4a8e6 100644 --- a/watchers/src/report_client.rs +++ b/watchers/src/report_client.rs @@ -3,6 +3,7 @@ use anyhow::Context; use aw_client_rust::{AwClient, Event as AwEvent}; use chrono::{DateTime, Duration, Utc}; use serde_json::{Map, Value}; +use std::future::Future; pub struct ReportClient { pub client: AwClient, @@ -31,6 +32,32 @@ impl ReportClient { }) } + async fn run_with_retries(f: F) -> Result + where + F: Fn() -> Fut, + Fut: Future>, + E: std::error::Error + Send + Sync + 'static, + { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + let mut attempts = 0; + loop { + match f().await { + Ok(val) => return Ok(val), + Err(e) + if attempts < 3 + && e.to_string() + .contains("tcp connect error: Connection refused") => + { + warn!("Failed to connect, retrying: {}", e); + + attempts += 1; + interval.tick().await; + } + Err(e) => return Err(e), + } + } + } + pub async fn ping( &self, is_idle: bool, @@ -55,8 +82,13 @@ impl ReportClient { } let pulsetime = (self.config.idle_timeout + self.config.poll_time_idle).as_secs_f64(); - self.client - .heartbeat(&self.idle_bucket_name, &event, pulsetime) + + let request = || { + self.client + .heartbeat(&self.idle_bucket_name, &event, pulsetime) + }; + + Self::run_with_retries(request) .await .with_context(|| "Failed to send heartbeat") } @@ -96,8 +128,12 @@ impl ReportClient { } let interval_margin = self.config.poll_time_window.as_secs_f64() + 1.0; - self.client - .heartbeat(&self.active_window_bucket_name, &event, interval_margin) + let request = || { + self.client + .heartbeat(&self.active_window_bucket_name, &event, interval_margin) + }; + + Self::run_with_retries(request) .await .with_context(|| "Failed to send heartbeat for active window") } @@ -107,8 +143,9 @@ impl ReportClient { bucket_name: &str, bucket_type: &str, ) -> anyhow::Result<()> { - client - .create_bucket_simple(bucket_name, bucket_type) + let request = || client.create_bucket_simple(bucket_name, bucket_type); + + Self::run_with_retries(request) .await .with_context(|| format!("Failed to create bucket {bucket_name}")) }