Skip to content

Commit

Permalink
Merge pull request #9 from ublk-org/rust-cleanup-error
Browse files Browse the repository at this point in the history
Rust cleanup error
  • Loading branch information
ming1 authored Feb 29, 2024
2 parents 1aa3743 + 6099e1e commit 31b3253
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 434 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ env_logger = "0.9"
smol = "1.3.0"
slab = "0.4.9"
derive_setters = "0.1"
bitflags = "2.4.1"

[dev-dependencies]
block-utils = "0.11.0"
Expand All @@ -59,4 +60,3 @@ ilog = "1.0.1"
async-std = {version = "1.12.0"}
ctrlc = "3.4.0"
daemonize = "0.5"
bitflags = "2.4.1"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn main() {
UblkCtrlBuilder::default()
.name("async_null")
.nr_queues(2)
.dev_flags(libublk::dev_flags::UBLK_DEV_F_ADD_DEV)
.dev_flags(libublk::UblkFlags::UBLK_DEV_F_ADD_DEV)
.build()
.unwrap(),
);
Expand Down
207 changes: 103 additions & 104 deletions examples/loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use bitflags::bitflags;
use clap::{Arg, ArgAction, Command};
use ilog::IntLog;
use io_uring::{opcode, squeue, types};
use libublk::dev_flags::*;
use libublk::helpers::IoBuf;
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue};
use libublk::uring_async::ublk_wait_and_handle_ios;
use libublk::{ctrl::UblkCtrl, sys, UblkError, UblkIORes};
use libublk::{ctrl::UblkCtrl, sys, UblkError, UblkFlags, UblkIORes};
use log::trace;
use serde::Serialize;
use std::os::unix::fs::FileTypeExt;
Expand Down Expand Up @@ -207,17 +206,111 @@ fn lo_handle_io_cmd_sync(q: &UblkQueue<'_>, tag: u16, i: &UblkIOCtx, buf_addr: *
}
}

fn test_add(
fn q_fn(qid: u16, dev: &UblkDev) {
let bufs_rc = Rc::new(dev.alloc_queue_io_bufs());
let bufs = bufs_rc.clone();
let lo_io_handler = move |q: &UblkQueue, tag: u16, io: &UblkIOCtx| {
let bufs = bufs_rc.clone();

lo_handle_io_cmd_sync(q, tag, io, bufs[tag as usize].as_mut_ptr());
};

UblkQueue::new(qid, dev)
.unwrap()
.regiser_io_bufs(Some(&bufs))
.submit_fetch_commands(Some(&bufs))
.wait_and_handle_io(lo_io_handler);
}

fn q_a_fn(qid: u16, dev: &UblkDev, depth: u16) {
let q_rc = Rc::new(UblkQueue::new(qid as u16, &dev).unwrap());
let exe = smol::LocalExecutor::new();
let mut f_vec = Vec::new();

for tag in 0..depth {
let q = q_rc.clone();

f_vec.push(exe.spawn(async move {
let buf = IoBuf::<u8>::new(q.dev.dev_info.max_io_buf_bytes as usize);
let buf_addr = buf.as_mut_ptr();
let mut cmd_op = sys::UBLK_U_IO_FETCH_REQ;
let mut res = 0;

q.register_io_buf(tag, &buf);
loop {
let cmd_res = q.submit_io_cmd(tag, cmd_op, buf_addr, res).await;
if cmd_res == sys::UBLK_IO_RES_ABORT {
break;
}

res = lo_handle_io_cmd_async(&q, tag, buf_addr).await;
cmd_op = sys::UBLK_U_IO_COMMIT_AND_FETCH_REQ;
}
}));
}
ublk_wait_and_handle_ios(&q_rc, &exe);
smol::block_on(async { futures::future::join_all(f_vec).await });
}

fn __loop_add(
id: i32,
nr_queues: u32,
depth: u32,
depth: u16,
buf_sz: u32,
backing_file: &String,
ctrl_flags: u64,
lo_flags: LoFlags,
) {
let aio = lo_flags.intersects(LoFlags::ASYNC);
let oneshot = lo_flags.intersects(LoFlags::ONESHOT);
// LooTgt has to live in the whole device lifetime
let lo = LoopTgt {
back_file: std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&backing_file)
.unwrap(),
direct_io: 1,
back_file_path: backing_file.clone(),
};
let ctrl = libublk::ctrl::UblkCtrlBuilder::default()
.name("example_loop")
.id(id)
.ctrl_flags(ctrl_flags)
.nr_queues(nr_queues.try_into().unwrap())
.depth(depth)
.io_buf_bytes(buf_sz)
.dev_flags(UblkFlags::UBLK_DEV_F_ADD_DEV)
.build()
.unwrap();
let tgt_init = |dev: &mut UblkDev| lo_init_tgt(dev, &lo);
let wh = move |d_ctrl: &UblkCtrl| {
d_ctrl.dump();
if oneshot {
d_ctrl.kill_dev().unwrap();
}
};

if aio {
ctrl.run_target(tgt_init, move |qid, dev: &_| q_a_fn(qid, dev, depth), wh)
.unwrap();
} else {
ctrl.run_target(tgt_init, move |qid, dev: &_| q_fn(qid, dev), wh)
.unwrap();
}
}

fn loop_add(
id: i32,
nr_queues: u32,
depth: u16,
buf_sz: u32,
backing_file: &String,
ctrl_flags: u64,
lo_flags: LoFlags,
) {
if lo_flags.intersects(LoFlags::FOREGROUND) {
__test_add(
__loop_add(
id,
nr_queues,
depth,
Expand All @@ -232,7 +325,7 @@ fn test_add(
.stderr(daemonize::Stdio::keep());

match daemonize.start() {
Ok(_) => __test_add(
Ok(_) => __loop_add(
id,
nr_queues,
depth,
Expand All @@ -246,100 +339,6 @@ fn test_add(
}
}

fn __test_add(
id: i32,
nr_queues: u32,
depth: u32,
buf_sz: u32,
backing_file: &String,
ctrl_flags: u64,
lo_flags: LoFlags,
) {
let aio = lo_flags.intersects(LoFlags::ASYNC);
let oneshot = lo_flags.intersects(LoFlags::ONESHOT);
{
// LooTgt has to live in the whole device lifetime
let lo = LoopTgt {
back_file: std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&backing_file)
.unwrap(),
direct_io: 1,
back_file_path: backing_file.clone(),
};
let ctrl = libublk::ctrl::UblkCtrlBuilder::default()
.name("example_loop")
.id(id)
.ctrl_flags(ctrl_flags)
.nr_queues(nr_queues.try_into().unwrap())
.depth(depth.try_into().unwrap())
.io_buf_bytes(buf_sz)
.dev_flags(UBLK_DEV_F_ADD_DEV)
.build()
.unwrap();

let tgt_init = |dev: &mut UblkDev| lo_init_tgt(dev, &lo);
let q_async_fn = move |qid: u16, dev: &UblkDev| {
let q_rc = Rc::new(UblkQueue::new(qid as u16, &dev).unwrap());
let exe = smol::LocalExecutor::new();
let mut f_vec = Vec::new();

for tag in 0..depth as u16 {
let q = q_rc.clone();

f_vec.push(exe.spawn(async move {
let buf = IoBuf::<u8>::new(q.dev.dev_info.max_io_buf_bytes as usize);
let buf_addr = buf.as_mut_ptr();
let mut cmd_op = sys::UBLK_U_IO_FETCH_REQ;
let mut res = 0;

q.register_io_buf(tag, &buf);
loop {
let cmd_res = q.submit_io_cmd(tag, cmd_op, buf_addr, res).await;
if cmd_res == sys::UBLK_IO_RES_ABORT {
break;
}

res = lo_handle_io_cmd_async(&q, tag, buf_addr).await;
cmd_op = sys::UBLK_U_IO_COMMIT_AND_FETCH_REQ;
}
}));
}
ublk_wait_and_handle_ios(&q_rc, &exe);
smol::block_on(async { futures::future::join_all(f_vec).await });
};

let q_sync_fn = move |qid: u16, dev: &UblkDev| {
let bufs_rc = Rc::new(dev.alloc_queue_io_bufs());
let bufs = bufs_rc.clone();
let lo_io_handler = move |q: &UblkQueue, tag: u16, io: &UblkIOCtx| {
let bufs = bufs_rc.clone();

lo_handle_io_cmd_sync(q, tag, io, bufs[tag as usize].as_mut_ptr());
};

UblkQueue::new(qid, dev)
.unwrap()
.regiser_io_bufs(Some(&bufs))
.submit_fetch_commands(Some(&bufs))
.wait_and_handle_io(lo_io_handler);
};

let wh = move |d_ctrl: &UblkCtrl| {
d_ctrl.dump();
if oneshot {
d_ctrl.kill_dev().unwrap();
}
};
if aio {
ctrl.run_target(tgt_init, q_async_fn, wh).unwrap();
} else {
ctrl.run_target(tgt_init, q_sync_fn, wh).unwrap();
}
}
}

fn main() {
let matches = Command::new("ublk-loop-example")
.subcommand_required(true)
Expand Down Expand Up @@ -467,10 +466,10 @@ fn main() {
} else {
0
};
test_add(
loop_add(
id,
nr_queues,
depth,
depth.try_into().unwrap(),
buf_size,
backing_file,
ctrl_flags,
Expand All @@ -483,10 +482,10 @@ fn main() {
.unwrap()
.parse::<i32>()
.unwrap_or(-1);
UblkCtrl::new_simple(id, 0).unwrap().del_dev().unwrap();
UblkCtrl::new_simple(id).unwrap().del_dev().unwrap();
}
Some(("list", _add_matches)) => UblkCtrl::for_each_dev_id(|dev_id| {
UblkCtrl::new_simple(dev_id as i32, 0).unwrap().dump();
UblkCtrl::new_simple(dev_id as i32).unwrap().dump();
}),
_ => {
println!("unsupported command");
Expand Down
Loading

0 comments on commit 31b3253

Please sign in to comment.