Skip to content
This repository has been archived by the owner on Jul 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #21 from tsirysndr/feat/watch-dir-changes
Browse files Browse the repository at this point in the history
feat(superviseur): restart service on events in working directory
  • Loading branch information
tsirysndr authored Mar 19, 2023
2 parents 66dc7f7 + f20c38b commit d0683f5
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 3 deletions.
80 changes: 80 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ tower = "0.4.13"
actix-web = "4.3.1"
rust-embed = { version = "6.6.0", features = ["debug-embed", "actix"] }
mime_guess = "2.0.4"
open = "3.4.0"
open = "3.2.0"
async-graphql = "5.0.6"
async-graphql-actix-web = "5.0.6"
local-ip-addr = "0.1.1"
Expand All @@ -47,6 +47,7 @@ once_cell = "1.17.1"
actix-cors = "0.6.4"
sha256 = "1.1.2"
names = "0.14.0"
notify = "5.1.0"

[build-dependencies]
tonic-build = "0.8"
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod graphql;
pub mod server;
pub mod superviseur;
pub mod types;
pub mod watch;
pub mod webui;
pub mod api {
#[path = ""]
Expand Down
32 changes: 30 additions & 2 deletions src/server/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ use crate::{
},
},
graphql::{
self, schema::objects::subscriptions::{AllServicesStarted, AllServicesStopped, AllServicesRestarted}, simple_broker::SimpleBroker,
self,
schema::objects::subscriptions::{
AllServicesRestarted, AllServicesStarted, AllServicesStopped,
},
simple_broker::SimpleBroker,
},
superviseur::{ProcessEvent, Superviseur, SuperviseurCommand},
types::{
Expand Down Expand Up @@ -79,6 +83,17 @@ impl ControlService for Control {
match old_config.services.iter().find(|s| s.name == service.name) {
Some(old_service) => {
service.id = old_service.id.clone();

// rewacth the directory if working_dir changed
if old_service.working_dir != service.working_dir {
self.cmd_tx
.send(SuperviseurCommand::WatchForChanges(
service.working_dir.clone(),
service.clone(),
config.project.clone(),
))
.unwrap();
}
}
None => {
service.id = Some(generator.next().unwrap());
Expand All @@ -99,6 +114,20 @@ impl ControlService for Control {
.collect();

config_map.insert(path.clone(), config.clone());

let services = config.services.clone();
let project = config.project.clone();

for service in services.into_iter() {
self.cmd_tx
.send(SuperviseurCommand::WatchForChanges(
service.working_dir.clone(),
service,
project.clone(),
))
.unwrap();
}

self.cmd_tx
.send(SuperviseurCommand::LoadConfig(
config.clone(),
Expand Down Expand Up @@ -291,7 +320,6 @@ impl ControlService for Control {
.collect::<Vec<graphql::schema::objects::service::Service>>();
SimpleBroker::publish(AllServicesRestarted { payload: services });


Ok(Response::new(RestartResponse { success: true }))
}

Expand Down
21 changes: 21 additions & 0 deletions src/superviseur.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
configuration::{ConfigurationData, Service},
process::{Process, State},
},
watch::WatchForChanges,
};

#[derive(Clone)]
Expand Down Expand Up @@ -61,6 +62,7 @@ pub enum SuperviseurCommand {
Stop(Service, String),
Restart(Service, String),
LoadConfig(ConfigurationData, String),
WatchForChanges(String, Service, String),
}

#[derive(Debug)]
Expand Down Expand Up @@ -327,6 +329,22 @@ impl SuperviseurInternal {
self.handle_stop(service.clone(), project.clone(), true)
}

fn handle_watch_for_changes(
&mut self,
dir: String,
service: Service,
project: String,
) -> Result<(), Error> {
let superviseur_tx = self.cmd_tx.clone();
thread::spawn(move || {
let _watcher = WatchForChanges::new(dir, superviseur_tx, service, project.clone());
loop {
thread::sleep(Duration::from_secs(5));
}
});
Ok(())
}

fn handle_command(&mut self, cmd: SuperviseurCommand) -> Result<(), Error> {
match cmd {
SuperviseurCommand::Load(service, project) => self.handle_load(service, project),
Expand All @@ -336,6 +354,9 @@ impl SuperviseurInternal {
SuperviseurCommand::LoadConfig(config, project) => {
self.handle_load_config(config, project)
}
SuperviseurCommand::WatchForChanges(dir, service, project) => {
self.handle_watch_for_changes(dir, service, project)
}
}
}

Expand Down
116 changes: 116 additions & 0 deletions src/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use std::{
path::Path,
pin::Pin,
task::{Context, Poll},
thread,
time::Duration,
};

use futures_util::Future;
use notify::{Config, Error, Event, PollWatcher, RecommendedWatcher, Watcher, WatcherKind};
use tokio::sync::mpsc;

use crate::{superviseur::SuperviseurCommand, types::configuration::Service};

pub struct WatchForChanges {}

impl WatchForChanges {
pub fn new(
dir: String,
superviseur_tx: mpsc::UnboundedSender<SuperviseurCommand>,
service: Service,
project: String,
) -> Self {
thread::spawn(move || {
let internal = WatchForChangesInternal::new(&dir, superviseur_tx, service, project);
futures::executor::block_on(internal);
});
Self {}
}
}

struct WatchForChangesInternal {
cmd_rx: mpsc::UnboundedReceiver<notify::Result<notify::Event>>,
superviseur_tx: mpsc::UnboundedSender<SuperviseurCommand>,
service: Service,
project: String,
watcher: Box<dyn Watcher>,
}

impl WatchForChangesInternal {
pub fn new(
dir: &str,
superviseur_tx: mpsc::UnboundedSender<SuperviseurCommand>,
service: Service,
project: String,
) -> Self {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let mut watcher: Box<dyn Watcher> =
if RecommendedWatcher::kind() == WatcherKind::PollWatcher {
// custom config for PollWatcher kind
// you
let config = Config::default().with_poll_interval(Duration::from_secs(1));
Box::new(
PollWatcher::new(
move |result: Result<Event, Error>| {
cmd_tx.send(result).unwrap();
},
config,
)
.unwrap(),
)
} else {
// use default config for everything else
Box::new(
RecommendedWatcher::new(
move |result: Result<Event, Error>| {
cmd_tx.send(result).unwrap();
},
Config::default(),
)
.unwrap(),
)
};
watcher
.watch(Path::new(dir), notify::RecursiveMode::Recursive)
.unwrap();
Self {
cmd_rx,
service,
project,
superviseur_tx,
watcher,
}
}
}

impl Future for WatchForChangesInternal {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let event = match self.cmd_rx.poll_recv(cx) {
Poll::Ready(Some(event)) => Some(event),
Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down.
_ => None,
};

if event.is_none() {
return Poll::Pending;
}

match event {
Some(Ok(_)) => {
self.superviseur_tx
.send(SuperviseurCommand::Restart(
self.service.clone(),
self.project.clone(),
))
.unwrap();
}
Some(Err(e)) => println!("watch error: {:?}", e),
None => {}
}
}
}
}

0 comments on commit d0683f5

Please sign in to comment.