Skip to content

Commit

Permalink
Add blocking support for eventfd
Browse files Browse the repository at this point in the history
  • Loading branch information
tiif committed Oct 5, 2024
1 parent d3dba92 commit 82f2249
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 32 deletions.
2 changes: 2 additions & 0 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub enum BlockReason {
InitOnce(InitOnceId),
/// Blocked on epoll.
Epoll,
/// Blocked on eventfd.
Eventfd,
}

/// The state of a thread.
Expand Down
161 changes: 129 additions & 32 deletions src/shims/unix/linux/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io;
use std::io::ErrorKind;

use crate::concurrency::VClock;
use crate::shims::unix::fd::FileDescriptionRef;
use crate::shims::unix::fd::{FileDescriptionRef, WeakFileDescriptionRef};
use crate::shims::unix::linux::epoll::{EpollReadyEvents, EvalContextExt as _};
use crate::shims::unix::*;
use crate::*;
Expand All @@ -26,6 +26,10 @@ struct Event {
counter: Cell<u64>,
is_nonblock: bool,
clock: RefCell<VClock>,
/// A thread blocked on read.
blocked_read_tid: RefCell<Option<ThreadId>>,
/// A thread blocked on write.
blocked_write_tid: RefCell<Option<ThreadId>>,
}

impl FileDescription for Event {
Expand Down Expand Up @@ -74,28 +78,34 @@ impl FileDescription for Event {

// Block when counter == 0.
let counter = self.counter.get();
let weak_eventfd = self_ref.downgrade();
if counter == 0 {
if self.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
let dest = dest.clone();
let mut blocked_read_tid = self.blocked_read_tid.borrow_mut();
*blocked_read_tid = Some(ecx.active_thread());

throw_unsup_format!("eventfd: blocking is unsupported");
ecx.block_thread(
BlockReason::Eventfd,
None,
callback!(
@capture<'tcx> {
buf_place: MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
blocking_eventfd_read_callback(&buf_place, &dest, weak_eventfd, this)?;
interp_ok(())
}
),
);
} else {
// Synchronize with all prior `write` calls to this FD.
ecx.acquire_clock(&self.clock.borrow());

// Give old counter value to userspace, and set counter value to 0.
ecx.write_int(counter, &buf_place)?;
self.counter.set(0);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(self_ref)?;

// Tell userspace how many bytes we wrote.
ecx.write_int(buf_place.layout.size.bytes(), dest)?;
// TODO: why the compiler suggested not to use &dest??
blocking_eventfd_read_callback(&buf_place, dest, weak_eventfd, ecx)?;
}

interp_ok(())
}

Expand Down Expand Up @@ -137,27 +147,39 @@ impl FileDescription for Event {
}
// If the addition does not let the counter to exceed the maximum value, update the counter.
// Else, block.
let weak_eventfd = self_ref.downgrade();
match self.counter.get().checked_add(num) {
Some(new_count @ 0..=MAX_COUNTER) => {
// Future `read` calls will synchronize with this write, so update the FD clock.
if let Some(clock) = &ecx.release_clock() {
self.clock.borrow_mut().join(clock);
}
self.counter.set(new_count);
Some(_new_count @ 0..=MAX_COUNTER) => {
return blocking_eventfd_write_callback(num, &buf_place, dest, weak_eventfd, ecx);
}
None | Some(u64::MAX) =>
None | Some(u64::MAX) => {
if self.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
throw_unsup_format!("eventfd: blocking is unsupported");
},
};
// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(self_ref)?;
}

let dest = dest.clone();
let mut blocked_write_tid = self.blocked_write_tid.borrow_mut();
*blocked_write_tid = Some(ecx.active_thread());

// Return how many bytes we read.
ecx.write_int(buf_place.layout.size.bytes(), dest)
ecx.block_thread(
BlockReason::Eventfd,
None,
callback!(
@capture<'tcx> {
num: u64,
buf_place: MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
blocking_eventfd_write_callback(num, &buf_place, &dest, weak_eventfd, this)?;
interp_ok(())
}
),
);
}
};
interp_ok(())
}
}

Expand Down Expand Up @@ -217,8 +239,83 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
counter: Cell::new(val.into()),
is_nonblock,
clock: RefCell::new(VClock::default()),
blocked_read_tid: RefCell::new(None),
blocked_write_tid: RefCell::new(None),
});

interp_ok(Scalar::from_i32(fd_value))
}
}

fn blocking_eventfd_read_callback<'tcx>(
buf_place: &MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref to the callback function, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

// Synchronize with all prior `write` calls to this FD.
ecx.acquire_clock(&eventfd.clock.borrow());

// Give old counter value to userspace, and set counter value to 0.
let counter = eventfd.counter.get();
ecx.write_int(counter, buf_place)?;
eventfd.counter.set(0);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;

// Unblock thread previously blocked on `write`.
if let Some(tid) = *eventfd.blocked_write_tid.borrow_mut() {
ecx.unblock_thread(tid, BlockReason::Eventfd)?;
}

// Tell userspace how many bytes we wrote.
ecx.write_int(buf_place.layout.size.bytes(), dest)
}
fn blocking_eventfd_write_callback<'tcx>(
num: u64,
buf_place: &MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref to the callback function, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

// Future `read` calls will synchronize with this write, so update the FD clock.
if let Some(clock) = &ecx.release_clock() {
eventfd.clock.borrow_mut().join(clock);
}

// In the happy case, the new_count is checked before executing this callback.
// In the case where the counter previously overflows or has the value u64::MAX,
// the counter will be set to 0 before the callback is executed.
let new_count = eventfd.counter.get().checked_add(num).unwrap();
eventfd.counter.set(new_count);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;

// Unblock thread previously blocked on `read`.
if let Some(tid) = *eventfd.blocked_read_tid.borrow_mut() {
ecx.unblock_thread(tid, BlockReason::Eventfd)?;
}

// Return how many bytes we read.
ecx.write_int(buf_place.layout.size.bytes(), dest)
}

0 comments on commit 82f2249

Please sign in to comment.