diff --git a/src/concurrency/thread.rs b/src/concurrency/thread.rs index dcae85109a..3d92860d30 100644 --- a/src/concurrency/thread.rs +++ b/src/concurrency/thread.rs @@ -173,6 +173,8 @@ pub enum BlockReason { InitOnce(InitOnceId), /// Blocked on epoll. Epoll, + /// Blocked on eventfd. + Eventfd, } /// The state of a thread. diff --git a/src/shims/unix/linux/eventfd.rs b/src/shims/unix/linux/eventfd.rs index 910ab7e90f..5df78ffc99 100644 --- a/src/shims/unix/linux/eventfd.rs +++ b/src/shims/unix/linux/eventfd.rs @@ -4,7 +4,7 @@ use std::io; use std::io::ErrorKind; use crate::concurrency::VClock; -use crate::shims::unix::fd::FileDescriptionRef; +use crate::shims::unix::fd::{FileDescriptionRef, WeakFileDescriptionRef}; use crate::shims::unix::linux::epoll::{EpollReadyEvents, EvalContextExt as _}; use crate::shims::unix::*; use crate::*; @@ -26,6 +26,10 @@ struct Event { counter: Cell, is_nonblock: bool, clock: RefCell, + /// A thread blocked on read. + blocked_read_tid: RefCell>, + /// A thread blocked on write. + blocked_write_tid: RefCell>, } impl FileDescription for Event { @@ -74,28 +78,34 @@ impl FileDescription for Event { // Block when counter == 0. let counter = self.counter.get(); + let weak_eventfd = self_ref.downgrade(); if counter == 0 { if self.is_nonblock { return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); } + let dest = dest.clone(); + let mut blocked_read_tid = self.blocked_read_tid.borrow_mut(); + *blocked_read_tid = Some(ecx.active_thread()); - throw_unsup_format!("eventfd: blocking is unsupported"); + ecx.block_thread( + BlockReason::Eventfd, + None, + callback!( + @capture<'tcx> { + buf_place: MPlaceTy<'tcx>, + dest: MPlaceTy<'tcx>, + weak_eventfd: WeakFileDescriptionRef, + } + @unblock = |this| { + blocking_eventfd_read_callback(&buf_place, &dest, weak_eventfd, this)?; + interp_ok(()) + } + ), + ); } else { - // Synchronize with all prior `write` calls to this FD. - ecx.acquire_clock(&self.clock.borrow()); - - // Give old counter value to userspace, and set counter value to 0. - ecx.write_int(counter, &buf_place)?; - self.counter.set(0); - - // When any of the event happened, we check and update the status of all supported event - // types for current file description. - ecx.check_and_update_readiness(self_ref)?; - - // Tell userspace how many bytes we wrote. - ecx.write_int(buf_place.layout.size.bytes(), dest)?; + // TODO: why the compiler suggested not to use &dest?? + blocking_eventfd_read_callback(&buf_place, dest, weak_eventfd, ecx)?; } - interp_ok(()) } @@ -137,27 +147,39 @@ impl FileDescription for Event { } // If the addition does not let the counter to exceed the maximum value, update the counter. // Else, block. + let weak_eventfd = self_ref.downgrade(); match self.counter.get().checked_add(num) { - Some(new_count @ 0..=MAX_COUNTER) => { - // Future `read` calls will synchronize with this write, so update the FD clock. - if let Some(clock) = &ecx.release_clock() { - self.clock.borrow_mut().join(clock); - } - self.counter.set(new_count); + Some(_new_count @ 0..=MAX_COUNTER) => { + return blocking_eventfd_write_callback(num, &buf_place, dest, weak_eventfd, ecx); } - None | Some(u64::MAX) => + None | Some(u64::MAX) => { if self.is_nonblock { return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } else { - throw_unsup_format!("eventfd: blocking is unsupported"); - }, - }; - // When any of the event happened, we check and update the status of all supported event - // types for current file description. - ecx.check_and_update_readiness(self_ref)?; + } + + let dest = dest.clone(); + let mut blocked_write_tid = self.blocked_write_tid.borrow_mut(); + *blocked_write_tid = Some(ecx.active_thread()); - // Return how many bytes we read. - ecx.write_int(buf_place.layout.size.bytes(), dest) + ecx.block_thread( + BlockReason::Eventfd, + None, + callback!( + @capture<'tcx> { + num: u64, + buf_place: MPlaceTy<'tcx>, + dest: MPlaceTy<'tcx>, + weak_eventfd: WeakFileDescriptionRef, + } + @unblock = |this| { + blocking_eventfd_write_callback(num, &buf_place, &dest, weak_eventfd, this)?; + interp_ok(()) + } + ), + ); + } + }; + interp_ok(()) } } @@ -217,8 +239,83 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { counter: Cell::new(val.into()), is_nonblock, clock: RefCell::new(VClock::default()), + blocked_read_tid: RefCell::new(None), + blocked_write_tid: RefCell::new(None), }); interp_ok(Scalar::from_i32(fd_value)) } } + +fn blocking_eventfd_read_callback<'tcx>( + buf_place: &MPlaceTy<'tcx>, + dest: &MPlaceTy<'tcx>, + weak_eventfd: WeakFileDescriptionRef, + ecx: &mut MiriInterpCx<'tcx>, +) -> InterpResult<'tcx> { + let Some(eventfd_ref) = weak_eventfd.upgrade() else { + throw_unsup_format!("eventfd FD got closed while blocking.") + }; + + // Since we pass the weak file description ref to the callback function, it is guaranteed to be + // an eventfd file description. + let eventfd = eventfd_ref.downcast::().unwrap(); + + // Synchronize with all prior `write` calls to this FD. + ecx.acquire_clock(&eventfd.clock.borrow()); + + // Give old counter value to userspace, and set counter value to 0. + let counter = eventfd.counter.get(); + ecx.write_int(counter, buf_place)?; + eventfd.counter.set(0); + + // When any of the event happened, we check and update the status of all supported event + // types for current file description. + ecx.check_and_update_readiness(&eventfd_ref)?; + + // Unblock thread previously blocked on `write`. + if let Some(tid) = *eventfd.blocked_write_tid.borrow_mut() { + ecx.unblock_thread(tid, BlockReason::Eventfd)?; + } + + // Tell userspace how many bytes we wrote. + ecx.write_int(buf_place.layout.size.bytes(), dest) +} +fn blocking_eventfd_write_callback<'tcx>( + num: u64, + buf_place: &MPlaceTy<'tcx>, + dest: &MPlaceTy<'tcx>, + weak_eventfd: WeakFileDescriptionRef, + ecx: &mut MiriInterpCx<'tcx>, +) -> InterpResult<'tcx> { + let Some(eventfd_ref) = weak_eventfd.upgrade() else { + throw_unsup_format!("eventfd FD got closed while blocking.") + }; + + // Since we pass the weak file description ref to the callback function, it is guaranteed to be + // an eventfd file description. + let eventfd = eventfd_ref.downcast::().unwrap(); + + // Future `read` calls will synchronize with this write, so update the FD clock. + if let Some(clock) = &ecx.release_clock() { + eventfd.clock.borrow_mut().join(clock); + } + + // In the happy case, the new_count is checked before executing this callback. + // In the case where the counter previously overflows or has the value u64::MAX, + // the counter will be set to 0 before the callback is executed. + let new_count = eventfd.counter.get().checked_add(num).unwrap(); + eventfd.counter.set(new_count); + + // When any of the event happened, we check and update the status of all supported event + // types for current file description. + ecx.check_and_update_readiness(&eventfd_ref)?; + + // Unblock thread previously blocked on `read`. + if let Some(tid) = *eventfd.blocked_read_tid.borrow_mut() { + ecx.unblock_thread(tid, BlockReason::Eventfd)?; + } + + // Return how many bytes we read. + ecx.write_int(buf_place.layout.size.bytes(), dest) +}