Skip to content

Commit

Permalink
libublk: io: add UblkQueueState and use it via RefCell
Browse files Browse the repository at this point in the history
Signed-off-by: Ming Lei <[email protected]>
  • Loading branch information
ming1 committed Oct 6, 2023
1 parent b203090 commit 05aed9b
Showing 1 changed file with 88 additions and 35 deletions.
123 changes: 88 additions & 35 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::{ctrl::UblkCtrl, sys, UblkError, UblkIORes};
use io_uring::{cqueue, opcode, squeue, types, IoUring};
use log::{error, info, trace};
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::fs;
use std::os::unix::io::AsRawFd;

Expand Down Expand Up @@ -357,6 +358,59 @@ impl UblkQueueCtx {
const UBLK_QUEUE_STOPPING: u32 = 1_u32 << 0;
const UBLK_QUEUE_IDLE: u32 = 1_u32 << 1;

#[derive(Debug, Clone, Default)]
pub struct UblkQueueState {
cmd_inflight: u32,
state: u32,
}

impl UblkQueueState {
pub fn new() -> Self {
Self {
..Default::default()
}
}

#[inline(always)]
fn queue_is_quiesced(&self) -> bool {
self.cmd_inflight == 0
}

#[inline(always)]
fn queue_is_done(&self) -> bool {
self.is_stopping() && self.queue_is_quiesced()
}

#[inline(always)]
fn get_nr_cmd_inflight(&self) -> u32 {
self.cmd_inflight
}

#[inline(always)]
fn is_stopping(&self) -> bool {
(self.state & UBLK_QUEUE_STOPPING) != 0
}

#[inline(always)]
fn is_idle(&self) -> bool {
(self.state & UBLK_QUEUE_IDLE) != 0
}

#[inline(always)]
fn inc_cmd_inflight(&mut self) {
self.cmd_inflight += 1;
}

#[inline(always)]
fn dec_cmd_inflight(&mut self) {
self.cmd_inflight -= 1;
}

fn mark_stopping(&mut self) {
self.state |= UBLK_QUEUE_STOPPING;
}
}

/// UBLK queue abstraction
///
/// UblkQueue is the core part of the whole stack, which communicates with
Expand All @@ -377,9 +431,8 @@ pub struct UblkQueue<'a> {
io_cmd_buf: u64,
//ops: Box<dyn UblkQueueImpl>,
pub dev: &'a UblkDev,
cmd_inflight: u32,
q_state: u32,
bufs: Vec<*mut u8>,
state: RefCell<UblkQueueState>,
q_ring: IoUring<squeue::Entry>,
}

Expand Down Expand Up @@ -512,8 +565,10 @@ impl UblkQueue<'_> {
q_depth: depth,
io_cmd_buf: io_cmd_buf as u64,
dev,
cmd_inflight: 0,
q_state: 0,
state: RefCell::new(UblkQueueState {
cmd_inflight: 0,
state: 0,
}),
q_ring: ring,
bufs,
};
Expand All @@ -538,7 +593,8 @@ impl UblkQueue<'_> {
#[inline(always)]
#[allow(unused_assignments)]
fn queue_io_cmd(&mut self, tag: u16, cmd_op: u32, buf_addr: u64, res: i32) -> i32 {
if (self.q_state & UBLK_QUEUE_STOPPING) != 0 {
let mut state = self.state.borrow_mut();
if state.is_stopping() {
return 0;
}

Expand All @@ -562,14 +618,15 @@ impl UblkQueue<'_> {
.expect("submission fail");
}

self.cmd_inflight += 1;
state.inc_cmd_inflight();

trace!(
"{}: (qid {} tag {} cmd_op {}) stopping {}",
"queue_io_cmd",
self.q_id,
tag,
cmd_op,
(self.q_state & UBLK_QUEUE_STOPPING) != 0
state.is_stopping(),
);

1
Expand Down Expand Up @@ -597,16 +654,6 @@ impl UblkQueue<'_> {
}
}

#[inline(always)]
fn queue_is_idle(&self) -> bool {
self.cmd_inflight == 0
}

#[inline(always)]
fn queue_is_done(&self) -> bool {
(self.q_state & UBLK_QUEUE_STOPPING) != 0 && self.queue_is_idle()
}

#[inline(always)]
fn complete_ios(&mut self, tag: usize, res: Result<UblkIORes, UblkError>) {
match res {
Expand Down Expand Up @@ -657,16 +704,18 @@ impl UblkQueue<'_> {
let tag = UblkIOCtx::user_data_to_tag(data);
let cmd_op = UblkIOCtx::user_data_to_op(data);

trace!(
"{}: res {} (qid {} tag {} cmd_op {} target {}) state {}",
"handle_cqe",
res,
self.q_id,
tag,
cmd_op,
is_target_io(data),
self.q_state,
);
{
trace!(
"{}: res {} (qid {} tag {} cmd_op {} target {}) state {:?}",
"handle_cqe",
res,
self.q_id,
tag,
cmd_op,
is_target_io(data),
self.state.borrow(),
);
}

if is_target_io(data) {
let res = e.result();
Expand All @@ -686,10 +735,13 @@ impl UblkQueue<'_> {
return;
}

self.cmd_inflight -= 1;
{
let mut state = self.state.borrow_mut();
state.dec_cmd_inflight();

if res == sys::UBLK_IO_RES_ABORT {
self.q_state |= UBLK_QUEUE_STOPPING;
if res == sys::UBLK_IO_RES_ABORT {
state.mark_stopping();
}
}

if res == sys::UBLK_IO_RES_OK as i32 {
Expand Down Expand Up @@ -724,13 +776,14 @@ impl UblkQueue<'_> {

#[inline(always)]
fn wait_ios(&mut self, to_wait: usize) -> Result<i32, UblkError> {
let state = self.state.borrow();
info!(
"dev{}-q{}: to_submit {} inflight cmd {} stopping {}",
self.dev.dev_info.dev_id,
self.q_id,
0,
self.cmd_inflight,
(self.q_state & UBLK_QUEUE_STOPPING)
state.get_nr_cmd_inflight(),
state.is_stopping(),
);

let ret = self
Expand All @@ -743,8 +796,8 @@ impl UblkQueue<'_> {
"submit result {}, nr_cqes {} stop {} idle {}",
ret,
nr_cqes,
(self.q_state & UBLK_QUEUE_STOPPING),
(self.q_state & UBLK_QUEUE_IDLE)
state.is_stopping(),
state.is_idle(),
);
Ok(nr_cqes)
}
Expand Down Expand Up @@ -803,7 +856,7 @@ impl UblkQueue<'_> {
where
F: FnMut(&mut UblkIOCtx) -> Result<UblkIORes, UblkError>,
{
if self.queue_is_done() && self.q_ring.submission().is_empty() {
if self.state.borrow().queue_is_done() && self.q_ring.submission().is_empty() {
return Err(UblkError::QueueIsDown(-libc::ENODEV));
}

Expand Down

0 comments on commit 05aed9b

Please sign in to comment.