diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index d1d49b63b..5df3e84c9 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -118,8 +118,11 @@ pub(crate) struct DownstairsClient { /// this handle should never be dropped before that point. client_task: ClientTaskHandle, - /// IO state counters - io_state_count: ClientIOStateCount, + /// Number of jobs in each IO state + io_state_job_count: ClientIOStateCount, + + /// Number of bytes associated with each IO state + io_state_byte_count: ClientIOStateCount, /// Jobs, write bytes, and total IO bytes in this client's queue /// @@ -229,7 +232,8 @@ impl DownstairsClient { skipped_jobs: BTreeSet::new(), region_metadata: None, repair_info: None, - io_state_count: ClientIOStateCount::default(), + io_state_job_count: ClientIOStateCount::default(), + io_state_byte_count: ClientIOStateCount::default(), backpressure_counters: BackpressureCounters::new(), connection_id: ConnectionId(0), client_delay_us, @@ -268,7 +272,8 @@ impl DownstairsClient { skipped_jobs: BTreeSet::new(), region_metadata: None, repair_info: None, - io_state_count: ClientIOStateCount::default(), + io_state_job_count: ClientIOStateCount::default(), + io_state_byte_count: ClientIOStateCount::default(), backpressure_counters: BackpressureCounters::new(), connection_id: ConnectionId(0), client_delay_us, @@ -339,17 +344,19 @@ impl DownstairsClient { } } - /// Sets a job state, handling `io_state_count` counters + /// Sets a job state, handling `io_state/byte_count` counters fn set_job_state( &mut self, job: &mut DownstairsIO, new_state: IOState, ) -> IOState { let is_running = matches!(new_state, IOState::InProgress); - self.io_state_count[&new_state] += 1; + self.io_state_job_count[&new_state] += 1; + self.io_state_byte_count[&new_state] += job.work.job_bytes(); let old_state = job.state.insert(self.client_id, new_state); let was_running = matches!(old_state, IOState::InProgress); - self.io_state_count[&old_state] -= 1; + self.io_state_job_count[&old_state] -= 1; + self.io_state_byte_count[&old_state] -= job.work.job_bytes(); // Update our bytes-in-flight counter if was_running && !is_running { @@ -372,14 +379,22 @@ impl DownstairsClient { old_state } - /// Retire a job state, handling `io_state_count` counters + /// Retire a job state, handling `io_state/byte_count` counters pub(crate) fn retire_job(&mut self, job: &DownstairsIO) { - self.io_state_count[&job.state[self.client_id]] -= 1; + let state = &job.state[self.client_id]; + self.io_state_job_count[state] -= 1; + self.io_state_byte_count[state] -= job.work.job_bytes(); + } + + /// Returns the number of jobs in each IO state + pub(crate) fn io_state_job_count(&self) -> ClientIOStateCount { + self.io_state_job_count } - /// Returns the current IO state counters - pub(crate) fn io_state_count(&self) -> ClientIOStateCount { - self.io_state_count + /// Returns the number of bytes associated with each IO state + #[allow(unused)] // XXX this will be used in the future! + pub(crate) fn io_state_byte_count(&self) -> ClientIOStateCount { + self.io_state_byte_count } /// Returns a client-specialized copy of the job's `IOop` @@ -889,7 +904,7 @@ impl DownstairsClient { /// Returns `true` if it should be sent and `false` otherwise /// /// If the job should be skipped, then it is added to `self.skipped_jobs`. - /// `self.io_state_count` is updated with the incoming job state. + /// `self.io_state_job_count` is updated with the incoming job state. #[must_use] pub(crate) fn enqueue( &mut self, @@ -912,11 +927,13 @@ impl DownstairsClient { } // Update our state counters based on the job state - self.io_state_count[if should_send { + let state = if should_send { &IOState::InProgress } else { &IOState::Skipped - }] += 1; + }; + self.io_state_job_count[&state] += 1; + self.io_state_byte_count[&state] += io.job_bytes(); should_send } @@ -2262,7 +2279,8 @@ impl DownstairsClient { } pub(crate) fn total_live_work(&self) -> usize { - (self.io_state_count.new + self.io_state_count.in_progress) as usize + (self.io_state_job_count.new + self.io_state_job_count.in_progress) + as usize } pub(crate) fn total_bytes_outstanding(&self) -> usize { diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 733378830..5716f9c95 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -2802,7 +2802,7 @@ impl Downstairs { } pub fn io_state_count(&self) -> IOStateCount { - let d = self.collect_stats(|c| c.io_state_count()); + let d = self.collect_stats(|c| c.io_state_job_count()); let f = |g: fn(ClientIOStateCount) -> u32| { ClientData([g(d[0]), g(d[1]), g(d[2])]) }; diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index ac1c675b9..d41745203 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -991,20 +991,7 @@ impl DownstairsIO { * We don't consider repair IOs in the size calculation. */ pub fn io_size(&self) -> usize { - match &self.work { - IOop::Write { data, .. } | IOop::WriteUnwritten { data, .. } => { - data.len() - } - IOop::Read { - count, block_size, .. - } => (*count * *block_size) as usize, - IOop::Flush { .. } - | IOop::Barrier { .. } - | IOop::ExtentFlushClose { .. } - | IOop::ExtentLiveRepair { .. } - | IOop::ExtentLiveReopen { .. } - | IOop::ExtentLiveNoOp { .. } => 0, - } + self.work.job_bytes() as usize } /* @@ -1432,8 +1419,8 @@ pub struct ClientIOStateCount { pub error: T, } -impl std::ops::Index<&IOState> for ClientIOStateCount { - type Output = u32; +impl std::ops::Index<&IOState> for ClientIOStateCount { + type Output = T; fn index(&self, index: &IOState) -> &Self::Output { match index { IOState::InProgress => &self.in_progress, @@ -1444,7 +1431,7 @@ impl std::ops::Index<&IOState> for ClientIOStateCount { } } -impl std::ops::IndexMut<&IOState> for ClientIOStateCount { +impl std::ops::IndexMut<&IOState> for ClientIOStateCount { fn index_mut(&mut self, index: &IOState) -> &mut Self::Output { match index { IOState::InProgress => &mut self.in_progress,