diff --git a/hydroflow/tests/surface_lattice_join.rs b/hydroflow/tests/surface_lattice_join.rs index 75730c2ef286..caf4ac8d95a3 100644 --- a/hydroflow/tests/surface_lattice_join.rs +++ b/hydroflow/tests/surface_lattice_join.rs @@ -1,5 +1,9 @@ +use std::collections::HashSet; + use hydroflow::util::collect_ready; use hydroflow_macro::hydroflow_syntax; +use lattices::collections::SingletonMap; +use lattices::DeepReveal; use multiplatform_test::multiplatform_test; #[multiplatform_test] @@ -14,12 +18,9 @@ pub fn test_lattice_join_fused_join_reducing_behavior() { source_iter([(7, Max::new(5)), (7, Max::new(6))]) -> [1]my_join; my_join = _lattice_join_fused_join::, Max>() - -> map(|singleton_map| { - let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal(); - (k, (v.into_reveal())) - }) + -> map(DeepReveal::deep_reveal) -> assert_eq([ - (7, (Min::new(5), Max::new(6))) + SingletonMap(7, (5, 6)) ]); }; @@ -36,11 +37,8 @@ pub fn test_lattice_join_fused_join_set_union() { source_iter([(7, SetUnionSingletonSet::new_from(5)), (7, SetUnionSingletonSet::new_from(6))]) -> [1]my_join; my_join = _lattice_join_fused_join::, SetUnionHashSet>() - -> map(|singleton_map| { // TODO(mingwei) - let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal(); - (k, (v.into_reveal())) - }) // TODO(mingwei) - -> assert_eq([(7, (SetUnionHashSet::new_from([5, 6]), SetUnionHashSet::new_from([5, 6])))]); + -> map(DeepReveal::deep_reveal) + -> assert_eq([SingletonMap(7, (HashSet::from_iter([5, 6]), HashSet::from_iter([5, 6])))]); }; df.run_available(); @@ -57,20 +55,16 @@ pub fn test_lattice_join_fused_join_map_union() { source_iter([(7, MapUnionSingletonMap::new_from((3, Min::new(5)))), (7, MapUnionSingletonMap::new_from((3, Min::new(4))))]) -> [1]my_join; my_join = _lattice_join_fused_join::>, MapUnionHashMap>>() - -> map(|singleton_map| { // TODO(mingwei) - let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal(); - (k, v.into_reveal()) - }) - -> assert(|(_k, (mu1, mu2))| mu1.as_reveal_ref().len() == 1 && mu2.as_reveal_ref().len() == 1) - -> map(|(k, (mu1, mu2))| { - let v1 = mu1.into_reveal().into_iter().next().unwrap(); - let v2 = mu2.into_reveal().into_iter().next().unwrap(); + -> map(DeepReveal::deep_reveal) + -> assert(|SingletonMap(_k, (mu1, mu2))| mu1.len() == 1 && mu2.len() == 1) + -> map(|SingletonMap(k, (mu1, mu2))| { + let v1 = mu1.into_iter().next().unwrap(); + let v2 = mu2.into_iter().next().unwrap(); (k, (v1, v2)) - }) // TODO(mingwei) + }) -> assert_eq([ - (7, ( (3, Min::new(4)), (3, Min::new(4)) )) + (7, ((3, 4), (3, 4))) ]); - // -> null(); }; df.run_available(); @@ -83,7 +77,7 @@ pub fn test_lattice_join_fused_join() { // 'static, 'tick. { let (out_tx, mut out_rx) = - hydroflow::util::unbounded_channel::<(usize, (Max, Max))>(); + hydroflow::util::unbounded_channel::>(); let (lhs_tx, lhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max)>(); let (rhs_tx, rhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max)>(); @@ -94,10 +88,7 @@ pub fn test_lattice_join_fused_join() { source_stream(rhs_rx) -> [1]my_join; my_join - -> map(|singleton_map| { - let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal(); - (k, (v.into_reveal())) - }) + -> map(DeepReveal::deep_reveal) -> for_each(|v| out_tx.send(v).unwrap()); }; @@ -110,7 +101,7 @@ pub fn test_lattice_join_fused_join() { df.run_tick(); let out: Vec<_> = collect_ready(&mut out_rx); - assert_eq!(out, vec![(7, (Max::new(4), Max::new(6)))]); + assert_eq!(out, [SingletonMap(7, (4, 6))]); // Forgets rhs state rhs_tx.send((7, Max::new(6))).unwrap(); @@ -119,13 +110,13 @@ pub fn test_lattice_join_fused_join() { df.run_tick(); let out: Vec<_> = collect_ready(&mut out_rx); - assert_eq!(out, vec![(7, (Max::new(4), Max::new(6)))]); + assert_eq!(out, [SingletonMap(7, (4, 6))]); } // 'static, 'static. { let (out_tx, mut out_rx) = - hydroflow::util::unbounded_channel::<(usize, (Max, Max))>(); + hydroflow::util::unbounded_channel::>(); let (lhs_tx, lhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max)>(); let (rhs_tx, rhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max)>(); @@ -136,10 +127,7 @@ pub fn test_lattice_join_fused_join() { source_stream(rhs_rx) -> [1]my_join; my_join - -> map(|singleton_map| { - let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal(); - (k, (v.into_reveal())) - }) + -> map(DeepReveal::deep_reveal) -> for_each(|v| out_tx.send(v).unwrap()); }; @@ -150,7 +138,7 @@ pub fn test_lattice_join_fused_join() { df.run_tick(); let out: Vec<_> = collect_ready(&mut out_rx); - assert_eq!(out, vec![(7, (Max::new(4), Max::new(6)))]); + assert_eq!(out, [SingletonMap(7, (4, 6))]); // Doesn't forget lhs_tx.send((7, Max::new(4))).unwrap(); @@ -158,6 +146,6 @@ pub fn test_lattice_join_fused_join() { df.run_tick(); let out: Vec<_> = collect_ready(&mut out_rx); - assert_eq!(out, vec![(7, (Max::new(4), Max::new(6)))]); + assert_eq!(out, [SingletonMap(7, (4, 6))]); } }