diff --git a/common/src/lib.rs b/common/src/lib.rs index 2ddd481..1e528b2 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -28,6 +28,7 @@ pub struct WorkerHeartbeatRequest { pub disk_free_space_bytes: i64, pub worker_secret: String, pub performance: Option, + pub internet_connectivity: Option } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/server/migrations/2024-06-04-020234_add_internet_connectivity_to_workers/down.sql b/server/migrations/2024-06-04-020234_add_internet_connectivity_to_workers/down.sql new file mode 100644 index 0000000..2dab5da --- /dev/null +++ b/server/migrations/2024-06-04-020234_add_internet_connectivity_to_workers/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE workers DROP COLUMN internet_connectivity; diff --git a/server/migrations/2024-06-04-020234_add_internet_connectivity_to_workers/up.sql b/server/migrations/2024-06-04-020234_add_internet_connectivity_to_workers/up.sql new file mode 100644 index 0000000..9c5923a --- /dev/null +++ b/server/migrations/2024-06-04-020234_add_internet_connectivity_to_workers/up.sql @@ -0,0 +1,2 @@ +-- Your SQL goes here +ALTER TABLE workers ADD internet_connectivity BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/server/src/models.rs b/server/src/models.rs index cf11537..dbfda24 100644 --- a/server/src/models.rs +++ b/server/src/models.rs @@ -92,6 +92,7 @@ pub struct Worker { pub disk_free_space_bytes: i64, pub performance: Option, pub visible: bool, + pub internet_connectivity: bool } #[derive(Insertable, AsChangeset)] @@ -106,6 +107,7 @@ pub struct NewWorker { pub last_heartbeat_time: chrono::DateTime, pub disk_free_space_bytes: i64, pub performance: Option, + pub internet_connectivity: bool } #[derive(Queryable, Selectable)] diff --git a/server/src/routes/worker.rs b/server/src/routes/worker.rs index 073ddb9..5f96ca5 100644 --- a/server/src/routes/worker.rs +++ b/server/src/routes/worker.rs @@ -47,6 +47,7 @@ pub struct WorkerListResponseItem { disk_free_space_bytes: i64, is_live: bool, last_heartbeat_time: DateTime, + internet_connectivity: bool, // status running_job_id: Option, running_job_assign_time: Option>, @@ -115,6 +116,7 @@ pub async fn worker_list( disk_free_space_bytes: worker.disk_free_space_bytes, is_live: worker.last_heartbeat_time > deadline, last_heartbeat_time: worker.last_heartbeat_time, + internet_connectivity: worker.internet_connectivity, running_job_id: job.as_ref().map(|job| job.id), running_job_assign_time: job.and_then(|job| job.assign_time), }); @@ -156,6 +158,7 @@ pub async fn worker_heartbeat( disk_free_space_bytes.eq(payload.disk_free_space_bytes), last_heartbeat_time.eq(chrono::Utc::now()), performance.eq(payload.performance), + internet_connectivity.eq(payload.internet_connectivity.unwrap_or(false)), )) .execute(conn)?; } @@ -169,6 +172,7 @@ pub async fn worker_heartbeat( disk_free_space_bytes: payload.disk_free_space_bytes, last_heartbeat_time: chrono::Utc::now(), performance: payload.performance, + internet_connectivity: payload.internet_connectivity.unwrap_or(false), }; diesel::insert_into(crate::schema::workers::table) .values(&new_worker) diff --git a/server/src/schema.rs b/server/src/schema.rs index 50c24c5..67f0144 100644 --- a/server/src/schema.rs +++ b/server/src/schema.rs @@ -67,10 +67,16 @@ diesel::table! { disk_free_space_bytes -> Int8, performance -> Nullable, visible -> Bool, + internet_connectivity -> Bool, } } diesel::joinable!(jobs -> pipelines (pipeline_id)); diesel::joinable!(pipelines -> users (creator_user_id)); -diesel::allow_tables_to_appear_in_same_query!(jobs, pipelines, users, workers,); +diesel::allow_tables_to_appear_in_same_query!( + jobs, + pipelines, + users, + workers, +); diff --git a/worker/src/heartbeat.rs b/worker/src/heartbeat.rs index f2392ab..f7628dc 100644 --- a/worker/src/heartbeat.rs +++ b/worker/src/heartbeat.rs @@ -1,7 +1,30 @@ use crate::Args; use common::WorkerHeartbeatRequest; use log::{info, warn}; -use std::time::Duration; +use std::{ + sync::atomic::{AtomicBool, Ordering}, + time::Duration, +}; + +static INTERNET_CONNECTIVITY: AtomicBool = AtomicBool::new(false); + +pub async fn internet_connectivity_worker() -> ! { + info!("Starting internet connectivity worker"); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .unwrap(); + loop { + let last = INTERNET_CONNECTIVITY.load(Ordering::SeqCst); + let next = client.get("https://github.com/").send().await.is_ok(); + if last != next { + info!("Internet connectivity changed from {} to {}", last, next); + } + INTERNET_CONNECTIVITY.store(next, Ordering::SeqCst); + + tokio::time::sleep(Duration::from_secs(60)).await; + } +} pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> { let client = reqwest::Client::builder() @@ -21,6 +44,7 @@ pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> { disk_free_space_bytes: fs2::free_space(std::env::current_dir()?)? as i64, logical_cores: num_cpus::get() as i32, performance: args.worker_performance, + internet_connectivity: Some(INTERNET_CONNECTIVITY.load(Ordering::SeqCst)), }) .send() .await?; @@ -29,6 +53,7 @@ pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> { } pub async fn heartbeat_worker(args: Args) -> ! { + tokio::spawn(internet_connectivity_worker()); loop { info!("Starting heartbeat worker"); if let Err(err) = heartbeat_worker_inner(&args).await {