diff --git a/Cargo.lock b/Cargo.lock index f8aa1dc5ae08..fae78b851489 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1581,7 +1581,6 @@ version = "0.0.0" dependencies = [ "futures", "hydro_deploy", - "hydroflow", "hydroflow_plus", "hydroflow_plus_cli_integration", "hydroflow_plus_test_macro", @@ -1595,7 +1594,6 @@ dependencies = [ name = "hydroflow_plus_test_macro" version = "0.0.0" dependencies = [ - "hydroflow", "hydroflow_plus", "hydroflow_plus_cli_integration", "stageleft", diff --git a/docs/docs/hydroflow_plus/distributed.mdx b/docs/docs/hydroflow_plus/distributed.mdx index 430b9673e386..82ba347716d3 100644 --- a/docs/docs/hydroflow_plus/distributed.mdx +++ b/docs/docs/hydroflow_plus/distributed.mdx @@ -49,7 +49,7 @@ numbers Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with `build_single`. Instead, we must take the subgraph ID as a runtime parameter (`subgraph_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it. ```rust -use hydroflow::util::cli::HydroCLI; +use hydroflow_plus::util::cli::HydroCLI; use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; #[stageleft::entry] @@ -67,10 +67,8 @@ The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates ```rust #[tokio::main] async fn main() { - let ports = hydroflow::util::cli::init().await; - - hydroflow::util::cli::launch_flow( - flow::first_ten_distributed_runtime!(&ports) + hydroflow_plus::util::cli::launch( + |ports| flow::first_ten_distributed_runtime!(ports) ).await; } ``` diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index b634a5505220..b83de573c95c 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -8,6 +8,17 @@ use serde::de::DeserializeOwned; use crate::scheduled::graph::Hydroflow; +pub async fn launch( + flow: impl FnOnce(&HydroCLI) -> Hydroflow<'_>, +) { + let ports = init_no_ack_start::().await; + let flow = flow(&ports); + + println!("ack start"); + + launch_flow(flow).await; +} + pub async fn launch_flow(mut flow: Hydroflow<'_>) { let stop = tokio::sync::oneshot::channel(); tokio::task::spawn_blocking(|| { @@ -43,7 +54,7 @@ impl HydroCLI { } } -pub async fn init() -> HydroCLI { +async fn init_no_ack_start() -> HydroCLI { let mut input = String::new(); std::io::stdin().read_line(&mut input).unwrap(); let trimmed = input.trim(); @@ -82,8 +93,6 @@ pub async fn init() -> HydroCLI { all_connected.insert(name, ServerOrBound::Bound(defn)); } - println!("ack start"); - HydroCLI { ports: RefCell::new(all_connected), meta: bind_config @@ -92,3 +101,11 @@ pub async fn init() -> HydroCLI { .unwrap_or_default(), } } + +pub async fn init() -> HydroCLI { + let ret = init_no_ack_start::().await; + + println!("ack start"); + + ret +} diff --git a/hydroflow/src/util/mod.rs b/hydroflow/src/util/mod.rs index fdc52039ba5b..25c8423d10a0 100644 --- a/hydroflow/src/util/mod.rs +++ b/hydroflow/src/util/mod.rs @@ -2,6 +2,7 @@ //! Helper utilities for the Hydroflow surface syntax. pub mod clear; +#[cfg(feature = "hydroflow_macro")] pub mod demux_enum; pub mod monotonic_map; pub mod multiset; diff --git a/hydroflow_plus/Cargo.toml b/hydroflow_plus/Cargo.toml index ef7f016b9cf3..2a9aa18c135e 100644 --- a/hydroflow_plus/Cargo.toml +++ b/hydroflow_plus/Cargo.toml @@ -20,7 +20,7 @@ quote = "1.0.0" syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } proc-macro2 = "1.0.57" proc-macro-crate = "1.1.0" -hydroflow = { path = "../hydroflow", version = "^0.5.0" } +hydroflow = { path = "../hydroflow", version = "^0.5.0", default-features = false } hydroflow_lang = { path = "../hydroflow_lang", version = "^0.5.0" } serde = { version = "1", features = [ "derive" ] } bincode = "1.3" diff --git a/hydroflow_plus_test/Cargo.toml b/hydroflow_plus_test/Cargo.toml index 5dfde76e7977..c20b4f95e633 100644 --- a/hydroflow_plus_test/Cargo.toml +++ b/hydroflow_plus_test/Cargo.toml @@ -5,7 +5,6 @@ version = "0.0.0" edition = "2021" [dependencies] -hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] } hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } tokio = { version = "1.16", features = [ "full" ] } stageleft = { path = "../stageleft", version = "^0.1.0" } diff --git a/hydroflow_plus_test/examples/networked_basic.rs b/hydroflow_plus_test/examples/networked_basic.rs index d6de39692656..a70f4e16df7d 100644 --- a/hydroflow_plus_test/examples/networked_basic.rs +++ b/hydroflow_plus_test/examples/networked_basic.rs @@ -1,6 +1,6 @@ use hydro_deploy::{Deployment, HydroflowCrate}; -use hydroflow::futures::SinkExt; -use hydroflow::util::cli::ConnectedSink; +use hydroflow_plus::futures::SinkExt; +use hydroflow_plus::util::cli::ConnectedSink; use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; #[tokio::main] diff --git a/hydroflow_plus_test/src/bin/first_ten_distributed.rs b/hydroflow_plus_test/src/bin/first_ten_distributed.rs index 6ec405c55018..dad227e12876 100644 --- a/hydroflow_plus_test/src/bin/first_ten_distributed.rs +++ b/hydroflow_plus_test/src/bin/first_ten_distributed.rs @@ -1,10 +1,8 @@ // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let ports = hydroflow::util::cli::init().await; - - hydroflow::util::cli::launch_flow( - hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports), - ) + hydroflow_plus::util::cli::launch(|ports| { + hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(ports) + }) .await; } diff --git a/hydroflow_plus_test/src/bin/many_to_many.rs b/hydroflow_plus_test/src/bin/many_to_many.rs index 4fe217b6e0d6..96b8cb43b9b7 100644 --- a/hydroflow_plus_test/src/bin/many_to_many.rs +++ b/hydroflow_plus_test/src/bin/many_to_many.rs @@ -1,8 +1,8 @@ // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let ports = hydroflow::util::cli::init().await; - - hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::many_to_many_runtime!(&ports)) - .await; + hydroflow_plus::util::cli::launch(|ports| { + hydroflow_plus_test::cluster::many_to_many_runtime!(ports) + }) + .await; } diff --git a/hydroflow_plus_test/src/bin/map_reduce.rs b/hydroflow_plus_test/src/bin/map_reduce.rs index 17526ca37efe..0a78e3eba638 100644 --- a/hydroflow_plus_test/src/bin/map_reduce.rs +++ b/hydroflow_plus_test/src/bin/map_reduce.rs @@ -4,8 +4,8 @@ extern crate alloc; // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let ports = hydroflow::util::cli::init().await; - - hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::map_reduce_runtime!(&ports)) - .await; + hydroflow_plus::util::cli::launch(|ports| { + hydroflow_plus_test::cluster::map_reduce_runtime!(ports) + }) + .await; } diff --git a/hydroflow_plus_test/src/bin/networked_basic.rs b/hydroflow_plus_test/src/bin/networked_basic.rs index 5e85fc102eb6..cc0ab9bd5b08 100644 --- a/hydroflow_plus_test/src/bin/networked_basic.rs +++ b/hydroflow_plus_test/src/bin/networked_basic.rs @@ -1,10 +1,8 @@ // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let ports = hydroflow::util::cli::init().await; - - hydroflow::util::cli::launch_flow(hydroflow_plus_test::networked::networked_basic_runtime!( - &ports - )) + hydroflow_plus::util::cli::launch(|ports| { + hydroflow_plus_test::networked::networked_basic_runtime!(ports) + }) .await; } diff --git a/hydroflow_plus_test/src/bin/simple_cluster.rs b/hydroflow_plus_test/src/bin/simple_cluster.rs index 8851839da7d4..ff0d41d99e6a 100644 --- a/hydroflow_plus_test/src/bin/simple_cluster.rs +++ b/hydroflow_plus_test/src/bin/simple_cluster.rs @@ -1,10 +1,8 @@ // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let ports = hydroflow::util::cli::init().await; - - hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::simple_cluster_runtime!( - &ports - )) + hydroflow_plus::util::cli::launch(|ports| { + hydroflow_plus_test::cluster::simple_cluster_runtime!(ports) + }) .await; } diff --git a/hydroflow_plus_test/src/cluster.rs b/hydroflow_plus_test/src/cluster.rs index 30a9bc1c8302..a7711c14b752 100644 --- a/hydroflow_plus_test/src/cluster.rs +++ b/hydroflow_plus_test/src/cluster.rs @@ -68,7 +68,7 @@ pub fn map_reduce<'a, D: Deploy<'a>>( (node, cluster) } -use hydroflow::util::cli::HydroCLI; +use hydroflow_plus::util::cli::HydroCLI; use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; #[stageleft::entry] @@ -104,7 +104,6 @@ mod tests { use std::cell::RefCell; use hydro_deploy::{Deployment, HydroflowCrate}; - use hydroflow::lattices::cc_traits::Iter; use hydroflow_plus_cli_integration::{ CLIDeployClusterBuilder, CLIDeployNodeBuilder, DeployCrateWrapper, }; diff --git a/hydroflow_plus_test/src/first_ten.rs b/hydroflow_plus_test/src/first_ten.rs index daee194518eb..e4ab7b090ba6 100644 --- a/hydroflow_plus_test/src/first_ten.rs +++ b/hydroflow_plus_test/src/first_ten.rs @@ -34,7 +34,7 @@ pub fn first_ten_distributed<'a, D: Deploy<'a>>( second_node } -use hydroflow::util::cli::HydroCLI; +use hydroflow_plus::util::cli::HydroCLI; use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; #[stageleft::entry] diff --git a/hydroflow_plus_test/src/networked.rs b/hydroflow_plus_test/src/networked.rs index 8ef6e66134a2..829fd61d8edc 100644 --- a/hydroflow_plus_test/src/networked.rs +++ b/hydroflow_plus_test/src/networked.rs @@ -1,7 +1,7 @@ -use hydroflow::bytes::BytesMut; -use hydroflow::util::cli::HydroCLI; +use hydroflow_plus::bytes::BytesMut; use hydroflow_plus::node::{Deploy, HfNode, NodeBuilder}; use hydroflow_plus::scheduled::graph::Hydroflow; +use hydroflow_plus::util::cli::HydroCLI; use hydroflow_plus::GraphBuilder; use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; use stageleft::{q, Quoted, RuntimeData}; @@ -41,8 +41,8 @@ pub fn networked_basic_runtime<'a>( #[cfg(test)] mod tests { use hydro_deploy::{Deployment, HydroflowCrate}; - use hydroflow::futures::SinkExt; - use hydroflow::util::cli::ConnectedSink; + use hydroflow_plus::futures::SinkExt; + use hydroflow_plus::util::cli::ConnectedSink; use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper}; #[tokio::test] diff --git a/hydroflow_plus_test_macro/Cargo.toml b/hydroflow_plus_test_macro/Cargo.toml index 79549e5dff78..dfbd21b7a7a4 100644 --- a/hydroflow_plus_test_macro/Cargo.toml +++ b/hydroflow_plus_test_macro/Cargo.toml @@ -13,7 +13,6 @@ default = ["macro"] macro = [] [dependencies] -hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] } hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } tokio = { version = "1.16", features = [ "full" ] } stageleft = { path = "../stageleft", version = "^0.1.0" }