Skip to content

Commit

Permalink
Merge c63fd4f into sapling-pr-archive-shadaj
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Dec 27, 2023
2 parents 7aa694c + c63fd4f commit 656c886
Show file tree
Hide file tree
Showing 16 changed files with 39 additions and 40 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions docs/docs/hydroflow_plus/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ numbers
Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with `build_single`. Instead, we must take the subgraph ID as a runtime parameter (`subgraph_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it.

```rust
use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
Expand All @@ -67,10 +67,8 @@ The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates
```rust
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(
flow::first_ten_distributed_runtime!(&ports)
hydroflow_plus::util::cli::launch(
|ports| flow::first_ten_distributed_runtime!(ports)
).await;
}
```
Expand Down
21 changes: 18 additions & 3 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ use serde::de::DeserializeOwned;

use crate::scheduled::graph::Hydroflow;

pub async fn launch<T: DeserializeOwned + Default>(flow: impl FnOnce(&HydroCLI<T>) -> Hydroflow<'_>) {
let ports = init_no_ack_start::<T>().await;
let flow = flow(&ports);

println!("ack start");

launch_flow(flow).await;
}

pub async fn launch_flow(mut flow: Hydroflow<'_>) {
let stop = tokio::sync::oneshot::channel();
tokio::task::spawn_blocking(|| {
Expand Down Expand Up @@ -43,7 +52,7 @@ impl<T> HydroCLI<T> {
}
}

pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
async fn init_no_ack_start<T: DeserializeOwned + Default>() -> HydroCLI<T> {
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let trimmed = input.trim();
Expand Down Expand Up @@ -82,8 +91,6 @@ pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
all_connected.insert(name, ServerOrBound::Bound(defn));
}

println!("ack start");

HydroCLI {
ports: RefCell::new(all_connected),
meta: bind_config
Expand All @@ -92,3 +99,11 @@ pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
.unwrap_or_default(),
}
}

pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
let ret = init_no_ack_start::<T>().await;

println!("ack start");

ret
}
1 change: 1 addition & 0 deletions hydroflow/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Helper utilities for the Hydroflow surface syntax.
pub mod clear;
#[cfg(feature = "hydroflow_macro")]
pub mod demux_enum;
pub mod monotonic_map;
pub mod multiset;
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ quote = "1.0.0"
syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] }
proc-macro2 = "1.0.57"
proc-macro-crate = "1.1.0"
hydroflow = { path = "../hydroflow", version = "^0.5.0" }
hydroflow = { path = "../hydroflow", version = "^0.5.0", default-features = false }
hydroflow_lang = { path = "../hydroflow_lang", version = "^0.5.0" }
serde = { version = "1", features = [ "derive" ] }
bincode = "1.3"
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ version = "0.0.0"
edition = "2021"

[dependencies]
hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] }
hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.1.0" }
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_plus_test/examples/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow::futures::SinkExt;
use hydroflow::util::cli::ConnectedSink;
use hydroflow_plus::futures::SinkExt;
use hydroflow_plus::util::cli::ConnectedSink;
use hydroflow_plus_cli_integration::CLIDeployNodeBuilder;

#[tokio::main]
Expand Down
6 changes: 2 additions & 4 deletions hydroflow_plus_test/src/bin/first_ten_distributed.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(
hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports),
hydroflow_plus::util::cli::launch(
|ports| hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(ports),
)
.await;
}
4 changes: 1 addition & 3 deletions hydroflow_plus_test/src/bin/many_to_many.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::many_to_many_runtime!(&ports))
hydroflow_plus::util::cli::launch(|ports| hydroflow_plus_test::cluster::many_to_many_runtime!(ports))
.await;
}
4 changes: 1 addition & 3 deletions hydroflow_plus_test/src/bin/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ extern crate alloc;
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::map_reduce_runtime!(&ports))
hydroflow_plus::util::cli::launch(|ports| hydroflow_plus_test::cluster::map_reduce_runtime!(ports))
.await;
}
6 changes: 2 additions & 4 deletions hydroflow_plus_test/src/bin/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::networked::networked_basic_runtime!(
&ports
hydroflow_plus::util::cli::launch(|ports| hydroflow_plus_test::networked::networked_basic_runtime!(
ports
))
.await;
}
6 changes: 2 additions & 4 deletions hydroflow_plus_test/src/bin/simple_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;

hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::simple_cluster_runtime!(
&ports
hydroflow_plus::util::cli::launch(|ports| hydroflow_plus_test::cluster::simple_cluster_runtime!(
ports
))
.await;
}
3 changes: 1 addition & 2 deletions hydroflow_plus_test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub fn map_reduce<'a, D: Deploy<'a>>(
(node, cluster)
}

use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
Expand Down Expand Up @@ -105,7 +105,6 @@ mod tests {
use std::time::Duration;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow::lattices::cc_traits::Iter;
use hydroflow_plus_cli_integration::{
CLIDeployClusterBuilder, CLIDeployNodeBuilder, DeployCrateWrapper,
};
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus_test/src/first_ten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn first_ten_distributed<'a, D: Deploy<'a>>(
second_node
}

use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
Expand Down
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/networked.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use hydroflow::bytes::BytesMut;
use hydroflow::util::cli::HydroCLI;
use hydroflow_plus::bytes::BytesMut;
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus::node::{Deploy, HfNode, NodeBuilder};
use hydroflow_plus::scheduled::graph::Hydroflow;
use hydroflow_plus::GraphBuilder;
Expand Down Expand Up @@ -43,8 +43,8 @@ mod tests {
use std::time::Duration;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow::futures::SinkExt;
use hydroflow::util::cli::ConnectedSink;
use hydroflow_plus::futures::SinkExt;
use hydroflow_plus::util::cli::ConnectedSink;
use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper};

#[tokio::test]
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test_macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ default = ["macro"]
macro = []

[dependencies]
hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] }
hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.1.0" }
Expand Down

0 comments on commit 656c886

Please sign in to comment.