From a6c29723fce157d6a20d82351699659b656506c9 Mon Sep 17 00:00:00 2001 From: Tsiry Sandratraina Date: Sun, 14 May 2023 02:04:15 +0300 Subject: [PATCH] fix(log_engine): wrap LogEngine under Mutex --- src/server/logging.rs | 7 ++++--- src/server/mod.rs | 2 +- src/superviseur/core.rs | 6 +++--- src/superviseur/dependencies.rs | 4 ++-- src/superviseur/drivers/docker/driver.rs | 10 ++++++---- src/superviseur/drivers/exec/driver.rs | 8 +++++--- src/superviseur/drivers/flox/driver.rs | 9 ++++++--- src/superviseur/drivers/nix/driver.rs | 8 +++++--- 8 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/server/logging.rs b/src/server/logging.rs index f1c5b92..9b81a1a 100644 --- a/src/server/logging.rs +++ b/src/server/logging.rs @@ -23,7 +23,7 @@ pub struct Logging { processes: Arc>>, config_map: Arc>>, project_map: Arc>>, - log_engine: LogEngine, + log_engine: Arc>, } impl Logging { @@ -32,7 +32,7 @@ impl Logging { processes: Arc>>, config_map: Arc>>, project_map: Arc>>, - log_engine: LogEngine, + log_engine: Arc>, ) -> Self { Self { superviseur, @@ -221,7 +221,8 @@ impl LoggingService for Logging { let config = config_map.get(&project_id).unwrap(); let query = format!("{} AND {} AND {}", config.project, service, term); - let result = self.log_engine.search_in_service(&query).map_err(|e| { + let log_engine = self.log_engine.lock().unwrap(); + let result = log_engine.search_in_service(&query).map_err(|e| { tonic::Status::internal(format!("Error searching in service: {}", e.to_string())) })?; let response = SearchResponse { diff --git a/src/server/mod.rs b/src/server/mod.rs index 496b205..50f03f3 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -45,7 +45,7 @@ pub async fn exec(port: u16, serve: bool) -> Result<(), Error> { let cmd_rx = Arc::new(Mutex::new(cmd_rx)); let service_graph = Arc::new(Mutex::new(vec![] as Vec<(DependencyGraph, String)>)); let service_map = Arc::new(Mutex::new(vec![] as Vec<(HashMap, String)>)); - let log_engine = LogEngine::new(); + let log_engine = Arc::new(Mutex::new(LogEngine::new())); let superviseur = Superviseur::new( cmd_rx, diff --git a/src/superviseur/core.rs b/src/superviseur/core.rs index 84bdba8..9d6937a 100644 --- a/src/superviseur/core.rs +++ b/src/superviseur/core.rs @@ -48,7 +48,7 @@ impl Superviseur { config_map: Arc>>, service_graph: Arc>>, service_map: Arc, String)>>>, - log_engine: LogEngine, + log_engine: Arc>, ) -> Self { let childs = Arc::new(Mutex::new(HashMap::new())); thread::spawn(move || { @@ -111,7 +111,7 @@ struct SuperviseurInternal { config_map: Arc>>, service_graph: Arc>>, service_map: Arc, String)>>>, - log_engine: LogEngine, + log_engine: Arc>, } impl SuperviseurInternal { @@ -125,7 +125,7 @@ impl SuperviseurInternal { config_map: Arc>>, service_graph: Arc>>, service_map: Arc, String)>>>, - log_engine: LogEngine, + log_engine: Arc>, ) -> Self { let config_map = Arc::new(Mutex::new( config_map diff --git a/src/superviseur/dependencies.rs b/src/superviseur/dependencies.rs index b7ae606..5f83ceb 100644 --- a/src/superviseur/dependencies.rs +++ b/src/superviseur/dependencies.rs @@ -103,7 +103,7 @@ pub enum GraphCommand { Arc>>, Arc>>, mpsc::UnboundedSender, - LogEngine, + Arc>, ), AddEdge(usize, usize), StartService(Service, bool), @@ -200,7 +200,7 @@ impl DependencyGraph { processes: Arc>>, childs: Arc>>, event_tx: mpsc::UnboundedSender, - log_engine: LogEngine, + log_engine: Arc>, ) -> usize { let mut vertex = Vertex::from(service); diff --git a/src/superviseur/drivers/docker/driver.rs b/src/superviseur/drivers/docker/driver.rs index f38ca86..a053923 100644 --- a/src/superviseur/drivers/docker/driver.rs +++ b/src/superviseur/drivers/docker/driver.rs @@ -44,7 +44,7 @@ pub struct Driver { processes: Arc>>, childs: Arc>>, event_tx: mpsc::UnboundedSender, - log_engine: logs::LogEngine, + log_engine: Arc>, config: Option, } @@ -59,7 +59,7 @@ impl Default for Driver { processes: Arc::new(Mutex::new(Vec::new())), childs: Arc::new(Mutex::new(HashMap::new())), event_tx, - log_engine: logs::LogEngine::new(), + log_engine: Arc::new(Mutex::new(logs::LogEngine::new())), config: None, } } @@ -73,7 +73,7 @@ impl Driver { processes: Arc>>, event_tx: mpsc::UnboundedSender, childs: Arc>>, - log_engine: LogEngine, + log_engine: Arc>, ) -> Self { let config = service .r#use @@ -465,7 +465,7 @@ impl DriverPlugin for Driver { pub async fn write_logs( service: Service, - log_engine: LogEngine, + log_engine: Arc>, project: String, mut stream: impl Stream> + Unpin, ) { @@ -499,6 +499,7 @@ pub async fn write_logs( .timestamp(), ), }; + let log_engine = log_engine.lock().unwrap(); match log_engine.insert(&log) { Ok(_) => {} Err(e) => { @@ -538,6 +539,7 @@ pub async fn write_logs( .timestamp(), ), }; + let log_engine = log_engine.lock().unwrap(); match log_engine.insert(&log) { Ok(_) => {} Err(e) => { diff --git a/src/superviseur/drivers/exec/driver.rs b/src/superviseur/drivers/exec/driver.rs index c78390c..d55aa56 100644 --- a/src/superviseur/drivers/exec/driver.rs +++ b/src/superviseur/drivers/exec/driver.rs @@ -36,7 +36,7 @@ pub struct Driver { processes: Arc>>, childs: Arc>>, event_tx: mpsc::UnboundedSender, - log_engine: logs::LogEngine, + log_engine: Arc>, } impl Default for Driver { @@ -48,7 +48,7 @@ impl Default for Driver { processes: Arc::new(Mutex::new(Vec::new())), childs: Arc::new(Mutex::new(HashMap::new())), event_tx, - log_engine: logs::LogEngine::new(), + log_engine: Arc::new(Mutex::new(logs::LogEngine::new())), } } } @@ -60,7 +60,7 @@ impl Driver { processes: Arc>>, event_tx: mpsc::UnboundedSender, childs: Arc>>, - log_engine: logs::LogEngine, + log_engine: Arc>, ) -> Self { Self { project, @@ -95,6 +95,7 @@ impl Driver { output: String::from("stdout"), date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()), }; + let log_engine = log_engine.lock().unwrap(); match log_engine.insert(&log) { Ok(_) => {} Err(e) => { @@ -128,6 +129,7 @@ impl Driver { output: String::from("stderr"), date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()), }; + let log_engine = log_engine.lock().unwrap(); match log_engine.insert(&log) { Ok(_) => {} Err(e) => { diff --git a/src/superviseur/drivers/flox/driver.rs b/src/superviseur/drivers/flox/driver.rs index 1468a61..6e3ad0d 100644 --- a/src/superviseur/drivers/flox/driver.rs +++ b/src/superviseur/drivers/flox/driver.rs @@ -40,7 +40,7 @@ pub struct Driver { processes: Arc>>, childs: Arc>>, event_tx: mpsc::UnboundedSender, - log_engine: LogEngine, + log_engine: Arc>, } impl Default for Driver { @@ -52,7 +52,7 @@ impl Default for Driver { processes: Arc::new(Mutex::new(Vec::new())), childs: Arc::new(Mutex::new(HashMap::new())), event_tx, - log_engine: LogEngine::new(), + log_engine: Arc::new(Mutex::new(LogEngine::new())), } } } @@ -64,7 +64,7 @@ impl Driver { processes: Arc>>, event_tx: mpsc::UnboundedSender, childs: Arc>>, - log_engine: LogEngine, + log_engine: Arc>, ) -> Self { Self { project, @@ -123,6 +123,8 @@ impl Driver { output: String::from("stdout"), date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()), }; + + let log_engine = log_engine.lock().unwrap(); match log_engine.insert(&log) { Ok(_) => {} Err(e) => { @@ -156,6 +158,7 @@ impl Driver { output: String::from("stderr"), date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()), }; + let log_engine = log_engine.lock().unwrap(); match log_engine.insert(&log) { Ok(_) => {} Err(e) => { diff --git a/src/superviseur/drivers/nix/driver.rs b/src/superviseur/drivers/nix/driver.rs index 83e3dbd..8facd97 100644 --- a/src/superviseur/drivers/nix/driver.rs +++ b/src/superviseur/drivers/nix/driver.rs @@ -36,7 +36,7 @@ pub struct Driver { processes: Arc>>, childs: Arc>>, event_tx: mpsc::UnboundedSender, - log_engine: LogEngine, + log_engine: Arc>, } impl Default for Driver { @@ -48,7 +48,7 @@ impl Default for Driver { processes: Arc::new(Mutex::new(Vec::new())), childs: Arc::new(Mutex::new(HashMap::new())), event_tx, - log_engine: LogEngine::new(), + log_engine: Arc::new(Mutex::new(LogEngine::new())), } } } @@ -60,7 +60,7 @@ impl Driver { processes: Arc>>, event_tx: mpsc::UnboundedSender, childs: Arc>>, - log_engine: LogEngine, + log_engine: Arc>, ) -> Self { Self { project, @@ -95,6 +95,7 @@ impl Driver { output: String::from("stdout"), date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()), }; + let log_engine = log_engine.lock().unwrap(); match log_engine.insert(&log) { Ok(_) => {} Err(e) => { @@ -128,6 +129,7 @@ impl Driver { output: String::from("stderr"), date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()), }; + let log_engine = log_engine.lock().unwrap(); match log_engine.insert(&log) { Ok(_) => {} Err(e) => {