Skip to content

Commit

Permalink
Runs
Browse files Browse the repository at this point in the history
  • Loading branch information
davidchuyaya committed Dec 19, 2024
1 parent c65b4c4 commit 40315fd
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
i_am_leader_check_timeout: u64,
i_am_leader_check_timeout_delay_multiplier: usize,
p_received_p2b_ballots: Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
a_log: Singleton<L, Tick<Cluster<'a, Acceptor>>, Bounded>,
a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
) -> (
Singleton<Ballot, Tick<Cluster<'a, Proposer>>, Bounded>,
Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
Stream<L, Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Stream<(Option<usize>, L), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
) {
let (p1b_fail_complete, p1b_fail) =
Expand Down Expand Up @@ -372,11 +372,11 @@ unsafe fn p_leader_heartbeat<'a>(
fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
acceptor_tick: &Tick<Cluster<'a, Acceptor>>,
p_to_acceptors_p1a: Stream<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded, NoOrder>,
a_log: Singleton<L, Tick<Cluster<'a, Acceptor>>, Bounded>,
a_log: Singleton<(Option<usize>, L), Tick<Cluster<'a, Acceptor>>, Bounded>,
proposers: &Cluster<'a, Proposer>,
) -> (
Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
Stream<(Ballot, Result<L, Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
Stream<(Ballot, Result<(Option<usize>, L), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
) {
let a_max_ballot = p_to_acceptors_p1a
.clone()
Expand Down Expand Up @@ -414,7 +414,7 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
proposer_tick: &Tick<Cluster<'a, Proposer>>,
a_to_proposers_p1b: Stream<
(Ballot, Result<P, Ballot>),
(Ballot, Result<(Option<usize>, P), Ballot>),
Cluster<'a, Proposer>,
Unbounded,
NoOrder,
Expand All @@ -424,7 +424,7 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
f: usize,
) -> (
Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,
Stream<P, Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Stream<(Option<usize>, P), Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Stream<Ballot, Timestamped<Cluster<'a, Proposer>>, Unbounded, NoOrder>,
) {
let (quorums, fails) = collect_quorum_with_response(
Expand Down Expand Up @@ -471,8 +471,9 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(

#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn recommit_after_leader_election<'a, P: PaxosPayload>(
proposer_tick: &Tick<Cluster<'a, Proposer>>,
accepted_logs: Stream<
HashMap<usize, LogValue<P>>,
(Option<usize>, HashMap<usize, LogValue<P>>),
Tick<Cluster<'a, Proposer>>,
Bounded,
NoOrder,
Expand All @@ -483,7 +484,19 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
Stream<P2a<P>, Tick<Cluster<'a, Proposer>>, Bounded, NoOrder>,
Optional<usize, Tick<Cluster<'a, Proposer>>, Bounded>,
) {
let p_p1b_max_checkpoint = accepted_logs
.clone()
.map(q!(|(checkpoint, _log)| {
if let Some(checkpoint) = checkpoint {
checkpoint as i64
} else {
-1
}
}))
.max()
.unwrap_or(proposer_tick.singleton(q!(-1 as i64)));
let p_p1b_highest_entries_and_count = accepted_logs
.map(q!(|(_checkpoint, log)| log))
.flatten_unordered() // Convert HashMap log back to stream
.fold_keyed_commutative::<(usize, Option<LogValue<P>>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| {
if let Some(curr_entry_payload) = &mut curr_entry.1 {
Expand All @@ -510,8 +523,9 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
let p_log_to_try_commit = p_p1b_highest_entries_and_count
.clone()
.cross_singleton(p_ballot.clone())
.filter_map(q!(move |((slot, (count, entry)), ballot)| {
if count <= f {
.cross_singleton(p_p1b_max_checkpoint.clone())
.filter_map(q!(move |(((slot, (count, entry)), ballot), checkpoint)| {
if count <= f && slot as i64 > checkpoint {
Some(P2a {
ballot,
slot,
Expand All @@ -530,7 +544,9 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
.map(q!(|(slot, _)| slot));
let p_log_holes = p_max_slot
.clone()
.flat_map_ordered(q!(|max_slot| 0..max_slot))
.into_stream()
.cross_singleton(p_p1b_max_checkpoint)
.flat_map_ordered(q!(|(max_slot, checkpoint)| (checkpoint+1) as usize..max_slot))
.filter_not_in(p_proposed_slots)
.cross_singleton(p_ballot.clone())
.map(q!(|(slot, ballot)| P2a {
Expand Down Expand Up @@ -564,7 +580,7 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>(
p_is_leader: Optional<(), Tick<Cluster<'a, Proposer>>, Bounded>,

p_relevant_p1bs: Stream<
HashMap<usize, LogValue<P>>,
(Option<usize>, HashMap<usize, LogValue<P>>),
Tick<Cluster<'a, Proposer>>,
Bounded,
NoOrder,
Expand All @@ -574,11 +590,11 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>(
a_max_ballot: Singleton<Ballot, Tick<Cluster<'a, Acceptor>>, Bounded>,
) -> (
Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder>,
Singleton<HashMap<usize, LogValue<P>>, Timestamped<Cluster<'a, Acceptor>>, Unbounded>,
Singleton<(Option<usize>, HashMap<usize, LogValue<P>>), Timestamped<Cluster<'a, Acceptor>>, Unbounded>,
Stream<Ballot, Cluster<'a, Proposer>, Unbounded, NoOrder>,
) {
let (p_log_to_recommit, p_max_slot) =
recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f);
recommit_after_leader_election(proposer_tick, p_relevant_p1bs, p_ballot.clone(), f);

let indexed_payloads = index_payloads(proposer_tick, p_max_slot, unsafe {
// SAFETY: We batch payloads so that we can compute the correct slot based on
Expand Down Expand Up @@ -635,7 +651,7 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>(
p_to_replicas
.map(q!(|((slot, _ballot), (value, _))| (slot, value)))
.drop_timestamp(),
a_log.map(q!(|(_ckpnt, log)| log)),
a_log,
fails.map(q!(|(_, ballot)| ballot)).drop_timestamp(),
)
}
Expand Down

0 comments on commit 40315fd

Please sign in to comment.