Skip to content

Commit

Permalink
refactor(hydroflow): Update surface_lattice_join tests to use `Deep…
Browse files Browse the repository at this point in the history
…Reveal` (#1032)
  • Loading branch information
MingweiSamuel committed Jan 16, 2024
1 parent 356d00a commit 101cb28
Showing 1 changed file with 23 additions and 35 deletions.
58 changes: 23 additions & 35 deletions hydroflow/tests/surface_lattice_join.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::collections::HashSet;

use hydroflow::util::collect_ready;
use hydroflow_macro::hydroflow_syntax;
use lattices::collections::SingletonMap;
use lattices::DeepReveal;
use multiplatform_test::multiplatform_test;

#[multiplatform_test]
Expand All @@ -14,12 +18,9 @@ pub fn test_lattice_join_fused_join_reducing_behavior() {
source_iter([(7, Max::new(5)), (7, Max::new(6))]) -> [1]my_join;

my_join = _lattice_join_fused_join::<Min<usize>, Max<usize>>()
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> map(DeepReveal::deep_reveal)
-> assert_eq([
(7, (Min::new(5), Max::new(6)))
SingletonMap(7, (5, 6))
]);
};

Expand All @@ -36,11 +37,8 @@ pub fn test_lattice_join_fused_join_set_union() {
source_iter([(7, SetUnionSingletonSet::new_from(5)), (7, SetUnionSingletonSet::new_from(6))]) -> [1]my_join;

my_join = _lattice_join_fused_join::<SetUnionHashSet<usize>, SetUnionHashSet<usize>>()
-> map(|singleton_map| { // TODO(mingwei)
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
}) // TODO(mingwei)
-> assert_eq([(7, (SetUnionHashSet::new_from([5, 6]), SetUnionHashSet::new_from([5, 6])))]);
-> map(DeepReveal::deep_reveal)
-> assert_eq([SingletonMap(7, (HashSet::from_iter([5, 6]), HashSet::from_iter([5, 6])))]);
};

df.run_available();
Expand All @@ -57,20 +55,16 @@ pub fn test_lattice_join_fused_join_map_union() {
source_iter([(7, MapUnionSingletonMap::new_from((3, Min::new(5)))), (7, MapUnionSingletonMap::new_from((3, Min::new(4))))]) -> [1]my_join;

my_join = _lattice_join_fused_join::<MapUnionHashMap<usize, Min<usize>>, MapUnionHashMap<usize, Min<usize>>>()
-> map(|singleton_map| { // TODO(mingwei)
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, v.into_reveal())
})
-> assert(|(_k, (mu1, mu2))| mu1.as_reveal_ref().len() == 1 && mu2.as_reveal_ref().len() == 1)
-> map(|(k, (mu1, mu2))| {
let v1 = mu1.into_reveal().into_iter().next().unwrap();
let v2 = mu2.into_reveal().into_iter().next().unwrap();
-> map(DeepReveal::deep_reveal)
-> assert(|SingletonMap(_k, (mu1, mu2))| mu1.len() == 1 && mu2.len() == 1)
-> map(|SingletonMap(k, (mu1, mu2))| {
let v1 = mu1.into_iter().next().unwrap();
let v2 = mu2.into_iter().next().unwrap();
(k, (v1, v2))
}) // TODO(mingwei)
})
-> assert_eq([
(7, ( (3, Min::new(4)), (3, Min::new(4)) ))
(7, ((3, 4), (3, 4)))
]);
// -> null();
};

df.run_available();
Expand All @@ -83,7 +77,7 @@ pub fn test_lattice_join_fused_join() {
// 'static, 'tick.
{
let (out_tx, mut out_rx) =
hydroflow::util::unbounded_channel::<(usize, (Max<usize>, Max<usize>))>();
hydroflow::util::unbounded_channel::<SingletonMap<usize, (usize, usize)>>();
let (lhs_tx, lhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max<usize>)>();
let (rhs_tx, rhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max<usize>)>();

Expand All @@ -94,10 +88,7 @@ pub fn test_lattice_join_fused_join() {
source_stream(rhs_rx) -> [1]my_join;

my_join
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> map(DeepReveal::deep_reveal)
-> for_each(|v| out_tx.send(v).unwrap());
};

Expand All @@ -110,7 +101,7 @@ pub fn test_lattice_join_fused_join() {
df.run_tick();

let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, vec![(7, (Max::new(4), Max::new(6)))]);
assert_eq!(out, [SingletonMap(7, (4, 6))]);

// Forgets rhs state
rhs_tx.send((7, Max::new(6))).unwrap();
Expand All @@ -119,13 +110,13 @@ pub fn test_lattice_join_fused_join() {
df.run_tick();

let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, vec![(7, (Max::new(4), Max::new(6)))]);
assert_eq!(out, [SingletonMap(7, (4, 6))]);
}

// 'static, 'static.
{
let (out_tx, mut out_rx) =
hydroflow::util::unbounded_channel::<(usize, (Max<usize>, Max<usize>))>();
hydroflow::util::unbounded_channel::<SingletonMap<usize, (usize, usize)>>();
let (lhs_tx, lhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max<usize>)>();
let (rhs_tx, rhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max<usize>)>();

Expand All @@ -136,10 +127,7 @@ pub fn test_lattice_join_fused_join() {
source_stream(rhs_rx) -> [1]my_join;

my_join
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> map(DeepReveal::deep_reveal)
-> for_each(|v| out_tx.send(v).unwrap());
};

Expand All @@ -150,14 +138,14 @@ pub fn test_lattice_join_fused_join() {

df.run_tick();
let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, vec![(7, (Max::new(4), Max::new(6)))]);
assert_eq!(out, [SingletonMap(7, (4, 6))]);

// Doesn't forget
lhs_tx.send((7, Max::new(4))).unwrap();
rhs_tx.send((7, Max::new(5))).unwrap();

df.run_tick();
let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, vec![(7, (Max::new(4), Max::new(6)))]);
assert_eq!(out, [SingletonMap(7, (4, 6))]);
}
}

0 comments on commit 101cb28

Please sign in to comment.