Skip to content

Commit

Permalink
feat(hydroflow_plus): provide simpler API for launching and minimize …
Browse files Browse the repository at this point in the history
…dependencies

The new `hydroflow::cli::util::launch` API takes a function from the `HydroCLI` config to a `Hydroflow`, which allows us to wait until the `Hydroflow` struct is instantiated (and all network connections are established) before returning the "ack start" message. This is also why this PR is stacked, because it triggers the race condition much more consistently.

This also disables depending on default features of Hydroflow from Hydroflow+, which should speed up builds because we no longer load the Hydroflow / Datalog macros.
  • Loading branch information
shadaj committed Dec 27, 2023
1 parent 1c31f9c commit 0143fe7
Show file tree
Hide file tree
Showing 16 changed files with 50 additions and 45 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions docs/docs/hydroflow_plus/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ numbers
Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with `build_single`. Instead, we must take the subgraph ID as a runtime parameter (`subgraph_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it.

```rust
use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
Expand All @@ -67,10 +67,8 @@ The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates
```rust
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(
flow::first_ten_distributed_runtime!(&ports)
hydroflow_plus::util::cli::launch(
|ports| flow::first_ten_distributed_runtime!(ports)
).await;
}
```
Expand Down
23 changes: 20 additions & 3 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ use serde::de::DeserializeOwned;

use crate::scheduled::graph::Hydroflow;

pub async fn launch<T: DeserializeOwned + Default>(
flow: impl FnOnce(&HydroCLI<T>) -> Hydroflow<'_>,
) {
let ports = init_no_ack_start::<T>().await;
let flow = flow(&ports);

println!("ack start");

launch_flow(flow).await;
}

pub async fn launch_flow(mut flow: Hydroflow<'_>) {
let stop = tokio::sync::oneshot::channel();
tokio::task::spawn_blocking(|| {
Expand Down Expand Up @@ -43,7 +54,7 @@ impl<T> HydroCLI<T> {
}
}

pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
async fn init_no_ack_start<T: DeserializeOwned + Default>() -> HydroCLI<T> {
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let trimmed = input.trim();
Expand Down Expand Up @@ -82,8 +93,6 @@ pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
all_connected.insert(name, ServerOrBound::Bound(defn));
}

println!("ack start");

HydroCLI {
ports: RefCell::new(all_connected),
meta: bind_config
Expand All @@ -92,3 +101,11 @@ pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
.unwrap_or_default(),
}
}

pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
let ret = init_no_ack_start::<T>().await;

println!("ack start");

ret
}
1 change: 1 addition & 0 deletions hydroflow/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Helper utilities for the Hydroflow surface syntax.
pub mod clear;
#[cfg(feature = "hydroflow_macro")]
pub mod demux_enum;
pub mod monotonic_map;
pub mod multiset;
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ quote = "1.0.0"
syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] }
proc-macro2 = "1.0.57"
proc-macro-crate = "1.1.0"
hydroflow = { path = "../hydroflow", version = "^0.5.0" }
hydroflow = { path = "../hydroflow", version = "^0.5.0", default-features = false }
hydroflow_lang = { path = "../hydroflow_lang", version = "^0.5.0" }
serde = { version = "1", features = [ "derive" ] }
bincode = "1.3"
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ version = "0.0.0"
edition = "2021"

[dependencies]
hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] }
hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.1.0" }
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_plus_test/examples/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow::futures::SinkExt;
use hydroflow::util::cli::ConnectedSink;
use hydroflow_plus::futures::SinkExt;
use hydroflow_plus::util::cli::ConnectedSink;
use hydroflow_plus_cli_integration::CLIDeployNodeBuilder;

#[tokio::main]
Expand Down
8 changes: 3 additions & 5 deletions hydroflow_plus_test/src/bin/first_ten_distributed.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(
hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports),
)
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(ports)
})
.await;
}
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/bin/many_to_many.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::many_to_many_runtime!(&ports))
.await;
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::cluster::many_to_many_runtime!(ports)
})
.await;
}
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/bin/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ extern crate alloc;
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::map_reduce_runtime!(&ports))
.await;
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::cluster::map_reduce_runtime!(ports)
})
.await;
}
8 changes: 3 additions & 5 deletions hydroflow_plus_test/src/bin/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::networked::networked_basic_runtime!(
&ports
))
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::networked::networked_basic_runtime!(ports)
})
.await;
}
8 changes: 3 additions & 5 deletions hydroflow_plus_test/src/bin/simple_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::simple_cluster_runtime!(
&ports
))
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::cluster::simple_cluster_runtime!(ports)
})
.await;
}
3 changes: 1 addition & 2 deletions hydroflow_plus_test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub fn map_reduce<'a, D: Deploy<'a>>(
(node, cluster)
}

use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
Expand Down Expand Up @@ -104,7 +104,6 @@ mod tests {
use std::cell::RefCell;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow::lattices::cc_traits::Iter;
use hydroflow_plus_cli_integration::{
CLIDeployClusterBuilder, CLIDeployNodeBuilder, DeployCrateWrapper,
};
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus_test/src/first_ten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn first_ten_distributed<'a, D: Deploy<'a>>(
second_node
}

use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
Expand Down
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/networked.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use hydroflow::bytes::BytesMut;
use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::bytes::BytesMut;
use hydroflow_plus::node::{Deploy, HfNode, NodeBuilder};
use hydroflow_plus::scheduled::graph::Hydroflow;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus::GraphBuilder;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};
use stageleft::{q, Quoted, RuntimeData};
Expand Down Expand Up @@ -41,8 +41,8 @@ pub fn networked_basic_runtime<'a>(
#[cfg(test)]
mod tests {
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow::futures::SinkExt;
use hydroflow::util::cli::ConnectedSink;
use hydroflow_plus::futures::SinkExt;
use hydroflow_plus::util::cli::ConnectedSink;
use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper};

#[tokio::test]
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test_macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ default = ["macro"]
macro = []

[dependencies]
hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] }
hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.1.0" }
Expand Down

0 comments on commit 0143fe7

Please sign in to comment.