Skip to content

Commit

Permalink
libublk: io: add UblkQueue::process_io_cmds() and UblkDev::get_nr_ios…
Browse files Browse the repository at this point in the history
…() for async/await

Now it is ready to write target io command handling code by async/await.

Signed-off-by: Ming Lei <[email protected]>
  • Loading branch information
ming1 committed Oct 14, 2023
1 parent ad3e787 commit 09a7264
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 1 deletion.
118 changes: 117 additions & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(feature = "fat_complete")]
use super::UblkFatRes;
use super::{ctrl::UblkCtrl, sys, UblkError, UblkIORes};
use super::{ctrl::UblkCtrl, exe::Executor, sys, UblkError, UblkIORes};
use io_uring::{cqueue, opcode, squeue, types, IoUring};
use log::{error, info, trace};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -123,6 +123,21 @@ impl<'a> UblkIOCtx<'a> {
tag as u64 | (op << 16) as u64 | (tgt_data << 24) as u64 | ((is_target_io as u64) << 63)
}

/// Build userdata for async io_uring OP
///
/// # Arguments:
/// * `tag`: io tag, length is 16bit
/// * `op`: io operation code, length is 8bit
/// * `op_id`: unique id in io task
///
/// The built userdata has to be unique in this io task, so that
/// our executor can figure out the exact submitted OP with
/// completed cqe
#[inline(always)]
pub fn build_user_data_async(tag: u16, op: u32, op_id: u32) -> u64 {
Self::build_user_data(tag, op, op_id, true)
}

/// Extract tag from userdata
#[inline(always)]
pub fn user_data_to_tag(user_data: u64) -> u32 {
Expand All @@ -140,6 +155,13 @@ impl<'a> UblkIOCtx<'a> {
fn is_target_io(user_data: u64) -> bool {
(user_data & (1_u64 << 63)) != 0
}

/// Check if this userdata is from IO command which is from
/// ublk driver
#[inline(always)]
fn is_io_command(user_data: u64) -> bool {
(user_data & (1_u64 << 63)) == 0
}
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -267,6 +289,13 @@ impl UblkDev {
..Default::default()
};
}

/// Return how many io slots, which is usually same with executor's
/// nr_tasks.
#[inline]
pub fn get_nr_ios(&self) -> u16 {
self.dev_info.queue_depth + self.tgt.extra_ios as u16
}
}

impl Drop for UblkDev {
Expand Down Expand Up @@ -854,6 +883,72 @@ impl UblkQueue<'_> {
}
}

pub(crate) fn process_io_cmds<F>(
&self,
exe: &Executor,
mut ops: F,
to_wait: usize,
) -> Result<i32, UblkError>
where
F: FnMut(&UblkQueue, u16, &UblkIOCtx),
{
match self.wait_ios(to_wait) {
Err(r) => Err(r),
Ok(done) => {
for idx in 0..done {
let cqe = {
match self.q_ring.borrow_mut().completion().next() {
None => return Err(UblkError::OtherError(-libc::EINVAL)),
Some(r) => r,
}
};

let e = UblkIOCtx(
&cqe,
if idx == 0 {
UblkIOCtx::UBLK_IO_F_FIRST
} else {
0
} | if idx + 1 == done {
UblkIOCtx::UBLK_IO_F_LAST
} else {
0
},
);

let data = e.user_data();
let res = e.result();
let tag = UblkIOCtx::user_data_to_tag(data);

{
let cmd_op = UblkIOCtx::user_data_to_op(data);
trace!(
"{}: res {} (qid {} tag {} cmd_op {} target {}) state {:?}",
"handle_cqe",
res,
self.q_id,
tag,
cmd_op,
UblkIOCtx::is_target_io(data),
self.state.borrow(),
);
}
if UblkIOCtx::is_io_command(data) {
self.update_state(e.0);

if res == sys::UBLK_IO_RES_OK as i32 {
assert!(tag < self.q_depth);
ops(self, tag as u16, &e);
}
} else {
exe.wake_with_uring_cqe(tag as u16, &cqe);
}
}
Ok(0)
}
}
}

/// Wait and handle incoming IO
///
/// # Arguments:
Expand All @@ -874,4 +969,25 @@ impl UblkQueue<'_> {
}
}
}

/// Wait and handle incoming IO command
///
/// # Arguments:
///
/// * `ops`: IO handling closure
///
/// Called in queue context. won't return unless error is observed.
/// Wait and handle any incoming cqe until queue is down.
///
pub fn wait_and_handle_io_cmd<F>(&self, exe: &Executor, mut ops: F)
where
F: FnMut(&UblkQueue, u16, &UblkIOCtx),
{
loop {
match self.process_io_cmds(exe, &mut ops, 1) {
Err(_) => break,
_ => continue,
}
}
}
}
56 changes: 56 additions & 0 deletions tests/basic.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#[cfg(test)]
mod integration {
use io_uring::opcode;
use libublk::dev_flags::*;
use libublk::exe::{Executor, UringOpFuture};
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue};
use libublk::{ctrl::UblkCtrl, UblkError, UblkIORes};
use libublk::{sys, UblkSessionBuilder};
use std::env;
use std::path::Path;
use std::process::{Command, Stdio};
use std::rc::Rc;

fn read_ublk_disk(dev_id: i32) {
let dev_path = format!("{}{}", libublk::BDEV_PATH, dev_id);
Expand Down Expand Up @@ -108,6 +111,59 @@ mod integration {
);
}

/// make one async ublk-null and test if /dev/ublkbN can be created successfully
#[test]
fn test_ublk_null_async() {
fn null_submit_nop(q: &UblkQueue<'_>, user_data: u64) -> UringOpFuture {
let nop_e = opcode::Nop::new().build().user_data(user_data);

unsafe {
q.q_ring
.borrow_mut()
.submission()
.push(&nop_e)
.expect("submission fail");
};
UringOpFuture { user_data }
}

async fn null_handle_io_cmd(q: &UblkQueue<'_>, tag: u16, _data: u64) {
let _iod = q.get_iod(tag);
let iod = unsafe { &*_iod };
let bytes = (iod.nr_sectors << 9) as i32;
let op = iod.op_flags & 0xff;
let data = UblkIOCtx::build_user_data_async(tag, op, 0);
let data2 = UblkIOCtx::build_user_data_async(tag, op, 1);

//simulate our io command by joining two io_uring nops
let f = null_submit_nop(q, data);
let f2 = null_submit_nop(q, data2);
let (res, res2) = futures::join!(f, f2);
assert!(res == 0 && res2 == 0);

q.complete_io_cmd(tag, Ok(UblkIORes::Result(bytes)));
}

fn null_handle_queue_async(qid: u16, _dev: &UblkDev) {
let q_rc = Rc::new(UblkQueue::new(qid as u16, &_dev).unwrap());
let exe_rc = Rc::new(Executor::new(_dev.get_nr_ios()));
let q = q_rc.clone();
let exe = exe_rc.clone();

let io_handler = move |_q: &UblkQueue, tag: u16, io: &UblkIOCtx| {
let q = q_rc.clone();
let data = io.user_data();

exe.spawn(tag as u16, async move {
null_handle_io_cmd(&q, tag, data).await;
});
};
q.wait_and_handle_io_cmd(&exe_rc, io_handler);
}

__test_ublk_null(UBLK_DEV_F_ADD_DEV, null_handle_queue_async);
}

/// make one ublk-ramdisk and test:
/// - if /dev/ublkbN can be created successfully
/// - if yes, then test format/mount/umount over this ublk-ramdisk
Expand Down

0 comments on commit 09a7264

Please sign in to comment.