Skip to content

Commit

Permalink
feat: ping github and report connectivity
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Jun 4, 2024
1 parent decd9a8 commit b835a03
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 2 deletions.
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct WorkerHeartbeatRequest {
pub disk_free_space_bytes: i64,
pub worker_secret: String,
pub performance: Option<i64>,
pub internet_connectivity: Option<bool>
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE workers DROP COLUMN internet_connectivity;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Your SQL goes here
ALTER TABLE workers ADD internet_connectivity BOOLEAN NOT NULL DEFAULT FALSE;
2 changes: 2 additions & 0 deletions server/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub struct Worker {
pub disk_free_space_bytes: i64,
pub performance: Option<i64>,
pub visible: bool,
pub internet_connectivity: bool
}

#[derive(Insertable, AsChangeset)]
Expand All @@ -106,6 +107,7 @@ pub struct NewWorker {
pub last_heartbeat_time: chrono::DateTime<chrono::Utc>,
pub disk_free_space_bytes: i64,
pub performance: Option<i64>,
pub internet_connectivity: bool
}

#[derive(Queryable, Selectable)]
Expand Down
4 changes: 4 additions & 0 deletions server/src/routes/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct WorkerListResponseItem {
disk_free_space_bytes: i64,
is_live: bool,
last_heartbeat_time: DateTime<Utc>,
internet_connectivity: bool,
// status
running_job_id: Option<i32>,
running_job_assign_time: Option<chrono::DateTime<chrono::Utc>>,
Expand Down Expand Up @@ -115,6 +116,7 @@ pub async fn worker_list(
disk_free_space_bytes: worker.disk_free_space_bytes,
is_live: worker.last_heartbeat_time > deadline,
last_heartbeat_time: worker.last_heartbeat_time,
internet_connectivity: worker.internet_connectivity,
running_job_id: job.as_ref().map(|job| job.id),
running_job_assign_time: job.and_then(|job| job.assign_time),
});
Expand Down Expand Up @@ -156,6 +158,7 @@ pub async fn worker_heartbeat(
disk_free_space_bytes.eq(payload.disk_free_space_bytes),
last_heartbeat_time.eq(chrono::Utc::now()),
performance.eq(payload.performance),
internet_connectivity.eq(payload.internet_connectivity.unwrap_or(false)),
))
.execute(conn)?;
}
Expand All @@ -169,6 +172,7 @@ pub async fn worker_heartbeat(
disk_free_space_bytes: payload.disk_free_space_bytes,
last_heartbeat_time: chrono::Utc::now(),
performance: payload.performance,
internet_connectivity: payload.internet_connectivity.unwrap_or(false),
};
diesel::insert_into(crate::schema::workers::table)
.values(&new_worker)
Expand Down
8 changes: 7 additions & 1 deletion server/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,16 @@ diesel::table! {
disk_free_space_bytes -> Int8,
performance -> Nullable<Int8>,
visible -> Bool,
internet_connectivity -> Bool,
}
}

diesel::joinable!(jobs -> pipelines (pipeline_id));
diesel::joinable!(pipelines -> users (creator_user_id));

diesel::allow_tables_to_appear_in_same_query!(jobs, pipelines, users, workers,);
diesel::allow_tables_to_appear_in_same_query!(
jobs,
pipelines,
users,
workers,
);
27 changes: 26 additions & 1 deletion worker/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
use crate::Args;
use common::WorkerHeartbeatRequest;
use log::{info, warn};
use std::time::Duration;
use std::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};

static INTERNET_CONNECTIVITY: AtomicBool = AtomicBool::new(false);

pub async fn internet_connectivity_worker() -> ! {
info!("Starting internet connectivity worker");
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap();
loop {
let last = INTERNET_CONNECTIVITY.load(Ordering::SeqCst);
let next = client.get("https://github.com/").send().await.is_ok();
if last != next {
info!("Internet connectivity changed from {} to {}", last, next);
}
INTERNET_CONNECTIVITY.store(next, Ordering::SeqCst);

tokio::time::sleep(Duration::from_secs(60)).await;
}
}

pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> {
let client = reqwest::Client::builder()
Expand All @@ -21,6 +44,7 @@ pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> {
disk_free_space_bytes: fs2::free_space(std::env::current_dir()?)? as i64,
logical_cores: num_cpus::get() as i32,
performance: args.worker_performance,
internet_connectivity: Some(INTERNET_CONNECTIVITY.load(Ordering::SeqCst)),
})
.send()
.await?;
Expand All @@ -29,6 +53,7 @@ pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> {
}

pub async fn heartbeat_worker(args: Args) -> ! {
tokio::spawn(internet_connectivity_worker());
loop {
info!("Starting heartbeat worker");
if let Err(err) = heartbeat_worker_inner(&args).await {
Expand Down

0 comments on commit b835a03

Please sign in to comment.