Skip to content

Commit

Permalink
fix: use lifo for ranges for depth first traversal
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 18, 2024
1 parent 2c1677d commit d8eb41c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 24 deletions.
2 changes: 1 addition & 1 deletion core/src/event_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl std::fmt::Debug for EventId {
} else {
let bytes = self.as_slice();
if bytes.len() < 6 {
write!(f, "{}", hex::encode_upper(&bytes))
write!(f, "{}", hex::encode_upper(bytes))
} else {
write!(f, "{}", hex::encode_upper(&bytes[bytes.len() - 6..]))
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl std::fmt::Debug for Interest {
} else {
let bytes = self.as_slice();
if bytes.len() < 6 {
write!(f, "{}", hex::encode_upper(&bytes))
write!(f, "{}", hex::encode_upper(bytes))
} else {
write!(f, "{}", hex::encode_upper(&bytes[bytes.len() - 6..]))
}
Expand Down
40 changes: 18 additions & 22 deletions recon/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,9 @@ where
{
common: Common<R, S, InitiatorValueResponseFn<R::Key, R::Hash>>,

rx_ranges: Receiver<Range<R::Key, R::Hash>>,
tx_ranges: Sender<Range<R::Key, R::Hash>>,
// Use a stack for buffered ranges as this ensures we traverse depth first
// through the key space tree.
ranges_stack: Vec<Range<R::Key, R::Hash>>,
pending_ranges: usize,

metrics: Metrics,
Expand All @@ -313,8 +314,8 @@ where
fn new(stream: S, recon: R, tx_want_values: Sender<R::Key>) -> Self {
let metrics = recon.metrics();
let stream = SinkFlusher::new(stream, metrics.clone());
// Use a buffer size large enough to handle the split factor of range requests.
let (tx_ranges, rx_ranges) = channel(PENDING_RANGES_LIMIT * 10);
// Use a stack size large enough to handle the split factor of range requests.
let ranges_stack = Vec::with_capacity(PENDING_RANGES_LIMIT * 10);

Self {
common: Common {
Expand All @@ -324,8 +325,7 @@ where
tx_want_values,
metrics: metrics.clone(),
},
rx_ranges,
tx_ranges,
ranges_stack,
pending_ranges: 0,
metrics,
}
Expand Down Expand Up @@ -356,10 +356,11 @@ where
.await?;
} else {
for range in ranges {
if self.tx_ranges.try_send(range).is_err() {
self.metrics.record(&RangeEnqueueFailed);
} else {
if self.ranges_stack.len() < self.ranges_stack.capacity() {
self.ranges_stack.push(range);
self.metrics.record(&RangeEnqueued);
} else {
self.metrics.record(&RangeEnqueueFailed);
}
}
}
Expand Down Expand Up @@ -398,21 +399,16 @@ where
}
async fn each(&mut self) -> Result<()> {
if self.pending_ranges < PENDING_RANGES_LIMIT {
let range = match self.rx_ranges.try_recv() {
Ok(r) => {
self.metrics.record(&RangeDequeued);
r
}
Err(_) => return Ok(()),
if let Some(range) = self.ranges_stack.pop() {
self.metrics.record(&RangeDequeued);
self.pending_ranges += 1;
self.common
.stream
.send(InitiatorMessage::RangeRequest(range))
.await?;
};
self.pending_ranges += 1;
self.common
.stream
.send(InitiatorMessage::RangeRequest(range))
.await
} else {
Ok(())
}
Ok(())
}

async fn finish(&mut self) -> Result<()> {
Expand Down

0 comments on commit d8eb41c

Please sign in to comment.