Skip to content
This repository has been archived by the owner on Jul 2, 2024. It is now read-only.

Commit

Permalink
fix(log_engine): wrap LogEngine under Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
tsirysndr committed May 13, 2023
1 parent 5758c0b commit a6c2972
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 22 deletions.
7 changes: 4 additions & 3 deletions src/server/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct Logging {
processes: Arc<Mutex<Vec<(Process, String)>>>,
config_map: Arc<Mutex<HashMap<String, ConfigurationData>>>,
project_map: Arc<Mutex<HashMap<String, String>>>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
}

impl Logging {
Expand All @@ -32,7 +32,7 @@ impl Logging {
processes: Arc<Mutex<Vec<(Process, String)>>>,
config_map: Arc<Mutex<HashMap<String, ConfigurationData>>>,
project_map: Arc<Mutex<HashMap<String, String>>>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
) -> Self {
Self {
superviseur,
Expand Down Expand Up @@ -221,7 +221,8 @@ impl LoggingService for Logging {
let config = config_map.get(&project_id).unwrap();

let query = format!("{} AND {} AND {}", config.project, service, term);
let result = self.log_engine.search_in_service(&query).map_err(|e| {
let log_engine = self.log_engine.lock().unwrap();
let result = log_engine.search_in_service(&query).map_err(|e| {
tonic::Status::internal(format!("Error searching in service: {}", e.to_string()))
})?;
let response = SearchResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub async fn exec(port: u16, serve: bool) -> Result<(), Error> {
let cmd_rx = Arc::new(Mutex::new(cmd_rx));
let service_graph = Arc::new(Mutex::new(vec![] as Vec<(DependencyGraph, String)>));
let service_map = Arc::new(Mutex::new(vec![] as Vec<(HashMap<usize, Service>, String)>));
let log_engine = LogEngine::new();
let log_engine = Arc::new(Mutex::new(LogEngine::new()));

let superviseur = Superviseur::new(
cmd_rx,
Expand Down
6 changes: 3 additions & 3 deletions src/superviseur/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Superviseur {
config_map: Arc<Mutex<HashMap<String, ConfigurationData>>>,
service_graph: Arc<Mutex<Vec<(DependencyGraph, String)>>>,
service_map: Arc<Mutex<Vec<(HashMap<usize, Service>, String)>>>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
) -> Self {
let childs = Arc::new(Mutex::new(HashMap::new()));
thread::spawn(move || {
Expand Down Expand Up @@ -111,7 +111,7 @@ struct SuperviseurInternal {
config_map: Arc<Mutex<Vec<(ConfigurationData, String)>>>,
service_graph: Arc<Mutex<Vec<(DependencyGraph, String)>>>,
service_map: Arc<Mutex<Vec<(HashMap<usize, Service>, String)>>>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
}

impl SuperviseurInternal {
Expand All @@ -125,7 +125,7 @@ impl SuperviseurInternal {
config_map: Arc<Mutex<HashMap<String, ConfigurationData>>>,
service_graph: Arc<Mutex<Vec<(DependencyGraph, String)>>>,
service_map: Arc<Mutex<Vec<(HashMap<usize, Service>, String)>>>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
) -> Self {
let config_map = Arc::new(Mutex::new(
config_map
Expand Down
4 changes: 2 additions & 2 deletions src/superviseur/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum GraphCommand {
Arc<Mutex<Vec<(Process, String)>>>,
Arc<Mutex<HashMap<String, i32>>>,
mpsc::UnboundedSender<ProcessEvent>,
LogEngine,
Arc<Mutex<LogEngine>>,
),
AddEdge(usize, usize),
StartService(Service, bool),
Expand Down Expand Up @@ -200,7 +200,7 @@ impl DependencyGraph {
processes: Arc<Mutex<Vec<(Process, String)>>>,
childs: Arc<Mutex<HashMap<String, i32>>>,
event_tx: mpsc::UnboundedSender<ProcessEvent>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
) -> usize {
let mut vertex = Vertex::from(service);

Expand Down
10 changes: 6 additions & 4 deletions src/superviseur/drivers/docker/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct Driver {
processes: Arc<Mutex<Vec<(Process, String)>>>,
childs: Arc<Mutex<HashMap<String, i32>>>,
event_tx: mpsc::UnboundedSender<ProcessEvent>,
log_engine: logs::LogEngine,
log_engine: Arc<Mutex<logs::LogEngine>>,
config: Option<DriverConfig>,
}

Expand All @@ -59,7 +59,7 @@ impl Default for Driver {
processes: Arc::new(Mutex::new(Vec::new())),
childs: Arc::new(Mutex::new(HashMap::new())),
event_tx,
log_engine: logs::LogEngine::new(),
log_engine: Arc::new(Mutex::new(logs::LogEngine::new())),
config: None,
}
}
Expand All @@ -73,7 +73,7 @@ impl Driver {
processes: Arc<Mutex<Vec<(Process, String)>>>,
event_tx: mpsc::UnboundedSender<ProcessEvent>,
childs: Arc<Mutex<HashMap<String, i32>>>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
) -> Self {
let config = service
.r#use
Expand Down Expand Up @@ -465,7 +465,7 @@ impl DriverPlugin for Driver {

pub async fn write_logs(
service: Service,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
project: String,
mut stream: impl Stream<Item = Result<tty::TtyChunk, shiplift::Error>> + Unpin,
) {
Expand Down Expand Up @@ -499,6 +499,7 @@ pub async fn write_logs(
.timestamp(),
),
};
let log_engine = log_engine.lock().unwrap();
match log_engine.insert(&log) {
Ok(_) => {}
Err(e) => {
Expand Down Expand Up @@ -538,6 +539,7 @@ pub async fn write_logs(
.timestamp(),
),
};
let log_engine = log_engine.lock().unwrap();
match log_engine.insert(&log) {
Ok(_) => {}
Err(e) => {
Expand Down
8 changes: 5 additions & 3 deletions src/superviseur/drivers/exec/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Driver {
processes: Arc<Mutex<Vec<(Process, String)>>>,
childs: Arc<Mutex<HashMap<String, i32>>>,
event_tx: mpsc::UnboundedSender<ProcessEvent>,
log_engine: logs::LogEngine,
log_engine: Arc<Mutex<logs::LogEngine>>,
}

impl Default for Driver {
Expand All @@ -48,7 +48,7 @@ impl Default for Driver {
processes: Arc::new(Mutex::new(Vec::new())),
childs: Arc::new(Mutex::new(HashMap::new())),
event_tx,
log_engine: logs::LogEngine::new(),
log_engine: Arc::new(Mutex::new(logs::LogEngine::new())),
}
}
}
Expand All @@ -60,7 +60,7 @@ impl Driver {
processes: Arc<Mutex<Vec<(Process, String)>>>,
event_tx: mpsc::UnboundedSender<ProcessEvent>,
childs: Arc<Mutex<HashMap<String, i32>>>,
log_engine: logs::LogEngine,
log_engine: Arc<Mutex<logs::LogEngine>>,
) -> Self {
Self {
project,
Expand Down Expand Up @@ -95,6 +95,7 @@ impl Driver {
output: String::from("stdout"),
date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()),
};
let log_engine = log_engine.lock().unwrap();
match log_engine.insert(&log) {
Ok(_) => {}
Err(e) => {
Expand Down Expand Up @@ -128,6 +129,7 @@ impl Driver {
output: String::from("stderr"),
date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()),
};
let log_engine = log_engine.lock().unwrap();
match log_engine.insert(&log) {
Ok(_) => {}
Err(e) => {
Expand Down
9 changes: 6 additions & 3 deletions src/superviseur/drivers/flox/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Driver {
processes: Arc<Mutex<Vec<(Process, String)>>>,
childs: Arc<Mutex<HashMap<String, i32>>>,
event_tx: mpsc::UnboundedSender<ProcessEvent>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
}

impl Default for Driver {
Expand All @@ -52,7 +52,7 @@ impl Default for Driver {
processes: Arc::new(Mutex::new(Vec::new())),
childs: Arc::new(Mutex::new(HashMap::new())),
event_tx,
log_engine: LogEngine::new(),
log_engine: Arc::new(Mutex::new(LogEngine::new())),
}
}
}
Expand All @@ -64,7 +64,7 @@ impl Driver {
processes: Arc<Mutex<Vec<(Process, String)>>>,
event_tx: mpsc::UnboundedSender<ProcessEvent>,
childs: Arc<Mutex<HashMap<String, i32>>>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
) -> Self {
Self {
project,
Expand Down Expand Up @@ -123,6 +123,8 @@ impl Driver {
output: String::from("stdout"),
date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()),
};

let log_engine = log_engine.lock().unwrap();
match log_engine.insert(&log) {
Ok(_) => {}
Err(e) => {
Expand Down Expand Up @@ -156,6 +158,7 @@ impl Driver {
output: String::from("stderr"),
date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()),
};
let log_engine = log_engine.lock().unwrap();
match log_engine.insert(&log) {
Ok(_) => {}
Err(e) => {
Expand Down
8 changes: 5 additions & 3 deletions src/superviseur/drivers/nix/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Driver {
processes: Arc<Mutex<Vec<(Process, String)>>>,
childs: Arc<Mutex<HashMap<String, i32>>>,
event_tx: mpsc::UnboundedSender<ProcessEvent>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
}

impl Default for Driver {
Expand All @@ -48,7 +48,7 @@ impl Default for Driver {
processes: Arc::new(Mutex::new(Vec::new())),
childs: Arc::new(Mutex::new(HashMap::new())),
event_tx,
log_engine: LogEngine::new(),
log_engine: Arc::new(Mutex::new(LogEngine::new())),
}
}
}
Expand All @@ -60,7 +60,7 @@ impl Driver {
processes: Arc<Mutex<Vec<(Process, String)>>>,
event_tx: mpsc::UnboundedSender<ProcessEvent>,
childs: Arc<Mutex<HashMap<String, i32>>>,
log_engine: LogEngine,
log_engine: Arc<Mutex<LogEngine>>,
) -> Self {
Self {
project,
Expand Down Expand Up @@ -95,6 +95,7 @@ impl Driver {
output: String::from("stdout"),
date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()),
};
let log_engine = log_engine.lock().unwrap();
match log_engine.insert(&log) {
Ok(_) => {}
Err(e) => {
Expand Down Expand Up @@ -128,6 +129,7 @@ impl Driver {
output: String::from("stderr"),
date: tantivy::DateTime::from_timestamp_secs(chrono::Local::now().timestamp()),
};
let log_engine = log_engine.lock().unwrap();
match log_engine.insert(&log) {
Ok(_) => {}
Err(e) => {
Expand Down

0 comments on commit a6c2972

Please sign in to comment.