Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hydro_lang): avoid crashes and miscompilations with unused locations #1641

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions hydro_lang/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
}

fn extra_stmts(&self, env: &<D as Deploy<'a>>::CompileEnv) -> BTreeMap<usize, Vec<syn::Stmt>> {
let all_locations_count = self.nodes.len() + self.clusters.len();

let mut extra_stmts: BTreeMap<usize, Vec<syn::Stmt>> = BTreeMap::new();
for &c_id in self.clusters.keys() {
let self_id_ident = syn::Ident::new(
Expand All @@ -132,14 +130,14 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
let #self_id_ident = #self_id_expr;
});

for other_location in 0..all_locations_count {
for other_location in self.nodes.keys().chain(self.clusters.keys()) {
let other_id_ident = syn::Ident::new(
&format!("__hydro_lang_cluster_ids_{}", c_id),
Span::call_site(),
);
let other_id_expr = D::cluster_ids(env, c_id).splice_untyped();
extra_stmts
.entry(other_location)
.entry(*other_location)
.or_default()
.push(syn::parse_quote! {
let #other_id_ident = #other_id_expr;
Expand Down Expand Up @@ -176,26 +174,34 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> {
let (mut processes, mut clusters, mut externals) = (
std::mem::take(&mut self.nodes)
.into_iter()
.map(|(node_id, node)| {
node.instantiate(
env,
&mut meta,
compiled.remove(&node_id).unwrap(),
extra_stmts.remove(&node_id).unwrap_or_default(),
);
(node_id, node)
.filter_map(|(node_id, node)| {
if let Some(ir) = compiled.remove(&node_id) {
node.instantiate(
env,
&mut meta,
ir,
extra_stmts.remove(&node_id).unwrap_or_default(),
);
Some((node_id, node))
} else {
None
}
})
.collect::<HashMap<_, _>>(),
std::mem::take(&mut self.clusters)
.into_iter()
.map(|(cluster_id, cluster)| {
cluster.instantiate(
env,
&mut meta,
compiled.remove(&cluster_id).unwrap(),
extra_stmts.remove(&cluster_id).unwrap_or_default(),
);
(cluster_id, cluster)
.filter_map(|(cluster_id, cluster)| {
if let Some(ir) = compiled.remove(&cluster_id) {
cluster.instantiate(
env,
&mut meta,
ir,
extra_stmts.remove(&cluster_id).unwrap_or_default(),
);
Some((cluster_id, cluster))
} else {
None
}
})
.collect::<HashMap<_, _>>(),
std::mem::take(&mut self.externals)
Expand Down
2 changes: 1 addition & 1 deletion hydro_lang/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ impl<'a> Deploy<'a> for HydroDeploy {
fn cluster_ids(
_env: &Self::CompileEnv,
of_cluster: usize,
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a {
) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
}

Expand Down
2 changes: 1 addition & 1 deletion hydro_lang/src/deploy/macro_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<'a> Deploy<'a> for DeployRuntime {
fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a {
) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
crate::deploy_runtime::cluster_members(*env, of_cluster)
}

Expand Down
2 changes: 1 addition & 1 deletion hydro_lang/src/deploy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub trait Deploy<'a> {
fn cluster_ids(
env: &Self::CompileEnv,
of_cluster: usize,
) -> impl QuotedWithContext<'a, &'a Vec<u32>, ()> + Copy + 'a;
) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
}

Expand Down
9 changes: 7 additions & 2 deletions hydro_lang/src/deploy_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ pub struct HydroflowPlusMeta {
pub fn cluster_members(
cli: RuntimeData<&DeployPorts<HydroflowPlusMeta>>,
of_cluster: usize,
) -> impl QuotedWithContext<&Vec<u32>, ()> + Copy {
q!(cli.meta.clusters.get(&of_cluster).unwrap())
) -> impl QuotedWithContext<&[u32], ()> + Copy {
q!(cli
.meta
.clusters
.get(&of_cluster)
.map(|v| v.as_slice())
.unwrap_or(&[])) // we default to empty slice because this is the scenario where the cluster is unused in the graph
}

pub fn cluster_self_id(
Expand Down
2 changes: 1 addition & 1 deletion hydro_lang/src/location/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl<'a, C: 'a, Ctx> FreeVariableWithContext<Ctx> for ClusterIds<'a, C> {
(
None,
Some(
quote! { unsafe { ::std::mem::transmute::<_, &::std::vec::Vec<#root::ClusterId<#c_type>>>(#ident) } },
quote! { unsafe { ::std::mem::transmute::<_, &[#root::ClusterId<#c_type>]>(#ident) } },
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ expression: built.ir()
| res | { let (id , b) = res . unwrap () ; (hydro_lang :: ClusterId :: < () > :: from_raw (id) , hydro_lang :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) },
),
input: FlatMap {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydro_lang :: location :: cluster :: cluster_id :: ClusterId < () > > , _ > > ({ use hydro_lang :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydro_lang :: ClusterId < () > > > (__hydro_lang_cluster_ids_0) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydro_lang :: location :: cluster :: cluster_id :: ClusterId < () > > , _ > > ({ use hydro_lang :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & [hydro_lang :: ClusterId < () >] > (__hydro_lang_cluster_ids_0) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
input: Source {
source: Iter(
{ use crate :: __staged :: cluster :: many_to_many :: * ; 0 .. 2 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ expression: built.ir()
| res | { hydro_lang :: runtime_support :: bincode :: deserialize :: < std :: string :: String > (& res . unwrap ()) . unwrap () },
),
input: Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use hydro_lang :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydro_lang :: ClusterId < hydro_test :: cluster :: map_reduce :: Worker > > > (__hydro_lang_cluster_ids_1) } ; | (i , w) | (ids__free [i % ids__free . len ()] , w) }),
f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use hydro_lang :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & [hydro_lang :: ClusterId < hydro_test :: cluster :: map_reduce :: Worker >] > (__hydro_lang_cluster_ids_1) } ; | (i , w) | (ids__free [i % ids__free . len ()] , w) }),
input: Enumerate {
is_static: true,
input: Map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ expression: ir.surface_syntax_string()
1v1 = source_iter ({ use crate :: __staged :: cluster :: map_reduce :: * ; vec ! ["abc" , "abc" , "xyz" , "abc"] });
2v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < & str , std :: string :: String > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | s | s . to_string () }));
3v1 = enumerate :: < 'static > ();
4v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use hydro_lang :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydro_lang :: ClusterId < hydro_test :: cluster :: map_reduce :: Worker > > > (__hydro_lang_cluster_ids_1) } ; | (i , w) | (ids__free [i % ids__free . len ()] , w) }));
4v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use hydro_lang :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & [hydro_lang :: ClusterId < hydro_test :: cluster :: map_reduce :: Worker >] > (__hydro_lang_cluster_ids_1) } ; | (i , w) | (ids__free [i % ids__free . len ()] , w) }));
5v1 = map (| (id , data) : (hydro_lang :: ClusterId < _ > , std :: string :: String) | { (id . raw_id , hydro_lang :: runtime_support :: bincode :: serialize :: < std :: string :: String > (& data) . unwrap () . into ()) });
6v1 = dest_sink ({ use hydro_lang :: __staged :: deploy_runtime :: * ; let env__free = FAKE ; let p1_port__free = "port_0" ; { env__free . port (p1_port__free) . connect_local_blocking :: < ConnectedDemux < ConnectedDirect > > () . into_sink () } });
7v1 = source_stream ({ use hydro_lang :: __staged :: deploy_runtime :: * ; let env__free = FAKE ; let p2_port__free = "port_1" ; { env__free . port (p2_port__free) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } });
Expand Down
Loading
Loading