Skip to content

Commit

Permalink
feat(hydroflow_plus): provide simpler API for launching and minimize …
Browse files Browse the repository at this point in the history
…dependencies

The new `hydroflow::cli::util::launch` API takes a function from the `HydroCLI` config to a `Hydroflow`, which allows us to wait until the `Hydroflow` struct is instantiated (and all network connections are established) before returning the "ack start" message.

This also disables depending on default features of Hydroflow from Hydroflow+, which should speed up builds because we no longer load the Hydroflow / Datalog macros.
  • Loading branch information
shadaj committed Dec 27, 2023
1 parent 1c31f9c commit 75b645c
Show file tree
Hide file tree
Showing 16 changed files with 50 additions and 45 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions docs/docs/hydroflow_plus/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ numbers
Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with `build_single`. Instead, we must take the subgraph ID as a runtime parameter (`subgraph_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it.

```rust
use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
Expand All @@ -67,10 +67,8 @@ The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates
```rust
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(
flow::first_ten_distributed_runtime!(&ports)
hydroflow_plus::util::cli::launch(
|ports| flow::first_ten_distributed_runtime!(ports)
).await;
}
```
Expand Down
23 changes: 20 additions & 3 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ use serde::de::DeserializeOwned;

use crate::scheduled::graph::Hydroflow;

pub async fn launch<T: DeserializeOwned + Default>(
flow: impl FnOnce(&HydroCLI<T>) -> Hydroflow<'_>,
) {
let ports = init_no_ack_start::<T>().await;
let flow = flow(&ports);

println!("ack start");

launch_flow(flow).await;
}

pub async fn launch_flow(mut flow: Hydroflow<'_>) {
let stop = tokio::sync::oneshot::channel();
tokio::task::spawn_blocking(|| {
Expand Down Expand Up @@ -43,7 +54,7 @@ impl<T> HydroCLI<T> {
}
}

pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
async fn init_no_ack_start<T: DeserializeOwned + Default>() -> HydroCLI<T> {
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let trimmed = input.trim();
Expand Down Expand Up @@ -82,8 +93,6 @@ pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
all_connected.insert(name, ServerOrBound::Bound(defn));
}

println!("ack start");

HydroCLI {
ports: RefCell::new(all_connected),
meta: bind_config
Expand All @@ -92,3 +101,11 @@ pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
.unwrap_or_default(),
}
}

pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
let ret = init_no_ack_start::<T>().await;

println!("ack start");

ret
}
1 change: 1 addition & 0 deletions hydroflow/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Helper utilities for the Hydroflow surface syntax.
pub mod clear;
#[cfg(feature = "hydroflow_macro")]
pub mod demux_enum;
pub mod monotonic_map;
pub mod multiset;
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ quote = "1.0.0"
syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] }
proc-macro2 = "1.0.57"
proc-macro-crate = "1.1.0"
hydroflow = { path = "../hydroflow", version = "^0.5.0" }
hydroflow = { path = "../hydroflow", version = "^0.5.0", default-features = false }
hydroflow_lang = { path = "../hydroflow_lang", version = "^0.5.0" }
serde = { version = "1", features = [ "derive" ] }
bincode = "1.3"
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ version = "0.0.0"
edition = "2021"

[dependencies]
hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] }
hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.1.0" }
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_plus_test/examples/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow::futures::SinkExt;
use hydroflow::util::cli::ConnectedSink;
use hydroflow_plus::futures::SinkExt;
use hydroflow_plus::util::cli::ConnectedSink;
use hydroflow_plus_cli_integration::CLIDeployNodeBuilder;

#[tokio::main]
Expand Down
8 changes: 3 additions & 5 deletions hydroflow_plus_test/src/bin/first_ten_distributed.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(
hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports),
)
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(ports)
})
.await;
}
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/bin/many_to_many.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::many_to_many_runtime!(&ports))
.await;
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::cluster::many_to_many_runtime!(ports)
})
.await;
}
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/bin/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ extern crate alloc;
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::map_reduce_runtime!(&ports))
.await;
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::cluster::map_reduce_runtime!(ports)
})
.await;
}
8 changes: 3 additions & 5 deletions hydroflow_plus_test/src/bin/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::networked::networked_basic_runtime!(
&ports
))
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::networked::networked_basic_runtime!(ports)
})
.await;
}
8 changes: 3 additions & 5 deletions hydroflow_plus_test/src/bin/simple_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::simple_cluster_runtime!(
&ports
))
hydroflow_plus::util::cli::launch(|ports| {
hydroflow_plus_test::cluster::simple_cluster_runtime!(ports)
})
.await;
}
3 changes: 1 addition & 2 deletions hydroflow_plus_test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub fn map_reduce<'a, D: Deploy<'a>>(
(node, cluster)
}

use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
Expand Down Expand Up @@ -104,7 +104,6 @@ mod tests {
use std::cell::RefCell;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow::lattices::cc_traits::Iter;
use hydroflow_plus_cli_integration::{
CLIDeployClusterBuilder, CLIDeployNodeBuilder, DeployCrateWrapper,
};
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus_test/src/first_ten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn first_ten_distributed<'a, D: Deploy<'a>>(
second_node
}

use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
Expand Down
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/networked.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use hydroflow::bytes::BytesMut;
use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::bytes::BytesMut;
use hydroflow_plus::node::{Deploy, HfNode, NodeBuilder};
use hydroflow_plus::scheduled::graph::Hydroflow;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus::GraphBuilder;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};
use stageleft::{q, Quoted, RuntimeData};
Expand Down Expand Up @@ -41,8 +41,8 @@ pub fn networked_basic_runtime<'a>(
#[cfg(test)]
mod tests {
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow::futures::SinkExt;
use hydroflow::util::cli::ConnectedSink;
use hydroflow_plus::futures::SinkExt;
use hydroflow_plus::util::cli::ConnectedSink;
use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper};

#[tokio::test]
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test_macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ default = ["macro"]
macro = []

[dependencies]
hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] }
hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.1.0" }
Expand Down

0 comments on commit 75b645c

Please sign in to comment.