From d8eb41c53a64ddd33bd6f221851d5d5a38a27fd2 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Thu, 18 Jan 2024 12:53:18 -0700 Subject: [PATCH] fix: use lifo for ranges for depth first traversal --- core/src/event_id.rs | 2 +- core/src/interest.rs | 2 +- recon/src/protocol.rs | 40 ++++++++++++++++++---------------------- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/core/src/event_id.rs b/core/src/event_id.rs index bfd7b77d4..6ba6d4239 100644 --- a/core/src/event_id.rs +++ b/core/src/event_id.rs @@ -135,7 +135,7 @@ impl std::fmt::Debug for EventId { } else { let bytes = self.as_slice(); if bytes.len() < 6 { - write!(f, "{}", hex::encode_upper(&bytes)) + write!(f, "{}", hex::encode_upper(bytes)) } else { write!(f, "{}", hex::encode_upper(&bytes[bytes.len() - 6..])) } diff --git a/core/src/interest.rs b/core/src/interest.rs index 465586939..424fce04b 100644 --- a/core/src/interest.rs +++ b/core/src/interest.rs @@ -101,7 +101,7 @@ impl std::fmt::Debug for Interest { } else { let bytes = self.as_slice(); if bytes.len() < 6 { - write!(f, "{}", hex::encode_upper(&bytes)) + write!(f, "{}", hex::encode_upper(bytes)) } else { write!(f, "{}", hex::encode_upper(&bytes[bytes.len() - 6..])) } diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index b43f0949f..c4d43bdb5 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -296,8 +296,9 @@ where { common: Common>, - rx_ranges: Receiver>, - tx_ranges: Sender>, + // Use a stack for buffered ranges as this ensures we traverse depth first + // through the key space tree. + ranges_stack: Vec>, pending_ranges: usize, metrics: Metrics, @@ -313,8 +314,8 @@ where fn new(stream: S, recon: R, tx_want_values: Sender) -> Self { let metrics = recon.metrics(); let stream = SinkFlusher::new(stream, metrics.clone()); - // Use a buffer size large enough to handle the split factor of range requests. - let (tx_ranges, rx_ranges) = channel(PENDING_RANGES_LIMIT * 10); + // Use a stack size large enough to handle the split factor of range requests. + let ranges_stack = Vec::with_capacity(PENDING_RANGES_LIMIT * 10); Self { common: Common { @@ -324,8 +325,7 @@ where tx_want_values, metrics: metrics.clone(), }, - rx_ranges, - tx_ranges, + ranges_stack, pending_ranges: 0, metrics, } @@ -356,10 +356,11 @@ where .await?; } else { for range in ranges { - if self.tx_ranges.try_send(range).is_err() { - self.metrics.record(&RangeEnqueueFailed); - } else { + if self.ranges_stack.len() < self.ranges_stack.capacity() { + self.ranges_stack.push(range); self.metrics.record(&RangeEnqueued); + } else { + self.metrics.record(&RangeEnqueueFailed); } } } @@ -398,21 +399,16 @@ where } async fn each(&mut self) -> Result<()> { if self.pending_ranges < PENDING_RANGES_LIMIT { - let range = match self.rx_ranges.try_recv() { - Ok(r) => { - self.metrics.record(&RangeDequeued); - r - } - Err(_) => return Ok(()), + if let Some(range) = self.ranges_stack.pop() { + self.metrics.record(&RangeDequeued); + self.pending_ranges += 1; + self.common + .stream + .send(InitiatorMessage::RangeRequest(range)) + .await?; }; - self.pending_ranges += 1; - self.common - .stream - .send(InitiatorMessage::RangeRequest(range)) - .await - } else { - Ok(()) } + Ok(()) } async fn finish(&mut self) -> Result<()> {