Skip to content

Commit

Permalink
feat(hydroflow_plus): push down persists and implement Pi example (#1021
Browse files Browse the repository at this point in the history
)

Also fixes type inference issues with reduce the same way as we did for fold.
  • Loading branch information
shadaj authored Jan 7, 2024
1 parent 7e4d577 commit af6e3be
Show file tree
Hide file tree
Showing 21 changed files with 463 additions and 101 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 17 additions & 2 deletions hydroflow_lang/src/graph/ops/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,18 @@ pub const REDUCE: OperatorConstraints = OperatorConstraints {
let #ident = {
let mut #input = #input;
let #accumulator_ident = #input.next();

#[inline(always)]
/// A: accumulator type
/// O: output type
fn call_comb_type<A, O>(acc: &mut A, item: A, f: impl Fn(&mut A, A) -> O) -> O {
f(acc, item)
}

if let ::std::option::Option::Some(mut #accumulator_ident) = #accumulator_ident {
for #iterator_item_ident in #input {
#[allow(clippy::redundant_closure_call)]
(#func)(&mut #accumulator_ident, #iterator_item_ident);
call_comb_type(&mut #accumulator_ident, #iterator_item_ident, #func);
}

::std::option::Option::Some(#accumulator_ident)
Expand All @@ -115,10 +123,17 @@ pub const REDUCE: OperatorConstraints = OperatorConstraints {
#input.next()
};

#[inline(always)]
/// A: accumulator type
/// O: output type
fn call_comb_type<A, O>(acc: &mut A, item: A, f: impl Fn(&mut A, A) -> O) -> O {
f(acc, item)
}

let #ret_ident = if let ::std::option::Option::Some(mut #accumulator_ident) = #accumulator_ident {
for #iterator_item_ident in #input {
#[allow(clippy::redundant_closure_call)]
(#func)(&mut #accumulator_ident, #iterator_item_ident);
call_comb_type(&mut #accumulator_ident, #iterator_item_ident, #func);
}

::std::option::Option::Some(#accumulator_ident)
Expand Down
3 changes: 3 additions & 0 deletions hydroflow_plus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ hydroflow_lang = { path = "../hydroflow_lang", version = "^0.5.0" }
serde = { version = "1", features = [ "derive" ] }
bincode = "1.3"
stageleft = { path = "../stageleft", version = "^0.1.0" }

[build-dependencies]
stageleft_tool = { path = "../stageleft_tool", version = "^0.1.0" }
3 changes: 3 additions & 0 deletions hydroflow_plus/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
stageleft_tool::gen_final!();
}
8 changes: 4 additions & 4 deletions hydroflow_plus/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,25 @@ impl<'a, D: LocalDeploy<'a>> FlowBuilder<'a, D> {
(&self.next_id, &self.builders)
}

pub fn process(&'a self, builder: &impl ProcessSpec<'a, D>) -> D::Process {
pub fn process(&'a self, spec: &impl ProcessSpec<'a, D>) -> D::Process {
let mut next_node_id = self.next_node_id.borrow_mut();
let id = *next_node_id;
*next_node_id += 1;

let node = builder.build(id, self, &mut self.meta.borrow_mut());
let node = spec.build(id, self, &mut self.meta.borrow_mut());
self.nodes.borrow_mut().push(node.clone());

self.update_metas();

node
}

pub fn cluster(&'a self, builder: &impl ClusterSpec<'a, D>) -> D::Cluster {
pub fn cluster(&'a self, spec: &impl ClusterSpec<'a, D>) -> D::Cluster {
let mut next_node_id = self.next_node_id.borrow_mut();
let id = *next_node_id;
*next_node_id += 1;

let cluster = builder.build(id, self, &mut self.meta.borrow_mut());
let cluster = spec.build(id, self, &mut self.meta.borrow_mut());
self.clusters.borrow_mut().push(cluster.clone());

self.update_metas();
Expand Down
3 changes: 2 additions & 1 deletion hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ pub struct HfCycle<'a, T, W, N: Location<'a>> {
impl<'a, T, W, N: Location<'a>> HfCycle<'a, T, W, N> {
pub fn complete(self, stream: &Stream<'a, T, W, N>) {
let ident = self.ident;
let stream_ident = stream.ident.clone();
// TODO(shadaj): avoid having to concretize within cycles
let stream_ident = stream.ensure_concrete().ident.clone();

self.builders
.borrow_mut()
Expand Down
2 changes: 2 additions & 0 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
stageleft::stageleft_no_entry_crate!();

use std::marker::PhantomData;

use hydroflow::scheduled::context::Context;
Expand Down
87 changes: 85 additions & 2 deletions hydroflow_plus/src/location/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::cell::RefCell;
use std::io;
use std::marker::PhantomData;
use std::time::Duration;

use hydroflow::bytes::BytesMut;
use hydroflow::futures::stream::Stream as FuturesStream;
use proc_macro2::Span;
use stageleft::Quoted;
use stageleft::{q, Quoted};
use syn::parse_quote;

use crate::builder::Builders;
use crate::stream::{Async, Windowed};
use crate::{FlowBuilder, HfCycle, Stream};

mod graphs;
pub mod graphs;
pub use graphs::*;

pub mod network;
Expand Down Expand Up @@ -72,6 +73,48 @@ pub trait Location<'a>: Clone {

fn update_meta(&mut self, meta: &Self::Meta);

fn spin(&self) -> Stream<'a, (), Async, Self> {
let (next_id_cell, builders) = self.flow_builder();

let next_id = {
let mut next_id = next_id_cell.borrow_mut();
let id = *next_id;
*next_id += 1;
id
};

let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site());

builders
.borrow_mut()
.as_mut()
.unwrap()
.entry(self.id())
.or_default()
.add_statement(parse_quote! {
#ident = spin() -> tee();
});

Stream {
ident,
node: self.clone(),
next_id: next_id_cell,
builders,
is_delta: false,
_phantom: PhantomData,
}
}

fn spin_batch(
&self,
batch_size: impl Quoted<'a, usize> + Copy + 'a,
) -> Stream<'a, (), Windowed, Self> {
self.spin()
.flat_map(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.tick_batch()
}

fn source_stream<T, E: FuturesStream<Item = T> + Unpin>(
&self,
e: impl Quoted<'a, E>,
Expand Down Expand Up @@ -103,6 +146,7 @@ pub trait Location<'a>: Clone {
node: self.clone(),
next_id: next_id_cell,
builders,
is_delta: false,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -146,6 +190,7 @@ pub trait Location<'a>: Clone {
node: self.clone(),
next_id: next_id_cell,
builders,
is_delta: false,
_phantom: PhantomData,
},
)
Expand Down Expand Up @@ -182,6 +227,43 @@ pub trait Location<'a>: Clone {
node: self.clone(),
next_id: next_id_cell,
builders,
is_delta: false,
_phantom: PhantomData,
}
}

fn source_interval(
&self,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Stream<'a, hydroflow::tokio::time::Instant, Async, Self> {
let (next_id_cell, builders) = self.flow_builder();

let next_id = {
let mut next_id = next_id_cell.borrow_mut();
let id = *next_id;
*next_id += 1;
id
};

let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site());
let interval = interval.splice();

builders
.borrow_mut()
.as_mut()
.unwrap()
.entry(self.id())
.or_default()
.add_statement(parse_quote! {
#ident = source_interval(#interval) -> tee();
});

Stream {
ident,
node: self.clone(),
next_id: next_id_cell,
builders,
is_delta: false,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -220,6 +302,7 @@ pub trait Location<'a>: Clone {
node: self.clone(),
next_id: next_id_cell,
builders,
is_delta: false,
_phantom: PhantomData,
},
)
Expand Down
Loading

0 comments on commit af6e3be

Please sign in to comment.