diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 36f432266..af7f00e65 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -495,10 +495,13 @@ impl DownstairsClient { } /// When the downstairs is marked as missing, handle its state transition - pub(crate) fn on_missing(&mut self) { + pub(crate) fn on_missing(&mut self, can_replay: bool) { let current = &self.state; let new_state = match current { - DsState::Active | DsState::Offline => DsState::Offline, + DsState::Active | DsState::Offline if can_replay => { + DsState::Offline + } + DsState::Active | DsState::Offline => DsState::Faulted, DsState::Faulted | DsState::LiveRepair @@ -837,8 +840,12 @@ impl DownstairsClient { reason: ClientStopReason, ) { let new_state = match self.state { - DsState::Active => DsState::Offline, - DsState::Offline => DsState::Offline, + DsState::Active | DsState::Offline + if matches!(reason, ClientStopReason::IneligibleForReplay) => + { + DsState::Faulted + } + DsState::Active | DsState::Offline => DsState::Offline, DsState::Migrating => DsState::Faulted, DsState::Faulted => DsState::Faulted, DsState::Deactivated => DsState::New, @@ -2405,6 +2412,9 @@ pub(crate) enum ClientStopReason { /// The upstairs has requested that we deactivate when we were offline OfflineDeactivated, + + /// The Upstairs has dropped jobs that would be needed for replay + IneligibleForReplay, } /// Response received from the I/O task diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index ba4eb592d..1c5777184 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -82,6 +82,12 @@ pub(crate) struct Downstairs { /// (including automatic flushes). next_flush: u64, + /// Indicates whether we are eligible for replay + /// + /// We are only eligible for replay if all jobs since the last flush are + /// buffered (i.e. none have been retired by a `Barrier` operation). + can_replay: bool, + /// Ringbuf of completed downstairs job IDs. completed: AllocRingBuffer, @@ -278,6 +284,7 @@ impl Downstairs { }, cfg, next_flush: 0, + can_replay: true, ds_active: ActiveJobs::new(), completed: AllocRingBuffer::new(2048), completed_jobs: AllocRingBuffer::new(8), @@ -499,7 +506,7 @@ impl Downstairs { // If the connection goes down here, we need to know what state we were // in to decide what state to transition to. The on_missing method will // do that for us! - self.clients[client_id].on_missing(); + self.clients[client_id].on_missing(self.can_replay); // If the IO task stops on its own, then under certain circumstances, // we want to skip all of its jobs. (If we requested that the IO task @@ -2442,11 +2449,17 @@ impl Downstairs { Ok(ReplaceResult::Started) } + /// Checks whether the given client state should go from Offline -> Faulted + /// + /// # Panics + /// If the given client is not in the `Offline` state pub(crate) fn check_gone_too_long( &mut self, client_id: ClientId, up_state: &UpstairsState, ) { + assert_eq!(self.clients[client_id].state(), DsState::Offline); + let byte_count = self.clients[client_id].total_bytes_outstanding(); let work_count = self.clients[client_id].total_live_work(); let failed = if work_count > crate::IO_OUTSTANDING_MAX_JOBS { @@ -2461,6 +2474,13 @@ impl Downstairs { "downstairs failed, too many outstanding bytes {byte_count}" ); Some(ClientStopReason::TooManyOutstandingBytes) + } else if !self.can_replay { + // XXX can this actually happen? + warn!( + self.log, + "downstairs became ineligible for replay while offline" + ); + Some(ClientStopReason::IneligibleForReplay) } else { None }; @@ -2592,9 +2612,12 @@ impl Downstairs { /// writes and if they aren't included in replay then the write will /// never start. fn retire_check(&mut self, ds_id: JobId) { - if !self.is_flush(ds_id) { - return; - } + let job = self.ds_active.get(&ds_id).expect("checked missing job"); + let can_replay = match job.work { + IOop::Flush { .. } => true, + IOop::Barrier { .. } => false, + _ => return, + }; // Only a completed flush will remove jobs from the active queue - // currently we have to keep everything around for use during replay @@ -2670,6 +2693,9 @@ impl Downstairs { for cid in ClientId::iter() { self.clients[cid].skipped_jobs.retain(|&x| x >= ds_id); } + + // Update the flag indicating whether replay is allowed + self.can_replay = can_replay; } } @@ -4174,6 +4200,13 @@ impl Downstairs { self.ddef = Some(ddef); } + /// Checks whether there are any in-progress jobs present + pub(crate) fn has_live_jobs(&self) -> bool { + self.clients + .iter() + .any(|c| c.backpressure_counters.get_jobs() > 0) + } + /// Returns the per-client state for the given job /// /// This is a helper function to make unit tests shorter diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index a7d382e69..c9686ed4e 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -263,23 +263,26 @@ impl DownstairsHandle { /// # Panics /// If a non-flush message arrives pub async fn ack_flush(&mut self) -> u64 { - let Message::Flush { - job_id, - flush_number, - upstairs_id, - .. - } = self.recv().await.unwrap() - else { - panic!("saw non flush!"); - }; - self.send(Message::FlushAck { - upstairs_id, - session_id: self.upstairs_session_id.unwrap(), - job_id, - result: Ok(()), - }) - .unwrap(); - flush_number + match self.recv().await.unwrap() { + Message::Flush { + job_id, + flush_number, + upstairs_id, + .. + } => { + self.send(Message::FlushAck { + upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id, + result: Ok(()), + }) + .unwrap(); + flush_number + } + m => { + panic!("saw non flush {m:?}"); + } + } } /// Awaits a `Message::Write { .. }` and sends a `WriteAck` @@ -302,6 +305,42 @@ impl DownstairsHandle { header.job_id } + /// Awaits and acks a `Message::Write { .. }` or `Message::Barrier { .. }` + /// + /// Returns the job ID for further checks. + /// + /// # Panics + /// If a non-write message arrives + pub async fn ack_write_or_barrier(&mut self) -> bool { + let (r, was_barrier) = match self.recv().await.unwrap() { + Message::Write { header, .. } => ( + Message::WriteAck { + upstairs_id: header.upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id: header.job_id, + result: Ok(()), + }, + false, + ), + Message::Barrier { + upstairs_id, + job_id, + .. + } => ( + Message::BarrierAck { + upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id, + result: Ok(()), + }, + true, + ), + m => panic!("saw unexpected message {m:?}"), + }; + self.send(r).unwrap(); + was_barrier + } + /// Awaits a `Message::Read` and sends a blank `ReadResponse` /// /// Returns the job ID for further checks @@ -470,14 +509,31 @@ pub struct TestHarness { /// Number of extents in `TestHarness::default_config` const DEFAULT_EXTENT_COUNT: u32 = 25; +const DEFAULT_BLOCK_COUNT: u64 = 10; + +struct TestOpts { + flush_timeout: f32, + read_only: bool, + disable_backpressure: bool, +} impl TestHarness { pub async fn new() -> TestHarness { - Self::new_(false).await + Self::new_with_opts(TestOpts { + flush_timeout: 86400.0, + read_only: false, + disable_backpressure: true, + }) + .await } pub async fn new_ro() -> TestHarness { - Self::new_(true).await + Self::new_with_opts(TestOpts { + flush_timeout: 86400.0, + read_only: true, + disable_backpressure: true, + }) + .await } pub fn ds1(&mut self) -> &mut DownstairsHandle { @@ -493,7 +549,7 @@ impl TestHarness { // IO_OUTSTANDING_MAX_BYTES in less than IO_OUTSTANDING_MAX_JOBS, // i.e. letting us test both byte and job fault conditions. extent_count: DEFAULT_EXTENT_COUNT, - extent_size: Block::new_512(10), + extent_size: Block::new_512(DEFAULT_BLOCK_COUNT), gen_numbers: vec![0u64; DEFAULT_EXTENT_COUNT as usize], flush_numbers: vec![0u64; DEFAULT_EXTENT_COUNT as usize], @@ -501,10 +557,10 @@ impl TestHarness { } } - async fn new_(read_only: bool) -> TestHarness { + async fn new_with_opts(opts: TestOpts) -> TestHarness { let log = csl(); - let cfg = Self::default_config(read_only); + let cfg = Self::default_config(opts.read_only); let ds1 = cfg.clone().start(log.new(o!("downstairs" => 1))).await; let ds2 = cfg.clone().start(log.new(o!("downstairs" => 2))).await; @@ -513,15 +569,17 @@ impl TestHarness { // Configure our guest without backpressure, to speed up tests which // require triggering a timeout let (g, mut io) = Guest::new(Some(log.clone())); - io.disable_queue_backpressure(); - io.disable_byte_backpressure(); + if opts.disable_backpressure { + io.disable_queue_backpressure(); + io.disable_byte_backpressure(); + } let guest = Arc::new(g); let crucible_opts = CrucibleOpts { id: Uuid::new_v4(), target: vec![ds1.local_addr, ds2.local_addr, ds3.local_addr], - flush_timeout: Some(86400.0), - read_only, + flush_timeout: Some(opts.flush_timeout), + read_only: opts.read_only, ..Default::default() }; @@ -2646,3 +2704,52 @@ async fn test_write_replay() { // requires going to the worker thread. harness.guest.get_uuid().await.unwrap(); } + +/// Test that barrier operations are sent periodically +#[tokio::test] +async fn test_periodic_barrier() { + let mut harness = TestHarness::new_with_opts(TestOpts { + flush_timeout: 0.5, + read_only: false, + disable_backpressure: false, + }) + .await; + + let start_time = std::time::Instant::now(); + let expected_time = std::time::Duration::from_secs(1); + + loop { + // Send a write, which will succeed + let write_handle = harness.spawn(|guest| async move { + let mut data = BytesMut::new(); + data.resize(512, 1u8); + guest.write(BlockIndex(0), data).await.unwrap(); + }); + + // Ensure that all three clients got the write request + let b1 = harness.ds1().ack_write_or_barrier().await; + let b2 = harness.ds2.ack_write_or_barrier().await; + let b3 = harness.ds3.ack_write_or_barrier().await; + assert_eq!(b1, b2); + assert_eq!(b2, b3); + + if b1 { + harness.ds1().ack_write().await; + harness.ds2.ack_write().await; + harness.ds3.ack_write().await; + write_handle.await.unwrap(); + + break; + } + + write_handle.await.unwrap(); + + if start_time.elapsed() > expected_time { + panic!("expected a barrier"); + } + } + // We should also automatically send a flush here + harness.ds1().ack_flush().await; + harness.ds2.ack_flush().await; + harness.ds3.ack_flush().await; +} diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 81488f2e5..0fd8636b9 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -306,6 +306,7 @@ mod cdt { fn up__action_deferred_message(_: u64) {} fn up__action_leak_check(_: u64) {} fn up__action_flush_check(_: u64) {} + fn up__action_barrier_check(_: u64) {} fn up__action_stat_check(_: u64) {} fn up__action_repair_check(_: u64) {} fn up__action_control_check(_: u64) {} diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 8ebccc5c3..b7008e22c 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -87,6 +87,7 @@ pub struct UpCounters { action_deferred_message: u64, action_leak_check: u64, action_flush_check: u64, + action_barrier_check: u64, action_stat_check: u64, action_repair_check: u64, action_control_check: u64, @@ -103,6 +104,7 @@ impl UpCounters { action_deferred_message: 0, action_leak_check: 0, action_flush_check: 0, + action_barrier_check: 0, action_stat_check: 0, action_repair_check: 0, action_control_check: 0, @@ -204,14 +206,21 @@ pub(crate) struct Upstairs { /// Marks whether a flush is needed /// - /// The Upstairs keeps all IOs in memory until a flush is ACK'd back from - /// all three downstairs. If there are IOs we have accepted into the work - /// queue that don't end with a flush, then we set this to indicate that the - /// upstairs may need to issue a flush of its own to be sure that data is - /// pushed to disk. Note that this is not an indication of an ACK'd flush, - /// just that the last IO command we put on the work queue was not a flush. + /// If there are IOs we have accepted into the work queue that don't end + /// with a flush, then we set this to indicate that the upstairs may need to + /// issue a flush of its own to be sure that data is pushed to disk. Note + /// that this is not an indication of an ACK'd flush, just that the last IO + /// command we put on the work queue was not a flush. need_flush: bool, + /// Marks whether a barrier is needed + /// + /// The Upstairs keeps all IOs in memory until a flush or barrier is ACK'd + /// back from all three downstairs. This flag indicates that we have sent + /// IOs and have not sent a flush or barrier; we should send a flush or + /// barrier periodically to keep the dependency list down. + need_barrier: bool, + /// Statistics for this upstairs /// /// Shared with the metrics producer, so this `struct` wraps a @@ -232,6 +241,9 @@ pub(crate) struct Upstairs { /// Next time to leak IOP / bandwidth tokens from the Guest leak_deadline: Instant, + /// Next time to trigger a dependency barrier + barrier_deadline: Instant, + /// Next time to trigger an automatic flush flush_deadline: Instant, @@ -273,6 +285,7 @@ pub(crate) enum UpstairsAction { LeakCheck, FlushCheck, + BarrierCheck, StatUpdate, RepairCheck, Control(ControlRequest), @@ -405,6 +418,7 @@ impl Upstairs { cfg, repair_check_interval: None, leak_deadline: deadline_secs(1.0), + barrier_deadline: deadline_secs(flush_timeout_secs), flush_deadline: deadline_secs(flush_timeout_secs), stat_deadline: deadline_secs(STAT_INTERVAL_SECS), flush_timeout_secs, @@ -412,6 +426,7 @@ impl Upstairs { guest_dropped: false, ddef: rd_status, need_flush: false, + need_barrier: false, stats, counters, log, @@ -518,6 +533,9 @@ impl Upstairs { _ = sleep_until(self.leak_deadline) => { UpstairsAction::LeakCheck } + _ = sleep_until(self.barrier_deadline) => { + UpstairsAction::BarrierCheck + } _ = sleep_until(self.flush_deadline) => { UpstairsAction::FlushCheck } @@ -585,6 +603,22 @@ impl Upstairs { self.submit_flush(None, None); } self.flush_deadline = deadline_secs(self.flush_timeout_secs); + self.barrier_deadline = deadline_secs(self.flush_timeout_secs); + } + UpstairsAction::BarrierCheck => { + self.counters.action_barrier_check += 1; + cdt::up__action_barrier_check!(|| (self + .counters + .action_barrier_check)); + // Upgrade from a Barrier to a Flush if eligible + if self.need_flush && !self.downstairs.has_live_jobs() { + self.submit_flush(None, None); + self.flush_deadline = + deadline_secs(self.flush_timeout_secs); + } else if self.need_barrier { + self.submit_barrier(); + } + self.barrier_deadline = deadline_secs(self.flush_timeout_secs); } UpstairsAction::StatUpdate => { self.counters.action_stat_check += 1; @@ -618,6 +652,12 @@ impl Upstairs { // because too many jobs have piled up. self.gone_too_long(); + // Only send automatic flushes if the downstairs is fully idle; + // otherwise, keep delaying the flush deadline. + if self.downstairs.has_live_jobs() { + self.flush_deadline = deadline_secs(self.flush_timeout_secs); + } + // Check to see whether live-repair can continue // // This must be called before acking jobs, because it looks in @@ -1270,6 +1310,7 @@ impl Upstairs { // BlockOp::Flush level above. self.need_flush = false; + self.need_barrier = false; // flushes also serve as a barrier /* * Get the next ID for our new guest work job. Note that the flush @@ -1288,13 +1329,12 @@ impl Upstairs { cdt::up__to__ds__flush__start!(|| (ds_id.0)); } - #[allow(dead_code)] // XXX this will be used soon! fn submit_barrier(&mut self) { // Notice that unlike submit_read and submit_write, we do not check for // guest_io_ready here. The upstairs itself calls submit_barrier // without the guest being involved; indeed the guest is not allowed to // call it! - + self.need_barrier = false; let ds_id = self.downstairs.submit_barrier(); self.guest.guest_work.submit_job(ds_id, None); @@ -1359,6 +1399,7 @@ impl Upstairs { } self.need_flush = true; + self.need_barrier = true; /* * Given the offset and buffer size, figure out what extent and @@ -1492,6 +1533,7 @@ impl Upstairs { * handles the operation(s) on the storage side. */ self.need_flush = true; + self.need_barrier = true; /* * Grab this ID after extent_from_offset: in case of Err we don't