-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonolith.rs
125 lines (103 loc) · 5.37 KB
/
monolith.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::thread;
use std::time::Instant;
use hydroflow::futures::StreamExt;
use hydroflow::itertools::Itertools;
use crate::debug_println;
use crate::util::{lt_slice, DataCenterIndexType, LatencyType, PlacementType};
pub(crate) async fn create_and_run_hf<'a>(
expected_optimal_count: usize,
num_d: usize,
num_c: usize,
dist: &[usize],
latency: &[LatencyType],
dist_constraint: usize,
) -> std::time::Duration {
let d: Vec<_> = (0 as DataCenterIndexType..num_d as DataCenterIndexType).collect();
let expected_optimal_count_clone = expected_optimal_count.clone();
let (tx_optimal_count, rx_optimal_count) = hydroflow::util::unbounded_channel();
// Atomic flag for thread to wait for executing and when to stop
let coordination_flag_main = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let latency = latency.to_vec();
let dist = dist.to_vec();
let coordination_flag_thread = coordination_flag_main.clone();
let executor_thread = thread::spawn(move || {
let mut hf = hydroflow::hydroflow_syntax! {
// Directly create pairs as combinations of data centers
data_center_pairs = source_iter(d.into_iter().tuple_combinations());
valid_placements = data_center_pairs
-> inspect(|_| {debug_println!("Filtering DC pair")})
// Apply distance constraint
-> filter(|(d_0, d_1)|{
let dist_index = (*d_0) as usize *num_d + (*d_1) as usize;
dist[dist_index] > dist_constraint
})
// Create placement and compute its latency to all clients
-> inspect(|_|{debug_println!("Computing placement latency")})
-> map(|(d_0, d_1)| -> PlacementType {
let latency_c: Vec<_> = (0..num_c).into_iter().map(|c| {
let latency_index_0 = d_0 as usize * num_c + c;
let latency_index_1 = d_1 as usize * num_c + c;
let latency_0 = latency[latency_index_0];
let latency_1 = latency[latency_index_1];
let write_latency = LatencyType::max(latency_0, latency_1);
let read_latency = LatencyType::min(latency_0, latency_1);
(write_latency, read_latency)
}).collect();
PlacementType {d_0, d_1, latency: latency_c}
})
-> tee();
// Check which placements are dominated by any other placement and remove these
valid_placements -> [0]compare_all_to_all;
valid_placements -> [1]compare_all_to_all;
compare_all_to_all = cross_join_multiset::<'static,'tick>();
filtered_placements = compare_all_to_all
-> inspect(|_|{debug_println!("Filter dominated placement");})
-> demux(|(p_0, p_1): (PlacementType, PlacementType), var_args!(dominated, uncertain)|{
if p_0.has_same_dcs(&p_1) {
// Forward a copy of each placement
uncertain.give(p_0);
} else if lt_slice(&p_0.latency, &p_1.latency) {
dominated.give(p_1);
} else if lt_slice(&p_1.latency, &p_0.latency) {
dominated.give(p_0);
} else {
// NOOP, since we output each placement via the first branch
}
});
filtered_placements[dominated]
-> inspect(|_|{debug_println!("Forwarding dominated placement");})
-> [neg]optimal_placements;
filtered_placements[uncertain] -> [pos]optimal_placements;
optimal_placements = difference::<'tick,'static>();
optimal_placements
-> inspect(|_|{debug_println!("Found optimal placement");})
-> map(|_|1) -> reduce::<'static>(|accum: &mut _, elem| {*accum += elem})
-> for_each(|acc|{
debug_println!("Number of optimal placements: {}", acc);
tx_optimal_count.send(acc).unwrap();
});
};
// Coordinate start by waiting for true
while coordination_flag_thread.load(std::sync::atomic::Ordering::Relaxed) == false {}
debug_println!("Computing optimal placements");
while coordination_flag_thread.load(std::sync::atomic::Ordering::Relaxed) == true {
let _worked = hf.run_available();
debug_println!("Iteration of dominated returned {}", _worked);
}
});
// Receive last element from rx_optimal_count channel
let start_time = Instant::now();
coordination_flag_main.store(true, std::sync::atomic::Ordering::Release);
let mut rx_optimal_count = rx_optimal_count;
while let Some(element) = rx_optimal_count.next().await {
if element >= expected_optimal_count_clone {
println!("Found {}/{} optimal placements", element, expected_optimal_count_clone);
break;
}
}
coordination_flag_main.store(false, std::sync::atomic::Ordering::Release);
let end_time = Instant::now();
let duration = end_time - start_time;
executor_thread.join().expect("Failed to join thread");
return duration;
}