Skip to content

Commit

Permalink
Fix the line buffering issue
Browse files Browse the repository at this point in the history
it's the line buffering, it's always the fucking line buffering
  • Loading branch information
ifd3f committed Aug 21, 2023
1 parent 86e0ce7 commit 802e0e5
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 22 deletions.
12 changes: 8 additions & 4 deletions src/burn/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use bytesize::ByteSize;
use tracing::{debug, error, trace};
use tracing::{debug, error, info, trace};
use tracing_unwrap::ResultExt;
use valuable::Valuable;

Expand All @@ -31,15 +31,17 @@ pub fn main() {
init_logging_child(&args.logfile);

set_hook(Box::new(|p| {
error!("{p}");
error!("{p:?}");
}));

debug!("We are in child process mode with args {:#?}", args);
info!("We are in child process mode with args {:#?}", args);

let final_msg = match run(&args) {
Ok(_) => StatusMessage::Success,
Err(e) => StatusMessage::Error(e),
};

info!(?final_msg, "Completed");
send_msg(final_msg);
}

Expand Down Expand Up @@ -131,7 +133,9 @@ fn for_each_block(

#[tracing::instrument(fields(msg = msg.as_value()))]
pub fn send_msg(msg: StatusMessage) {
write_msg(std::io::stdout(), &msg).expect("Failed to write message");
let mut stdout = std::io::stdout();
write_msg(&mut stdout, &msg).expect("Failed to write message");
stdout.flush().expect("Failed to flush stdout");
}

trait BlockSink {
Expand Down
10 changes: 6 additions & 4 deletions src/burn/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl Handle {
tokio::process::Command::from(cmd)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.kill_on_drop(true)
.spawn()?
};

Expand Down Expand Up @@ -93,7 +94,7 @@ impl Handle {
})
}

pub async fn next_message(&mut self) -> anyhow::Result<Option<StatusMessage>> {
pub async fn next_message(&mut self) -> std::io::Result<Option<StatusMessage>> {
read_next_message(&mut self.rx).await
}

Expand All @@ -112,9 +113,10 @@ pub enum StartProcessError {
Failed(Option<ErrorType>),
}

async fn read_next_message(rx: impl AsyncBufRead + Unpin) -> anyhow::Result<Option<StatusMessage>> {
let message = read_msg_async(rx).await?;
Ok(Some(message))
async fn read_next_message(
rx: impl AsyncBufRead + Unpin,
) -> std::io::Result<Option<StatusMessage>> {
Ok(Some(read_msg_async(rx).await?))
}

impl core::fmt::Debug for Handle {
Expand Down
26 changes: 12 additions & 14 deletions src/burn/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use byteorder::{BigEndian, WriteBytesExt};
use serde::{Deserialize, Serialize};

use tokio::io::{AsyncRead, AsyncReadExt};
use tracing::{trace, trace_span, Instrument};
use tracing::{trace, trace_span};
use valuable::Valuable;

use crate::compression::CompressionFormat;
Expand All @@ -28,19 +28,17 @@ pub fn write_msg(mut w: impl Write, msg: &StatusMessage) -> anyhow::Result<()> {
Ok(())
}

pub async fn read_msg_async(mut r: impl AsyncRead + Unpin) -> anyhow::Result<StatusMessage> {
let span = trace_span!("Reading");
async move {
let size = r.read_u32().await?;
let mut buf = vec![0; size as usize];
r.read_exact(&mut buf).await?;

let msg: StatusMessage = bincode_options().deserialize(&buf)?;
trace!(msg = msg.as_value(), "Parsed message");
Ok(msg)
}
.instrument(span)
.await
#[tracing::instrument(level = "trace", skip_all)]
pub async fn read_msg_async(mut r: impl AsyncRead + Unpin) -> std::io::Result<StatusMessage> {
let size = r.read_u32().await?;
let mut buf = vec![0; size as usize];
r.read_exact(&mut buf).await?;

let msg: StatusMessage = bincode_options()
.deserialize(&buf)
.expect("Failed to parse bincode from stream");
trace!(msg = msg.as_value(), "Parsed message");
Ok(msg)
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Valuable)]
Expand Down

0 comments on commit 802e0e5

Please sign in to comment.