diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 173d33213f54..aaef3be3f673 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -224,16 +224,6 @@ jobs: cd python_tests pip install -r requirements.txt RUST_BACKTRACE=1 pytest *.py - cd ../.. - - - name: Run Hydroflow+ Python tests - run: | - ulimit -c unlimited - cd hydro_cli - source .venv/bin/activate - cd ../hydroflow_plus_test/python_tests - pip install -r requirements.txt - RUST_BACKTRACE=1 pytest *.py lints: name: Lints diff --git a/Cargo.lock b/Cargo.lock index c10dd202a585..90df0f745286 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1481,25 +1481,40 @@ dependencies = [ name = "hydroflow_plus" version = "0.5.0" dependencies = [ + "bincode", "hydroflow", "hydroflow_lang", "proc-macro-crate", "proc-macro2", "quote", + "serde", "stageleft", "syn 2.0.14", ] +[[package]] +name = "hydroflow_plus_cli_integration" +version = "0.5.0" +dependencies = [ + "async-channel", + "hydro_cli", + "hydroflow_plus", + "proc-macro-crate", + "stageleft", + "syn 2.0.14", + "tokio", +] + [[package]] name = "hydroflow_plus_test" version = "0.0.0" dependencies = [ + "hydro_cli", "hydroflow", "hydroflow_plus", + "hydroflow_plus_cli_integration", "hydroflow_plus_test_macro", "insta", - "regex", - "serde", "stageleft", "stageleft_tool", "tokio", @@ -1511,8 +1526,7 @@ version = "0.0.0" dependencies = [ "hydroflow", "hydroflow_plus", - "regex", - "serde", + "hydroflow_plus_cli_integration", "stageleft", "stageleft_tool", "tokio", @@ -1987,9 +2001,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.17.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "oorandom" diff --git a/Cargo.toml b/Cargo.toml index 1ef74afd2113..5b886f148ed6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "hydroflow_lang", "hydroflow_macro", "hydroflow_plus", + "hydroflow_plus_cli_integration", "hydroflow_plus_test", "hydroflow_plus_test_macro", "lattices", diff --git a/docs/docs/hydroflow_plus/distributed.mdx b/docs/docs/hydroflow_plus/distributed.mdx new file mode 100644 index 000000000000..f5a9a3416bb9 --- /dev/null +++ b/docs/docs/hydroflow_plus/distributed.mdx @@ -0,0 +1,134 @@ +--- +sidebar_position: 3 +--- + +# Distributed Hydroflow+ +Continuing from our previous example, we will now look at how to extend our program to run on multiple nodes. Recall that our previous flow graph looked like this: + +```rust +use hydroflow_plus::*; +use hydroflow_plus::node::*; +use stageleft::*; + +pub fn first_ten<'a, D: HfDeploy<'a>>( + graph: &'a HfBuilder<'a, D>, + node_builder: &mut impl HfNodeBuilder<'a, D> +) { + let node = graph.node(node_builder); + let numbers = node.source_iter(q!(0..10)); + numbers.for_each(q!(|n| println!("{}", n))); +} +``` + +## The Flow Graph +Let's extend this example to print the numbers on a separate node. First, we need to specify that our flow graph will involve the network. We do this by replacing the `HfDeploy<'a>` trait bound with `HfNetworkedDeploy<'a>`. Then, we can use the `node_builder` to create a second node: +```rust +use hydroflow_plus::*; +use hydroflow_plus::node::*; +use stageleft::*; + +pub fn first_ten_distributed<'a, D: HfNetworkedDeploy<'a>>( + graph: &'a HfBuilder<'a, D>, + node_builder: &mut impl HfNodeBuilder<'a, D> +) { + let node = graph.node(node_builder); + let second_node = graph.node(node_builder); +} +``` + +Now, we can distribute our dataflow by using the `send_bincode` operator to mark where the data should be sent using bincode serialization. + +```rust +let numbers = node.source_iter(q!(0..10)); +numbers + .send_bincode(&second_node) + .for_each(q!(|n| println!("{}", n))); +``` + +## The Runtime +Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with hardcoding node 0 as before. Instead, we must take the node ID as a runtime parameter (`node_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it (via `CLIRuntimeNodeBuilder`). + +```rust +use hydroflow::util::cli::HydroCLI; +use hydroflow_plus_cli_integration::{CLIRuntime, CLIRuntimeNodeBuilder}; + +#[stageleft::entry] +pub fn first_ten_distributed_runtime<'a>( + graph: &'a HfBuilder<'a, CLIRuntime>, + cli: RuntimeData<&'a HydroCLI>, + node_id: RuntimeData, +) -> impl Quoted<'a, Hydroflow<'a>> { + let _ = first_ten_distributed(graph, &mut CLIRuntimeNodeBuilder::new(cli)); + graph.build(node_id) +} +``` + +The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates the CLI and reads the node ID from the command line arguments: + +```rust +#[tokio::main] +async fn main() { + let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let ports = hydroflow::util::cli::init().await; + + let joined = flow::first_ten_distributed_runtime!(&ports, node_id); + + hydroflow::util::cli::launch_flow(joined).await; +} +``` + +## The Deployment +Finally, we need to deploy our dataflow with the appropriate network topology. We achieve this by using [Hydro Deploy](../deploy/index.md). Hydroflow+ integrates with Hydro Deploy to automatically construct the topology based on the flow graph. We can create a new file `examples/first_ten_distributed.rs` with the following contents: + +```rust +use hydro_cli::core::Deployment; +use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; + +#[tokio::main] +async fn main() { + let mut deployment = Deployment::new(); + let localhost = deployment.Localhost(); + + let builder = hydroflow_plus::HfBuilder::new(); + hydroflow_plus_test::first_ten::first_ten_distributed( + &builder, + &mut CLIDeployNodeBuilder::new(|id| { + deployment.HydroflowCrate( + ".", + localhost.clone(), + Some("first_ten_distributed".into()), // --bin + None, + Some("dev".into()), // --profile + None, + Some(vec![id.to_string()]), // command line arguments + None, + vec![], + ) + }), + ); + + deployment.deploy().await.unwrap(); + + deployment.start().await.unwrap(); + + tokio::signal::ctrl_c().await.unwrap() +} +``` + +Most importantly, we specify a `CLIDeployNodeBuilder`, which takes a closure that constructs a Hydro Deploy node for each node in the flow graph. In our case, we use the `HydroflowCrate` node type, which deploys a Hydroflow+ binary. We also specify the node ID as a command line argument, which is read by our runtime binary. + +We can then run our distributed dataflow with: + +```bash +$ cargo run --example first_ten_distributed +[service/1] 0 +[service/1] 1 +[service/1] 2 +[service/1] 3 +[service/1] 4 +[service/1] 5 +[service/1] 6 +[service/1] 7 +[service/1] 8 +[service/1] 9 +``` diff --git a/docs/docs/hydroflow_plus/index.mdx b/docs/docs/hydroflow_plus/index.mdx new file mode 100644 index 000000000000..64fdb7af161e --- /dev/null +++ b/docs/docs/hydroflow_plus/index.mdx @@ -0,0 +1,22 @@ +--- +sidebar_position: 1 +--- + +# Introduction +Hydroflow+ layers a high-level Rust API over the Hydroflow IR, making it possible to write dataflow programs that span multiple nodes with straightline, functional Rust code. Hydroflow+ is built on top of [Stageleft](./stageleft.mdx), which allows Hydroflow+ to emit regular Hydroflow programs that are compiled into efficient Rust binaries. It also integrates with [Hydro Deploy](../deploy/index.md) to make it easy to deploy and run Hydroflow+ programs on a cluster. + +The main logic of Hydroflow+ programs manipulates **streams**, which capture infinite ordered sequences of elements. Streams are transformed using classic functional operators such as `map`, `filter`, and `fold`, as well as relational operators such as `join`. To build **distributed** dataflow programs, Hydroflow+ also introduces the concept of **nodes**, which capture _where_ a stream is being processed. + +## Quickstart +Hydroflow+ requires a particular workspace setup, as any crate that uses Hydroflow+ must have an supporting macro crate to drive the code generation. To get started, we recommend using the Hydroflow+ template. + +```bash +$ cargo install cargo-generate +$ cargo generate hydro-project/hydroflow-plus-template +``` + +Then, you can test the example dataflow: + +```bash +$ cargo test +``` diff --git a/docs/docs/hydroflow_plus/stageleft.mdx b/docs/docs/hydroflow_plus/stageleft.mdx new file mode 100644 index 000000000000..3da1d77384eb --- /dev/null +++ b/docs/docs/hydroflow_plus/stageleft.mdx @@ -0,0 +1,8 @@ +--- +title: Stageleft +sidebar_position: 4 +--- + +import StageleftDocs from '../../../stageleft/README.md' + + diff --git a/docs/docs/hydroflow_plus/structure.mdx b/docs/docs/hydroflow_plus/structure.mdx new file mode 100644 index 000000000000..3650c3128a53 --- /dev/null +++ b/docs/docs/hydroflow_plus/structure.mdx @@ -0,0 +1,91 @@ +--- +sidebar_position: 2 +--- + +# Building Dataflows +Hydroflow+ programs require special structure to support code generation and distributed deployments. There are three main components of a Hydroflow+ program: +- The **flow graph** describes the dataflow logic of the program. +- The **runtime** wraps the dataflow in an executable Rust binary. +- The **deployment** describes how to map the flow graph to instances of the runtime. This is only needed for distributed deployments. + +Let's look a minimal example of a Hydroflow+ program. We'll start with a simple flow graph that prints out the first 10 natural numbers. First, we'll define the **flow graph**. + + +## The Flow Graph +```rust +use hydroflow_plus::*; +use hydroflow_plus::node::*; +use stageleft::*; + +pub fn first_ten<'a, D: HfDeploy<'a>>( + graph: &'a HfBuilder<'a, D>, + node_builder: &mut impl HfNodeBuilder<'a, D> +) {} +``` + +The `graph` variable gives us access to the builder API for constructing the flow graph. The `node_builder` variable defines how to construct the nodes of the flow graph. For now, we will only use the builder once, to create a single node. + +```rust +pub fn first_ten<'a, D: HfDeploy<'a>>( + graph: &'a HfBuilder<'a, D>, + node_builder: &mut impl HfNodeBuilder<'a, D> +) { + let node = graph.node(node_builder); +} +``` + +Now, we can build out the logic of the node. First, we instantiate a stream that emits the first 10 natural numbers. + +```rust +let numbers = node.source_iter(q!(0..10)); +``` + +In Hydroflow+, whenever there are snippets of code that will be executed during runtime, we use the `q!` macro to mark them. This includes both static sources of data as well as closures that transform them. For example, to print out these numbers, we can use the `for_each` operator: + +```rust +numbers.for_each(q!(|n| println!("{}", n))); +``` + +## The Runtime +Next, we need to instantiate our flow graph into a runnable Rust binary. We do this by defining a Stageleft entrypoint for the graph, and then instantiating this inside a separate Rust binary. + +To define the entrypoint, we use the `#[stageleft::entry]` macro, which takes the graph being built and returns a generated Hydroflow program: + +```rust +#[stageleft::entry] +pub fn first_ten_runtime<'a>( + graph: &'a HfBuilder<'a, SingleGraph> +) -> impl Quoted<'a, Hydroflow<'a>> { + first_ten(graph, &mut ()); + graph.build(q!(0)) +} +``` + +Hydroflow+ graphs can span multiple nodes, each running different pieces of logic. When a Hydroflow+ graph is built into a runnable program, the graph is sliced into components for each of these nodes, and an integer ID is used to determine which subgraph should be executed. In our case, we only have a single node, so we pass a static `q!(0)` node ID to the `build` method. + +Stageleft entries are usable as macros from other programs. In our case, we will instantiate our entrypoint from the Rust binary for our dataflow. We can create a new file `src/bin/first_ten.rs` with the following contents: + +```rust +#[tokio::main] +async fn main() { + flow::first_ten_runtime!().run_async().await; +} +``` + +We can now run this binary to see the output of our dataflow: + +```bash +$ cargo run --bin first_ten +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +``` + +In the next section, we will look at how to extend this program to run on multiple nodes. diff --git a/docs/docusaurus.config.js b/docs/docusaurus.config.js index 2b37bfc6e5dd..e89445e0d761 100644 --- a/docs/docusaurus.config.js +++ b/docs/docusaurus.config.js @@ -120,6 +120,11 @@ const config = { sidebarId: 'hydroflowSidebar', label: 'Hydroflow', }, + { + type: 'docSidebar', + sidebarId: 'hydroflowPlusSidebar', + label: 'Hydroflow+', + }, { type: 'docSidebar', sidebarId: 'deploySidebar', @@ -160,6 +165,10 @@ const config = { label: 'Hydroflow', to: '/docs/hydroflow/', }, + { + label: 'Hydroflow+', + to: '/docs/hydroflow_plus/', + }, { label: 'Hydro Deploy', to: '/docs/deploy/', diff --git a/docs/sidebars.js b/docs/sidebars.js index 4ce0ff60ab05..d433609f4d87 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -15,6 +15,7 @@ const sidebars = { // By default, Docusaurus generates a sidebar from the docs folder structure hydroflowSidebar: [{type: 'autogenerated', dirName: 'hydroflow'}], + hydroflowPlusSidebar: [{type: 'autogenerated', dirName: 'hydroflow_plus'}], deploySidebar: [{type: 'autogenerated', dirName: 'deploy'}], }; diff --git a/hydro_cli/Cargo.toml b/hydro_cli/Cargo.toml index 4c802528b34f..7338b7da0623 100644 --- a/hydro_cli/Cargo.toml +++ b/hydro_cli/Cargo.toml @@ -8,9 +8,9 @@ documentation = "https://docs.rs/hydro_cli/" description = "Hydro Deploy Command Line Interface" [lib] -name = "hydro" +name = "hydro_cli" # "cdylib" is necessary to produce a shared library for Python to import from. -crate-type = ["cdylib"] +crate-type = ["lib", "cdylib"] [dependencies] tokio = { version = "1.16", features = [ "full" ] } diff --git a/hydro_cli/src/core/custom_service.rs b/hydro_cli/src/core/custom_service.rs index 3f5695f3a90a..90aebd401c06 100644 --- a/hydro_cli/src/core/custom_service.rs +++ b/hydro_cli/src/core/custom_service.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Weak}; use anyhow::{bail, Result}; use async_trait::async_trait; -use hydroflow_cli_integration::ServerPort; +use hydroflow_cli_integration::{ConnectedDirect, ServerPort}; use tokio::sync::RwLock; use super::hydroflow_crate::ports::{ @@ -32,6 +32,10 @@ impl CustomService { launched_host: None, } } + + pub fn declare_client(&self, self_arc: &Arc>) -> CustomClientPort { + CustomClientPort::new(Arc::downgrade(self_arc)) + } } #[async_trait] @@ -95,6 +99,17 @@ impl CustomClientPort { .load_instantiated(&|p| p) .await } + + pub async fn connect(&self) -> ConnectedDirect { + self.client_port + .as_ref() + .unwrap() + .load_instantiated(&|p| p) + .await + .instantiate() + .connect::() + .await + } } impl HydroflowSource for CustomClientPort { diff --git a/hydro_cli/src/core/deployment.rs b/hydro_cli/src/core/deployment.rs index 7cc7a1442239..9750ee053be6 100644 --- a/hydro_cli/src/core/deployment.rs +++ b/hydro_cli/src/core/deployment.rs @@ -1,10 +1,15 @@ +use std::path::PathBuf; use std::sync::{Arc, Weak}; use anyhow::Result; use futures::{StreamExt, TryStreamExt}; use tokio::sync::RwLock; -use super::{progress, Host, ResourcePool, ResourceResult, Service}; +use super::gcp::GCPNetwork; +use super::{ + progress, CustomService, GCPComputeEngineHost, Host, HydroflowCrate, LocalhostHost, + ResourcePool, ResourceResult, Service, +}; #[derive(Default)] pub struct Deployment { @@ -17,6 +22,76 @@ pub struct Deployment { } impl Deployment { + pub fn new() -> Self { + Self::default() + } + + #[allow(non_snake_case)] + pub fn Localhost(&mut self) -> Arc> { + self.add_host(LocalhostHost::new) + } + + #[allow(non_snake_case)] + pub fn GCPComputeEngineHost( + &mut self, + project: &str, + machine_type: &str, + image: &str, + region: &str, + network: Arc>, + user: Option, + ) -> Arc> { + self.add_host(|id| { + crate::core::GCPComputeEngineHost::new( + id, + project, + machine_type, + image, + region, + network, + user, + ) + }) + } + + #[allow(non_snake_case)] + pub fn CustomService( + &mut self, + on: Arc>, + external_ports: Vec, + ) -> Arc> { + self.add_service(|id| CustomService::new(id, on, external_ports)) + } + + #[allow(non_snake_case, clippy::too_many_arguments)] + pub fn HydroflowCrate( + &mut self, + src: impl Into, + on: Arc>, + bin: Option, + example: Option, + profile: Option, + features: Option>, + args: Option>, + display_id: Option, + external_ports: Vec, + ) -> Arc> { + self.add_service(|id| { + crate::core::HydroflowCrate::new( + id, + src.into(), + on, + bin, + example, + profile, + features, + args, + display_id, + external_ports, + ) + }) + } + pub async fn deploy(&mut self) -> Result<()> { progress::ProgressTracker::with_group("deploy", None, || async { let mut resource_batch = super::ResourceBatch::new(); @@ -136,7 +211,7 @@ impl Deployment { arc } - pub fn add_service( + fn add_service( &mut self, service: impl FnOnce(usize) -> T, ) -> Arc> { diff --git a/hydro_cli/src/core/gcp.rs b/hydro_cli/src/core/gcp.rs index 8c236e84bcf0..3934befa266c 100644 --- a/hydro_cli/src/core/gcp.rs +++ b/hydro_cli/src/core/gcp.rs @@ -146,9 +146,9 @@ pub struct GCPNetwork { } impl GCPNetwork { - pub fn new(project: String, existing_vpc: Option) -> Self { + pub fn new(project: &str, existing_vpc: Option) -> Self { Self { - project, + project: project.to_string(), existing_vpc, id: nanoid!(8, &TERRAFORM_ALPHABET), } @@ -277,19 +277,19 @@ pub struct GCPComputeEngineHost { impl GCPComputeEngineHost { pub fn new( id: usize, - project: String, - machine_type: String, - image: String, - region: String, + project: &str, + machine_type: &str, + image: &str, + region: &str, network: Arc>, user: Option, ) -> Self { Self { id, - project, - machine_type, - image, - region, + project: project.to_string(), + machine_type: machine_type.to_string(), + image: image.to_string(), + region: region.to_string(), network, user, launched: None, diff --git a/hydro_cli/src/core/hydroflow_crate/mod.rs b/hydro_cli/src/core/hydroflow_crate/mod.rs index 6e9be1987256..42b79da90b6e 100644 --- a/hydro_cli/src/core/hydroflow_crate/mod.rs +++ b/hydro_cli/src/core/hydroflow_crate/mod.rs @@ -13,7 +13,8 @@ use tokio::sync::RwLock; use self::ports::{HydroflowPortConfig, HydroflowSink, SourcePath}; use super::progress::ProgressTracker; use super::{ - Host, LaunchedBinary, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, Service, + Host, HostTargetType, LaunchedBinary, LaunchedHost, ResourceBatch, ResourceResult, + ServerStrategy, Service, }; mod build; @@ -32,6 +33,8 @@ pub struct HydroflowCrate { display_id: Option, external_ports: Vec, + target_type: HostTargetType, + /// Configuration for the ports this service will connect to as a client. port_to_server: HashMap, @@ -64,6 +67,8 @@ impl HydroflowCrate { display_id: Option, external_ports: Vec, ) -> Self { + let target_type = on.try_read().unwrap().target_type(); + Self { id, src, @@ -74,6 +79,7 @@ impl HydroflowCrate { features, args, display_id, + target_type, external_ports, port_to_server: HashMap::new(), port_to_bind: HashMap::new(), @@ -168,9 +174,8 @@ impl HydroflowCrate { let bin_cloned = self.bin.clone(); let example_cloned = self.example.clone(); let features_cloned = self.features.clone(); - let host = self.on.clone(); let profile_cloned = self.profile.clone(); - let target_type = host.try_read().unwrap().target_type(); + let target_type = self.target_type.clone(); let built_binary_cloned = self.built_binary.clone(); async move { diff --git a/hydro_cli/src/core/localhost.rs b/hydro_cli/src/core/localhost.rs index 090969e24d4d..1647ff345008 100644 --- a/hydro_cli/src/core/localhost.rs +++ b/hydro_cli/src/core/localhost.rs @@ -128,15 +128,15 @@ pub fn create_broadcast( } if let Some(cli_receivers) = weak_cli_receivers.upgrade() { - let cli_receivers = cli_receivers.write().await; - for r in cli_receivers.iter() { + let mut cli_receivers = cli_receivers.write().await; + for r in cli_receivers.drain(..) { r.close(); } } if let Some(receivers) = weak_receivers.upgrade() { - let receivers = receivers.write().await; - for r in receivers.iter() { + let mut receivers = receivers.write().await; + for r in receivers.drain(..) { r.close(); } } diff --git a/hydro_cli/src/lib.rs b/hydro_cli/src/lib.rs index 59c2c3e80ea7..be8c8533ff8d 100644 --- a/hydro_cli/src/lib.rs +++ b/hydro_cli/src/lib.rs @@ -159,10 +159,7 @@ impl Deployment { #[allow(non_snake_case)] fn Localhost(&self, py: Python<'_>) -> PyResult> { - let arc = self - .underlying - .blocking_write() - .add_host(crate::core::LocalhostHost::new); + let arc = self.underlying.blocking_write().Localhost(); Ok(Py::new( py, @@ -188,10 +185,10 @@ impl Deployment { let arc = self.underlying.blocking_write().add_host(|id| { crate::core::GCPComputeEngineHost::new( id, - project, - machine_type, - image, - region, + &project, + &machine_type, + &image, + ®ion, network.underlying, user, ) @@ -214,9 +211,10 @@ impl Deployment { on: &Host, external_ports: Vec, ) -> PyResult> { - let service = self.underlying.blocking_write().add_service(|id| { - crate::core::CustomService::new(id, on.underlying.clone(), external_ports) - }); + let service = self + .underlying + .blocking_write() + .CustomService(on.underlying.clone(), external_ports); Ok(Py::new( py, @@ -244,20 +242,17 @@ impl Deployment { display_id: Option, external_ports: Option>, ) -> PyResult> { - let service = self.underlying.blocking_write().add_service(|id| { - crate::core::HydroflowCrate::new( - id, - src.into(), - on.underlying.clone(), - bin, - example, - profile, - features, - args, - display_id, - external_ports.unwrap_or_default(), - ) - }); + let service = self.underlying.blocking_write().HydroflowCrate( + src, + on.underlying.clone(), + bin, + example, + profile, + features, + args, + display_id, + external_ports.unwrap_or_default(), + ); Ok(Py::new( py, @@ -338,7 +333,7 @@ impl GCPNetwork { fn new(project: String, existing: Option) -> Self { GCPNetwork { underlying: Arc::new(RwLock::new(crate::core::gcp::GCPNetwork::new( - project, existing, + &project, existing, ))), } } diff --git a/hydroflow_cli_integration/src/lib.rs b/hydroflow_cli_integration/src/lib.rs index 8841c5157f4c..5e85269b0568 100644 --- a/hydroflow_cli_integration/src/lib.rs +++ b/hydroflow_cli_integration/src/lib.rs @@ -41,6 +41,12 @@ pub enum ServerPort { Null, } +impl ServerPort { + pub fn instantiate(&self) -> ServerOrBound { + ServerOrBound::Server(self.into()) + } +} + #[derive(Debug)] pub enum RealizedServerPort { UnixSocket(JoinHandle>), diff --git a/hydroflow_plus/Cargo.toml b/hydroflow_plus/Cargo.toml index 395a4ff09b46..ef7f016b9cf3 100644 --- a/hydroflow_plus/Cargo.toml +++ b/hydroflow_plus/Cargo.toml @@ -13,6 +13,7 @@ path = "src/lib.rs" [features] default = [] diagnostics = [ "hydroflow_lang/diagnostics" ] +cli_integration = [ "hydroflow/cli_integration" ] [dependencies] quote = "1.0.0" @@ -21,4 +22,6 @@ proc-macro2 = "1.0.57" proc-macro-crate = "1.1.0" hydroflow = { path = "../hydroflow", version = "^0.5.0" } hydroflow_lang = { path = "../hydroflow_lang", version = "^0.5.0" } +serde = { version = "1", features = [ "derive" ] } +bincode = "1.3" stageleft = { path = "../stageleft", version = "^0.1.0" } diff --git a/hydroflow_plus/src/builder.rs b/hydroflow_plus/src/builder.rs index 256423a011ff..dc9f5a864663 100644 --- a/hydroflow_plus/src/builder.rs +++ b/hydroflow_plus/src/builder.rs @@ -2,39 +2,59 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::marker::PhantomData; -use hydroflow::futures::stream::Stream; use hydroflow_lang::graph::{ eliminate_extra_unions_tees, partition_graph, propegate_flow_props, FlatGraphBuilder, }; use proc_macro2::{Span, TokenStream}; use quote::quote; -use stageleft::{IntoQuotedOnce, Quoted, QuotedContext}; +use stageleft::{Quoted, QuotedContext}; use syn::parse_quote; -use crate::{HfBuilt, HfStream, RuntimeContext}; +use crate::node::{HfDeploy, HfNodeBuilder}; +use crate::{HfBuilt, RuntimeContext}; -pub struct HfBuilder<'a> { +pub type Builders = RefCell>>; + +pub struct HfBuilder<'a, D: HfDeploy<'a> + ?Sized> { pub(crate) next_id: RefCell, - pub(crate) builders: RefCell>>, + pub(crate) builders: Builders, + nodes: RefCell>, + next_node_id: RefCell, _phantom: PhantomData<&'a mut &'a ()>, } -impl<'a> QuotedContext for HfBuilder<'a> { +impl<'a, D: HfDeploy<'a>> QuotedContext for HfBuilder<'a, D> { fn create() -> Self { HfBuilder::new() } } -impl<'a> HfBuilder<'a> { +impl<'a, D: HfDeploy<'a>> HfBuilder<'a, D> { #[allow(clippy::new_without_default)] - pub fn new() -> HfBuilder<'a> { + pub fn new() -> HfBuilder<'a, D> { HfBuilder { next_id: RefCell::new(0), builders: RefCell::new(Some(Default::default())), + nodes: RefCell::new(Vec::new()), + next_node_id: RefCell::new(0), _phantom: PhantomData, } } + pub fn builder_components(&self) -> (&RefCell, &Builders) { + (&self.next_id, &self.builders) + } + + pub fn node(&'a self, builder: &mut impl HfNodeBuilder<'a, D>) -> D::Node { + let mut next_node_id = self.next_node_id.borrow_mut(); + let id = *next_node_id; + *next_node_id += 1; + + let node = builder.build(id, self); + self.nodes.borrow_mut().push(node.clone()); + node + } + pub fn build(&self, id: impl Quoted<'a, usize>) -> HfBuilt<'a> { let builders = self.builders.borrow_mut().take().unwrap(); @@ -99,131 +119,4 @@ impl<'a> HfBuilder<'a> { _phantom: PhantomData, } } - - pub fn source_stream + Unpin>( - &'a self, - node_id: usize, - e: impl Quoted<'a, E>, - ) -> HfStream<'a, T> { - let next_id = { - let mut next_id = self.next_id.borrow_mut(); - let id = *next_id; - *next_id += 1; - id - }; - - let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); - let e = e.splice(); - - self.builders - .borrow_mut() - .as_mut() - .unwrap() - .entry(node_id) - .or_default() - .add_statement(parse_quote! { - #ident = source_stream(#e) -> tee(); - }); - - HfStream { - ident, - node_id, - graph: self, - _phantom: PhantomData, - } - } - - pub fn source_iter>( - &'a self, - node_id: usize, - e: impl IntoQuotedOnce<'a, E>, - ) -> HfStream<'a, T> { - let next_id = { - let mut next_id = self.next_id.borrow_mut(); - let id = *next_id; - *next_id += 1; - id - }; - - let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); - let e = e.splice(); - - self.builders - .borrow_mut() - .as_mut() - .unwrap() - .entry(node_id) - .or_default() - .add_statement(parse_quote! { - #ident = source_iter(#e) -> tee(); - }); - - HfStream { - ident, - node_id, - graph: self, - _phantom: PhantomData, - } - } - - pub fn cycle(&'a self, node_id: usize) -> (HfCycle<'a, T>, HfStream<'a, T>) { - let next_id = { - let mut next_id = self.next_id.borrow_mut(); - let id = *next_id; - *next_id += 1; - id - }; - - let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); - - self.builders - .borrow_mut() - .as_mut() - .unwrap() - .entry(node_id) - .or_default() - .add_statement(parse_quote! { - #ident = tee(); - }); - - ( - HfCycle { - ident: ident.clone(), - node_id, - graph: self, - _phantom: PhantomData, - }, - HfStream { - ident, - node_id, - graph: self, - _phantom: PhantomData, - }, - ) - } -} - -pub struct HfCycle<'a, T> { - ident: syn::Ident, - node_id: usize, - graph: &'a HfBuilder<'a>, - _phantom: PhantomData, -} - -impl<'a, T> HfCycle<'a, T> { - pub fn complete(self, stream: &HfStream<'a, T>) { - let ident = self.ident; - let stream_ident = stream.ident.clone(); - - self.graph - .builders - .borrow_mut() - .as_mut() - .unwrap() - .entry(self.node_id) - .or_default() - .add_statement(parse_quote! { - #stream_ident -> #ident; - }); - } } diff --git a/hydroflow_plus/src/cycle.rs b/hydroflow_plus/src/cycle.rs new file mode 100644 index 000000000000..185826e4145e --- /dev/null +++ b/hydroflow_plus/src/cycle.rs @@ -0,0 +1,31 @@ +use std::marker::PhantomData; + +use syn::parse_quote; + +use crate::builder::Builders; +use crate::node::HfNode; +use crate::HfStream; + +pub struct HfCycle<'a, T, N: HfNode<'a>> { + pub(crate) ident: syn::Ident, + pub(crate) node: N, + pub(crate) builders: &'a Builders, + pub(crate) _phantom: PhantomData, +} + +impl<'a, T, N: HfNode<'a>> HfCycle<'a, T, N> { + pub fn complete(self, stream: &HfStream<'a, T, N>) { + let ident = self.ident; + let stream_ident = stream.ident.clone(); + + self.builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(self.node.id()) + .or_default() + .add_statement(parse_quote! { + #stream_ident -> #ident; + }); + } +} diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index f51cb79e4761..d9f68705402d 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -1,17 +1,26 @@ use std::marker::PhantomData; use hydroflow::scheduled::context::Context; -use hydroflow::scheduled::graph::Hydroflow; +pub use hydroflow::scheduled::graph::Hydroflow; pub use hydroflow::*; use proc_macro2::TokenStream; use quote::quote; use stageleft::runtime_support::FreeVariable; use stageleft::Quoted; +pub mod runtime_support { + pub use bincode; +} + mod stream; pub use stream::HfStream; -mod builder; +pub mod node; + +pub mod cycle; +pub use cycle::HfCycle; + +pub mod builder; pub use builder::HfBuilder; #[derive(Clone)] diff --git a/hydroflow_plus/src/node/graphs.rs b/hydroflow_plus/src/node/graphs.rs new file mode 100644 index 000000000000..7f22f1a0ad50 --- /dev/null +++ b/hydroflow_plus/src/node/graphs.rs @@ -0,0 +1,90 @@ +use std::cell::RefCell; + +use hydroflow_lang::parse::Pipeline; + +use super::{HfDeploy, HfNode, HfNodeBuilder}; +use crate::builder::Builders; +use crate::HfBuilder; + +pub struct SingleGraph {} + +impl<'a> HfDeploy<'a> for SingleGraph { + type Node = SingleNode<'a>; +} + +impl<'a> HfNodeBuilder<'a, SingleGraph> for () { + fn build(&mut self, _id: usize, builder: &'a HfBuilder<'a, SingleGraph>) -> SingleNode<'a> { + SingleNode { builder } + } +} + +#[derive(Clone)] +pub struct SingleNode<'a> { + builder: &'a HfBuilder<'a, SingleGraph>, +} + +impl<'a> HfNode<'a> for SingleNode<'a> { + type Port = (); + + fn id(&self) -> usize { + 0 + } + + fn graph_builder(&self) -> (&'a RefCell, &'a Builders) { + (&self.builder.next_id, &self.builder.builders) + } + + fn next_port(&self) { + panic!(); + } + + fn gen_source_statement(&self, _port: &()) -> Pipeline { + panic!(); + } + + fn gen_sink_statement(&self, _port: &()) -> Pipeline { + panic!(); + } +} + +pub struct MultiGraph {} + +impl<'a> HfDeploy<'a> for MultiGraph { + type Node = MultiNode<'a>; +} + +impl<'a> HfNodeBuilder<'a, MultiGraph> for () { + fn build(&mut self, id: usize, builder: &'a HfBuilder<'a, MultiGraph>) -> MultiNode<'a> { + MultiNode { builder, id } + } +} + +#[derive(Clone)] +pub struct MultiNode<'a> { + builder: &'a HfBuilder<'a, MultiGraph>, + id: usize, +} + +impl<'a> HfNode<'a> for MultiNode<'a> { + type Port = (); + + fn id(&self) -> usize { + self.id + } + + fn graph_builder(&self) -> (&'a RefCell, &'a Builders) { + (&self.builder.next_id, &self.builder.builders) + } + + fn next_port(&self) { + panic!(); + } + + fn gen_source_statement(&self, _port: &()) -> Pipeline { + panic!(); + } + + fn gen_sink_statement(&self, _port: &()) -> Pipeline { + panic!(); + } +} diff --git a/hydroflow_plus/src/node/mod.rs b/hydroflow_plus/src/node/mod.rs new file mode 100644 index 000000000000..aed0c818485e --- /dev/null +++ b/hydroflow_plus/src/node/mod.rs @@ -0,0 +1,193 @@ +use std::cell::RefCell; +use std::io; +use std::marker::PhantomData; + +use hydroflow::bytes::BytesMut; +use hydroflow::futures::stream::Stream; +use hydroflow_lang::parse::Pipeline; +use proc_macro2::Span; +use stageleft::Quoted; +use syn::parse_quote; + +use crate::builder::Builders; +use crate::{HfBuilder, HfCycle, HfStream}; + +mod graphs; +pub use graphs::*; + +pub trait HfDeploy<'a> { + type Node: HfNode<'a>; +} + +pub trait HfNetworkedDeploy<'a>: HfDeploy<'a, Node = Self::NetworkedNode> { + type NetworkedNode: HfNode<'a, Port = Self::Port> + HfSendTo<'a, Self::NetworkedNode>; + type Port; +} + +impl<'a, T: HfDeploy<'a, Node = N>, N: HfSendTo<'a, N>> HfNetworkedDeploy<'a> for T { + type NetworkedNode = N; + type Port = N::Port; +} + +pub trait HfNodeBuilder<'a, D: HfDeploy<'a> + ?Sized> { + fn build(&mut self, id: usize, builder: &'a HfBuilder<'a, D>) -> D::Node; +} + +pub trait HfSendTo<'a, O: HfNode<'a>>: HfNode<'a> { + fn send_to(&self, other: &O, source_port: &Self::Port, recipient_port: &O::Port); +} + +pub trait HfNode<'a>: Clone { + type Port; + + fn id(&self) -> usize; + fn graph_builder(&self) -> (&'a RefCell, &'a Builders); + fn next_port(&self) -> Self::Port; + fn gen_source_statement(&self, port: &Self::Port) -> Pipeline; + fn gen_sink_statement(&self, port: &Self::Port) -> Pipeline; + + fn source_stream + Unpin>( + &self, + e: impl Quoted<'a, E>, + ) -> HfStream<'a, T, Self> { + let (next_id_cell, builders) = self.graph_builder(); + + let next_id = { + let mut next_id = next_id_cell.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); + let e = e.splice(); + + builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(self.id()) + .or_default() + .add_statement(parse_quote! { + #ident = source_stream(#e) -> tee(); + }); + + HfStream { + ident, + node: self.clone(), + next_id: next_id_cell, + builders, + _phantom: PhantomData, + } + } + + fn source_external(&self) -> (Self::Port, HfStream<'a, Result, Self>) { + let (next_id_cell, builders) = self.graph_builder(); + + let next_id = { + let mut next_id = next_id_cell.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); + let port = self.next_port(); + let source_pipeline = self.gen_source_statement(&port); + + builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(self.id()) + .or_default() + .add_statement(parse_quote! { + #ident = #source_pipeline -> tee(); + }); + + ( + port, + HfStream { + ident, + node: self.clone(), + next_id: next_id_cell, + builders, + _phantom: PhantomData, + }, + ) + } + + fn source_iter>( + &self, + e: impl Quoted<'a, E>, + ) -> HfStream<'a, T, Self> { + let (next_id_cell, builders) = self.graph_builder(); + + let next_id = { + let mut next_id = next_id_cell.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); + let e = e.splice(); + + builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(self.id()) + .or_default() + .add_statement(parse_quote! { + #ident = source_iter(#e) -> tee(); + }); + + HfStream { + ident, + node: self.clone(), + next_id: next_id_cell, + builders, + _phantom: PhantomData, + } + } + + fn cycle(&self) -> (HfCycle<'a, T, Self>, HfStream<'a, T, Self>) { + let (next_id_cell, builders) = self.graph_builder(); + + let next_id = { + let mut next_id = next_id_cell.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); + + builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(self.id()) + .or_default() + .add_statement(parse_quote! { + #ident = tee(); + }); + + ( + HfCycle { + ident: ident.clone(), + node: self.clone(), + builders, + _phantom: PhantomData, + }, + HfStream { + ident, + node: self.clone(), + next_id: next_id_cell, + builders, + _phantom: PhantomData, + }, + ) + } +} diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index b1fae1be60a8..8444e3ed9d19 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -1,28 +1,32 @@ +use std::cell::RefCell; use std::hash::Hash; use std::io; use std::marker::PhantomData; use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::futures::Sink; -use hydroflow::util::cli::HydroCLI; use proc_macro2::Span; use quote::quote; -use stageleft::{IntoQuotedMut, Quoted, RuntimeData}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use stageleft::{IntoQuotedMut, Quoted}; use syn::parse_quote; -use crate::HfBuilder; +use crate::builder::Builders; +use crate::node::{HfNode, HfSendTo}; -pub struct HfStream<'a, T> { +pub struct HfStream<'a, T, N: HfNode<'a>> { pub(crate) ident: syn::Ident, - pub(crate) node_id: usize, - pub(crate) graph: &'a HfBuilder<'a>, + pub(crate) node: N, + pub(crate) next_id: &'a RefCell, + pub(crate) builders: &'a Builders, pub(crate) _phantom: PhantomData<&'a mut &'a T>, } -impl<'a, T> HfStream<'a, T> { - pub fn map U + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, U> { +impl<'a, T, N: HfNode<'a>> HfStream<'a, T, N> { + pub fn map U + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, U, N> { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -32,12 +36,11 @@ impl<'a, T> HfStream<'a, T> { let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); let f = f.splice(); - self.graph - .builders + self.builders .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> map(#f) -> tee(); @@ -45,15 +48,49 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } - pub fn filter bool + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, T> { + pub fn inspect(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, T, N> { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); + let f = f.splice(); + + self.builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(self.node.id()) + .or_default() + .add_statement(parse_quote! { + #ident = #self_ident -> inspect(#f) -> tee(); + }); + + HfStream { + ident, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, + _phantom: PhantomData, + } + } + + pub fn filter bool + 'a>( + &self, + f: impl IntoQuotedMut<'a, F>, + ) -> HfStream<'a, T, N> { + let next_id = { + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -63,12 +100,11 @@ impl<'a, T> HfStream<'a, T> { let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); let f = f.splice(); - self.graph - .builders + self.builders .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> filter(#f) -> tee(); @@ -76,8 +112,9 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } @@ -85,9 +122,9 @@ impl<'a, T> HfStream<'a, T> { pub fn filter_map Option + 'a>( &self, f: impl IntoQuotedMut<'a, F>, - ) -> HfStream<'a, U> { + ) -> HfStream<'a, U, N> { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -97,12 +134,11 @@ impl<'a, T> HfStream<'a, T> { let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); let f = f.splice(); - self.graph - .builders + self.builders .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> filter_map(#f) -> tee(); @@ -110,8 +146,9 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } @@ -120,9 +157,9 @@ impl<'a, T> HfStream<'a, T> { &self, init: impl IntoQuotedMut<'a, I>, comb: impl IntoQuotedMut<'a, C>, - ) -> HfStream<'a, A> { + ) -> HfStream<'a, A, N> { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -132,12 +169,11 @@ impl<'a, T> HfStream<'a, T> { let init = init.splice(); let comb = comb.splice(); - self.graph - .builders + self.builders .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> fold(#init, #comb) -> tee(); @@ -145,15 +181,16 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } - pub fn persist(&self) -> HfStream<'a, T> { + pub fn persist(&self) -> HfStream<'a, T, N> { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -162,12 +199,11 @@ impl<'a, T> HfStream<'a, T> { let self_ident = &self.ident; let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); - self.graph - .builders + self.builders .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> persist() -> tee(); @@ -175,15 +211,16 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } - pub fn delta(&self) -> HfStream<'a, T> { + pub fn delta(&self) -> HfStream<'a, T, N> { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -192,12 +229,11 @@ impl<'a, T> HfStream<'a, T> { let self_ident = &self.ident; let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); - self.graph - .builders + self.builders .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> multiset_delta() -> tee(); @@ -205,18 +241,19 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } - pub fn unique(&self) -> HfStream<'a, T> + pub fn unique(&self) -> HfStream<'a, T, N> where T: Eq + Hash, { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -225,12 +262,11 @@ impl<'a, T> HfStream<'a, T> { let self_ident = &self.ident; let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); - self.graph - .builders + self.builders .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> unique::<'tick>() -> tee(); @@ -238,15 +274,16 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } - pub fn cross_product(&self, other: &HfStream) -> HfStream<'a, (T, O)> { + pub fn cross_product(&self, other: &HfStream<'a, O, N>) -> HfStream<'a, (T, O), N> { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -256,8 +293,12 @@ impl<'a, T> HfStream<'a, T> { let other_ident = &other.ident; let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); - let mut builders = self.graph.builders.borrow_mut(); - let builder = builders.as_mut().unwrap().entry(self.node_id).or_default(); + let mut builders = self.builders.borrow_mut(); + let builder = builders + .as_mut() + .unwrap() + .entry(self.node.id()) + .or_default(); builder.add_statement(parse_quote! { #ident = cross_join::<'tick, 'tick>() -> tee(); @@ -273,15 +314,16 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } - pub fn union(&self, other: &HfStream) -> HfStream<'a, T> { + pub fn union(&self, other: &HfStream<'a, T, N>) -> HfStream<'a, T, N> { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -291,8 +333,12 @@ impl<'a, T> HfStream<'a, T> { let other_ident = &other.ident; let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); - let mut builders = self.graph.builders.borrow_mut(); - let builder = builders.as_mut().unwrap().entry(self.node_id).or_default(); + let mut builders = self.builders.borrow_mut(); + let builder = builders + .as_mut() + .unwrap() + .entry(self.node.id()) + .or_default(); builder.add_statement(parse_quote! { #ident = union() -> tee(); @@ -308,46 +354,37 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } pub fn for_each(&self, f: impl IntoQuotedMut<'a, F>) { - let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); - let id = *next_id; - *next_id += 1; - id - }; - let self_ident = &self.ident; - let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); let f = f.splice(); - self.graph - .builders + self.builders .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { - #ident = #self_ident -> for_each(#f); + #self_ident -> for_each(#f); }); } pub fn dest_sink + 'a>(&self, sink: impl Quoted<'a, S>) { - let sink = sink.splice(); let self_ident = &self.ident; + let sink = sink.splice(); - self.graph - .builders + self.builders .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #self_ident -> dest_sink(#sink); @@ -355,16 +392,72 @@ impl<'a, T> HfStream<'a, T> { } } -impl<'a> HfStream<'a, Bytes> { - pub fn send_to( +impl<'a, N: HfNode<'a>> HfStream<'a, Bytes, N> { + pub fn send_bytes>( &self, - other: usize, - port_name: &str, - cli: RuntimeData<&'a HydroCLI>, - ) -> HfStream<'a, Result> { + other: &N2, + ) -> HfStream<'a, Result, N2> + where + N: HfSendTo<'a, N2>, + { + let self_ident = &self.ident; + + let mut builders_borrowed = self.builders.borrow_mut(); + let builders = builders_borrowed.as_mut().unwrap(); + + let source_name = self.node.next_port(); + let self_sink = self.node.gen_sink_statement(&source_name); + + builders + .entry(self.node.id()) + .or_default() + .add_statement(parse_quote! { + #self_ident -> #self_sink; + }); + + let recipient_next_id = { + let mut next_id = self.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("stream_{}", recipient_next_id), Span::call_site()); + + let recipient_port_name = other.next_port(); + let recipient_source = other.gen_source_statement(&recipient_port_name); + + builders + .entry(other.id()) + .or_default() + .add_statement(parse_quote! { + #ident = #recipient_source -> tee(); + }); + + self.node.send_to(other, &source_name, &recipient_port_name); + + HfStream { + ident, + node: other.clone(), + next_id: self.next_id, + builders: self.builders, + _phantom: PhantomData, + } + } +} + +impl<'a, T: Serialize + DeserializeOwned, N: HfNode<'a>> HfStream<'a, T, N> { + pub fn send_bincode>(&self, other: &N2) -> HfStream<'a, T, N2> + where + N: HfSendTo<'a, N2>, + { let self_ident = &self.ident; - let cli_splice = cli.splice(); + let mut builders_borrowed = self.builders.borrow_mut(); + let builders = builders_borrowed.as_mut().unwrap(); + + let source_name = self.node.next_port(); + let self_sink = self.node.gen_sink_statement(&source_name); let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") .expect("hydroflow_plus should be present in `Cargo.toml`"); @@ -376,25 +469,20 @@ impl<'a> HfStream<'a, Bytes> { } }; - self.graph - .builders - .borrow_mut() - .as_mut() - .unwrap() - .entry(self.node_id) + // TODO(shadaj): this may fail when instantiated in an environment with different deps + let t_type: syn::Type = syn::parse_str(std::any::type_name::()).unwrap(); + + builders + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { - #self_ident -> dest_sink({ - use #root::util::cli::ConnectedSink; - #cli_splice - .port(#port_name) - .connect_local_blocking::<#root::util::cli::ConnectedDirect>() - .into_sink() - }); + #self_ident -> map(|data| { + #root::runtime_support::bincode::serialize::<#t_type>(&data).unwrap().into() + }) -> #self_sink; }); let recipient_next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -402,39 +490,37 @@ impl<'a> HfStream<'a, Bytes> { let ident = syn::Ident::new(&format!("stream_{}", recipient_next_id), Span::call_site()); - self.graph - .builders - .borrow_mut() - .as_mut() - .unwrap() - .entry(other) + let recipient_port_name = other.next_port(); + let recipient_source = other.gen_source_statement(&recipient_port_name); + + builders + .entry(other.id()) .or_default() .add_statement(parse_quote! { - #ident = source_stream({ - use #root::util::cli::ConnectedSource; - #cli_splice - .port(#port_name) - .connect_local_blocking::<#root::util::cli::ConnectedDirect>() - .into_source() + #ident = #recipient_source -> map(|b| { + #root::runtime_support::bincode::deserialize::<#t_type>(&b.unwrap()).unwrap() }) -> tee(); }); + self.node.send_to(other, &source_name, &recipient_port_name); + HfStream { ident, - node_id: other, - graph: self.graph, + node: other.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } } -impl<'a, K, V1> HfStream<'a, (K, V1)> { - pub fn join(&self, n: &HfStream<(K, V2)>) -> HfStream<'a, (K, (V1, V2))> +impl<'a, K, V1, N: HfNode<'a>> HfStream<'a, (K, V1), N> { + pub fn join(&self, n: &HfStream<'a, (K, V2), N>) -> HfStream<'a, (K, (V1, V2)), N> where K: Eq + Hash, { let next_id = { - let mut next_id = self.graph.next_id.borrow_mut(); + let mut next_id = self.next_id.borrow_mut(); let id = *next_id; *next_id += 1; id @@ -444,8 +530,12 @@ impl<'a, K, V1> HfStream<'a, (K, V1)> { let other_ident = &n.ident; let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); - let mut builders = self.graph.builders.borrow_mut(); - let builder = builders.as_mut().unwrap().entry(self.node_id).or_default(); + let mut builders = self.builders.borrow_mut(); + let builder = builders + .as_mut() + .unwrap() + .entry(self.node.id()) + .or_default(); builder.add_statement(parse_quote! { #ident = join::<'tick, 'tick>() -> tee(); @@ -461,8 +551,9 @@ impl<'a, K, V1> HfStream<'a, (K, V1)> { HfStream { ident, - node_id: self.node_id, - graph: self.graph, + node: self.node.clone(), + next_id: self.next_id, + builders: self.builders, _phantom: PhantomData, } } diff --git a/hydroflow_plus_cli_integration/Cargo.toml b/hydroflow_plus_cli_integration/Cargo.toml new file mode 100644 index 000000000000..c8ed720998f2 --- /dev/null +++ b/hydroflow_plus_cli_integration/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "hydroflow_plus_cli_integration" +publish = false +version = "0.5.0" +edition = "2021" + +[features] +default = [] +deploy = [ "hydro_cli" ] + +[dependencies] +stageleft = { path = "../stageleft", version = "^0.1.0" } +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0", features = [ "cli_integration" ] } +proc-macro-crate = "1.1.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } +hydro_cli = { path = "../hydro_cli", version = "^0.5.0", optional = true } +tokio = { version = "1.16", features = [ "full" ] } +async-channel = "1.8.0" diff --git a/hydroflow_plus_cli_integration/src/deploy.rs b/hydroflow_plus_cli_integration/src/deploy.rs new file mode 100644 index 000000000000..ea2a5c9239fe --- /dev/null +++ b/hydroflow_plus_cli_integration/src/deploy.rs @@ -0,0 +1,142 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::sync::Arc; + +use async_channel::Receiver; +use hydro_cli::core::custom_service::CustomClientPort; +use hydro_cli::core::hydroflow_crate::ports::HydroflowSource; +use hydro_cli::core::{Deployment, Host, HydroflowCrate}; +use hydroflow_plus::builder::Builders; +use hydroflow_plus::node::{HfDeploy, HfNode, HfNodeBuilder, HfSendTo}; +use hydroflow_plus::HfBuilder; +use stageleft::internal::syn::parse_quote; +use tokio::sync::RwLock; + +pub struct CLIDeploy {} + +impl<'a> HfDeploy<'a> for CLIDeploy { + type Node = CLIDeployNode<'a>; +} + +#[derive(Clone)] +pub struct CLIDeployNode<'a> { + id: usize, + builder: &'a HfBuilder<'a, CLIDeploy>, + next_port: Rc>, + underlying: Arc>, +} + +impl<'a> CLIDeployNode<'a> { + pub async fn create_sender( + &self, + port: &str, + deployment: &mut Deployment, + on: &Arc>, + ) -> CustomClientPort { + let sender_service = deployment.CustomService(on.clone(), vec![]); + let mut sender_port = sender_service.read().await.declare_client(&sender_service); + let mut recipient = self + .underlying + .read() + .await + .get_port(port.to_string(), &self.underlying); + + sender_port.send_to(&mut recipient); + sender_port + } + + pub async fn stdout(&self) -> Receiver { + self.underlying.read().await.stdout().await + } + + pub async fn stderr(&self) -> Receiver { + self.underlying.read().await.stderr().await + } +} + +pub struct CLIDeployPort<'a> { + node: CLIDeployNode<'a>, + port: String, +} + +impl<'a> CLIDeployPort<'a> { + pub async fn create_sender( + &self, + deployment: &mut Deployment, + on: &Arc>, + ) -> CustomClientPort { + self.node.create_sender(&self.port, deployment, on).await + } +} + +impl<'a> HfNode<'a> for CLIDeployNode<'a> { + type Port = CLIDeployPort<'a>; + + fn id(&self) -> usize { + self.id + } + + fn graph_builder(&self) -> (&'a RefCell, &'a Builders) { + self.builder.builder_components() + } + + fn next_port(&self) -> CLIDeployPort<'a> { + let next_port = *self.next_port.borrow(); + *self.next_port.borrow_mut() += 1; + + CLIDeployPort { + node: self.clone(), + port: format!("port_{}", next_port), + } + } + + fn gen_sink_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline { + parse_quote!(null()) + } + + fn gen_source_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline { + parse_quote!(null()) + } +} + +impl<'a> HfSendTo<'a, CLIDeployNode<'a>> for CLIDeployNode<'a> { + fn send_to( + &self, + other: &CLIDeployNode<'a>, + source_port: &CLIDeployPort<'a>, + recipient_port: &CLIDeployPort<'a>, + ) { + let mut source_port = self + .underlying + .try_read() + .unwrap() + .get_port(source_port.port.clone(), &self.underlying); + + let mut recipient_port = other + .underlying + .try_read() + .unwrap() + .get_port(recipient_port.port.clone(), &other.underlying); + + source_port.send_to(&mut recipient_port); + } +} + +pub struct CLIDeployNodeBuilder<'a>(Box Arc> + 'a>); + +impl<'a> CLIDeployNodeBuilder<'a> { + pub fn new Arc> + 'a>(f: F) -> Self { + Self(Box::new(f)) + } +} + +impl<'a: 'b, 'b> HfNodeBuilder<'a, CLIDeploy> for CLIDeployNodeBuilder<'b> { + fn build(&mut self, id: usize, builder: &'a HfBuilder<'a, CLIDeploy>) -> CLIDeployNode<'a> { + CLIDeployNode { + id, + builder, + next_port: Rc::new(RefCell::new(0)), + underlying: (self.0)(id), + } + } +} diff --git a/hydroflow_plus_cli_integration/src/lib.rs b/hydroflow_plus_cli_integration/src/lib.rs new file mode 100644 index 000000000000..d0ae1e5e7178 --- /dev/null +++ b/hydroflow_plus_cli_integration/src/lib.rs @@ -0,0 +1,8 @@ +mod runtime; +pub use runtime::*; + +#[cfg(feature = "deploy")] +mod deploy; + +#[cfg(feature = "deploy")] +pub use deploy::*; diff --git a/hydroflow_plus_cli_integration/src/runtime.rs b/hydroflow_plus_cli_integration/src/runtime.rs new file mode 100644 index 000000000000..b9da8b0949dc --- /dev/null +++ b/hydroflow_plus_cli_integration/src/runtime.rs @@ -0,0 +1,117 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use hydroflow_plus::lang::parse::Pipeline; +use hydroflow_plus::node::{HfDeploy, HfNode, HfNodeBuilder, HfSendTo}; +use hydroflow_plus::util::cli::HydroCLI; +use hydroflow_plus::HfBuilder; +use stageleft::internal::{quote, Span}; +use stageleft::{Quoted, RuntimeData}; +use syn::parse_quote; + +pub struct CLIRuntime {} + +impl<'a> HfDeploy<'a> for CLIRuntime { + type Node = CLIRuntimeNode<'a>; +} + +#[derive(Clone)] +pub struct CLIRuntimeNode<'a> { + id: usize, + builder: &'a HfBuilder<'a, CLIRuntime>, + next_port: Rc>, + cli: RuntimeData<&'a HydroCLI>, +} + +impl<'a> HfNode<'a> for CLIRuntimeNode<'a> { + type Port = String; + + fn id(&self) -> usize { + self.id + } + + fn graph_builder(&self) -> (&'a RefCell, &'a hydroflow_plus::builder::Builders) { + self.builder.builder_components() + } + + fn next_port(&self) -> String { + let next_send_port = *self.next_port.borrow(); + *self.next_port.borrow_mut() += 1; + format!("port_{}", next_send_port) + } + + fn gen_source_statement(&self, port: &String) -> Pipeline { + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let self_cli_splice = self.cli.splice(); + parse_quote! { + source_stream({ + use #root::util::cli::ConnectedSource; + #self_cli_splice + .port(#port) + .connect_local_blocking::<#root::util::cli::ConnectedDirect>() + .into_source() + }) + } + } + + fn gen_sink_statement(&self, port: &String) -> Pipeline { + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let self_cli_splice = self.cli.splice(); + parse_quote! { + dest_sink({ + use #root::util::cli::ConnectedSink; + #self_cli_splice + .port(#port) + .connect_local_blocking::<#root::util::cli::ConnectedDirect>() + .into_sink() + }) + } + } +} + +impl<'a> HfSendTo<'a, CLIRuntimeNode<'a>> for CLIRuntimeNode<'a> { + fn send_to(&self, _other: &CLIRuntimeNode, _source_port: &String, _recipient_port: &String) {} +} + +pub struct CLIRuntimeNodeBuilder<'a> { + cli: RuntimeData<&'a HydroCLI>, +} + +impl CLIRuntimeNodeBuilder<'_> { + pub fn new(cli: RuntimeData<&HydroCLI>) -> CLIRuntimeNodeBuilder { + CLIRuntimeNodeBuilder { cli } + } +} + +impl<'cli> HfNodeBuilder<'cli, CLIRuntime> for CLIRuntimeNodeBuilder<'cli> { + fn build( + &mut self, + id: usize, + builder: &'cli HfBuilder<'cli, CLIRuntime>, + ) -> CLIRuntimeNode<'cli> { + CLIRuntimeNode { + id, + builder, + next_port: Rc::new(RefCell::new(0)), + cli: self.cli, + } + } +} diff --git a/hydroflow_plus_test/Cargo.toml b/hydroflow_plus_test/Cargo.toml index c5e701fbce09..194e5e23eaf2 100644 --- a/hydroflow_plus_test/Cargo.toml +++ b/hydroflow_plus_test/Cargo.toml @@ -9,8 +9,8 @@ hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integ hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } tokio = { version = "1.16", features = [ "full" ] } stageleft = { path = "../stageleft", version = "^0.1.0" } -regex = "1" -serde = "1" +hydroflow_plus_cli_integration = { path = "../hydroflow_plus_cli_integration", version = "^0.5.0" } + hydroflow_plus_test_macro = { path = "../hydroflow_plus_test_macro" } [build-dependencies] @@ -18,3 +18,5 @@ stageleft_tool = { path = "../stageleft_tool", version = "^0.1.0" } [dev-dependencies] insta = "1.7.1" +hydro_cli = { path = "../hydro_cli", version = "^0.5.0" } +hydroflow_plus_cli_integration = { path = "../hydroflow_plus_cli_integration", version = "^0.5.0", features = [ "deploy" ] } diff --git a/hydroflow_plus_test/examples/first_ten_distributed.rs b/hydroflow_plus_test/examples/first_ten_distributed.rs new file mode 100644 index 000000000000..6923611991e8 --- /dev/null +++ b/hydroflow_plus_test/examples/first_ten_distributed.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use hydro_cli::core::gcp::GCPNetwork; +use hydro_cli::core::{Deployment, Host}; +use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; +use tokio::sync::RwLock; + +type HostCreator = Box Arc>>; + +// run with no args for localhost, with `gcp ` for GCP +#[tokio::main] +async fn main() { + let mut deployment = Deployment::new(); + let host_arg = std::env::args().nth(1).unwrap_or_default(); + + let (create_host, profile): (HostCreator, Option) = if host_arg == *"gcp" { + let project = std::env::args().nth(2).unwrap(); + let network = Arc::new(RwLock::new(GCPNetwork::new(&project, None))); + + ( + Box::new(move |deployment| -> Arc> { + deployment.GCPComputeEngineHost( + &project, + "e2-micro", + "debian-cloud/debian-11", + "us-west1-a", + network.clone(), + None, + ) + }), + None, + ) + } else { + let localhost = deployment.Localhost(); + ( + Box::new(move |_| -> Arc> { localhost.clone() }), + Some("dev".to_string()), + ) + }; + + let builder = hydroflow_plus::HfBuilder::new(); + hydroflow_plus_test::first_ten::first_ten_distributed( + &builder, + &mut CLIDeployNodeBuilder::new(|id| { + let host = create_host(&mut deployment); + deployment.HydroflowCrate( + ".", + host, + Some("first_ten_distributed".into()), + None, + profile.clone(), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }), + ); + + deployment.deploy().await.unwrap(); + + deployment.start().await.unwrap(); + + tokio::signal::ctrl_c().await.unwrap() +} diff --git a/hydroflow_plus_test/examples/networked_basic_deploy.rs b/hydroflow_plus_test/examples/networked_basic_deploy.rs new file mode 100644 index 000000000000..54ad96a91afe --- /dev/null +++ b/hydroflow_plus_test/examples/networked_basic_deploy.rs @@ -0,0 +1,42 @@ +use hydro_cli::core::Deployment; +use hydroflow::futures::SinkExt; +use hydroflow::util::cli::ConnectedSink; +use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; + +#[tokio::main] +async fn main() { + let mut deployment = Deployment::new(); + let localhost = deployment.Localhost(); + + let builder = hydroflow_plus::HfBuilder::new(); + let (source_zero_port, _, _) = hydroflow_plus_test::networked::networked_basic( + &builder, + &mut CLIDeployNodeBuilder::new(|id| { + deployment.HydroflowCrate( + ".", + localhost.clone(), + Some("networked_basic".into()), + None, + Some("dev".into()), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }), + ); + + let port_to_zero = source_zero_port + .create_sender(&mut deployment, &localhost) + .await; + + deployment.deploy().await.unwrap(); + + let mut conn_to_zero = port_to_zero.connect().await.into_sink(); + + deployment.start().await.unwrap(); + + for line in std::io::stdin().lines() { + conn_to_zero.send(line.unwrap().into()).await.unwrap(); + } +} diff --git a/hydroflow_plus_test/python_tests/basic.py b/hydroflow_plus_test/python_tests/basic.py deleted file mode 100644 index 2061a37516ab..000000000000 --- a/hydroflow_plus_test/python_tests/basic.py +++ /dev/null @@ -1,48 +0,0 @@ -from codecs import decode -import json -from pathlib import Path -import pytest -import hydro - -@pytest.mark.asyncio -async def test_networked_basic(): - deployment = hydro.Deployment() - localhost_machine = deployment.Localhost() - - sender = deployment.CustomService( - external_ports=[], - on=localhost_machine.client_only(), - ) - - program_zero = deployment.HydroflowCrate( - src=str((Path(__file__).parent.parent).absolute()), - args=["0"], - example="networked_basic", - profile="dev", - on=localhost_machine - ) - - program_one = deployment.HydroflowCrate( - src=str((Path(__file__).parent.parent).absolute()), - args=["1"], - example="networked_basic", - profile="dev", - on=localhost_machine - ) - - sender_port = sender.client_port() - sender_port.send_to(program_zero.ports.node_zero_input) - - program_zero.ports.zero_to_one.send_to(program_one.ports.zero_to_one) - - await deployment.deploy() - - receiver_out = await program_one.stdout() - connection = await (await sender_port.server_port()).into_sink() - - await deployment.start() - await connection.send(bytes("hi!", "utf-8")) - - async for log in receiver_out: - assert log == "node one received: \"hi!\"" - break diff --git a/hydroflow_plus_test/python_tests/requirements.txt b/hydroflow_plus_test/python_tests/requirements.txt deleted file mode 100644 index ee4ba018603b..000000000000 --- a/hydroflow_plus_test/python_tests/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -pytest -pytest-asyncio diff --git a/hydroflow_plus_test/src/bin/first_ten_distributed.rs b/hydroflow_plus_test/src/bin/first_ten_distributed.rs new file mode 100644 index 000000000000..735567deda86 --- /dev/null +++ b/hydroflow_plus_test/src/bin/first_ten_distributed.rs @@ -0,0 +1,10 @@ +// cannot use hydroflow::main because connect_local_blocking causes a deadlock +#[tokio::main] +async fn main() { + let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let ports = hydroflow::util::cli::init().await; + + let joined = hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports, node_id); + + hydroflow::util::cli::launch_flow(joined).await; +} diff --git a/hydroflow_plus_test/examples/networked_basic.rs b/hydroflow_plus_test/src/bin/networked_basic.rs similarity index 88% rename from hydroflow_plus_test/examples/networked_basic.rs rename to hydroflow_plus_test/src/bin/networked_basic.rs index 1f18b163b070..c9d171730c52 100644 --- a/hydroflow_plus_test/examples/networked_basic.rs +++ b/hydroflow_plus_test/src/bin/networked_basic.rs @@ -1,12 +1,10 @@ -use hydroflow_plus_test::*; - // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let ports = hydroflow::util::cli::init().await; - let joined = hydroflow_plus_test::networked::networked_basic!(&ports, node_id); + let joined = hydroflow_plus_test::networked::networked_basic_runtime!(&ports, node_id); hydroflow::util::cli::launch_flow(joined).await; } diff --git a/hydroflow_plus_test/src/first_ten.rs b/hydroflow_plus_test/src/first_ten.rs new file mode 100644 index 000000000000..0071dadc5659 --- /dev/null +++ b/hydroflow_plus_test/src/first_ten.rs @@ -0,0 +1,97 @@ +use hydroflow_plus::node::*; +use hydroflow_plus::*; +use stageleft::*; + +pub fn first_ten<'a, D: HfDeploy<'a>>( + graph: &'a HfBuilder<'a, D>, + node_builder: &mut impl HfNodeBuilder<'a, D>, +) { + let node = graph.node(node_builder); + let numbers = node.source_iter(q!(0..10)); + numbers.for_each(q!(|n| println!("{}", n))); +} + +#[stageleft::entry] +pub fn first_ten_runtime<'a>( + graph: &'a HfBuilder<'a, SingleGraph>, +) -> impl Quoted<'a, Hydroflow<'a>> { + first_ten(graph, &mut ()); + graph.build(q!(0)) +} + +pub fn first_ten_distributed<'a, D: HfNetworkedDeploy<'a>>( + graph: &'a HfBuilder<'a, D>, + node_builder: &mut impl HfNodeBuilder<'a, D>, +) -> D::Node { + let node = graph.node(node_builder); + let second_node = graph.node(node_builder); + + let numbers = node.source_iter(q!(0..10)); + numbers + .send_bincode(&second_node) + .for_each(q!(|n| println!("{}", n))); + + second_node +} + +use hydroflow::util::cli::HydroCLI; +use hydroflow_plus_cli_integration::{CLIRuntime, CLIRuntimeNodeBuilder}; + +#[stageleft::entry] +pub fn first_ten_distributed_runtime<'a>( + graph: &'a HfBuilder<'a, CLIRuntime>, + cli: RuntimeData<&'a HydroCLI>, + node_id: RuntimeData, +) -> impl Quoted<'a, Hydroflow<'a>> { + let _ = first_ten_distributed(graph, &mut CLIRuntimeNodeBuilder::new(cli)); + graph.build(node_id) +} + +#[stageleft::runtime] +#[cfg(test)] +mod tests { + use std::time::Duration; + + use hydro_cli::core::Deployment; + use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; + + #[tokio::test] + async fn first_ten_distributed() { + let mut deployment = Deployment::new(); + let localhost = deployment.Localhost(); + + let builder = hydroflow_plus::HfBuilder::new(); + let second_node = super::first_ten_distributed( + &builder, + &mut CLIDeployNodeBuilder::new(|id| { + deployment.HydroflowCrate( + ".", + localhost.clone(), + Some("first_ten_distributed".into()), + None, + Some("dev".into()), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }), + ); + + deployment.deploy().await.unwrap(); + + let second_node_stdout = second_node.stdout().await; + + deployment.start().await.unwrap(); + + for i in 0..10 { + assert_eq!( + tokio::time::timeout(Duration::from_secs(1), second_node_stdout.recv()) + .await + .unwrap() + .unwrap(), + i.to_string() + ); + } + } +} diff --git a/hydroflow_plus_test/src/lib.rs b/hydroflow_plus_test/src/lib.rs index d532634f1de4..32d0a4046fa0 100644 --- a/hydroflow_plus_test/src/lib.rs +++ b/hydroflow_plus_test/src/lib.rs @@ -1,23 +1,28 @@ stageleft::stageleft_crate!(hydroflow_plus_test_macro); use hydroflow_plus::futures::stream::Stream; +use hydroflow_plus::node::{HfNode, MultiGraph, SingleGraph}; use hydroflow_plus::scheduled::graph::Hydroflow; use hydroflow_plus::tokio::sync::mpsc::UnboundedSender; use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::HfBuilder; use stageleft::{q, Quoted, RuntimeData}; +pub mod first_ten; pub mod networked; #[stageleft::entry(UnboundedReceiverStream)] pub fn teed_join<'a, S: Stream + Unpin + 'a>( - graph: &'a HfBuilder<'a>, + graph: &'a HfBuilder<'a, MultiGraph>, input_stream: RuntimeData, output: RuntimeData<&'a UnboundedSender>, send_twice: bool, node_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { - let source = graph.source_stream(0, input_stream); + let node_zero = graph.node(&mut ()); + let node_one = graph.node(&mut ()); + + let source = node_zero.source_stream(input_stream); let map1 = source.map(q!(|v| (v + 1, ()))); let map2 = source.map(q!(|v| (v - 1, ()))); @@ -33,7 +38,7 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( })); } - let source_node_id_1 = graph.source_iter(1, q!(0..5)); + let source_node_id_1 = node_one.source_iter(q!(0..5)); source_node_id_1.for_each(q!(|v| { output.send(v).unwrap(); })); @@ -43,14 +48,16 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( #[stageleft::entry] pub fn chat_app<'a>( - graph: &'a HfBuilder<'a>, + graph: &'a HfBuilder<'a, SingleGraph>, users_stream: RuntimeData>, messages: RuntimeData>, output: RuntimeData<&'a UnboundedSender<(u32, String)>>, replay_messages: bool, ) -> impl Quoted<'a, Hydroflow<'a>> { - let users = graph.source_stream(0, users_stream).persist(); - let mut messages = graph.source_stream(0, messages); + let node = graph.node(&mut ()); + + let users = node.source_stream(users_stream).persist(); + let mut messages = node.source_stream(messages); if replay_messages { messages = messages.persist(); } @@ -69,15 +76,17 @@ pub fn chat_app<'a>( #[stageleft::entry] pub fn graph_reachability<'a>( - graph: &'a HfBuilder<'a>, + graph: &'a HfBuilder<'a, SingleGraph>, roots: RuntimeData>, edges: RuntimeData>, reached_out: RuntimeData<&'a UnboundedSender>, ) -> impl Quoted<'a, Hydroflow<'a>> { - let roots = graph.source_stream(0, roots); - let edges = graph.source_stream(0, edges); + let node = graph.node(&mut ()); - let (set_reached_cycle, reached_cycle) = graph.cycle(0); + let roots = node.source_stream(roots); + let edges = node.source_stream(edges); + + let (set_reached_cycle, reached_cycle) = node.cycle(); let reached = roots.union(&reached_cycle); let reachable = reached @@ -95,11 +104,13 @@ pub fn graph_reachability<'a>( #[stageleft::entry(String)] pub fn count_elems<'a, T: 'a>( - graph: &'a HfBuilder<'a>, + graph: &'a HfBuilder<'a, SingleGraph>, input_stream: RuntimeData>, output: RuntimeData<&'a UnboundedSender>, ) -> impl Quoted<'a, Hydroflow<'a>> { - let source = graph.source_stream(0, input_stream); + let node = graph.node(&mut ()); + + let source = node.source_stream(input_stream); let count = source.map(q!(|_| 1)).fold(q!(|| 0), q!(|a, b| *a += b)); count.for_each(q!(|v| { diff --git a/hydroflow_plus_test/src/networked.rs b/hydroflow_plus_test/src/networked.rs index 454a2f2b7398..8d27e45a7e39 100644 --- a/hydroflow_plus_test/src/networked.rs +++ b/hydroflow_plus_test/src/networked.rs @@ -1,27 +1,23 @@ use hydroflow::bytes::BytesMut; -use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, HydroCLI}; +use hydroflow::util::cli::HydroCLI; +use hydroflow_plus::node::{HfNetworkedDeploy, HfNode, HfNodeBuilder}; use hydroflow_plus::scheduled::graph::Hydroflow; use hydroflow_plus::HfBuilder; +use hydroflow_plus_cli_integration::{CLIRuntime, CLIRuntimeNodeBuilder}; use stageleft::{q, Quoted, RuntimeData}; -#[stageleft::entry] -pub fn networked_basic<'a>( - graph: &'a HfBuilder<'a>, - cli: RuntimeData<&'a HydroCLI>, - node_id: RuntimeData, -) -> impl Quoted<'a, Hydroflow<'a>> { - let source_zero = graph.source_stream( - 0, - q!({ - cli.port("node_zero_input") - .connect_local_blocking::() - .into_source() - }), - ); +pub fn networked_basic<'a, D: HfNetworkedDeploy<'a>>( + graph: &'a HfBuilder<'a, D>, + node_builder: &mut impl HfNodeBuilder<'a, D>, +) -> (D::Port, D::Node, D::Node) { + let node_zero = graph.node(node_builder); + let node_one = graph.node(node_builder); + + let (source_zero_port, source_zero) = node_zero.source_external(); source_zero .map(q!(|v| v.unwrap().freeze())) - .send_to(1, "zero_to_one", cli) + .send_bytes(&node_one) .for_each(q!(|v: Result| { println!( "node one received: {:?}", @@ -29,5 +25,71 @@ pub fn networked_basic<'a>( ); })); + (source_zero_port, node_zero, node_one) +} + +#[stageleft::entry] +pub fn networked_basic_runtime<'a>( + graph: &'a HfBuilder<'a, CLIRuntime>, + cli: RuntimeData<&'a HydroCLI>, + node_id: RuntimeData, +) -> impl Quoted<'a, Hydroflow<'a>> { + let _ = networked_basic(graph, &mut CLIRuntimeNodeBuilder::new(cli)); graph.build(node_id) } + +#[stageleft::runtime] +#[cfg(test)] +mod tests { + use std::time::Duration; + + use hydro_cli::core::Deployment; + use hydroflow::futures::SinkExt; + use hydroflow::util::cli::ConnectedSink; + use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; + + #[tokio::test] + async fn networked_basic() { + let mut deployment = Deployment::new(); + let localhost = deployment.Localhost(); + + let builder = hydroflow_plus::HfBuilder::new(); + let (source_zero_port, _, node_one) = super::networked_basic( + &builder, + &mut CLIDeployNodeBuilder::new(|id| { + deployment.HydroflowCrate( + ".", + localhost.clone(), + Some("networked_basic".into()), + None, + Some("dev".into()), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }), + ); + + let port_to_zero = source_zero_port + .create_sender(&mut deployment, &localhost) + .await; + + deployment.deploy().await.unwrap(); + + let mut conn_to_zero = port_to_zero.connect().await.into_sink(); + let node_one_stdout = node_one.stdout().await; + + deployment.start().await.unwrap(); + + conn_to_zero.send("hello world!".into()).await.unwrap(); + + assert_eq!( + tokio::time::timeout(Duration::from_secs(1), node_one_stdout.recv()) + .await + .unwrap() + .unwrap(), + "node one received: \"hello world!\"" + ); + } +} diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_no_replay@graphvis_dot.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_no_replay@graphvis_dot.snap index f2409a82ce7d..edc770bf70a7 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_no_replay@graphvis_dot.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_no_replay@graphvis_dot.snap @@ -39,10 +39,6 @@ digraph { label="var stream_3" n7v1 } - subgraph "cluster_sg_1v1_var_stream_4" { - label="var stream_4" - n9v1 - } } } diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_no_replay@graphvis_mermaid.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_no_replay@graphvis_mermaid.snap index b37a365c38f9..47b4e8d5b69f 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_no_replay@graphvis_mermaid.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_no_replay@graphvis_mermaid.snap @@ -35,8 +35,5 @@ subgraph sg_1v1 ["sg_1v1 stratum 0"] subgraph sg_1v1_var_stream_3 ["var stream_3"] 7v1 end - subgraph sg_1v1_var_stream_4 ["var stream_4"] - 9v1 - end end diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_replay@graphvis_dot.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_replay@graphvis_dot.snap index c2c57aa1d843..c406ce38cd27 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_replay@graphvis_dot.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_replay@graphvis_dot.snap @@ -53,10 +53,6 @@ digraph { label="var stream_5" n11v1 } - subgraph "cluster_sg_1v1_var_stream_6" { - label="var stream_6" - n13v1 - } } } diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_replay@graphvis_mermaid.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_replay@graphvis_mermaid.snap index 99161c4f7657..1606d7b2f9bd 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_replay@graphvis_mermaid.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__chat_app_replay@graphvis_mermaid.snap @@ -47,8 +47,5 @@ subgraph sg_1v1 ["sg_1v1 stratum 0"] subgraph sg_1v1_var_stream_5 ["var stream_5"] 11v1 end - subgraph sg_1v1_var_stream_6 ["var stream_6"] - 13v1 - end end diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__count@graphvis_dot.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__count@graphvis_dot.snap index b7b178870cc0..0bac8a3fd0dd 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__count@graphvis_dot.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__count@graphvis_dot.snap @@ -39,10 +39,6 @@ digraph { label="var stream_2" n5v1 } - subgraph "cluster_sg_2v1_var_stream_3" { - label="var stream_3" - n7v1 - } } } diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__count@graphvis_mermaid.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__count@graphvis_mermaid.snap index 79b38a6249ee..4a2184d692ed 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__count@graphvis_mermaid.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__count@graphvis_mermaid.snap @@ -33,8 +33,5 @@ subgraph sg_2v1 ["sg_2v1 stratum 1"] subgraph sg_2v1_var_stream_2 ["var stream_2"] 5v1 end - subgraph sg_2v1_var_stream_3 ["var stream_3"] - 7v1 - end end diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__reachability@graphvis_dot.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__reachability@graphvis_dot.snap index efec6ed6274f..bdda76412bf3 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__reachability@graphvis_dot.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__reachability@graphvis_dot.snap @@ -68,10 +68,6 @@ digraph { label="var stream_7" n14v1 } - subgraph "cluster_sg_1v1_var_stream_8" { - label="var stream_8" - n16v1 - } } } diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__reachability@graphvis_mermaid.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__reachability@graphvis_mermaid.snap index f5b59d5a318c..4408301d8305 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__reachability@graphvis_mermaid.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__reachability@graphvis_mermaid.snap @@ -61,8 +61,5 @@ subgraph sg_1v1 ["sg_1v1 stratum 0"] subgraph sg_1v1_var_stream_7 ["var stream_7"] 14v1 end - subgraph sg_1v1_var_stream_8 ["var stream_8"] - 16v1 - end end diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join@graphvis_dot.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join@graphvis_dot.snap index 308ad21d6e11..c1d61efa862c 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join@graphvis_dot.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join@graphvis_dot.snap @@ -60,10 +60,6 @@ digraph { label="var stream_4" n9v1 } - subgraph "cluster_sg_2v1_var_stream_5" { - label="var stream_5" - n11v1 - } } } diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join@graphvis_mermaid.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join@graphvis_mermaid.snap index 98595ac2d587..257614e9cf06 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join@graphvis_mermaid.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join@graphvis_mermaid.snap @@ -52,8 +52,5 @@ subgraph sg_2v1 ["sg_2v1 stratum 0"] subgraph sg_2v1_var_stream_4 ["var stream_4"] 9v1 end - subgraph sg_2v1_var_stream_5 ["var stream_5"] - 11v1 - end end diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_multi_node@graphvis_dot.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_multi_node@graphvis_dot.snap index 247ff3d0a3e7..ffd86cc1aba7 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_multi_node@graphvis_dot.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_multi_node@graphvis_dot.snap @@ -14,14 +14,10 @@ digraph { label = "sg_1v1\nstratum 0" n1v1 n3v1 - subgraph "cluster_sg_1v1_var_stream_7" { - label="var stream_7" + subgraph "cluster_sg_1v1_var_stream_5" { + label="var stream_5" n1v1 } - subgraph "cluster_sg_1v1_var_stream_8" { - label="var stream_8" - n3v1 - } } } diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_multi_node@graphvis_mermaid.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_multi_node@graphvis_mermaid.snap index f05fd609c74f..9d1dbadc8263 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_multi_node@graphvis_mermaid.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_multi_node@graphvis_mermaid.snap @@ -14,11 +14,8 @@ linkStyle default stroke:#aaa subgraph sg_1v1 ["sg_1v1 stratum 0"] 1v1 3v1 - subgraph sg_1v1_var_stream_7 ["var stream_7"] + subgraph sg_1v1_var_stream_5 ["var stream_5"] 1v1 end - subgraph sg_1v1_var_stream_8 ["var stream_8"] - 3v1 - end end diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_twice@graphvis_dot.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_twice@graphvis_dot.snap index 6982aa4d9573..ace69bfc4365 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_twice@graphvis_dot.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_twice@graphvis_dot.snap @@ -67,14 +67,6 @@ digraph { n9v1 n10v1 } - subgraph "cluster_sg_2v1_var_stream_5" { - label="var stream_5" - n11v1 - } - subgraph "cluster_sg_2v1_var_stream_6" { - label="var stream_6" - n12v1 - } } } diff --git a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_twice@graphvis_mermaid.snap b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_twice@graphvis_mermaid.snap index e46040d95c6d..d81160f1a859 100644 --- a/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_twice@graphvis_mermaid.snap +++ b/hydroflow_plus_test/src/snapshots/hydroflow_plus_test__tests__teed_join_twice@graphvis_mermaid.snap @@ -59,11 +59,5 @@ subgraph sg_2v1 ["sg_2v1 stratum 0"] 9v1 10v1 end - subgraph sg_2v1_var_stream_5 ["var stream_5"] - 11v1 - end - subgraph sg_2v1_var_stream_6 ["var stream_6"] - 12v1 - end end diff --git a/hydroflow_plus_test_macro/Cargo.toml b/hydroflow_plus_test_macro/Cargo.toml index c3ff206a97a8..909f6759d81f 100644 --- a/hydroflow_plus_test_macro/Cargo.toml +++ b/hydroflow_plus_test_macro/Cargo.toml @@ -17,8 +17,7 @@ hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integ hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } tokio = { version = "1.16", features = [ "full" ] } stageleft = { path = "../stageleft", version = "^0.1.0" } -regex = "1" -serde = "1" +hydroflow_plus_cli_integration = { path = "../hydroflow_plus_cli_integration", version = "^0.5.0" } [build-dependencies] stageleft_tool = { path = "../stageleft_tool", version = "^0.1.0" } diff --git a/stageleft/README.md b/stageleft/README.md index e0f99bcf038e..067b015f1d5b 100644 --- a/stageleft/README.md +++ b/stageleft/README.md @@ -1,4 +1,4 @@ -# Stageleft +

Stageleft

Stageleft brings the magic of staged programming to Rust, making it easy to write macros with type-safe logic and high-level APIs that can generate efficient code under the hood. ## Example diff --git a/stageleft/src/lib.rs b/stageleft/src/lib.rs index 28fbcca36115..f74cf37d3f33 100644 --- a/stageleft/src/lib.rs +++ b/stageleft/src/lib.rs @@ -18,6 +18,8 @@ pub use stageleft_macro::{entry, q, quse_fn, runtime}; pub mod runtime_support; use runtime_support::FreeVariable; +use crate::runtime_support::get_final_crate_name; + #[macro_export] macro_rules! stageleft_crate { ($macro_crate:ident) => { @@ -27,7 +29,7 @@ macro_rules! stageleft_crate { #[cfg(not(feature = "macro"))] #[doc(hidden)] - #[allow(unused)] + #[allow(unused, ambiguous_glob_reexports)] pub mod __staged { include!(concat!(env!("OUT_DIR"), "/lib_pub.rs")); } @@ -122,37 +124,40 @@ impl< ) }); - let final_crate = proc_macro_crate::crate_name(crate_name) - .unwrap_or_else(|_| panic!("{crate_name} should be present in `Cargo.toml`")); - let final_crate_root = match final_crate { - proc_macro_crate::FoundCrate::Itself => quote!(crate), - proc_macro_crate::FoundCrate::Name(name) => { - let ident = syn::Ident::new(&name, Span::call_site()); - quote! { #ident } - } - }; + let final_crate_root = get_final_crate_name(crate_name); let module_path: syn::Path = syn::parse_str(&module_path).unwrap(); - let module_path = module_path + let module_path_segments = module_path .segments .iter() .skip(1) // skip crate .cloned() .collect::>(); - let module_path = syn::Path { - leading_colon: None, - segments: syn::punctuated::Punctuated::from_iter(module_path), + let module_path = if module_path_segments.is_empty() { + None + } else { + Some(syn::Path { + leading_colon: None, + segments: syn::punctuated::Punctuated::from_iter(module_path_segments), + }) }; - let expr: syn::Expr = syn::parse(expr_tokens.into()).unwrap(); - ( - None, - Some(quote!({ + let expr: syn::Expr = syn::parse2(expr_tokens).unwrap(); + let with_env = if let Some(module_path) = module_path { + quote!({ use #final_crate_root::#module_path::*; #(#instantiated_free_variables)* #expr - })), - ) + }) + } else { + quote!({ + use #final_crate_root::*; + #(#instantiated_free_variables)* + #expr + }) + }; + + (None, Some(with_env)) } } diff --git a/stageleft/src/runtime_support.rs b/stageleft/src/runtime_support.rs index a32dcae36e8e..868feb27fb61 100644 --- a/stageleft/src/runtime_support.rs +++ b/stageleft/src/runtime_support.rs @@ -4,6 +4,27 @@ use std::mem::MaybeUninit; use proc_macro2::{Span, TokenStream}; use quote::quote; +pub fn get_final_crate_name(crate_name: &str) -> TokenStream { + let final_crate = proc_macro_crate::crate_name(crate_name) + .unwrap_or_else(|_| panic!("{crate_name} should be present in `Cargo.toml`")); + + match final_crate { + proc_macro_crate::FoundCrate::Itself => { + if std::env::var("CARGO_BIN_NAME").is_ok() { + let underscored = crate_name.replace('-', "_"); + let underscored_ident = syn::Ident::new(&underscored, Span::call_site()); + quote! { #underscored_ident } + } else { + quote! { crate } + } + } + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + } +} + pub trait ParseFromLiteral { fn parse_from_literal(literal: &syn::Expr) -> Self; } @@ -93,15 +114,7 @@ pub fn create_import( impl FreeVariable for Import { fn to_tokens(self) -> (Option, Option) { - let final_crate = proc_macro_crate::crate_name(self.crate_name) - .unwrap_or_else(|_| panic!("{} should be present in `Cargo.toml`", self.crate_name)); - let final_crate_root = match final_crate { - proc_macro_crate::FoundCrate::Itself => quote!(crate), - proc_macro_crate::FoundCrate::Name(name) => { - let ident = syn::Ident::new(&name, Span::call_site()); - quote! { #ident } - } - }; + let final_crate_root = get_final_crate_name(self.crate_name); let module_path = syn::parse_str::(self.module_path).unwrap(); let parsed = syn::parse_str::(self.path).unwrap(); diff --git a/stageleft_macro/src/lib.rs b/stageleft_macro/src/lib.rs index 975e9b09aaab..2fd610eaada1 100644 --- a/stageleft_macro/src/lib.rs +++ b/stageleft_macro/src/lib.rs @@ -331,15 +331,7 @@ pub fn entry( }; let final_crate_name = env!("STAGELEFT_FINAL_CRATE_NAME"); - let final_crate = #root::internal::proc_macro_crate::crate_name(final_crate_name) - .unwrap_or_else(|_| panic!("{final_crate_name} should be present in `Cargo.toml`")); - let final_crate_root = match final_crate { - #root::internal::proc_macro_crate::FoundCrate::Itself => ::#root::internal::quote! { crate }, - #root::internal::proc_macro_crate::FoundCrate::Name(name) => { - let ident = #root::internal::syn::Ident::new(&name, #root::internal::Span::call_site()); - ::#root::internal::quote! { #pound ident } - } - }; + let final_crate_root = #root::runtime_support::get_final_crate_name(final_crate_name); let module_path: #root::internal::syn::Path = #root::internal::syn::parse_str(module_path!()).unwrap(); let module_path = module_path.segments.iter().skip(1).cloned().collect::>(); diff --git a/stageleft_tool/src/lib.rs b/stageleft_tool/src/lib.rs index 35776fc64a59..20501b97bceb 100644 --- a/stageleft_tool/src/lib.rs +++ b/stageleft_tool/src/lib.rs @@ -221,6 +221,6 @@ pub fn gen_final_helper(final_crate: &str) { #[macro_export] macro_rules! gen_final { () => { - $crate::gen_final_helper(env!("CARGO_CRATE_NAME")) + $crate::gen_final_helper(env!("CARGO_PKG_NAME")) }; }