Skip to content

Commit

Permalink
Only send flushes when Downstairs is idle; send Barrier otherwise
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Oct 14, 2024
1 parent dbc44be commit 7ee640f
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 42 deletions.
18 changes: 14 additions & 4 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,13 @@ impl DownstairsClient {
}

/// When the downstairs is marked as missing, handle its state transition
pub(crate) fn on_missing(&mut self) {
pub(crate) fn on_missing(&mut self, can_replay: bool) {
let current = &self.state;
let new_state = match current {
DsState::Active | DsState::Offline => DsState::Offline,
DsState::Active | DsState::Offline if can_replay => {
DsState::Offline
}
DsState::Active | DsState::Offline => DsState::Faulted,

DsState::Faulted
| DsState::LiveRepair
Expand Down Expand Up @@ -837,8 +840,12 @@ impl DownstairsClient {
reason: ClientStopReason,
) {
let new_state = match self.state {
DsState::Active => DsState::Offline,
DsState::Offline => DsState::Offline,
DsState::Active | DsState::Offline
if matches!(reason, ClientStopReason::IneligibleForReplay) =>
{
DsState::Faulted
}
DsState::Active | DsState::Offline => DsState::Offline,
DsState::Migrating => DsState::Faulted,
DsState::Faulted => DsState::Faulted,
DsState::Deactivated => DsState::New,
Expand Down Expand Up @@ -2405,6 +2412,9 @@ pub(crate) enum ClientStopReason {

/// The upstairs has requested that we deactivate when we were offline
OfflineDeactivated,

/// The Upstairs has dropped jobs that would be needed for replay
IneligibleForReplay,
}

/// Response received from the I/O task
Expand Down
41 changes: 37 additions & 4 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ pub(crate) struct Downstairs {
/// (including automatic flushes).
next_flush: u64,

/// Indicates whether we are eligible for replay
///
/// We are only eligible for replay if all jobs since the last flush are
/// buffered (i.e. none have been retired by a `Barrier` operation).
can_replay: bool,

/// Ringbuf of completed downstairs job IDs.
completed: AllocRingBuffer<JobId>,

Expand Down Expand Up @@ -278,6 +284,7 @@ impl Downstairs {
},
cfg,
next_flush: 0,
can_replay: true,
ds_active: ActiveJobs::new(),
completed: AllocRingBuffer::new(2048),
completed_jobs: AllocRingBuffer::new(8),
Expand Down Expand Up @@ -499,7 +506,7 @@ impl Downstairs {
// If the connection goes down here, we need to know what state we were
// in to decide what state to transition to. The on_missing method will
// do that for us!
self.clients[client_id].on_missing();
self.clients[client_id].on_missing(self.can_replay);

// If the IO task stops on its own, then under certain circumstances,
// we want to skip all of its jobs. (If we requested that the IO task
Expand Down Expand Up @@ -2442,11 +2449,17 @@ impl Downstairs {
Ok(ReplaceResult::Started)
}

/// Checks whether the given client state should go from Offline -> Faulted
///
/// # Panics
/// If the given client is not in the `Offline` state
pub(crate) fn check_gone_too_long(
&mut self,
client_id: ClientId,
up_state: &UpstairsState,
) {
assert_eq!(self.clients[client_id].state(), DsState::Offline);

let byte_count = self.clients[client_id].total_bytes_outstanding();
let work_count = self.clients[client_id].total_live_work();
let failed = if work_count > crate::IO_OUTSTANDING_MAX_JOBS {
Expand All @@ -2461,6 +2474,13 @@ impl Downstairs {
"downstairs failed, too many outstanding bytes {byte_count}"
);
Some(ClientStopReason::TooManyOutstandingBytes)
} else if !self.can_replay {
// XXX can this actually happen?
warn!(
self.log,
"downstairs became ineligible for replay while offline"
);
Some(ClientStopReason::IneligibleForReplay)
} else {
None
};
Expand Down Expand Up @@ -2592,9 +2612,12 @@ impl Downstairs {
/// writes and if they aren't included in replay then the write will
/// never start.
fn retire_check(&mut self, ds_id: JobId) {
if !self.is_flush(ds_id) {
return;
}
let job = self.ds_active.get(&ds_id).expect("checked missing job");
let can_replay = match job.work {
IOop::Flush { .. } => true,
IOop::Barrier { .. } => false,
_ => return,
};

// Only a completed flush will remove jobs from the active queue -
// currently we have to keep everything around for use during replay
Expand Down Expand Up @@ -2670,6 +2693,9 @@ impl Downstairs {
for cid in ClientId::iter() {
self.clients[cid].skipped_jobs.retain(|&x| x >= ds_id);
}

// Update the flag indicating whether replay is allowed
self.can_replay = can_replay;
}
}

Expand Down Expand Up @@ -4174,6 +4200,13 @@ impl Downstairs {
self.ddef = Some(ddef);
}

/// Checks whether there are any in-progress jobs present
pub(crate) fn has_live_jobs(&self) -> bool {
self.clients
.iter()
.any(|c| c.backpressure_counters.get_jobs() > 0)
}

/// Returns the per-client state for the given job
///
/// This is a helper function to make unit tests shorter
Expand Down
159 changes: 133 additions & 26 deletions upstairs/src/dummy_downstairs_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,23 +263,26 @@ impl DownstairsHandle {
/// # Panics
/// If a non-flush message arrives
pub async fn ack_flush(&mut self) -> u64 {
let Message::Flush {
job_id,
flush_number,
upstairs_id,
..
} = self.recv().await.unwrap()
else {
panic!("saw non flush!");
};
self.send(Message::FlushAck {
upstairs_id,
session_id: self.upstairs_session_id.unwrap(),
job_id,
result: Ok(()),
})
.unwrap();
flush_number
match self.recv().await.unwrap() {
Message::Flush {
job_id,
flush_number,
upstairs_id,
..
} => {
self.send(Message::FlushAck {
upstairs_id,
session_id: self.upstairs_session_id.unwrap(),
job_id,
result: Ok(()),
})
.unwrap();
flush_number
}
m => {
panic!("saw non flush {m:?}");
}
}
}

/// Awaits a `Message::Write { .. }` and sends a `WriteAck`
Expand All @@ -302,6 +305,42 @@ impl DownstairsHandle {
header.job_id
}

/// Awaits and acks a `Message::Write { .. }` or `Message::Barrier { .. }`
///
/// Returns the job ID for further checks.
///
/// # Panics
/// If a non-write message arrives
pub async fn ack_write_or_barrier(&mut self) -> bool {
let (r, was_barrier) = match self.recv().await.unwrap() {
Message::Write { header, .. } => (
Message::WriteAck {
upstairs_id: header.upstairs_id,
session_id: self.upstairs_session_id.unwrap(),
job_id: header.job_id,
result: Ok(()),
},
false,
),
Message::Barrier {
upstairs_id,
job_id,
..
} => (
Message::BarrierAck {
upstairs_id,
session_id: self.upstairs_session_id.unwrap(),
job_id,
result: Ok(()),
},
true,
),
m => panic!("saw unexpected message {m:?}"),
};
self.send(r).unwrap();
was_barrier
}

/// Awaits a `Message::Read` and sends a blank `ReadResponse`
///
/// Returns the job ID for further checks
Expand Down Expand Up @@ -470,14 +509,31 @@ pub struct TestHarness {

/// Number of extents in `TestHarness::default_config`
const DEFAULT_EXTENT_COUNT: u32 = 25;
const DEFAULT_BLOCK_COUNT: u64 = 10;

struct TestOpts {
flush_timeout: f32,
read_only: bool,
disable_backpressure: bool,
}

impl TestHarness {
pub async fn new() -> TestHarness {
Self::new_(false).await
Self::new_with_opts(TestOpts {
flush_timeout: 86400.0,
read_only: false,
disable_backpressure: true,
})
.await
}

pub async fn new_ro() -> TestHarness {
Self::new_(true).await
Self::new_with_opts(TestOpts {
flush_timeout: 86400.0,
read_only: true,
disable_backpressure: true,
})
.await
}

pub fn ds1(&mut self) -> &mut DownstairsHandle {
Expand All @@ -493,18 +549,18 @@ impl TestHarness {
// IO_OUTSTANDING_MAX_BYTES in less than IO_OUTSTANDING_MAX_JOBS,
// i.e. letting us test both byte and job fault conditions.
extent_count: DEFAULT_EXTENT_COUNT,
extent_size: Block::new_512(10),
extent_size: Block::new_512(DEFAULT_BLOCK_COUNT),

gen_numbers: vec![0u64; DEFAULT_EXTENT_COUNT as usize],
flush_numbers: vec![0u64; DEFAULT_EXTENT_COUNT as usize],
dirty_bits: vec![false; DEFAULT_EXTENT_COUNT as usize],
}
}

async fn new_(read_only: bool) -> TestHarness {
async fn new_with_opts(opts: TestOpts) -> TestHarness {
let log = csl();

let cfg = Self::default_config(read_only);
let cfg = Self::default_config(opts.read_only);

let ds1 = cfg.clone().start(log.new(o!("downstairs" => 1))).await;
let ds2 = cfg.clone().start(log.new(o!("downstairs" => 2))).await;
Expand All @@ -513,15 +569,17 @@ impl TestHarness {
// Configure our guest without backpressure, to speed up tests which
// require triggering a timeout
let (g, mut io) = Guest::new(Some(log.clone()));
io.disable_queue_backpressure();
io.disable_byte_backpressure();
if opts.disable_backpressure {
io.disable_queue_backpressure();
io.disable_byte_backpressure();
}
let guest = Arc::new(g);

let crucible_opts = CrucibleOpts {
id: Uuid::new_v4(),
target: vec![ds1.local_addr, ds2.local_addr, ds3.local_addr],
flush_timeout: Some(86400.0),
read_only,
flush_timeout: Some(opts.flush_timeout),
read_only: opts.read_only,

..Default::default()
};
Expand Down Expand Up @@ -2646,3 +2704,52 @@ async fn test_write_replay() {
// requires going to the worker thread.
harness.guest.get_uuid().await.unwrap();
}

/// Test that barrier operations are sent periodically
#[tokio::test]
async fn test_periodic_barrier() {
let mut harness = TestHarness::new_with_opts(TestOpts {
flush_timeout: 0.5,
read_only: false,
disable_backpressure: false,
})
.await;

let start_time = std::time::Instant::now();
let expected_time = std::time::Duration::from_secs(1);

loop {
// Send a write, which will succeed
let write_handle = harness.spawn(|guest| async move {
let mut data = BytesMut::new();
data.resize(512, 1u8);
guest.write(BlockIndex(0), data).await.unwrap();
});

// Ensure that all three clients got the write request
let b1 = harness.ds1().ack_write_or_barrier().await;
let b2 = harness.ds2.ack_write_or_barrier().await;
let b3 = harness.ds3.ack_write_or_barrier().await;
assert_eq!(b1, b2);
assert_eq!(b2, b3);

if b1 {
harness.ds1().ack_write().await;
harness.ds2.ack_write().await;
harness.ds3.ack_write().await;
write_handle.await.unwrap();

break;
}

write_handle.await.unwrap();

if start_time.elapsed() > expected_time {
panic!("expected a barrier");
}
}
// We should also automatically send a flush here
harness.ds1().ack_flush().await;
harness.ds2.ack_flush().await;
harness.ds3.ack_flush().await;
}
1 change: 1 addition & 0 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ mod cdt {
fn up__action_deferred_message(_: u64) {}
fn up__action_leak_check(_: u64) {}
fn up__action_flush_check(_: u64) {}
fn up__action_barrier_check(_: u64) {}
fn up__action_stat_check(_: u64) {}
fn up__action_repair_check(_: u64) {}
fn up__action_control_check(_: u64) {}
Expand Down
Loading

0 comments on commit 7ee640f

Please sign in to comment.