Skip to content

Commit

Permalink
feat: Doomsday timer to ensure crash (#196)
Browse files Browse the repository at this point in the history
Flips the paradigm from crash if failure detected to crash if success not asserted.

This is valuable as we may fail to detect failure. Can be used for components that run in environments with automatic restarts.
  • Loading branch information
davidbeesley committed Jan 20, 2023
1 parent 738f1a4 commit e97957e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-future"
version = "0.4.4"
version = "0.4.5"
edition = "2021"
authors = ["Fluvio Contributors <[email protected]>"]
description = "I/O futures for Fluvio project"
Expand Down Expand Up @@ -48,6 +48,7 @@ fs = ["async-fs", "futures-lite", "pin-utils", "async-trait"]
zero_copy = ["nix", "task_unstable"]
mmap = ["fs", "memmap", "task_unstable"]
retry = []
doomsday = []

[dependencies]
log = "0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion async-test-derive/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

134 changes: 134 additions & 0 deletions src/doomsday.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::fmt::Display;
use std::sync::atomic::{AtomicBool, Ordering};

use std::{
sync::Arc,
time::{Duration, Instant},
};

use async_std::sync::Mutex;
pub use async_std::task::JoinHandle;
use log::info;
use tracing::{debug, error};

#[derive(Clone)]
/// DoomsdayTimer will configurably panic or exit if it is not
/// `reset()` at least every `duration`
pub struct DoomsdayTimer {
time_to_explode: Arc<Mutex<Instant>>,
duration: Duration,
defused: Arc<AtomicBool>,
aggressive_mode: bool,
}

impl Display for DoomsdayTimer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let fail_mode = if self.aggressive_mode {
"Exits"
} else {
"Panics"
};
write!(
f,
"DoomsdayTimer(Duration: {:?}, {})",
self.duration, fail_mode,
)
}
}

impl DoomsdayTimer {
/// Spawn a new doomsday timer.
/// If `exit_on_explode` is true, it will terminate process with `exit(1)` if it explodes.
/// Otherwise it will call `panic()`. Note that `awaiting` on the jh will panic if the `DoomsdayTimer` panicked
pub fn spawn(duration: Duration, exit_on_explode: bool) -> (Self, JoinHandle<()>) {
let s = Self {
time_to_explode: Arc::new(Mutex::new(Instant::now() + duration)),
duration,
defused: Default::default(),
aggressive_mode: exit_on_explode,
};

let cloned = s.clone();
let jh = crate::task::spawn(async move {
cloned.main_loop().await;
});
(s, jh)
}

/// Reset the timer to it's full duration
pub async fn reset(&self) {
let new_time_to_explode = Instant::now() + self.duration;
*self.time_to_explode.lock().await = new_time_to_explode;
debug!("{} has been reset", self);
}

async fn main_loop(&self) {
loop {
if self.defused.load(Ordering::Relaxed) {
debug!("{} has been defused, terminating main loop", self);
return;
}
let now = Instant::now();
let time_to_explode = *self.time_to_explode.lock().await;
if now > time_to_explode {
error!("{} exploded due to timeout", self);
self.explode_inner();
} else {
let time_to_sleep = time_to_explode - now;
crate::timer::sleep(time_to_sleep).await;
}
}
}

/// Force the timer to explode
pub fn explode(&self) {
error!("{} was exploded manually", self);
self.explode_inner();
}

fn explode_inner(&self) {
if self.aggressive_mode {
error!("{} exiting", self);
std::process::exit(1);
} else {
error!("{} panicking", self);
panic!("DoomsdayTimer with Duration {:?} exploded", self.duration);
}
}
/// Defuse the timer. Cannot be undone and will no longer `explode`
pub fn defuse(&self) {
self.defused.store(true, Ordering::Relaxed);
info!("{} has been defused", self);
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::DoomsdayTimer;
use crate::task::run_block_on;
use crate::test_async;
use std::io::Error;

#[test_async(should_panic)]
async fn test_explode() -> Result<(), Error> {
let (_, jh) = DoomsdayTimer::spawn(Duration::from_millis(1), false);
crate::timer::sleep(Duration::from_millis(2)).await;
jh.await;
Ok(())
}

#[test_async]
async fn test_do_not_explode() -> Result<(), Error> {
let (bomb, jh) = DoomsdayTimer::spawn(Duration::from_millis(10), false);
crate::timer::sleep(Duration::from_millis(5)).await;
bomb.reset().await;
crate::timer::sleep(Duration::from_millis(5)).await;
bomb.reset().await;
crate::timer::sleep(Duration::from_millis(5)).await;
bomb.defuse();
run_block_on(jh);
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub mod subscriber {
}
}

#[cfg(feature = "doomsday")]
pub mod doomsday;

/// re-export tracing
pub mod tracing {

Expand Down
6 changes: 3 additions & 3 deletions src/zero_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ impl ZeroCopy {
) {
Ok(bytes_transferred) => {
trace!("bytes transferred: {}", bytes_transferred);
total_transferred += bytes_transferred as usize;
total_transferred += bytes_transferred;

// zero bytes transferred means it's EOF
if bytes_transferred == 0 {
return Ok(total_transferred as usize);
return Ok(total_transferred);
}

if total_transferred < size as usize {
Expand All @@ -97,7 +97,7 @@ impl ZeroCopy {
size
);

return Ok(total_transferred as usize);
return Ok(total_transferred);
}
}
Err(err) => match err {
Expand Down

0 comments on commit e97957e

Please sign in to comment.