diff --git a/Cargo.lock b/Cargo.lock index d6df5a5689a0..d43f3e8fd46c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1338,6 +1338,19 @@ dependencies = [ "tokio-tungstenite", ] +[[package]] +name = "hydro_cli_maelstrom" +version = "0.1.0" +dependencies = [ + "bincode", + "bytes", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-util", +] + [[package]] name = "hydroflow" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index 96a771c0c18c..f6fedee72a30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "benches", "hydro_cli", "hydro_cli_examples", + "hydro_cli_maelstrom", "hydroflow", "hydroflow_cli_integration", "hydroflow_datalog", diff --git a/hydro_cli/src/core/hydroflow_crate/mod.rs b/hydro_cli/src/core/hydroflow_crate/mod.rs index 7b379c4ffaf4..d7b17a966756 100644 --- a/hydro_cli/src/core/hydroflow_crate/mod.rs +++ b/hydro_cli/src/core/hydroflow_crate/mod.rs @@ -248,6 +248,15 @@ impl Service for HydroflowCrate { ) .await?; + // send the id over + binary + .write() + .await + .stdin() + .await + .send(format!("id: {}\n", self.id)) + .await?; + let mut bind_config = HashMap::new(); for (port_name, bind_type) in self.port_to_bind.iter() { bind_config.insert(port_name.clone(), launched_host.server_config(bind_type)); diff --git a/hydro_cli_examples/Cargo.toml b/hydro_cli_examples/Cargo.toml index 296db8e3c63f..0eb6a0cf38ad 100644 --- a/hydro_cli_examples/Cargo.toml +++ b/hydro_cli_examples/Cargo.toml @@ -46,6 +46,9 @@ name = "pn_counter_delta" [[example]] name = "ws_chat_server" +[[example]] +name = "maelstrom_unique_id" + [dev-dependencies] hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] } hydroflow_datalog = { path = "../hydroflow_datalog" } diff --git a/hydro_cli_examples/examples/echo/main.rs b/hydro_cli_examples/examples/echo/main.rs new file mode 100644 index 000000000000..5c1a18adf77b --- /dev/null +++ b/hydro_cli_examples/examples/echo/main.rs @@ -0,0 +1,71 @@ +use hydroflow::hydroflow_syntax; +use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource}; +use hydroflow::util::serialize_to_bytes; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EchoMsg { + pub msg_id: Value, + pub echo: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EchoOkMsg { + pub echo: String, + pub in_reply_to: Value, +} + +impl EchoMsg { + /// Generate EchoOkMsg response to this EchoMsg + fn response( + EchoMsg { + echo, + msg_id: source_msg_id, + }: Self, + ) -> EchoOkMsg { + EchoOkMsg { + echo, + in_reply_to: source_msg_id, + } + } +} + +#[hydroflow::main] +async fn main() { + let mut ports = hydroflow::util::cli::init().await; + + // TODO: use ConnectedDemux? + let echo_in = ports + .port("echo_in") + .connect::() + .await + .into_source(); + let echo_out = ports + .port("echo_out") + .connect::() + .await + .into_sink(); + + let df = hydroflow_syntax! { + input = source_stream(echo_in) + -> map(Result::unwrap) + -> map(|x| x.to_vec()) + -> map(String::from_utf8) + -> map(Result::unwrap); + + output = map(|x| serde_json::to_string(&x)) + -> map(Result::unwrap) + -> map(serialize_to_bytes) + -> dest_sink(echo_out); + + + input + -> map(|x| serde_json::from_str::(&x).unwrap()) + //-> map(|x| EchoMsg {msg_id: x.msg_id, echo: x.echo + "hi"}) + -> map(EchoMsg::response) + -> output; + }; + + hydroflow::util::cli::launch_flow(df).await; +} diff --git a/hydro_cli_examples/examples/maelstrom_unique_id/main.rs b/hydro_cli_examples/examples/maelstrom_unique_id/main.rs new file mode 100644 index 000000000000..051b89c446db --- /dev/null +++ b/hydro_cli_examples/examples/maelstrom_unique_id/main.rs @@ -0,0 +1,68 @@ +use hydroflow::hydroflow_syntax; +use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource}; +use hydroflow::util::serialize_to_bytes; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Generate { + pub msg_id: Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GenerateOk { + pub id: Value, + pub in_reply_to: Value, +} + +impl Generate { + /// Generate GenerateOk response to this Generate message + pub fn respond(self, i: usize, node_id: &str) -> GenerateOk { + let id = json!([i, node_id]); + + GenerateOk { + id, + in_reply_to: self.msg_id, + } + } +} + +#[hydroflow::main] +async fn main() { + let mut ports = hydroflow::util::cli::init().await; + let node_id = ports.node_id.clone(); + + // TODO: use ConnectedDemux? + let gen_in = ports + .port("gen_in") + .connect::() + .await + .into_source(); + let ok_out = ports + .port("ok_out") + .connect::() + .await + .into_sink(); + + let df = hydroflow_syntax! { + input = source_stream(gen_in) + -> map(Result::unwrap) + -> map(|x| x.to_vec()) + -> map(String::from_utf8) + -> map(Result::unwrap); + + output = map(|x| serde_json::to_string(&x)) + -> map(Result::unwrap) + -> map(serialize_to_bytes) + -> dest_sink(ok_out); + + + input + -> map(|x| serde_json::from_str::(&x).unwrap()) + -> enumerate::<'static>() //-> enumerate() will fail! + -> map(|(i, x)| x.respond(i, &node_id)) + -> output; + }; + + hydroflow::util::cli::launch_flow(df).await; +} diff --git a/hydro_cli_maelstrom/Cargo.toml b/hydro_cli_maelstrom/Cargo.toml new file mode 100644 index 000000000000..8eccb6f10926 --- /dev/null +++ b/hydro_cli_maelstrom/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "hydro_cli_maelstrom" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde_json = "1" +serde = { version = "1", features = [ "derive" ] } +futures = { version = "0.3" } +bytes = "1.1.0" +bincode = "1.3" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { version = "1.16", features = [ "full" ] } +tokio-util = { version = "0.7.4", features = [ "net", "codec" ] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +tokio = { version = "1.16", features = [ "rt" , "sync", "macros", "io-util", "time" ] } +tokio-util = { version = "0.7.4", features = [ "codec" ] } \ No newline at end of file diff --git a/hydro_cli_maelstrom/src/main.rs b/hydro_cli_maelstrom/src/main.rs new file mode 100644 index 000000000000..ada9d109af3f --- /dev/null +++ b/hydro_cli_maelstrom/src/main.rs @@ -0,0 +1,351 @@ +use std::collections::HashMap; +use std::io::Result; +use std::net::SocketAddr; +use std::process::Stdio; + +use bytes::Bytes; +use futures::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio::io::{stderr, stdin, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::process::{ChildStdin, ChildStdout, Command}; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum ServerPort { + TcpPort(SocketAddr), +} + +#[derive(PartialEq, Eq, Serialize, Deserialize)] +pub enum Direction { + Source, + Sink, +} + +#[derive(Serialize, Deserialize)] +pub struct PortDefn { + pub maelstrom_type: String, + pub port_name: String, + /// If the port is a sink/source from the perspective of the hydroflow program + pub direction: Direction, +} + +/// forwards an in_stream into an out_stream, prefixed by `debug_message`. +async fn debug_link( + in_stream: R, + mut out_stream: W, + debug_message: String, +) -> Result<()> { + let mut lines = BufReader::new(in_stream).lines(); + while let Ok(Some(line)) = lines.next_line().await { + let text = format!("{debug_message}: {line}\n"); + out_stream.write_all(text.as_bytes()).await?; + } + Ok(()) +} + +/// Accepts the init payload from maelstrom and responds with an init_ok, returning the node_id of this node +async fn maelstrom_init() -> Result { + let mut init_msg = String::new(); + BufReader::new(stdin()).read_line(&mut init_msg).await?; + + let v: Value = serde_json::from_str(&init_msg)?; + let src = &v["src"]; + + let body = &v["body"]; + let msg_id = &body["msg_id"]; + let node_id = body["node_id"].as_str().unwrap(); + + let response = json!({ + "src": node_id, + "dest": src, + "body": { + "type": "init_ok", + "in_reply_to": msg_id + } + }); + + println!("{}", response); + + Ok(node_id.to_string()) +} + +/// Sends setup string which sets up all source ports. +async fn send_setup_string( + source_port_names: &[String], + child_stdin: &mut ChildStdin, +) -> Result<()> { + let localhost = HashMap::from([("TcpPort", "127.0.0.1")]); + let source_setup_pairs = source_port_names + .iter() + .map(|p| (p, localhost.clone())) + .collect::>(); + let source_setup_string = serde_json::to_string(&source_setup_pairs)? + "\n"; + child_stdin + .write_all(source_setup_string.as_bytes()) + .await?; // e.g. {"echo_in": {"TcpPort": "127.0.0.1"}} + Ok(()) +} + +/// Reads "ready" message which contains tcp port for all source ports. +async fn recieve_ready(child_stdout: &mut ChildStdout) -> Result> { + let mut child_stdout = BufReader::new(child_stdout); + + let mut line_buf = String::new(); + child_stdout.read_line(&mut line_buf).await?; + assert!(line_buf.starts_with("ready: ")); + let body = line_buf.trim_start_matches("ready: "); + let source_ports: HashMap = serde_json::from_str(body).unwrap(); + + Ok(source_ports) +} + +/// From a set of source_ports, creates a map from port name to writer which writes to that port's tcp stream +async fn source_demux( + source_ports: HashMap, +) -> Result>> { + let mut source_port_demux = HashMap::new(); + for (port_name, port) in source_ports { + let ServerPort::TcpPort(socket) = port; + let stream = TcpStream::connect(socket).await?; + let writer = FramedWrite::new(stream, LengthDelimitedCodec::new()); + source_port_demux.insert(port_name, writer); + } + Ok(source_port_demux) +} + +/// Creats a TcpListener bound on localhost for each sink port +async fn setup_sink_ports(sink_port_names: Vec) -> Result> { + let mut sink_name_to_listener: HashMap = HashMap::new(); + for port_name in sink_port_names { + let listener = TcpListener::bind("127.0.0.1:0").await?; + sink_name_to_listener.insert(port_name, listener); + } + Ok(sink_name_to_listener) +} + +/// Creates the "start: " string and sends it to the child with the connection definitions +async fn send_start_string( + sink_name_to_listener: &HashMap, + child_stdin: &mut ChildStdin, +) -> Result<()> { + let mut connection_defs: HashMap = HashMap::new(); + for (port_name, listener) in sink_name_to_listener { + let addr = listener.local_addr().unwrap(); + connection_defs.insert(port_name.clone(), ServerPort::TcpPort(addr)); + } + // send "start: {}" message + let connection_defs_str = serde_json::to_string(&connection_defs)?; + let formatted_defns = format!("start: {connection_defs_str}\n"); + child_stdin.write_all(formatted_defns.as_bytes()).await?; + #[cfg(debug_assertions)] + println!("sent {}", formatted_defns); + Ok(()) +} + +/// Demux from stdin to all source ports +/// type_to_port_name is a map from payload type to port name +async fn input_handler( + source_ports: HashMap, + type_to_port_name: HashMap, +) -> Result<()> { + let mut source_demux = source_demux(source_ports).await?; + + let mut lines = BufReader::new(stdin()).lines(); + while let Ok(Some(line)) = lines.next_line().await { + // Read the line as a json value (maelstrom payload) + let mut v: Value = serde_json::from_str(&line)?; + + let msg_id = Value::Array(vec![v["src"].clone(), v["body"]["msg_id"].clone()]); + + // Get the body of the payload + let body = v["body"].as_object_mut().unwrap(); + + // Update the msg_id to be the pair [src, msg_id] + body.insert("msg_id".into(), msg_id); + + // Get the payload type (like echo, echo_ok, etc) + let body_type = body.remove("type").unwrap(); + let body_type = body_type.as_str().unwrap(); + + // Get the target port for that payload type + let target_port_name = type_to_port_name.get(body_type).unwrap(); + let target_port = source_demux.get_mut(target_port_name).unwrap(); + + // Send the body string to the target port + let body_string = serde_json::to_string(body)?; + #[cfg(debug_assertions)] + println!("Sending line {}", body_string); + target_port.send(Bytes::from(body_string)).await?; + } + Ok(()) +} + +/// Accept a connection on each sink port which wraps outputs in maelstrom payload of specified type +/// Generated "msg_id"s will be `= output_id (mod output_count)` to ensure no overlap +async fn output_handler( + listener: TcpListener, + maelstrom_type: String, + node_id: String, + output_id: usize, + output_count: usize, +) -> Result<()> { + let in_stream = listener.accept().await?.0; + + let mut lines = FramedRead::new(in_stream, LengthDelimitedCodec::new()); + #[cfg(debug_assertions)] + println!("accepted connection"); + + // Itializer counter which tracks the next available msg_id + let mut msg_id_counter = output_id; + + while let Some(Ok(line)) = lines.next().await { + // Transforms output into maelstrom payload + // For example: + // {"echo":"hello world!","in_reply_to":["n1", 1]} + // -> + // {"src":"n1","dest":"c1","body":{"echo":"hello world!","msg_id":0,"in_reply_to":1,"type":"echo_ok"}} + + // parse line to string + let raw_line: String = bincode::deserialize(&line).unwrap(); + + // parse raw string to json value + let mut v: Value = serde_json::from_str(&raw_line)?; + let body = v.as_object_mut().unwrap(); + body.insert("type".to_string(), maelstrom_type.clone().into()); + + // in_reply_to is actually [src, msg_id] + let in_reply_to = v["in_reply_to"].as_array_mut().unwrap(); + assert_eq!( + in_reply_to.len(), + 2, + "in_reply_to was not a pair of [src, msg_id]" + ); + let dest = in_reply_to.remove(0); + let in_reply_to = in_reply_to.remove(0); + + // set in_reply_to to real value + v["in_reply_to"] = in_reply_to; + + // set msg_id to next available message id + v["msg_id"] = Value::Number(msg_id_counter.into()); + msg_id_counter += output_count; + + // wrap v in maelstrom header + let response = json!({ + "src": node_id.clone(), + "dest": dest, + "body": v + }); + + // send it to stdout + println!("{}", response); + } + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<()> { + // let path = r"C:\Users\rhala\Code\hydroflow\target\debug\examples\echo"; + let path = r"/mnt/c/Users/rhala/Code/hydroflow/target/debug/examples/maelstrom_unique_id"; + let mut child = Command::new(path) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + let mut child_stdin = child.stdin.take().unwrap(); + let mut child_stdout = child.stdout.take().unwrap(); + let child_stderr = child.stderr.take().unwrap(); + + // this is supposed to be an input to the program + // vec![ + // PortDefn::new("echo".into(), "echo_in".into(), Direction::Source), + // PortDefn::new("echo_ok".into(), "echo_out".into(), Direction::Sink), + // ]; + // echo setup + // let port_input_str = r#"[{"maelstrom_type":"echo","port_name":"echo_in","direction":"Source"},{"maelstrom_type":"echo_ok","port_name":"echo_out","direction":"Sink"}]"#; + + // unique id gen setup + let port_input_str = r#"[{"maelstrom_type":"generate","port_name":"gen_in","direction":"Source"},{"maelstrom_type":"generate_ok","port_name":"ok_out","direction":"Sink"}]"#; + + let all_ports: Vec = serde_json::from_str(port_input_str)?; + + // Create some useful representations of all_ports: + + let (source_ports, sink_ports): (Vec<_>, Vec<_>) = all_ports + .iter() + .partition(|port| port.direction == Direction::Source); + + // port names for all sources (from the perspective of the hydro code) + let source_port_names: Vec<_> = source_ports + .iter() + .map(|port| port.port_name.clone()) + .collect(); + // maelstrom payload type to source port name + let type_to_source_name: HashMap<_, _> = source_ports + .iter() + .map(|port| (port.maelstrom_type.clone(), port.port_name.clone())) + .collect(); + + // port names for all sinks (from the perspective of the hydro code) + let sink_port_names: Vec<_> = sink_ports + .iter() + .map(|port| port.port_name.clone()) + .collect(); + // sink port name to maelstrom payload type + let sink_name_to_type: HashMap<_, _> = sink_ports + .iter() + .map(|port| (port.port_name.clone(), port.maelstrom_type.clone())) + .collect(); + + // accept the maelstrom initialize payload + let node_id = maelstrom_init().await?; + + // send node_id to hydro_cli + child_stdin + .write_all(format!("id: {node_id}\n").as_bytes()) + .await?; + + // setup source ports + send_setup_string(&source_port_names, &mut child_stdin).await?; + let source_ports = recieve_ready(&mut child_stdout).await?; + #[cfg(debug_assertions)] + println!("Parsed ready: {}", serde_json::to_string(&source_ports)?); + + // handle input (demux from stdin to tcp) + tokio::task::spawn(input_handler(source_ports, type_to_source_name)); + + // setup all sink ports + let sink_name_to_listener = setup_sink_ports(sink_port_names).await?; + + // start the crate + send_start_string(&sink_name_to_listener, &mut child_stdin).await?; + + let sink_count = sink_name_to_listener.len(); + // handle each output port individually (merge from tcp to stdout) + for (output_id, (port_name, listener)) in sink_name_to_listener.into_iter().enumerate() { + let maelstrom_type = sink_name_to_type.get(&port_name).unwrap().clone(); + tokio::task::spawn(output_handler( + listener, + maelstrom_type, + node_id.clone(), + output_id, + sink_count, + )); + } + + // forward child's stdout and stderr to our stderr + tokio::task::spawn(debug_link(child_stderr, stderr(), "child-stderr".into())); + tokio::task::spawn(debug_link(child_stdout, stderr(), "child-stdout".into())); + + // wait to finish + child.wait().await?; + + Ok(()) +} +// Example inputs (in order): +//{"src": "c1", "dest": "n1","body": {"msg_id": 0,"type": "init", "node_id": "n1"}} +//{"src": "c1", "dest": "n1","body": {"msg_id": 1,"type": "echo", "echo": "hello world!"}} +//{"src": "c1", "dest": "n1","body": {"msg_id": 1, "type": "generate"}} diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index 90928acd8a00..15a172b8dbfb 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -23,6 +23,7 @@ pub async fn launch_flow(mut flow: Hydroflow) { pub struct HydroCLI { ports: HashMap, + pub node_id: String, } impl HydroCLI { @@ -32,6 +33,14 @@ impl HydroCLI { } pub async fn init() -> HydroCLI { + let mut start_buf = String::new(); + std::io::stdin().read_line(&mut start_buf).unwrap(); + let id = if start_buf.starts_with("id: ") { + start_buf.trim_start_matches("id: ").trim().to_string() + } else { + panic!("expected id"); + }; + let mut input = String::new(); std::io::stdin().read_line(&mut input).unwrap(); let trimmed = input.trim(); @@ -72,5 +81,6 @@ pub async fn init() -> HydroCLI { HydroCLI { ports: all_connected, + node_id: id, } }