Skip to content

Commit

Permalink
refactor(storage-manager): optimize lookup
Browse files Browse the repository at this point in the history
WIP

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
Charles-Schleich authored and J-Loudet committed Oct 8, 2024
1 parent ffd213b commit 76f7551
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,26 @@ impl Interval {
}

/// Lookup the provided key expression and return, if found, its associated [Event].
pub(crate) fn lookup(&self, stripped_key: &Option<OwnedKeyExpr>) -> Option<&Event> {
for sub_interval in self.sub_intervals.values() {
if let Some(event) = sub_interval.events.get(stripped_key) {
return Some(event);
pub(crate) fn search_more_recent_event(
&self,
stripped_key: &Option<OwnedKeyExpr>,
timestamp: &Timestamp,
start_sub_interval_idx: Option<SubIntervalIdx>,
) -> Option<&Event> {
let sub_intervals = if let Some(start_sub_interval_idx) = start_sub_interval_idx {
self.sub_intervals
.iter()
.filter(|(&sub_idx, _)| sub_idx >= start_sub_interval_idx)
.map(|(_, sub_interval)| sub_interval)
.collect::<Vec<_>>()
} else {
self.sub_intervals.values().collect::<Vec<_>>()
};

for sub_interval in sub_intervals {
let search_result = sub_interval.events.get(stripped_key);
if search_result.is_some() {
return search_result;
}
}

Expand Down Expand Up @@ -293,6 +309,11 @@ impl SubInterval {
true
}

/// Returns a reference to an [Event] of a [SubInterval] for the given stripped key expression.
pub(crate) fn get(&self, key: &Option<OwnedKeyExpr>) -> Option<&Event> {
self.events.get(key)
}

/// Returns the [Fingerprint] of this `SubInterval`.
///
/// The [Fingerprint] of an `SubInterval` is equal to the XOR (exclusive or) of the fingerprints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ impl Replication {
match &replica_event.action {
SampleKind::Put => {
let replication_log_guard = self.replication_log.read().await;
if let Some(latest_event) =
replication_log_guard.lookup(&replica_event.stripped_key)
if let Some(latest_event) = replication_log_guard
.search_more_recent_event(&replica_event.stripped_key, &replica_event.timestamp)
{
if latest_event.timestamp >= replica_event.timestamp {
return None;
Expand Down
35 changes: 31 additions & 4 deletions plugins/zenoh-plugin-storage-manager/src/replication/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,41 @@ impl LogLatest {
}

/// Lookup the provided key expression and, if found, return its associated [Event].
pub fn lookup(&self, stripped_key: &Option<OwnedKeyExpr>) -> Option<&Event> {
pub fn search_more_recent_event(
&self,
stripped_key: &Option<OwnedKeyExpr>,
timestamp: &Timestamp,
) -> Option<&Event> {
if !self.bloom_filter_event.check(stripped_key) {
return None;
}

for interval in self.intervals.values().rev() {
if let Some(event) = interval.lookup(stripped_key) {
return Some(event);
// NOTE: If the call to `get_time_classification` returns an error it either means that the
// Timestamp is far in the future or the clock of the Host is in a weird state. In both
// cases, there is nothing we can do.
let Ok((start_interval_idx, start_sub_interval_idx)) =
self.configuration.get_time_classification(timestamp)
else {
return None;
};

for (interval_idx, interval) in self
.intervals
.iter()
.filter(|(&idx, _)| idx >= start_interval_idx)
{
let search_result = if interval_idx == &start_interval_idx {
interval.search_more_recent_event(
stripped_key,
timestamp,
Some(start_sub_interval_idx),
)
} else {
interval.search_more_recent_event(stripped_key, timestamp, None)
};

if search_result.is_some() {
return search_result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,11 @@ impl StorageService {
}

if let Some(replication_log) = &self.cache_latest.replication_log {
if let Some(event) = replication_log.read().await.lookup(stripped_key) {
if let Some(event) = replication_log
.read()
.await
.search_more_recent_event(stripped_key, received_ts)
{
if received_ts <= event.timestamp() {
return None;
}
Expand Down

0 comments on commit 76f7551

Please sign in to comment.