Skip to content

Commit

Permalink
chore: use Duration for LatencyMovAvg
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Jun 27, 2024
1 parent bfc28fe commit 973b377
Showing 1 changed file with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ const MAX_LATENCY: Duration = Duration::from_secs(500);

const WINDOW_SIZE: usize = 15;

type LatencyMovAvg = SumTreeSMA<f64, f64, WINDOW_SIZE>;
// Algorithmic complexity: add sample - O(log(N)), get average - O(1).
// Space complexity: O(N)
type LatencyMovAvg = SumTreeSMA<Duration, u32, WINDOW_SIZE>;

#[derive(Clone, Debug)]
struct WeightedNode {
Expand Down Expand Up @@ -108,19 +110,19 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
}

// If latency is None (meaning Node is unhealthy), we assign some big value
let latency = health.latency.unwrap_or(MAX_LATENCY).as_millis() as f64;
let latency = health.latency.unwrap_or(MAX_LATENCY);

if let Some(idx) = self.weighted_nodes.iter().position(|x| &x.node == node) {
// Node is already in the array (it is not the first update_node() call).
self.weighted_nodes[idx].latency_mov_avg.add_sample(latency);
let latency_avg = self.weighted_nodes[idx].latency_mov_avg.get_average();
// As nodes with smaller average latencies are preferred for routing, we use inverted values for weights.
self.weighted_nodes[idx].weight = 1.0 / latency_avg;
self.weighted_nodes[idx].weight = 1.0 / latency_avg.as_secs_f64();
} else {
// Node is not yet in array (first update_node() call).
let mut latency_mov_avg = LatencyMovAvg::new();
let mut latency_mov_avg = LatencyMovAvg::from_zero(Duration::ZERO);
latency_mov_avg.add_sample(latency);
let weight = 1.0 / latency_mov_avg.get_average();
let weight = 1.0 / latency_mov_avg.get_average().as_secs_f64();
self.weighted_nodes.push(WeightedNode {
latency_mov_avg,
node: node.clone(),
Expand Down Expand Up @@ -194,8 +196,11 @@ mod tests {
assert!(is_updated);
assert!(snapshot.has_nodes());
let weighted_node = snapshot.weighted_nodes.first().unwrap();
assert_eq!(weighted_node.latency_mov_avg.get_average(), 1000.0);
assert_eq!(weighted_node.weight, 0.001);
assert_eq!(
weighted_node.latency_mov_avg.get_average(),
Duration::from_secs(1)
);
assert_eq!(weighted_node.weight, 1.0);
assert_eq!(snapshot.next().unwrap(), node);
// Check second update
let health = HealthCheckStatus {
Expand All @@ -206,8 +211,11 @@ mod tests {
.expect("node update failed");
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
assert_eq!(weighted_node.latency_mov_avg.get_average(), 1500.0);
assert_eq!(weighted_node.weight, 1.0 / 1500.0);
assert_eq!(
weighted_node.latency_mov_avg.get_average(),
Duration::from_millis(1500)
);
assert_eq!(weighted_node.weight, 1.0 / 1.5);
// Check third update
let health = HealthCheckStatus {
latency: Some(Duration::from_secs(3)),
Expand All @@ -217,23 +225,21 @@ mod tests {
.expect("node update failed");
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
assert_eq!(weighted_node.latency_mov_avg.get_average(), 2000.0);
assert_eq!(weighted_node.weight, 1.0 / 2000.0);
assert_eq!(
weighted_node.latency_mov_avg.get_average(),
Duration::from_millis(2000)
);
assert_eq!(weighted_node.weight, 0.5);
// Check forth update with none
let health = HealthCheckStatus { latency: None };
let is_updated = snapshot
.update_node(&node, health)
.expect("node update failed");
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
assert_eq!(
weighted_node.latency_mov_avg.get_average(),
(MAX_LATENCY.as_millis() as f64 + 6000.0) / 4.0
);
assert_eq!(
weighted_node.weight,
4.0 / (MAX_LATENCY.as_millis() as f64 + 6000.0)
);
let avg_latency = Duration::from_secs_f64((MAX_LATENCY.as_secs() as f64 + 6.0) / 4.0);
assert_eq!(weighted_node.latency_mov_avg.get_average(), avg_latency);
assert_eq!(weighted_node.weight, 1.0 / avg_latency.as_secs_f64());
assert_eq!(snapshot.weighted_nodes.len(), 1);
assert_eq!(snapshot.existing_nodes.len(), 1);
assert_eq!(snapshot.next().unwrap(), node);
Expand All @@ -255,7 +261,7 @@ mod tests {
// Add node_1 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
node: node_1.clone(),
latency_mov_avg: LatencyMovAvg::new(),
latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO),
weight: 0.0,
});
// Sync with node_1 again
Expand All @@ -279,7 +285,7 @@ mod tests {
// Add node_2 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
node: node_2.clone(),
latency_mov_avg: LatencyMovAvg::new(),
latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO),
weight: 0.0,
});
// Sync with [node_2, node_3]
Expand All @@ -296,7 +302,7 @@ mod tests {
// Add node_3 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
node: node_3,
latency_mov_avg: LatencyMovAvg::new(),
latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO),
weight: 0.0,
});
// Sync with []
Expand Down

0 comments on commit 973b377

Please sign in to comment.