Skip to content

Commit

Permalink
simplify handling of network timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
stlankes committed Sep 29, 2024
1 parent 3d00fbf commit 075d816
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 114 deletions.
90 changes: 14 additions & 76 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,10 @@ pub(crate) fn now() -> u64 {
}

/// Blocks the current thread on `f`, running the executor when idling.
pub(crate) fn poll_on<F, T>(future: F, timeout: Option<Duration>) -> io::Result<T>
pub(crate) fn poll_on<F, T>(future: F) -> io::Result<T>
where
F: Future<Output = io::Result<T>>,
{
#[cfg(any(feature = "tcp", feature = "udp"))]
let nic = get_network_driver();

// disable network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
let no_retransmission = if let Some(nic) = nic {
let mut guard = nic.lock();
guard.set_polling_mode(true);
guard.get_checksums().tcp.tx()
} else {
true
};

let start = now();
let mut cx = Context::from_waker(Waker::noop());
let mut future = pin!(future);

Expand All @@ -129,42 +115,8 @@ where
run();

if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let wakeup_time =
network_delay(Instant::from_micros_const(now().try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(wakeup_time);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return t;
}

if let Some(duration) = timeout {
if Duration::from_micros(now() - start) >= duration {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let wakeup_time =
network_delay(Instant::from_micros_const(now().try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(wakeup_time);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return Err(io::Error::ETIME);
}
}
}
}

Expand All @@ -176,16 +128,6 @@ where
#[cfg(any(feature = "tcp", feature = "udp"))]
let nic = get_network_driver();

// disable network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
let no_retransmission = if let Some(nic) = nic {
let mut guard = nic.lock();
guard.set_polling_mode(true);
!guard.get_checksums().tcp.tx()
} else {
true
};

let backoff = Backoff::new();
let start = now();
let task_notify = Arc::new(TaskNotify::new());
Expand All @@ -200,17 +142,16 @@ where
let now = now();
if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
{
let network_timer =
network_delay(Instant::from_micros_const(now.try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(network_timer);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
// allow network interrupts
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}
}

return t;
Expand All @@ -219,17 +160,16 @@ where
if let Some(duration) = timeout {
if Duration::from_micros(now - start) >= duration {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
{
let network_timer =
network_delay(Instant::from_micros_const(now.try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(network_timer);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
// allow network interrupts
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}
}

return Err(io::Error::ETIME);
Expand All @@ -244,11 +184,9 @@ where
if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 {
let wakeup_time =
timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
if !no_retransmission {
let ticks = crate::arch::processor::get_timer_ticks();
let network_timer = delay.map(|d| ticks + d);
core_scheduler().add_network_timer(network_timer);
}
let ticks = crate::arch::processor::get_timer_ticks();
let network_timer = delay.map(|d| ticks + d);
core_scheduler().add_network_timer(network_timer);

// allow network interrupts
if let Some(nic) = nic {
Expand Down
20 changes: 10 additions & 10 deletions src/executor/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,18 +324,18 @@ impl<'a> NetworkInterface<'a> {

#[inline]
pub(crate) fn network_delay(timestamp: Instant) -> Option<Duration> {
crate::executor::network::NIC
.lock()
.as_nic_mut()
.unwrap()
.poll_delay(timestamp)
if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
nic.poll_delay(timestamp)
} else {
None
}
}

#[inline]
fn network_poll(timestamp: Instant) {
crate::executor::network::NIC
.lock()
.as_nic_mut()
.unwrap()
.poll_common(timestamp);
if let Ok(nic) = crate::executor::network::NIC.lock().as_nic_mut() {
nic.poll_common(timestamp);
} else {
warn!("Unable to poll network interface!");
}
}
53 changes: 25 additions & 28 deletions src/scheduler/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,35 +462,32 @@ impl Task {
))))
.unwrap();
let objmap = OBJECT_MAP.get().unwrap().clone();
let _ = poll_on(
async {
let mut guard = objmap.write().await;
if env::is_uhyve() {
guard
.try_insert(STDIN_FILENO, Arc::new(UhyveStdin::new()))
.map_err(|_| io::Error::EIO)?;
guard
.try_insert(STDOUT_FILENO, Arc::new(UhyveStdout::new()))
.map_err(|_| io::Error::EIO)?;
guard
.try_insert(STDERR_FILENO, Arc::new(UhyveStderr::new()))
.map_err(|_| io::Error::EIO)?;
} else {
guard
.try_insert(STDIN_FILENO, Arc::new(GenericStdin::new()))
.map_err(|_| io::Error::EIO)?;
guard
.try_insert(STDOUT_FILENO, Arc::new(GenericStdout::new()))
.map_err(|_| io::Error::EIO)?;
guard
.try_insert(STDERR_FILENO, Arc::new(GenericStderr::new()))
.map_err(|_| io::Error::EIO)?;
}
let _ = poll_on(async {
let mut guard = objmap.write().await;
if env::is_uhyve() {
guard
.try_insert(STDIN_FILENO, Arc::new(UhyveStdin::new()))
.map_err(|_| io::Error::EIO)?;
guard
.try_insert(STDOUT_FILENO, Arc::new(UhyveStdout::new()))
.map_err(|_| io::Error::EIO)?;
guard
.try_insert(STDERR_FILENO, Arc::new(UhyveStderr::new()))
.map_err(|_| io::Error::EIO)?;
} else {
guard
.try_insert(STDIN_FILENO, Arc::new(GenericStdin::new()))
.map_err(|_| io::Error::EIO)?;
guard
.try_insert(STDOUT_FILENO, Arc::new(GenericStdout::new()))
.map_err(|_| io::Error::EIO)?;
guard
.try_insert(STDERR_FILENO, Arc::new(GenericStderr::new()))
.map_err(|_| io::Error::EIO)?;
}

Ok(())
},
None,
);
Ok(())
});
}

Task {
Expand Down

0 comments on commit 075d816

Please sign in to comment.