Skip to content

Commit

Permalink
revert changes made to main
Browse files Browse the repository at this point in the history
  • Loading branch information
jhellerstein committed Nov 20, 2023
1 parent cab8994 commit b672372
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 16 deletions.
2 changes: 1 addition & 1 deletion hydroflow/examples/chat/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
// Pipeline 2: Broadcast messages to all clients
inbound_chan[ChatMsg] -> map(|(_addr, nickname, message, ts)| Message::ChatMsg { nickname, message, ts }) -> [0]broadcast;
clients[1] -> [1]broadcast;
broadcast = cross_join::<'tick, 'static>() -> [1]outbound_chan;
broadcast = cross_join::<'static>() -> [1]outbound_chan;
};

if let Some(graph) = opts.graph {
Expand Down
20 changes: 5 additions & 15 deletions hydroflow/examples/two_pc/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ pub(crate) async fn run_coordinator(outbound: UdpSink, inbound: UdpStream, opts:
outbound_chan[1] -> for_each(|(m, a)| println!("Sending {:?} to {:?}", m, a));

// setup broadcast channel to all subords
broadcast_join = cross_join::<'tick, 'static>() -> [pos]filtered;
broadcast_join = cross_join::<'static>() -> outbound_chan;
broadcast = union() -> [0]broadcast_join;
subords[1] -> [1]broadcast_join;
subords[2] -> for_each(|s| println!("Subordinate: {:?}", s));
filtered = anti_join::<'static, 'tick>() -> outbound_chan;


// Phase 1 initiate:
// Given a transaction commit request from stdio, broadcast a Prepare to subordinates
Expand All @@ -60,30 +58,22 @@ pub(crate) async fn run_coordinator(outbound: UdpSink, inbound: UdpStream, opts:
-> [1]broadcast;

// count commit votes
commit_votes = msgs[commits] -> tee();
commit_vote_cnt = commit_votes
commit_votes = msgs[commits]
-> map(|m: SubordResponse| (m.xid, 1))
-> fold_keyed::<'static, u16, u32>(|| 0, |acc: &mut _, val| *acc += val);

// count subordinates
subord_total = subords[0] -> fold::<'tick>(|| 0, |a: &mut _, _b| *a += 1); // -> for_each(|n| println!("There are {} subordinates.", n));

// If new commit_votes have come in, then check if
// commit_votes for this xid is the same as all_votes.
// if so, send a P2 Commit message
gate = cross_join::<'tick, 'tick>();
commit_votes -> [0]gate;
// If commit_votes for this xid is the same as all_votes, send a P2 Commit message
committed = join() -> map(|(_c, (xid, ()))| xid);
commit_vote_cnt -> [1]gate;
gate -> map(|(_, (xid, c))| (xid, c)) -> map(|(xid, c)| (c, xid)) -> [0]committed;
commit_votes -> map(|(xid, c)| (c, xid)) -> [0]committed;
subord_total -> map(|c| (c, ())) -> [1]committed;
committed -> map(|xid| CoordMsg{xid, mtype: MsgType::Commit}) -> [2]broadcast;

// Handle p2 acknowledgments by sending an End message
acks = msgs[acks] tee();
acks -> map(|m:SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::End,})
msgs[acks] -> map(|m:SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::End,})
-> [3]broadcast;
acks -> map(|m:SubordResponse| )

// Handler for ended acknowledgments not necessary; we just print them
};
Expand Down

0 comments on commit b672372

Please sign in to comment.