diff --git a/worker/src/build.rs b/worker/src/build.rs index 45ac4a4..312019d 100644 --- a/worker/src/build.rs +++ b/worker/src/build.rs @@ -2,19 +2,17 @@ use crate::{get_memory_bytes, Args}; use chrono::Local; use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse}; use flume::{unbounded, Receiver, Sender}; -use futures_util::StreamExt; +use futures_util::{future::try_join3, StreamExt}; use log::{error, info, warn}; use reqwest::Url; use std::{ - os::fd::AsRawFd, - os::fd::FromRawFd, path::Path, process::{Output, Stdio}, time::{Duration, Instant}, }; use tokio::{ fs, - io::{AsyncBufReadExt, BufReader}, + io::{AsyncBufReadExt, AsyncRead, BufReader}, process::Command, time::sleep, }; @@ -38,33 +36,38 @@ async fn get_output_logged( logs.extend(msg.as_bytes()); info!("{}", msg.trim()); - // join stdout and stderr together - let (writer, reader) = tokio::net::unix::pipe::pipe()?; - let writer_fd = writer.into_blocking_fd()?; - let output = Command::new(cmd) + let mut output = Command::new(cmd) .args(args) .current_dir(cwd) - .stdout(unsafe { Stdio::from_raw_fd(writer_fd.as_raw_fd()) }) - .stderr(unsafe { Stdio::from_raw_fd(writer_fd.as_raw_fd()) }) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) .spawn()?; - let mut stdout_reader = BufReader::new(reader).lines(); - - let mut stdout_out = vec![]; - while let Ok(Some(v)) = stdout_reader.next_line().await { - for line in v.split("\r") { - tx.send_async(Message::Text(line.to_string())).await.ok(); + // learn from tokio wait_with_output + async fn read_and_send( + io: &mut Option, + tx: Sender, + ) -> tokio::io::Result { + let mut res = String::new(); + if let Some(io) = io.as_mut() { + let mut reader = BufReader::new(io).lines(); + while let Ok(Some(v)) = reader.next_line().await { + for line in v.split("\r") { + tx.send_async(Message::Text(line.to_string())).await.ok(); + res += &line; + res += "\n"; + } + } } - stdout_out.push(v); + Ok(res) } - let mut output = output.wait_with_output().await?; + let mut stdout_pipe = output.stdout.take(); + let stdout_future = read_and_send(&mut stdout_pipe, tx.clone()); + let mut stderr_pipe = output.stderr.take(); + let stderr_future = read_and_send(&mut stderr_pipe, tx.clone()); - // save data back to output.stdout, since we captured it manually - for line in &stdout_out { - output.stdout.extend_from_slice(line.as_bytes()); - output.stdout.push(b'\n'); - } + let (status, stdout, stderr) = try_join3(output.wait(), stdout_future, stderr_future).await?; let elapsed = begin.elapsed(); @@ -75,14 +78,20 @@ async fn get_output_logged( cmd, args.join(" "), elapsed, - output.status.success() + status.success() ) .as_bytes(), ); - logs.extend("STDOUT&STDERR:\n".as_bytes()); - logs.extend(stdout_out.join("\n").as_bytes()); - - Ok(output) + logs.extend("STDOUT:\n".as_bytes()); + logs.extend(stdout.as_bytes()); + logs.extend("STDERR:\n".as_bytes()); + logs.extend(stderr.as_bytes()); + + Ok(Output { + status, + stdout: stdout.into_bytes(), + stderr: stderr.into_bytes(), + }) } /// Run command and retry until it succeeds