Skip to content

Commit

Permalink
feat(hydroflow): auto-configure Hydro Deploy based on Hydroflow+ plans
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Dec 18, 2023
1 parent 27dabcf commit 2ca9d01
Show file tree
Hide file tree
Showing 58 changed files with 1,662 additions and 511 deletions.
10 changes: 0 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,6 @@ jobs:
cd python_tests
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py
cd ../..
- name: Run Hydroflow+ Python tests
run: |
ulimit -c unlimited
cd hydro_cli
source .venv/bin/activate
cd ../hydroflow_plus_test/python_tests
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py
lints:
name: Lints
Expand Down
26 changes: 20 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ members = [
"hydroflow_lang",
"hydroflow_macro",
"hydroflow_plus",
"hydroflow_plus_cli_integration",
"hydroflow_plus_test",
"hydroflow_plus_test_macro",
"lattices",
Expand Down
134 changes: 134 additions & 0 deletions docs/docs/hydroflow_plus/distributed.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
---
sidebar_position: 3
---

# Distributed Hydroflow+
Continuing from our previous example, we will now look at how to extend our program to run on multiple nodes. Recall that our previous flow graph looked like this:

```rust
use hydroflow_plus::*;
use hydroflow_plus::node::*;
use stageleft::*;

pub fn first_ten<'a, D: HfDeploy<'a>>(
graph: &'a HfBuilder<'a, D>,
node_builder: &mut impl HfNodeBuilder<'a, D>
) {
let node = graph.node(node_builder);
let numbers = node.source_iter(q!(0..10));
numbers.for_each(q!(|n| println!("{}", n)));
}
```

## The Flow Graph
Let's extend this example to print the numbers on a separate node. First, we need to specify that our flow graph will involve the network. We do this by replacing the `HfDeploy<'a>` trait bound with `HfNetworkedDeploy<'a>`. Then, we can use the `node_builder` to create a second node:
```rust
use hydroflow_plus::*;
use hydroflow_plus::node::*;
use stageleft::*;

pub fn first_ten_distributed<'a, D: HfNetworkedDeploy<'a>>(
graph: &'a HfBuilder<'a, D>,
node_builder: &mut impl HfNodeBuilder<'a, D>
) {
let node = graph.node(node_builder);
let second_node = graph.node(node_builder);
}
```

Now, we can distribute our dataflow by using the `send_bincode` operator to mark where the data should be sent using bincode serialization.

```rust
let numbers = node.source_iter(q!(0..10));
numbers
.send_bincode(&second_node)
.for_each(q!(|n| println!("{}", n)));
```

## The Runtime
Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with hardcoding node 0 as before. Instead, we must take the node ID as a runtime parameter (`node_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it (via `CLIRuntimeNodeBuilder`).

```rust
use hydroflow::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, CLIRuntimeNodeBuilder};

#[stageleft::entry]
pub fn first_ten_distributed_runtime<'a>(
graph: &'a HfBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI>,
node_id: RuntimeData<usize>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = first_ten_distributed(graph, &mut CLIRuntimeNodeBuilder::new(cli));
graph.build(node_id)
}
```

The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates the CLI and reads the node ID from the command line arguments:

```rust
#[tokio::main]
async fn main() {
let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = flow::first_ten_distributed_runtime!(&ports, node_id);

hydroflow::util::cli::launch_flow(joined).await;
}
```

## The Deployment
Finally, we need to deploy our dataflow with the appropriate network topology. We achieve this by using [Hydro Deploy](../deploy/index.md). Hydroflow+ integrates with Hydro Deploy to automatically construct the topology based on the flow graph. We can create a new file `examples/first_ten_distributed.rs` with the following contents:

```rust
use hydro_cli::core::Deployment;
use hydroflow_plus_cli_integration::CLIDeployNodeBuilder;

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::HfBuilder::new();
hydroflow_plus_test::first_ten::first_ten_distributed(
&builder,
&mut CLIDeployNodeBuilder::new(|id| {
deployment.HydroflowCrate(
".",
localhost.clone(),
Some("first_ten_distributed".into()), // --bin
None,
Some("dev".into()), // --profile
None,
Some(vec![id.to_string()]), // command line arguments
None,
vec![],
)
}),
);

deployment.deploy().await.unwrap();

deployment.start().await.unwrap();

tokio::signal::ctrl_c().await.unwrap()
}
```

Most importantly, we specify a `CLIDeployNodeBuilder`, which takes a closure that constructs a Hydro Deploy node for each node in the flow graph. In our case, we use the `HydroflowCrate` node type, which deploys a Hydroflow+ binary. We also specify the node ID as a command line argument, which is read by our runtime binary.

We can then run our distributed dataflow with:

```bash
$ cargo run --example first_ten_distributed
[service/1] 0
[service/1] 1
[service/1] 2
[service/1] 3
[service/1] 4
[service/1] 5
[service/1] 6
[service/1] 7
[service/1] 8
[service/1] 9
```
22 changes: 22 additions & 0 deletions docs/docs/hydroflow_plus/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
sidebar_position: 1
---

# Introduction
Hydroflow+ layers a high-level Rust API over the Hydroflow IR, making it possible to write dataflow programs that span multiple nodes with straightline, functional Rust code. Hydroflow+ is built on top of [Stageleft](./stageleft.mdx), which allows Hydroflow+ to emit regular Hydroflow programs that are compiled into efficient Rust binaries. It also integrates with [Hydro Deploy](../deploy/index.md) to make it easy to deploy and run Hydroflow+ programs on a cluster.

The main logic of Hydroflow+ programs manipulates **streams**, which capture infinite ordered sequences of elements. Streams are transformed using classic functional operators such as `map`, `filter`, and `fold`, as well as relational operators such as `join`. To build **distributed** dataflow programs, Hydroflow+ also introduces the concept of **nodes**, which capture _where_ a stream is being processed.

## Quickstart
Hydroflow+ requires a particular workspace setup, as any crate that uses Hydroflow+ must have an supporting macro crate to drive the code generation. To get started, we recommend using the Hydroflow+ template.

```bash
$ cargo install cargo-generate
$ cargo generate hydro-project/hydroflow-plus-template
```

Then, you can test the example dataflow:

```bash
$ cargo test
```
8 changes: 8 additions & 0 deletions docs/docs/hydroflow_plus/stageleft.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
title: Stageleft
sidebar_position: 4
---

import StageleftDocs from '../../../stageleft/README.md'

<StageleftDocs/>
91 changes: 91 additions & 0 deletions docs/docs/hydroflow_plus/structure.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
---
sidebar_position: 2
---

# Building Dataflows
Hydroflow+ programs require special structure to support code generation and distributed deployments. There are three main components of a Hydroflow+ program:
- The **flow graph** describes the dataflow logic of the program.
- The **runtime** wraps the dataflow in an executable Rust binary.
- The **deployment** describes how to map the flow graph to instances of the runtime. This is only needed for distributed deployments.

Let's look a minimal example of a Hydroflow+ program. We'll start with a simple flow graph that prints out the first 10 natural numbers. First, we'll define the **flow graph**.


## The Flow Graph
```rust
use hydroflow_plus::*;
use hydroflow_plus::node::*;
use stageleft::*;

pub fn first_ten<'a, D: HfDeploy<'a>>(
graph: &'a HfBuilder<'a, D>,
node_builder: &mut impl HfNodeBuilder<'a, D>
) {}
```

The `graph` variable gives us access to the builder API for constructing the flow graph. The `node_builder` variable defines how to construct the nodes of the flow graph. For now, we will only use the builder once, to create a single node.

```rust
pub fn first_ten<'a, D: HfDeploy<'a>>(
graph: &'a HfBuilder<'a, D>,
node_builder: &mut impl HfNodeBuilder<'a, D>
) {
let node = graph.node(node_builder);
}
```

Now, we can build out the logic of the node. First, we instantiate a stream that emits the first 10 natural numbers.

```rust
let numbers = node.source_iter(q!(0..10));
```

In Hydroflow+, whenever there are snippets of code that will be executed during runtime, we use the `q!` macro to mark them. This includes both static sources of data as well as closures that transform them. For example, to print out these numbers, we can use the `for_each` operator:

```rust
numbers.for_each(q!(|n| println!("{}", n)));
```

## The Runtime
Next, we need to instantiate our flow graph into a runnable Rust binary. We do this by defining a Stageleft entrypoint for the graph, and then instantiating this inside a separate Rust binary.

To define the entrypoint, we use the `#[stageleft::entry]` macro, which takes the graph being built and returns a generated Hydroflow program:

```rust
#[stageleft::entry]
pub fn first_ten_runtime<'a>(
graph: &'a HfBuilder<'a, SingleGraph>
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten(graph, &mut ());
graph.build(q!(0))
}
```

Hydroflow+ graphs can span multiple nodes, each running different pieces of logic. When a Hydroflow+ graph is built into a runnable program, the graph is sliced into components for each of these nodes, and an integer ID is used to determine which subgraph should be executed. In our case, we only have a single node, so we pass a static `q!(0)` node ID to the `build` method.

Stageleft entries are usable as macros from other programs. In our case, we will instantiate our entrypoint from the Rust binary for our dataflow. We can create a new file `src/bin/first_ten.rs` with the following contents:

```rust
#[tokio::main]
async fn main() {
flow::first_ten_runtime!().run_async().await;
}
```

We can now run this binary to see the output of our dataflow:

```bash
$ cargo run --bin first_ten
0
1
2
3
4
5
6
7
8
9
```

In the next section, we will look at how to extend this program to run on multiple nodes.
9 changes: 9 additions & 0 deletions docs/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ const config = {
sidebarId: 'hydroflowSidebar',
label: 'Hydroflow',
},
{
type: 'docSidebar',
sidebarId: 'hydroflowPlusSidebar',
label: 'Hydroflow+',
},
{
type: 'docSidebar',
sidebarId: 'deploySidebar',
Expand Down Expand Up @@ -160,6 +165,10 @@ const config = {
label: 'Hydroflow',
to: '/docs/hydroflow/',
},
{
label: 'Hydroflow+',
to: '/docs/hydroflow_plus/',
},
{
label: 'Hydro Deploy',
to: '/docs/deploy/',
Expand Down
1 change: 1 addition & 0 deletions docs/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
const sidebars = {
// By default, Docusaurus generates a sidebar from the docs folder structure
hydroflowSidebar: [{type: 'autogenerated', dirName: 'hydroflow'}],
hydroflowPlusSidebar: [{type: 'autogenerated', dirName: 'hydroflow_plus'}],
deploySidebar: [{type: 'autogenerated', dirName: 'deploy'}],
};

Expand Down
4 changes: 2 additions & 2 deletions hydro_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ documentation = "https://docs.rs/hydro_cli/"
description = "Hydro Deploy Command Line Interface"

[lib]
name = "hydro"
name = "hydro_cli"
# "cdylib" is necessary to produce a shared library for Python to import from.
crate-type = ["cdylib"]
crate-type = ["lib", "cdylib"]

[dependencies]
tokio = { version = "1.16", features = [ "full" ] }
Expand Down
Loading

0 comments on commit 2ca9d01

Please sign in to comment.