diff --git a/README.md b/README.md index 167407b..57547d7 100644 --- a/README.md +++ b/README.md @@ -116,6 +116,9 @@ see [`test_ublk_null_async():tests/basic.rs`](tests/basic.rs) Queue wide data is per-thread and can be shared in io handler by Rc() & RefCell(). + * [`examples/loop.rs`](examples/loop.rs): the whole example using async/await + + ## Test You can run the test of the library with ```cargo test``` diff --git a/examples/loop.rs b/examples/loop.rs index b8e25b3..c6759d2 100644 --- a/examples/loop.rs +++ b/examples/loop.rs @@ -2,10 +2,11 @@ use anyhow::Result; use io_uring::{opcode, squeue, types}; use libublk::dev_flags::*; use libublk::io::{UblkDev, UblkIOCtx, UblkQueue}; -use libublk::{ctrl::UblkCtrl, UblkError, UblkIORes}; +use libublk::{ctrl::UblkCtrl, exe::Executor, exe::UringOpFuture, UblkError, UblkIORes}; use log::trace; use serde::Serialize; use std::os::unix::io::AsRawFd; +use std::rc::Rc; #[derive(Debug, Serialize)] struct LoJson { @@ -56,15 +57,14 @@ fn lo_init_tgt(dev: &mut UblkDev, lo: &LoopTgt) -> Result, tag: u16, _data: u64) { // either start to handle or retry let _iod = q.get_iod(tag); let iod = unsafe { &*_iod }; - let off = (iod.start_sector << 9) as u64; let bytes = (iod.nr_sectors << 9) as u32; let op = iod.op_flags & 0xff; - let data = UblkIOCtx::build_user_data(tag as u16, op, 0, true); + let data = UblkIOCtx::build_user_data_async(tag as u16, op, 0); let buf_addr = q.get_io_buf_addr(tag); if op == libublk::sys::UBLK_IO_OP_WRITE_ZEROES || op == libublk::sys::UBLK_IO_OP_DISCARD { @@ -115,30 +115,19 @@ fn loop_queue_tgt_io(q: &UblkQueue, tag: u16, _io: &UblkIOCtx) { .expect("submission fail"); } } - _ => q.complete_io_cmd(tag, Err(UblkError::OtherError(-libc::EINVAL))), - }; -} - -fn _lo_handle_io(q: &UblkQueue, tag: u16, i: &UblkIOCtx) { - // our IO on backing file is done - if i.is_tgt_io() { - let user_data = i.user_data(); - let res = i.result(); - let cqe_tag = UblkIOCtx::user_data_to_tag(user_data); - - assert!(cqe_tag == tag as u32); - - if res != -(libc::EAGAIN) { - q.complete_io_cmd(tag, Ok(UblkIORes::Result(res))); + _ => { + q.complete_io_cmd(tag, Err(UblkError::OtherError(-libc::EINVAL))); return; } - } + }; - loop_queue_tgt_io(q, tag, i); + // wait until the io_uring IO completed + let res = UringOpFuture { user_data: data }.await; + q.complete_io_cmd(tag, Ok(UblkIORes::Result(res))); } fn test_add() { - let back_file = std::env::args().nth(2).unwrap(); + let back_file = std::env::args().nth(3).unwrap(); let _pid = unsafe { libc::fork() }; if _pid == 0 { @@ -162,12 +151,20 @@ fn test_add() { let tgt_init = |dev: &mut UblkDev| lo_init_tgt(dev, &lo); let (mut ctrl, dev) = sess.create_devices(tgt_init).unwrap(); let q_fn = move |qid: u16, _dev: &UblkDev| { - let lo_io_handler = - move |q: &UblkQueue, tag: u16, io: &UblkIOCtx| _lo_handle_io(q, tag, io); - - UblkQueue::new(qid, _dev) - .unwrap() - .wait_and_handle_io(lo_io_handler); + let q_rc = Rc::new(UblkQueue::new(qid as u16, &_dev).unwrap()); + let exe_rc = Rc::new(Executor::new(_dev.get_nr_ios())); + let q = q_rc.clone(); + let exe = exe_rc.clone(); + + let lo_io_handler = move |_q: &UblkQueue, tag: u16, io: &UblkIOCtx| { + let q = q_rc.clone(); + let data = io.user_data(); + + exe.spawn(tag as u16, async move { + lo_handle_io_cmd(&q, tag, data).await; + }); + }; + q.wait_and_handle_io_cmd(&exe_rc, lo_io_handler); }; sess.run_target(&mut ctrl, &dev, q_fn, |dev_id| {