diff --git a/Cargo.lock b/Cargo.lock index 3589e12..28fd4fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4675,6 +4675,7 @@ dependencies = [ "dotenv", "env_logger", "fs2", + "futures-channel", "futures-util", "gethostname", "log", diff --git a/worker/Cargo.toml b/worker/Cargo.toml index fd10662..e1fa733 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -22,6 +22,7 @@ sysinfo = "0.30.5" tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "fs"] } tokio-tungstenite = "0.23.1" futures-util = "0.3.30" +futures-channel = "0.3.30" [build-dependencies] vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] } diff --git a/worker/src/build.rs b/worker/src/build.rs index e5eee62..8b32465 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -2,6 +2,7 @@ use crate::{get_memory_bytes, Args}; use anyhow::Context; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; +use futures_channel::mpsc::{unbounded, UnboundedSender}; use futures_util::{SinkExt, StreamExt}; use log::{error, info, warn}; use std::{ @@ -13,18 +14,16 @@ use tokio::{ fs, io::{AsyncBufReadExt, BufReader}, process::Command, - sync::mpsc::{unbounded_channel, UnboundedSender}, time::sleep, }; use tokio_tungstenite::{connect_async, tungstenite::Message}; - async fn get_output_logged( cmd: &str, args: &[&str], cwd: &Path, logs: &mut Vec, - tx: UnboundedSender, + mut tx: UnboundedSender, ) -> anyhow::Result { let begin = Instant::now(); let msg = format!( @@ -50,13 +49,13 @@ async fn get_output_logged( let mut stdout_reader = BufReader::new(stdout).lines(); let stderr = output.stderr.take().context("Failed to get stderr")?; - let txc = tx.clone(); + let mut txc = tx.clone(); let stderr_task = tokio::spawn(async move { let mut res = vec![]; let mut stderr_reader = BufReader::new(stderr).lines(); while let Ok(Some(v)) = stderr_reader.next_line().await { - let _ = txc.send(v.clone()); + let _ = txc.send(Message::Text(v.clone())).await; res.push(v); } @@ -65,7 +64,7 @@ async fn get_output_logged( let mut stdout_out = vec![]; while let Ok(Some(v)) = stdout_reader.next_line().await { - tx.send(v.clone())?; + tx.send(Message::Text(v.clone())).await?; stdout_out.push(v); } @@ -96,7 +95,7 @@ async fn run_logged_with_retry( args: &[&str], cwd: &Path, logs: &mut Vec, - tx: UnboundedSender, + tx: UnboundedSender, ) -> anyhow::Result { for i in 0..5 { if i > 0 { @@ -129,7 +128,7 @@ async fn build( job: &WorkerPollResponse, tree_path: &Path, args: &Args, - tx: UnboundedSender, + tx: UnboundedSender, ) -> anyhow::Result { let begin = Instant::now(); let mut successful_packages = vec![]; @@ -356,9 +355,9 @@ async fn build( async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { let ws = &args.websocket; let (ws_stream, _) = connect_async(ws).await?; - let (mut write, _) = ws_stream.split(); + let (write, _) = ws_stream.split(); - let (tx, mut rx): (UnboundedSender, _) = unbounded_channel(); + let (tx, rx) = unbounded(); let mut tree_path = args.ciel_path.clone(); tree_path.push("TREE"); @@ -378,11 +377,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> { logical_cores: num_cpus::get() as i32, }; - tokio::spawn(async move { - if let Some(v) = rx.recv().await { - let _ = write.send(Message::Text(v)).await; - } - }); + tokio::spawn(async move { rx.map(Ok).forward(write) }); loop { if let Some(job) = client