diff --git a/hydro_test/src/cluster/paxos.rs b/hydro_test/src/cluster/paxos.rs index 93f4f0bb06f..fc6e4907c15 100644 --- a/hydro_test/src/cluster/paxos.rs +++ b/hydro_test/src/cluster/paxos.rs @@ -485,8 +485,9 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( ) { let p_p1b_max_checkpoint = accepted_logs .clone() - .map(q!(|(checkpoint, _log)| checkpoint)) - .max(); + .filter_map(q!(|(checkpoint, _log)| checkpoint)) + .max() + .into_singleton(); let p_p1b_highest_entries_and_count = accepted_logs .map(q!(|(_checkpoint, log)| log)) .flatten_unordered() // Convert HashMap log back to stream diff --git a/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap index d53ab94f81b..fad726e1c0a 100644 --- a/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -788,15 +788,30 @@ expression: built.ir() }, ), Tee { - inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < usize > , core :: option :: Option < usize > , () > ({ use hydro_lang :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (checkpoint , _log) | checkpoint }), - input: Tee { - inner: , + inner: : Chain( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydro_lang :: __staged :: optional :: * ; | v | Some (v) }), + input: Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + input: FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (checkpoint , _log) | checkpoint }), + input: Tee { + inner: , + }, + }, }, }, - }, + Persist( + Source { + source: Iter( + [:: std :: option :: Option :: None], + ), + location_kind: Cluster( + 0, + ), + }, + ), + ), }, ), },