From 802e0e5f5a59133807632f90842dbc082061b3d8 Mon Sep 17 00:00:00 2001 From: Astrid Yu Date: Sun, 20 Aug 2023 23:02:48 -0700 Subject: [PATCH] Fix the line buffering issue it's the line buffering, it's always the fucking line buffering --- src/burn/child.rs | 12 ++++++++---- src/burn/handle.rs | 10 ++++++---- src/burn/ipc.rs | 26 ++++++++++++-------------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/src/burn/child.rs b/src/burn/child.rs index acb02a7..4e66d9d 100644 --- a/src/burn/child.rs +++ b/src/burn/child.rs @@ -7,7 +7,7 @@ use std::{ }; use bytesize::ByteSize; -use tracing::{debug, error, trace}; +use tracing::{debug, error, info, trace}; use tracing_unwrap::ResultExt; use valuable::Valuable; @@ -31,15 +31,17 @@ pub fn main() { init_logging_child(&args.logfile); set_hook(Box::new(|p| { - error!("{p}"); + error!("{p:?}"); })); - debug!("We are in child process mode with args {:#?}", args); + info!("We are in child process mode with args {:#?}", args); let final_msg = match run(&args) { Ok(_) => StatusMessage::Success, Err(e) => StatusMessage::Error(e), }; + + info!(?final_msg, "Completed"); send_msg(final_msg); } @@ -131,7 +133,9 @@ fn for_each_block( #[tracing::instrument(fields(msg = msg.as_value()))] pub fn send_msg(msg: StatusMessage) { - write_msg(std::io::stdout(), &msg).expect("Failed to write message"); + let mut stdout = std::io::stdout(); + write_msg(&mut stdout, &msg).expect("Failed to write message"); + stdout.flush().expect("Failed to flush stdout"); } trait BlockSink { diff --git a/src/burn/handle.rs b/src/burn/handle.rs index c739520..10ede30 100644 --- a/src/burn/handle.rs +++ b/src/burn/handle.rs @@ -54,6 +54,7 @@ impl Handle { tokio::process::Command::from(cmd) .stdin(Stdio::piped()) .stdout(Stdio::piped()) + .kill_on_drop(true) .spawn()? }; @@ -93,7 +94,7 @@ impl Handle { }) } - pub async fn next_message(&mut self) -> anyhow::Result> { + pub async fn next_message(&mut self) -> std::io::Result> { read_next_message(&mut self.rx).await } @@ -112,9 +113,10 @@ pub enum StartProcessError { Failed(Option), } -async fn read_next_message(rx: impl AsyncBufRead + Unpin) -> anyhow::Result> { - let message = read_msg_async(rx).await?; - Ok(Some(message)) +async fn read_next_message( + rx: impl AsyncBufRead + Unpin, +) -> std::io::Result> { + Ok(Some(read_msg_async(rx).await?)) } impl core::fmt::Debug for Handle { diff --git a/src/burn/ipc.rs b/src/burn/ipc.rs index 5db10ae..ba4b60d 100644 --- a/src/burn/ipc.rs +++ b/src/burn/ipc.rs @@ -6,7 +6,7 @@ use byteorder::{BigEndian, WriteBytesExt}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt}; -use tracing::{trace, trace_span, Instrument}; +use tracing::{trace, trace_span}; use valuable::Valuable; use crate::compression::CompressionFormat; @@ -28,19 +28,17 @@ pub fn write_msg(mut w: impl Write, msg: &StatusMessage) -> anyhow::Result<()> { Ok(()) } -pub async fn read_msg_async(mut r: impl AsyncRead + Unpin) -> anyhow::Result { - let span = trace_span!("Reading"); - async move { - let size = r.read_u32().await?; - let mut buf = vec![0; size as usize]; - r.read_exact(&mut buf).await?; - - let msg: StatusMessage = bincode_options().deserialize(&buf)?; - trace!(msg = msg.as_value(), "Parsed message"); - Ok(msg) - } - .instrument(span) - .await +#[tracing::instrument(level = "trace", skip_all)] +pub async fn read_msg_async(mut r: impl AsyncRead + Unpin) -> std::io::Result { + let size = r.read_u32().await?; + let mut buf = vec![0; size as usize]; + r.read_exact(&mut buf).await?; + + let msg: StatusMessage = bincode_options() + .deserialize(&buf) + .expect("Failed to parse bincode from stream"); + trace!(msg = msg.as_value(), "Parsed message"); + Ok(msg) } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Valuable)]