Skip to content

Commit

Permalink
Merge pull request #11 from ublk-org/pre-dev-0.3
Browse files Browse the repository at this point in the history
Pre dev 0.3: cleanup io & uring_async
  • Loading branch information
ming1 authored Mar 2, 2024
2 parents 6cf5ae3 + 4940e93 commit 202d278
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 39 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn main() {
// target initialization
|dev| {
dev.set_default_params(250_u64 << 30);
Ok(0)
Ok(())
},
// queue IO logic
|tag, dev| q_fn(tag, dev),
Expand Down
14 changes: 4 additions & 10 deletions examples/loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn lo_file_size(f: &std::fs::File) -> Result<(u64, u8, u8)> {
}

// setup loop target
fn lo_init_tgt(dev: &mut UblkDev, lo: &LoopTgt) -> Result<i32, UblkError> {
fn lo_init_tgt(dev: &mut UblkDev, lo: &LoopTgt) -> Result<(), UblkError> {
trace!("loop: init_tgt {}", dev.dev_info.dev_id);
if lo.direct_io != 0 {
unsafe {
Expand Down Expand Up @@ -113,7 +113,7 @@ fn lo_init_tgt(dev: &mut UblkDev, lo: &LoopTgt) -> Result<i32, UblkError> {
let val = serde_json::json!({"loop": LoJson { back_file_path: lo.back_file_path.clone(), direct_io: 1 } });
dev.set_target_json(val);

Ok(0)
Ok(())
}

#[inline]
Expand Down Expand Up @@ -195,14 +195,8 @@ fn lo_handle_io_cmd_sync(q: &UblkQueue<'_>, tag: u16, i: &UblkIOCtx, buf_addr: *
// either start to handle or retry
let off = (iod.start_sector << 9) as u64;
let bytes = (iod.nr_sectors << 9) as u32;
let sqe = __lo_make_io_sqe(op, off, bytes, buf_addr).user_data(data);
unsafe {
q.q_ring
.borrow_mut()
.submission()
.push(&sqe)
.expect("submission fail");
}
let sqe = __lo_make_io_sqe(op, off, bytes, buf_addr);
q.ublk_submit_sqe_sync(sqe, data).unwrap();
}
}

Expand Down
14 changes: 8 additions & 6 deletions examples/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,18 @@ fn q_async_fn(qid: u16, dev: &UblkDev, user_copy: bool) {
let q = q_rc.clone();

f_vec.push(exe.spawn(async move {
let buf = IoBuf::<u8>::new(q.dev.dev_info.max_io_buf_bytes as usize);
let mut cmd_op = libublk::sys::UBLK_U_IO_FETCH_REQ;
let mut res = 0;
let buf_addr = if user_copy {
std::ptr::null_mut()
let (_buf, buf_addr) = if user_copy {
(None, std::ptr::null_mut())
} else {
buf.as_mut_ptr()
let buf = IoBuf::<u8>::new(q.dev.dev_info.max_io_buf_bytes as usize);

q.register_io_buf(tag, &buf);
let addr = buf.as_mut_ptr();
(Some(buf), addr)
};

q.register_io_buf(tag, &buf);
loop {
let cmd_res = q.submit_io_cmd(tag, cmd_op, buf_addr, res).await;
if cmd_res == libublk::sys::UBLK_IO_RES_ABORT {
Expand Down Expand Up @@ -107,7 +109,7 @@ fn __null_add(
.unwrap();
let tgt_init = |dev: &mut UblkDev| {
dev.set_default_params(250_u64 << 30);
Ok(0)
Ok(())
};
let user_copy = (ctrl.dev_info().flags & libublk::sys::UBLK_F_USER_COPY as u64) != 0;
let wh = move |d_ctrl: &UblkCtrl| {
Expand Down
2 changes: 1 addition & 1 deletion examples/ramdisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn rd_add_dev(dev_id: i32, buf_addr: *mut u8, size: u64, for_add: bool) {

let tgt_init = |dev: &mut UblkDev| {
dev.set_default_params(size);
Ok(0)
Ok(())
};
let dev_arc = Arc::new(UblkDev::new(ctrl.get_name(), tgt_init, &ctrl).unwrap());
let dev_clone = dev_arc.clone();
Expand Down
11 changes: 5 additions & 6 deletions src/ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,14 @@ const CTRL_CMD_BUF_READ: u32 = 8;
/// case of unprivileged ublk, such as get_features(), add_dev().
const CTRL_CMD_NO_NEED_DEV_PATH: u32 = 16;

#[allow(dead_code)]
#[derive(Debug, Default, Copy, Clone)]
struct UblkCtrlCmdData {
cmd_op: u32,
flags: u32,
data: u64,
dev_path_len: u16,
pad: u16,
reserved: u32,
_pad: u16,
_reserved: u32,

addr: u64,
len: u32,
Expand Down Expand Up @@ -1425,7 +1424,7 @@ impl UblkCtrl {
/// io handler, such as setup async/await for handling io command.
pub fn run_target<T, Q, W>(&self, tgt_fn: T, q_fn: Q, device_fn: W) -> Result<i32, UblkError>
where
T: FnOnce(&mut UblkDev) -> Result<i32, UblkError>,
T: FnOnce(&mut UblkDev) -> Result<(), UblkError>,
Q: FnOnce(u16, &UblkDev) + Send + Sync + Clone + 'static,
W: FnOnce(&UblkCtrl) + Send + Sync + 'static,
{
Expand Down Expand Up @@ -1539,7 +1538,7 @@ mod tests {
let tgt_init = |dev: &mut UblkDev| {
dev.set_default_params(250_u64 << 30);
dev.set_target_json(serde_json::json!({"null": "test_data" }));
Ok(0)
Ok(())
};
let dev = UblkDev::new(ctrl.get_name(), tgt_init, &ctrl).unwrap();

Expand All @@ -1565,7 +1564,7 @@ mod tests {
let tgt_init = |dev: &mut UblkDev| {
dev.set_default_params(250_u64 << 30);
dev.set_target_json(serde_json::json!({"null": "test_data" }));
Ok(0)
Ok(())
};
let q_fn = move |qid: u16, dev: &UblkDev| {
let bufs_rc = Rc::new(dev.alloc_queue_io_bufs());
Expand Down
105 changes: 95 additions & 10 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl UblkDev {
/// structure which implements UblkTgtImpl.
pub fn new<F>(tgt_name: String, ops: F, ctrl: &UblkCtrl) -> Result<UblkDev, UblkError>
where
F: FnOnce(&mut UblkDev) -> Result<i32, UblkError>,
F: FnOnce(&mut UblkDev) -> Result<(), UblkError>,
{
let info = ctrl.dev_info();
let mut tgt = UblkTgt {
Expand Down Expand Up @@ -418,7 +418,6 @@ impl UblkQueueState {
///
/// So far, each queue is handled by one its own io_uring.
///
#[allow(dead_code)]
pub struct UblkQueue<'a> {
flags: UblkFlags,
q_id: u16,
Expand All @@ -429,9 +428,10 @@ pub struct UblkQueue<'a> {
bufs: RefCell<Vec<*mut u8>>,
state: RefCell<UblkQueueState>,

/// uring is shared for handling target IO, so has to be
/// public
pub q_ring: RefCell<IoUring<squeue::Entry>>,
// call uring_op() and uring_op_mut() for manipulating
// q_ring, and in future it is likely to change to
// thread_local variable
q_ring: RefCell<IoUring<squeue::Entry>>,
}

impl AsRawFd for UblkQueue<'_> {
Expand Down Expand Up @@ -564,6 +564,26 @@ impl UblkQueue<'_> {
Ok(q)
}

// Manipulate immutable queue uring
pub fn uring_op<R, H>(&self, op_handler: H) -> Result<R, UblkError>
where
H: Fn(&IoUring<squeue::Entry>) -> Result<R, UblkError>,
{
let uring = self.q_ring.borrow();

op_handler(&uring)
}

// Manipulate mutable queue uring
pub fn uring_op_mut<R, H>(&self, op_handler: H) -> Result<R, UblkError>
where
H: Fn(&mut IoUring<squeue::Entry>) -> Result<R, UblkError>,
{
let mut uring = self.q_ring.borrow_mut();

op_handler(&mut uring)
}

/// Return queue depth
///
/// Queue depth decides the max count of inflight io command
Expand Down Expand Up @@ -596,8 +616,7 @@ impl UblkQueue<'_> {
unsafe { &*iod }
}

#[inline(always)]
pub fn get_io_buf_addr(&self, tag: u16) -> *mut u8 {
fn get_io_buf_addr(&self, tag: u16) -> *mut u8 {
self.bufs.borrow()[tag as usize]
}

Expand Down Expand Up @@ -638,7 +657,6 @@ impl UblkQueue<'_> {
}

#[inline(always)]
#[allow(unused_assignments)]
fn __queue_io_cmd(
&self,
r: &mut IoUring<squeue::Entry>,
Expand Down Expand Up @@ -777,6 +795,29 @@ impl UblkQueue<'_> {
f
}

#[inline]
pub fn ublk_submit_sqe_sync(
&self,
sqe: io_uring::squeue::Entry,
user_data: u64,
) -> Result<(), UblkError> {
let sqe = sqe.user_data(user_data);

loop {
let res = unsafe { self.q_ring.borrow_mut().submission().push(&sqe) };

match res {
Ok(_) => break,
Err(_) => {
log::debug!("ublk_submit_sqe: flush and retry");
self.q_ring.borrow().submit_and_wait(0)?;
}
}
}

Ok(())
}

/// Submit all commands for fetching IO
///
/// Only called during queue initialization. After queue is setup,
Expand Down Expand Up @@ -866,7 +907,6 @@ impl UblkQueue<'_> {
}

#[inline(always)]
#[allow(unused_assignments)]
fn handle_cqe<F>(&self, mut ops: F, e: &UblkIOCtx)
where
F: FnMut(&UblkQueue, u16, &UblkIOCtx),
Expand Down Expand Up @@ -1143,7 +1183,7 @@ impl UblkQueue<'_> {
///
/// # Arguments:
///
/// * `exe`: async executor
/// * `wake_handler`: handler for wakeup io tasks pending on this uring
///
/// * `to_wait`: passed to io_uring_enter(), wait until `to_wait` events
/// are available. It won't block in waiting for events if `to_wait` is
Expand Down Expand Up @@ -1181,3 +1221,48 @@ impl UblkQueue<'_> {
}
}
}

#[cfg(test)]
mod tests {
use crate::ctrl::UblkCtrlBuilder;
use crate::io::{UblkDev, UblkQueue};
use crate::{UblkError, UblkFlags};
use io_uring::IoUring;

fn __submit_uring_nop(ring: &mut IoUring<io_uring::squeue::Entry>) -> Result<usize, UblkError> {
let nop_e = io_uring::opcode::Nop::new().build().user_data(0x42).into();

unsafe {
let mut queue = ring.submission();
queue.push(&nop_e).expect("queue is full");
}

ring.submit_and_wait(1).map_err(UblkError::IOError)
}

#[test]
fn test_queue_uring_op() {
let ctrl = UblkCtrlBuilder::default()
.dev_flags(UblkFlags::UBLK_DEV_F_ADD_DEV)
.build()
.unwrap();

let tgt_init = |dev: &mut _| {
let q = UblkQueue::new(0, dev)?;

q.uring_op(|ring: &_| {
ring.submitter().unregister_files()?;
ring.submitter()
.register_files(&dev.tgt.fds)
.map_err(UblkError::IOError)
})?;
q.uring_op_mut(|ring: &mut _| -> Result<usize, UblkError> {
__submit_uring_nop(ring)
})?;

Ok(())
};

UblkDev::new(ctrl.get_name(), tgt_init, &ctrl).unwrap();
}
}
15 changes: 13 additions & 2 deletions src/uring_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ impl Future for UblkUringOpFuture {
}
}

//// Wakeup the pending future/task
/// Wakeup the pending task, which will be marked as runnable
/// by smol, and the task's future poll() will be run by smol
/// executor's try_tick()
#[inline]
pub fn ublk_wake_task(data: u64, cqe: &cqueue::Entry) {
MY_SLAB.with(|refcell| {
Expand Down Expand Up @@ -124,6 +126,7 @@ pub fn ublk_run_task<T, F>(
where
F: Fn(&smol::LocalExecutor) -> Result<(), UblkError>,
{
// make sure the spawned task is started by `try_tick()`
while exe.try_tick() {}
while !task.is_finished() {
handler(exe)?;
Expand All @@ -146,7 +149,15 @@ pub fn ublk_run_io_task<T>(
ublk_run_task(exe, task, handler)
}

/// Run one task in this local Executor until the task is finished
/// Run one control task in this local Executor until the task is finished,
/// control task is queued in the thread_local io_uring CTRL_URING.
///
/// The current queue is passed in because some control command depends on
/// IO command, such as START command, so ublk_run_ctrl_task() has to drive
/// both data and control urings.
///
/// Rust isn't friendly for using native poll or epoll, so use one dedicated
/// uring for polling data and control urings.
pub fn ublk_run_ctrl_task<T>(
exe: &smol::LocalExecutor,
q: &UblkQueue,
Expand Down
6 changes: 3 additions & 3 deletions tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ mod integration {
.unwrap();
let tgt_init = |dev: &mut UblkDev| {
dev.set_default_params(250_u64 << 30);
Ok(0)
Ok(())
};

let q_fn = move |qid: u16, _dev: &UblkDev| {
Expand Down Expand Up @@ -172,7 +172,7 @@ mod integration {

let tgt_init = |dev: &mut UblkDev| {
dev.set_default_params(250_u64 << 30);
Ok(0)
Ok(())
};
// device data is shared among all queue contexts
let dev_data = Arc::new(Mutex::new(DevData { done: 0 }));
Expand Down Expand Up @@ -310,7 +310,7 @@ mod integration {
.unwrap();
let tgt_init = |dev: &mut UblkDev| {
dev.set_default_params(size);
Ok(0)
Ok(())
};

let q_fn = move |qid: u16, dev: &UblkDev| {
Expand Down

0 comments on commit 202d278

Please sign in to comment.