diff --git a/Cargo.lock b/Cargo.lock index 85d5715..0c27b95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1217,6 +1217,18 @@ dependencies = [ "instant", ] +[[package]] +name = "filetime" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a3de6e8d11b22ff9edc6d916f890800597d60f8b2da1caf2955c274638d6412" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "windows-sys 0.45.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -1248,6 +1260,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.26" @@ -1641,6 +1662,26 @@ dependencies = [ "serde", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" @@ -1693,6 +1734,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kqueue" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -1927,6 +1988,24 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "notify" +version = "5.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ea850aa68a06e48fdb069c0ec44d0d64c8dbffa49bf3b6f7f0a901fdea1ba9" +dependencies = [ + "bitflags", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "mio", + "walkdir", + "windows-sys 0.42.0", +] + [[package]] name = "ntapi" version = "0.4.0" @@ -2732,6 +2811,7 @@ dependencies = [ "mime_guess", "names", "nix", + "notify", "once_cell", "open", "owo-colors", diff --git a/Cargo.toml b/Cargo.toml index 2ddb58e..bd12872 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ tower = "0.4.13" actix-web = "4.3.1" rust-embed = { version = "6.6.0", features = ["debug-embed", "actix"] } mime_guess = "2.0.4" -open = "3.4.0" +open = "3.2.0" async-graphql = "5.0.6" async-graphql-actix-web = "5.0.6" local-ip-addr = "0.1.1" @@ -47,6 +47,7 @@ once_cell = "1.17.1" actix-cors = "0.6.4" sha256 = "1.1.2" names = "0.14.0" +notify = "5.1.0" [build-dependencies] tonic-build = "0.8" diff --git a/src/lib.rs b/src/lib.rs index 076e1bd..8cece97 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod graphql; pub mod server; pub mod superviseur; pub mod types; +pub mod watch; pub mod webui; pub mod api { #[path = ""] diff --git a/src/server/control.rs b/src/server/control.rs index efb425c..1c34a84 100644 --- a/src/server/control.rs +++ b/src/server/control.rs @@ -20,7 +20,11 @@ use crate::{ }, }, graphql::{ - self, schema::objects::subscriptions::{AllServicesStarted, AllServicesStopped, AllServicesRestarted}, simple_broker::SimpleBroker, + self, + schema::objects::subscriptions::{ + AllServicesRestarted, AllServicesStarted, AllServicesStopped, + }, + simple_broker::SimpleBroker, }, superviseur::{ProcessEvent, Superviseur, SuperviseurCommand}, types::{ @@ -79,6 +83,17 @@ impl ControlService for Control { match old_config.services.iter().find(|s| s.name == service.name) { Some(old_service) => { service.id = old_service.id.clone(); + + // rewacth the directory if working_dir changed + if old_service.working_dir != service.working_dir { + self.cmd_tx + .send(SuperviseurCommand::WatchForChanges( + service.working_dir.clone(), + service.clone(), + config.project.clone(), + )) + .unwrap(); + } } None => { service.id = Some(generator.next().unwrap()); @@ -99,6 +114,20 @@ impl ControlService for Control { .collect(); config_map.insert(path.clone(), config.clone()); + + let services = config.services.clone(); + let project = config.project.clone(); + + for service in services.into_iter() { + self.cmd_tx + .send(SuperviseurCommand::WatchForChanges( + service.working_dir.clone(), + service, + project.clone(), + )) + .unwrap(); + } + self.cmd_tx .send(SuperviseurCommand::LoadConfig( config.clone(), @@ -291,7 +320,6 @@ impl ControlService for Control { .collect::>(); SimpleBroker::publish(AllServicesRestarted { payload: services }); - Ok(Response::new(RestartResponse { success: true })) } diff --git a/src/superviseur.rs b/src/superviseur.rs index a45a652..5506a71 100644 --- a/src/superviseur.rs +++ b/src/superviseur.rs @@ -31,6 +31,7 @@ use crate::{ configuration::{ConfigurationData, Service}, process::{Process, State}, }, + watch::WatchForChanges, }; #[derive(Clone)] @@ -61,6 +62,7 @@ pub enum SuperviseurCommand { Stop(Service, String), Restart(Service, String), LoadConfig(ConfigurationData, String), + WatchForChanges(String, Service, String), } #[derive(Debug)] @@ -327,6 +329,22 @@ impl SuperviseurInternal { self.handle_stop(service.clone(), project.clone(), true) } + fn handle_watch_for_changes( + &mut self, + dir: String, + service: Service, + project: String, + ) -> Result<(), Error> { + let superviseur_tx = self.cmd_tx.clone(); + thread::spawn(move || { + let _watcher = WatchForChanges::new(dir, superviseur_tx, service, project.clone()); + loop { + thread::sleep(Duration::from_secs(5)); + } + }); + Ok(()) + } + fn handle_command(&mut self, cmd: SuperviseurCommand) -> Result<(), Error> { match cmd { SuperviseurCommand::Load(service, project) => self.handle_load(service, project), @@ -336,6 +354,9 @@ impl SuperviseurInternal { SuperviseurCommand::LoadConfig(config, project) => { self.handle_load_config(config, project) } + SuperviseurCommand::WatchForChanges(dir, service, project) => { + self.handle_watch_for_changes(dir, service, project) + } } } diff --git a/src/watch.rs b/src/watch.rs new file mode 100644 index 0000000..25b0341 --- /dev/null +++ b/src/watch.rs @@ -0,0 +1,116 @@ +use std::{ + path::Path, + pin::Pin, + task::{Context, Poll}, + thread, + time::Duration, +}; + +use futures_util::Future; +use notify::{Config, Error, Event, PollWatcher, RecommendedWatcher, Watcher, WatcherKind}; +use tokio::sync::mpsc; + +use crate::{superviseur::SuperviseurCommand, types::configuration::Service}; + +pub struct WatchForChanges {} + +impl WatchForChanges { + pub fn new( + dir: String, + superviseur_tx: mpsc::UnboundedSender, + service: Service, + project: String, + ) -> Self { + thread::spawn(move || { + let internal = WatchForChangesInternal::new(&dir, superviseur_tx, service, project); + futures::executor::block_on(internal); + }); + Self {} + } +} + +struct WatchForChangesInternal { + cmd_rx: mpsc::UnboundedReceiver>, + superviseur_tx: mpsc::UnboundedSender, + service: Service, + project: String, + watcher: Box, +} + +impl WatchForChangesInternal { + pub fn new( + dir: &str, + superviseur_tx: mpsc::UnboundedSender, + service: Service, + project: String, + ) -> Self { + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); + let mut watcher: Box = + if RecommendedWatcher::kind() == WatcherKind::PollWatcher { + // custom config for PollWatcher kind + // you + let config = Config::default().with_poll_interval(Duration::from_secs(1)); + Box::new( + PollWatcher::new( + move |result: Result| { + cmd_tx.send(result).unwrap(); + }, + config, + ) + .unwrap(), + ) + } else { + // use default config for everything else + Box::new( + RecommendedWatcher::new( + move |result: Result| { + cmd_tx.send(result).unwrap(); + }, + Config::default(), + ) + .unwrap(), + ) + }; + watcher + .watch(Path::new(dir), notify::RecursiveMode::Recursive) + .unwrap(); + Self { + cmd_rx, + service, + project, + superviseur_tx, + watcher, + } + } +} + +impl Future for WatchForChangesInternal { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let event = match self.cmd_rx.poll_recv(cx) { + Poll::Ready(Some(event)) => Some(event), + Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down. + _ => None, + }; + + if event.is_none() { + return Poll::Pending; + } + + match event { + Some(Ok(_)) => { + self.superviseur_tx + .send(SuperviseurCommand::Restart( + self.service.clone(), + self.project.clone(), + )) + .unwrap(); + } + Some(Err(e)) => println!("watch error: {:?}", e), + None => {} + } + } + } +}