Skip to content

Commit

Permalink
working chat
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Sep 19, 2024
1 parent 6ed3aa6 commit a6c01c0
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 10 deletions.
2 changes: 1 addition & 1 deletion hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ name = "modules_triple_cross_join"
required-features = [ "debugging" ]

[[example]]
name = "echoserver_websocket"
name = "chat_websocket"
required-features = [ "websocket" ]

[dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ async fn main() {
inbound_chan[0]
-> for_each(|(msg, addr): (Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), msg, addr));

// Echo back the Echo messages with updated timestamp
inbound_chan[1]
-> map(|(msg, addr)| (msg, addr) ) -> dest_sink(outbound);
clients = inbound_chan[1] -> map(|(_msg, addr)| addr) -> unique::<'static>();
messages = inbound_chan[2] -> map(|(msg, _addr)| msg);

messages -> [0]cj;
clients -> [1]cj;
cj = cross_join::<'tick, 'static>() -> inspect(|msg| println!("SEND {:?}", msg)) -> dest_sink(outbound);
};

// run the server
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/echoserver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() {
.unwrap_or_else(|| ipv4_resolve("localhost:0").unwrap());

// allocate `outbound` sink and `inbound` stream
let (outbound, inbound, addr) = bind_udp_bytes(addr).await.unwrap();
let (outbound, inbound, addr) = bind_udp_bytes(addr).await;
println!("Listening on {:?}", addr);

match opts.role {
Expand Down
18 changes: 13 additions & 5 deletions hydroflow/src/util/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::pin;
use std::rc::Rc;

use futures::{SinkExt, StreamExt};
use tokio::net::{TcpListener};
use tokio::net::TcpListener;
use tokio::task::spawn_local;
use tokio_tungstenite::tungstenite::{Error, Message};

use crate::util::unsync::mpsc::{Receiver, Sender};
use crate::util::unsync_channel;


pub async fn bind_websocket(endpoint: SocketAddr) -> Result<(Sender<(Message, SocketAddr)>, Receiver<Result<(Message, SocketAddr), Error>>, SocketAddr), std::io::Error>{
pub async fn bind_websocket(
endpoint: SocketAddr,
) -> Result<
(
Sender<(Message, SocketAddr)>,
Receiver<Result<(Message, SocketAddr), Error>>,
SocketAddr,
),
std::io::Error,
> {
let listener = TcpListener::bind(endpoint).await.unwrap();

let bound_endpoint = listener.local_addr()?;
Expand Down Expand Up @@ -70,9 +80,7 @@ pub async fn bind_websocket(endpoint: SocketAddr) -> Result<(Sender<(Message, So
}
});
}

});

Ok((tx_egress, rx_ingress, bound_endpoint))
}

0 comments on commit a6c01c0

Please sign in to comment.