Skip to content

Commit

Permalink
Merge pull request #4 from fpco/capture-recent-output
Browse files Browse the repository at this point in the history
Capture recent output for nicer error messages
  • Loading branch information
snoyberg authored Jun 13, 2024
2 parents 8bf2ae8 + b3f3a58 commit b7c44fc
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 34 deletions.
98 changes: 66 additions & 32 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use parking_lot::Mutex;
use reqwest::Url;
use signal_hook::consts::{SIGINT, SIGTERM};
use std::{
collections::VecDeque,
io::{Read, Write},
process::{Child, Command, ExitStatus, Stdio},
str::FromStr,
Expand Down Expand Up @@ -48,6 +49,9 @@ pub(crate) struct Cli {
/// Arguments to the process
#[arg(required = false)]
pub(crate) args: Vec<String>,
/// How many lines of output should we store for error messages?
#[arg(long, default_value_t = 10)]
pub(crate) output_lines: usize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -82,45 +86,59 @@ impl Cli {
let mut command = Command::new(&self.command);
command.args(&self.args[..]);

if self.task_output_timeout.is_some() {
command.stdout(Stdio::piped()).stderr(Stdio::piped());
}
command.stdout(Stdio::piped()).stderr(Stdio::piped());

let mut child = command
.spawn()
.context(format!("Failed to spawn {}", self.command))?;

let (send, recv) = mpsc::channel::<MainMessage>();
let send = SendMainMessage(send);
let max_recent_output = self.output_lines;
let recent_output = Arc::new(Mutex::new(VecDeque::with_capacity(max_recent_output)));

match self.task_output_timeout {
Some(task_output_timeout) => {
let last_output = Arc::new(Mutex::new(Instant::now()));
let child_stdout = child.stdout.take().context("child stdout is None")?;
let child_stderr = child.stderr.take().context("child stderr is None")?;
let send_clone = send.clone();
let last_output_clone = last_output.clone();
std::thread::spawn(|| {
process_std_handle(child_stdout, send_clone, StdType::Stdout, last_output_clone)
});
let send_clone = send.clone();
let last_output_clone = last_output.clone();
std::thread::spawn(|| {
process_std_handle(child_stderr, send_clone, StdType::Stderr, last_output_clone)
});
let send_clone = send.clone();
std::thread::spawn(move || {
detect_deadlock(
last_output,
send_clone,
Duration::from_secs(task_output_timeout),
)
});
}
None => {
anyhow::ensure!(child.stdout.is_none());
anyhow::ensure!(child.stderr.is_none());
}
// Always capture output so we can keep recent output available for error messages.
let last_output = Arc::new(Mutex::new(Instant::now()));
{
let child_stdout = child.stdout.take().context("child stdout is None")?;
let child_stderr = child.stderr.take().context("child stderr is None")?;
let send_clone = send.clone();
let last_output_clone = last_output.clone();
let recent_output_clone = recent_output.clone();
std::thread::spawn(move || {
process_std_handle(
child_stdout,
send_clone,
StdType::Stdout,
last_output_clone,
recent_output_clone,
max_recent_output,
)
});
let send_clone = send.clone();
let last_output_clone = last_output.clone();
let recent_output_clone = recent_output.clone();
std::thread::spawn(move || {
process_std_handle(
child_stderr,
send_clone,
StdType::Stderr,
last_output_clone,
recent_output_clone,
max_recent_output,
)
});
}

if let Some(task_output_timeout) = self.task_output_timeout {
let send_clone = send.clone();
std::thread::spawn(move || {
detect_deadlock(
last_output,
send_clone,
Duration::from_secs(task_output_timeout),
)
});
}

let send_clone = send.clone();
Expand Down Expand Up @@ -174,7 +192,12 @@ impl Cli {
self.app_version,
self.image_url,
);
let result = slack_app.send_notification(&e);
let mut msg = String::new();
for line in &*recent_output.lock() {
msg.push_str(line);
msg.push('\n');
}
let result = slack_app.send_notification(&e, &msg);
if let Err(err) = result {
eprintln!("Slack notification failed: {err:?}");
}
Expand All @@ -190,6 +213,8 @@ fn process_std_handle(
send: SendMainMessage,
std_type: StdType,
last_output: Arc<Mutex<Instant>>,
recent_output: Arc<Mutex<VecDeque<String>>>,
max_recent_output: usize,
) {
let mut buffer = [0u8; 4096];
loop {
Expand Down Expand Up @@ -217,6 +242,15 @@ fn process_std_handle(
send.send(MainMessage::Error(e));
break;
}

let mut guard = recent_output.lock();
for line in buffer.split(|x| *x == b'\n') {
if guard.len() >= max_recent_output {
guard.pop_front();
}
let line = line.strip_suffix(&[b'\r']).unwrap_or(line);
guard.push_back(String::from_utf8_lossy(line).into_owned());
}
}
Err(e) => {
send.send(MainMessage::Error(e));
Expand Down
15 changes: 13 additions & 2 deletions src/slack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ impl SlackApp {
)
}

pub(crate) fn send_notification(&self, message: &anyhow::Error) -> Result<()> {
pub(crate) fn send_notification(
&self,
message: &anyhow::Error,
latest_output: &str,
) -> Result<()> {
let description = self.compute_description();
let mut value = serde_json::json!(
{
Expand All @@ -69,7 +73,14 @@ impl SlackApp {
"type": "mrkdwn",
"text": description
},
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": latest_output,
}
},
]
});
if let Some(image_url) = &self.app_info.image_url {
Expand Down

0 comments on commit b7c44fc

Please sign in to comment.