Skip to content

Commit

Permalink
refactor(paxos): disentangle KV replicas from Paxos (#1656)
Browse files Browse the repository at this point in the history
Makes it possible to substitute Paxos with other sequencing protocols.
  • Loading branch information
shadaj authored Jan 16, 2025
1 parent ad8a7dd commit 965e38c
Show file tree
Hide file tree
Showing 8 changed files with 660 additions and 616 deletions.
11 changes: 7 additions & 4 deletions hydro_test/examples/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use hydro_deploy::gcp::GcpNetwork;
use hydro_deploy::{Deployment, Host};
use hydro_lang::deploy::TrybuildHost;
use hydro_test::cluster::paxos::PaxosConfig;
use tokio::sync::RwLock;

type HostCreator = Box<dyn Fn(&mut Deployment) -> Arc<dyn Host>>;
Expand Down Expand Up @@ -43,13 +44,15 @@ async fn main() {

let (proposers, acceptors, clients, replicas) = hydro_test::cluster::paxos_bench::paxos_bench(
&builder,
f,
num_clients_per_node,
median_latency_window_size,
checkpoint_frequency,
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
PaxosConfig {
f,
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
},
);

let rustflags = "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off";
Expand Down
164 changes: 164 additions & 0 deletions hydro_test/src/cluster/bench_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;

use hydro_lang::*;
use tokio::time::Instant;

pub struct Client {}

pub fn bench_client<'a>(
clients: &Cluster<'a, Client>,
transaction_cycle: impl FnOnce(
Stream<(u32, u32), Cluster<'a, Client>, Unbounded>,
) -> Stream<(u32, u32), Cluster<'a, Client>, Unbounded, NoOrder>,
num_clients_per_node: usize,
median_latency_window_size: usize,
) {
let client_tick = clients.tick();
// r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload)));

// Set up an initial set of payloads on the first tick
let start_this_tick = client_tick.singleton_first_tick(q!(()));

let c_new_payloads_on_start = start_this_tick.clone().flat_map_ordered(q!(move |_| (0
..num_clients_per_node)
.map(move |i| (
(CLUSTER_SELF_ID.raw_id * (num_clients_per_node as u32)) + i as u32,
0
))));

let (c_to_proposers_complete_cycle, c_to_proposers) =
clients.forward_ref::<Stream<_, _, _, TotalOrder>>();
let c_received_quorum_payloads = unsafe {
// SAFETY: because the transaction processor is required to handle arbitrary reordering
// across *different* keys, we are safe because delaying a transaction result for a key
// will only affect when the next request for that key is emitted with respect to other
// keys
transaction_cycle(c_to_proposers)
.timestamped(&client_tick)
.tick_batch()
};

// Whenever all replicas confirm that a payload was committed, send another payload
let c_new_payloads_when_committed = c_received_quorum_payloads
.clone()
.map(q!(|payload| (payload.0, payload.1 + 1)));
c_to_proposers_complete_cycle.complete(
c_new_payloads_on_start
.chain(unsafe {
// SAFETY: we don't send a new write for the same key until the previous one is committed,
// so this contains only a single write per key, and we don't care about order
// across keys
c_new_payloads_when_committed.assume_ordering::<TotalOrder>()
})
.all_ticks()
.drop_timestamp(),
);

// Track statistics
let (c_timers_complete_cycle, c_timers) =
client_tick.cycle::<Stream<(usize, Instant), _, _, NoOrder>>();
let c_new_timers_when_leader_elected = start_this_tick
.map(q!(|_| Instant::now()))
.flat_map_ordered(q!(
move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now))
));
let c_updated_timers = c_received_quorum_payloads
.clone()
.map(q!(|(key, _prev_count)| (key as usize, Instant::now())));
let c_new_timers = c_timers
.clone() // Update c_timers in tick+1 so we can record differences during this tick (to track latency)
.chain(c_new_timers_when_leader_elected)
.chain(c_updated_timers.clone())
.reduce_keyed_commutative(q!(|curr_time, new_time| {
if new_time > *curr_time {
*curr_time = new_time;
}
}));
c_timers_complete_cycle.complete_next_tick(c_new_timers);

let c_stats_output_timer = unsafe {
// SAFETY: intentionally sampling statistics
clients
.source_interval(q!(Duration::from_secs(1)))
.timestamped(&client_tick)
.tick_batch()
}
.first();

let c_latency_reset = c_stats_output_timer.clone().map(q!(|_| None)).defer_tick();

let c_latencies = c_timers
.join(c_updated_timers)
.map(q!(|(_virtual_id, (prev_time, curr_time))| Some(
curr_time.duration_since(prev_time)
)))
.chain(c_latency_reset.into_stream())
.all_ticks()
.flatten_ordered()
.fold_commutative(
// Create window with ring buffer using vec + wraparound index
// TODO: Would be nice if I could use vec![] instead, but that doesn't work in Hydro with RuntimeData *median_latency_window_size
q!(move || (
Rc::new(RefCell::new(Vec::<Duration>::with_capacity(
median_latency_window_size
))),
0usize,
)),
q!(move |(latencies, write_index), latency| {
let mut latencies_mut = latencies.borrow_mut();
if *write_index < latencies_mut.len() {
latencies_mut[*write_index] = latency;
} else {
latencies_mut.push(latency);
}
// Increment write index and wrap around
*write_index = (*write_index + 1) % median_latency_window_size;
}),
)
.map(q!(|(latencies, _)| latencies));

let c_throughput_new_batch = c_received_quorum_payloads
.clone()
.count()
.continue_unless(c_stats_output_timer.clone())
.map(q!(|batch_size| (batch_size, false)));

let c_throughput_reset = c_stats_output_timer
.clone()
.map(q!(|_| (0, true)))
.defer_tick();

let c_throughput = c_throughput_new_batch
.union(c_throughput_reset)
.all_ticks()
.fold(
q!(|| 0),
q!(|total, (batch_size, reset)| {
if reset {
*total = 0;
} else {
*total += batch_size;
}
}),
);

unsafe {
// SAFETY: intentionally sampling statistics
c_latencies.zip(c_throughput).latest_tick()
}
.continue_if(c_stats_output_timer)
.all_ticks()
.for_each(q!(move |(latencies, throughput)| {
let mut latencies_mut = latencies.borrow_mut();
if latencies_mut.len() > 0 {
let middle_idx = latencies_mut.len() / 2;
let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx);
println!("Median latency: {}ms", median.as_micros() as f64 / 1000.0);
}

println!("Throughput: {} requests/s", throughput);
}));
// End track statistics
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use hydro_lang::*;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};

use super::paxos::{paxos_core, Acceptor, Ballot, Proposer};

pub struct Replica {}

pub trait KvKey: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug {}
Expand Down Expand Up @@ -41,70 +39,22 @@ impl<K: KvKey, V: KvValue> PartialOrd for SequencedKv<K, V> {
}
}

/// Sets up a linearizable key-value store using Paxos.
///
/// # Safety
/// Notifications for leader election are non-deterministic. When the leader is changing,
/// writes may be dropped by the old leader.
#[expect(
clippy::type_complexity,
clippy::too_many_arguments,
reason = "internal paxos code // TODO"
)]
pub unsafe fn paxos_kv<'a, K: KvKey, V: KvValue>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
replicas: &Cluster<'a, Replica>,
c_to_proposers: Stream<KvPayload<K, V>, Cluster<'a, Proposer>, Unbounded>,
f: usize,
i_am_leader_send_timeout: u64,
i_am_leader_check_timeout: u64,
i_am_leader_check_timeout_delay_multiplier: usize,
checkpoint_frequency: usize,
) -> (
Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
Stream<KvPayload<K, V>, Cluster<'a, Replica>, Unbounded>,
) {
let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) =
replicas.forward_ref::<Stream<_, _, _>>();

let (p_to_clients_new_leader_elected, p_to_replicas) = unsafe {
// SAFETY: Leader election non-determinism and non-deterministic dropping of writes is documented.
paxos_core(
proposers,
acceptors,
r_to_acceptors_checkpoint.broadcast_bincode(acceptors),
c_to_proposers,
f,
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
)
};

let (r_to_acceptors_checkpoint_new, r_new_processed_payloads) = replica(
replicas,
p_to_replicas
.map(q!(|(slot, kv)| SequencedKv { seq: slot, kv }))
.broadcast_bincode_interleaved(replicas),
checkpoint_frequency,
);

r_to_acceptors_checkpoint_complete_cycle.complete(r_to_acceptors_checkpoint_new);

(p_to_clients_new_leader_elected, r_new_processed_payloads)
}

// Replicas. All relations for replicas will be prefixed with r. Expects ReplicaPayload on p_to_replicas, outputs a stream of (client address, ReplicaPayload) after processing.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
pub fn replica<'a, K: KvKey, V: KvValue>(
pub fn kv_replica<'a, K: KvKey, V: KvValue>(
replicas: &Cluster<'a, Replica>,
p_to_replicas: Stream<SequencedKv<K, V>, Cluster<'a, Replica>, Unbounded, NoOrder>,
p_to_replicas: impl Into<
Stream<(usize, Option<KvPayload<K, V>>), Cluster<'a, Replica>, Unbounded, NoOrder>,
>,
checkpoint_frequency: usize,
) -> (
Stream<usize, Cluster<'a, Replica>, Unbounded>,
Stream<KvPayload<K, V>, Cluster<'a, Replica>, Unbounded>,
) {
let p_to_replicas = p_to_replicas
.into()
.map(q!(|(slot, kv)| SequencedKv { seq: slot, kv }));

let replica_tick = replicas.tick();

let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replica_tick.cycle();
Expand Down
4 changes: 3 additions & 1 deletion hydro_test/src/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod bench_client;
pub mod compute_pi;
pub mod kv_replica;
pub mod many_to_many;
pub mod map_reduce;
pub mod paxos;
pub mod paxos_bench;
pub mod paxos_kv;
pub mod paxos_with_client;
pub mod simple_cluster;
pub mod two_pc;
41 changes: 31 additions & 10 deletions hydro_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ use serde::{Deserialize, Serialize};
pub struct Proposer {}
pub struct Acceptor {}

#[derive(Clone, Copy)]
pub struct PaxosConfig {
/// Maximum number of faulty nodes
pub f: usize,
/// How often to send "I am leader" heartbeats
pub i_am_leader_send_timeout: u64,
/// How often to check if the leader has expired
pub i_am_leader_check_timeout: u64,
/// Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts
pub i_am_leader_check_timeout_delay_multiplier: usize,
}

pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {}
impl<T: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug> PaxosPayload for T {}

Expand Down Expand Up @@ -64,11 +76,7 @@ struct P2a<P> {
/// in deterministic order. However, when the leader is changing, payloads may be
/// non-deterministically dropped. The stream of ballots is also non-deterministic because
/// leaders are elected in a non-deterministic process.
#[expect(
clippy::too_many_arguments,
clippy::type_complexity,
reason = "internal paxos code // TODO"
)]
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
pub unsafe fn paxos_core<'a, P: PaxosPayload, R>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
Expand All @@ -78,15 +86,20 @@ pub unsafe fn paxos_core<'a, P: PaxosPayload, R>(
Unbounded,
NoOrder,
>,
c_to_proposers: Stream<P, Cluster<'a, Proposer>, Unbounded>,
f: usize,
i_am_leader_send_timeout: u64,
i_am_leader_check_timeout: u64,
i_am_leader_check_timeout_delay_multiplier: usize,
c_to_proposers: impl FnOnce(
Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
config: PaxosConfig,
) -> (
Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
) {
let f = config.f;
let i_am_leader_send_timeout = config.i_am_leader_send_timeout;
let i_am_leader_check_timeout = config.i_am_leader_check_timeout;
let i_am_leader_check_timeout_delay_multiplier =
config.i_am_leader_check_timeout_delay_multiplier;

proposers
.source_iter(q!(["Proposers say hello"]))
.for_each(q!(|s| println!("{}", s)));
Expand Down Expand Up @@ -127,6 +140,14 @@ pub unsafe fn paxos_core<'a, P: PaxosPayload, R>(
.clone()
.continue_unless(p_is_leader.clone().defer_tick());

let c_to_proposers = c_to_proposers(
just_became_leader
.clone()
.then(p_ballot.clone())
.all_ticks()
.drop_timestamp(),
);

let (p_to_replicas, a_log, sequencing_max_ballots) = unsafe {
// SAFETY: The relevant p1bs are non-deterministic because they come from a arbitrary quorum, but because
// we use a quorum, if we remain the leader there are no missing committed values when we combine the logs.
Expand Down
Loading

0 comments on commit 965e38c

Please sign in to comment.