Skip to content

Commit

Permalink
examples/loop: handle io command by async/await
Browse files Browse the repository at this point in the history
async/await can handling io handle in async style, meantime the
written Rust code can be just like sync programming.

This way is very helpful for writing complicated ublk target.

Signed-off-by: Ming Lei <[email protected]>
  • Loading branch information
ming1 committed Oct 14, 2023
1 parent cb5423f commit 8c41501
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ driver, and it just takes async `q_handler`, such as:
}
```

* [`examples/loop.rs`](examples/loop.rs): the whole example using async/await


## Test

You can run the test of the library with ```cargo test```
Expand Down
51 changes: 24 additions & 27 deletions examples/loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use anyhow::Result;
use io_uring::{opcode, squeue, types};
use libublk::dev_flags::*;
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue};
use libublk::{ctrl::UblkCtrl, UblkError, UblkIORes};
use libublk::{ctrl::UblkCtrl, exe::Executor, exe::UringOpFuture, UblkError, UblkIORes};
use log::trace;
use serde::Serialize;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;

#[derive(Debug, Serialize)]
struct LoJson {
Expand Down Expand Up @@ -56,15 +57,14 @@ fn lo_init_tgt(dev: &mut UblkDev, lo: &LoopTgt) -> Result<serde_json::Value, Ubl
)
}

fn loop_queue_tgt_io(q: &UblkQueue, tag: u16, _io: &UblkIOCtx) {
async fn lo_handle_io_cmd(q: &UblkQueue<'_>, tag: u16, _data: u64) {
// either start to handle or retry
let _iod = q.get_iod(tag);
let iod = unsafe { &*_iod };

let off = (iod.start_sector << 9) as u64;
let bytes = (iod.nr_sectors << 9) as u32;
let op = iod.op_flags & 0xff;
let data = UblkIOCtx::build_user_data(tag as u16, op, 0, true);
let data = UblkIOCtx::build_user_data_async(tag as u16, op, 0);
let buf_addr = q.get_io_buf_addr(tag);

if op == libublk::sys::UBLK_IO_OP_WRITE_ZEROES || op == libublk::sys::UBLK_IO_OP_DISCARD {
Expand Down Expand Up @@ -115,26 +115,15 @@ fn loop_queue_tgt_io(q: &UblkQueue, tag: u16, _io: &UblkIOCtx) {
.expect("submission fail");
}
}
_ => q.complete_io_cmd(tag, Err(UblkError::OtherError(-libc::EINVAL))),
};
}

fn _lo_handle_io(q: &UblkQueue, tag: u16, i: &UblkIOCtx) {
// our IO on backing file is done
if i.is_tgt_io() {
let user_data = i.user_data();
let res = i.result();
let cqe_tag = UblkIOCtx::user_data_to_tag(user_data);

assert!(cqe_tag == tag as u32);

if res != -(libc::EAGAIN) {
q.complete_io_cmd(tag, Ok(UblkIORes::Result(res)));
_ => {
q.complete_io_cmd(tag, Err(UblkError::OtherError(-libc::EINVAL)));
return;
}
}
};

loop_queue_tgt_io(q, tag, i);
// wait until the io_uring IO completed
let res = UringOpFuture { user_data: data }.await;
q.complete_io_cmd(tag, Ok(UblkIORes::Result(res)));
}

fn test_add() {
Expand Down Expand Up @@ -162,12 +151,20 @@ fn test_add() {
let tgt_init = |dev: &mut UblkDev| lo_init_tgt(dev, &lo);
let (mut ctrl, dev) = sess.create_devices(tgt_init).unwrap();
let q_fn = move |qid: u16, _dev: &UblkDev| {
let lo_io_handler =
move |q: &UblkQueue, tag: u16, io: &UblkIOCtx| _lo_handle_io(q, tag, io);

UblkQueue::new(qid, _dev)
.unwrap()
.wait_and_handle_io(lo_io_handler);
let q_rc = Rc::new(UblkQueue::new(qid as u16, &_dev).unwrap());
let exe_rc = Rc::new(Executor::new(_dev.get_nr_ios()));
let q = q_rc.clone();
let exe = exe_rc.clone();

let lo_io_handler = move |_q: &UblkQueue, tag: u16, io: &UblkIOCtx| {
let q = q_rc.clone();
let data = io.user_data();

exe.spawn(tag as u16, async move {
lo_handle_io_cmd(&q, tag, data).await;
});
};
q.wait_and_handle_io_cmd(&exe_rc, lo_io_handler);
};

sess.run_target(&mut ctrl, &dev, q_fn, |dev_id| {
Expand Down

0 comments on commit 8c41501

Please sign in to comment.