diff --git a/src/io.rs b/src/io.rs index 2f7341f..4ef900b 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,6 +1,6 @@ #[cfg(feature = "fat_complete")] use super::UblkFatRes; -use super::{ctrl::UblkCtrl, sys, UblkError, UblkIORes}; +use super::{ctrl::UblkCtrl, exe::Executor, sys, UblkError, UblkIORes}; use io_uring::{cqueue, opcode, squeue, types, IoUring}; use log::{error, info, trace}; use serde::{Deserialize, Serialize}; @@ -123,6 +123,21 @@ impl<'a> UblkIOCtx<'a> { tag as u64 | (op << 16) as u64 | (tgt_data << 24) as u64 | ((is_target_io as u64) << 63) } + /// Build userdata for async io_uring OP + /// + /// # Arguments: + /// * `tag`: io tag, length is 16bit + /// * `op`: io operation code, length is 8bit + /// * `op_id`: unique id in io task + /// + /// The built userdata has to be unique in this io task, so that + /// our executor can figure out the exact submitted OP with + /// completed cqe + #[inline(always)] + pub fn build_user_data_async(tag: u16, op: u32, op_id: u32) -> u64 { + Self::build_user_data(tag, op, op_id, true) + } + /// Extract tag from userdata #[inline(always)] pub fn user_data_to_tag(user_data: u64) -> u32 { @@ -140,6 +155,13 @@ impl<'a> UblkIOCtx<'a> { fn is_target_io(user_data: u64) -> bool { (user_data & (1_u64 << 63)) != 0 } + + /// Check if this userdata is from IO command which is from + /// ublk driver + #[inline(always)] + fn is_io_command(user_data: u64) -> bool { + (user_data & (1_u64 << 63)) == 0 + } } #[derive(Debug, Clone, Default, Serialize, Deserialize)] @@ -267,6 +289,13 @@ impl UblkDev { ..Default::default() }; } + + /// Return how many io slots, which is usually same with executor's + /// nr_tasks. + #[inline] + pub fn get_nr_ios(&self) -> u16 { + self.dev_info.queue_depth + self.tgt.extra_ios as u16 + } } impl Drop for UblkDev { @@ -854,6 +883,72 @@ impl UblkQueue<'_> { } } + pub(crate) fn process_io_cmds( + &self, + exe: &Executor, + mut ops: F, + to_wait: usize, + ) -> Result + where + F: FnMut(&UblkQueue, u16, &UblkIOCtx), + { + match self.wait_ios(to_wait) { + Err(r) => Err(r), + Ok(done) => { + for idx in 0..done { + let cqe = { + match self.q_ring.borrow_mut().completion().next() { + None => return Err(UblkError::OtherError(-libc::EINVAL)), + Some(r) => r, + } + }; + + let e = UblkIOCtx( + &cqe, + if idx == 0 { + UblkIOCtx::UBLK_IO_F_FIRST + } else { + 0 + } | if idx + 1 == done { + UblkIOCtx::UBLK_IO_F_LAST + } else { + 0 + }, + ); + + let data = e.user_data(); + let res = e.result(); + let tag = UblkIOCtx::user_data_to_tag(data); + + { + let cmd_op = UblkIOCtx::user_data_to_op(data); + trace!( + "{}: res {} (qid {} tag {} cmd_op {} target {}) state {:?}", + "handle_cqe", + res, + self.q_id, + tag, + cmd_op, + UblkIOCtx::is_target_io(data), + self.state.borrow(), + ); + } + if UblkIOCtx::is_io_command(data) { + self.update_state(e.0); + + if res == sys::UBLK_IO_RES_OK as i32 { + assert!(tag < self.q_depth); + ops(self, tag as u16, &e); + } + } else { + exe.wake_with_uring_cqe(tag as u16, &cqe); + } + } + Ok(0) + } + } + } + /// Wait and handle incoming IO /// /// # Arguments: @@ -874,4 +969,25 @@ impl UblkQueue<'_> { } } } + + /// Wait and handle incoming IO command + /// + /// # Arguments: + /// + /// * `ops`: IO handling closure + /// + /// Called in queue context. won't return unless error is observed. + /// Wait and handle any incoming cqe until queue is down. + /// + pub fn wait_and_handle_io_cmd(&self, exe: &Executor, mut ops: F) + where + F: FnMut(&UblkQueue, u16, &UblkIOCtx), + { + loop { + match self.process_io_cmds(exe, &mut ops, 1) { + Err(_) => break, + _ => continue, + } + } + } } diff --git a/tests/basic.rs b/tests/basic.rs index 8d02736..ca63eab 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -1,12 +1,15 @@ #[cfg(test)] mod integration { + use io_uring::opcode; use libublk::dev_flags::*; + use libublk::exe::{Executor, UringOpFuture}; use libublk::io::{UblkDev, UblkIOCtx, UblkQueue}; use libublk::{ctrl::UblkCtrl, UblkError, UblkIORes}; use libublk::{sys, UblkSessionBuilder}; use std::env; use std::path::Path; use std::process::{Command, Stdio}; + use std::rc::Rc; fn read_ublk_disk(dev_id: i32) { let dev_path = format!("{}{}", libublk::BDEV_PATH, dev_id); @@ -108,6 +111,59 @@ mod integration { ); } + /// make one async ublk-null and test if /dev/ublkbN can be created successfully + #[test] + fn test_ublk_null_async() { + fn null_submit_nop(q: &UblkQueue<'_>, user_data: u64) -> UringOpFuture { + let nop_e = opcode::Nop::new().build().user_data(user_data); + + unsafe { + q.q_ring + .borrow_mut() + .submission() + .push(&nop_e) + .expect("submission fail"); + }; + UringOpFuture { user_data } + } + + async fn null_handle_io_cmd(q: &UblkQueue<'_>, tag: u16, _data: u64) { + let _iod = q.get_iod(tag); + let iod = unsafe { &*_iod }; + let bytes = (iod.nr_sectors << 9) as i32; + let op = iod.op_flags & 0xff; + let data = UblkIOCtx::build_user_data_async(tag, op, 0); + let data2 = UblkIOCtx::build_user_data_async(tag, op, 1); + + //simulate our io command by joining two io_uring nops + let f = null_submit_nop(q, data); + let f2 = null_submit_nop(q, data2); + let (res, res2) = futures::join!(f, f2); + assert!(res == 0 && res2 == 0); + + q.complete_io_cmd(tag, Ok(UblkIORes::Result(bytes))); + } + + fn null_handle_queue_async(qid: u16, _dev: &UblkDev) { + let q_rc = Rc::new(UblkQueue::new(qid as u16, &_dev).unwrap()); + let exe_rc = Rc::new(Executor::new(_dev.get_nr_ios())); + let q = q_rc.clone(); + let exe = exe_rc.clone(); + + let io_handler = move |_q: &UblkQueue, tag: u16, io: &UblkIOCtx| { + let q = q_rc.clone(); + let data = io.user_data(); + + exe.spawn(tag as u16, async move { + null_handle_io_cmd(&q, tag, data).await; + }); + }; + q.wait_and_handle_io_cmd(&exe_rc, io_handler); + } + + __test_ublk_null(UBLK_DEV_F_ADD_DEV, null_handle_queue_async); + } + /// make one ublk-ramdisk and test: /// - if /dev/ublkbN can be created successfully /// - if yes, then test format/mount/umount over this ublk-ramdisk