Skip to content

Commit

Permalink
WIP: wildcard update
Browse files Browse the repository at this point in the history
  • Loading branch information
J-Loudet committed Oct 4, 2024
1 parent c164e5f commit 4fcfb9a
Show file tree
Hide file tree
Showing 9 changed files with 544 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,50 @@ impl Interval {

result
}

/// Removes and returns the [Event] present in this `Interval` that are overridden by the
/// provided Wildcard Update.
///
/// If the Wildcard Update should be recorded in this `Interval` then the index of the
/// `SubInterval` should also be provided as the removal can be stopped right after: all the
/// [Event]s contained in greater `SubInterval` will, by construction of the Replication Log,
/// only have greater timestamps and thus cannot be overridden by this Wildcard Update.
pub(crate) fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
wildcard_key_expr: &OwnedKeyExpr,
wildcard_timestamp: &Timestamp,
wildcard_sub_idx: Option<SubIntervalIdx>,
) -> HashSet<Event> {
let mut overridden_events = HashSet::new();
for (sub_interval_idx, sub_interval) in self.sub_intervals.iter_mut() {
let mut timestamp = None;
if let Some(wildcard_sub_idx) = wildcard_sub_idx {
if *sub_interval_idx > wildcard_sub_idx {
break;
}

// We only provide the timestamp of the Wildcard Update if the index of the
// SubInterval (if provided) equals the index of SubInterval where the Wildcard
// Update will be stored.
if *sub_interval_idx == wildcard_sub_idx {
timestamp = Some(wildcard_timestamp);
}
}

self.fingerprint ^= sub_interval.fingerprint;

overridden_events.extend(sub_interval.remove_events_overridden_by_wildcard_update(
prefix,
wildcard_key_expr,
timestamp,
));

self.fingerprint ^= sub_interval.fingerprint;
}

overridden_events
}
}

/// A `SubIntervalIdx` represents the index of a [SubInterval].
Expand Down Expand Up @@ -356,6 +400,34 @@ impl SubInterval {

EventRemoval::NotFound
}

/// Removes and returns the [Event] present in this `SubInterval` that are overridden by the
/// provided Wildcard Update.
///
/// The timestamp of the Wildcard Update should only be provided if the considered `SubInterval`
/// is where the Wildcard Update should be recorded.
/// It is only in that specific scenario that we are not sure that all [Event]s have a lower
/// timestamp.
fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
wildcard_key_expr: &OwnedKeyExpr,
wildcard_timestamp: Option<&Timestamp>,
) -> HashSet<Event> {
let overridden_events =
crate::replication::core::remove_events_overridden_by_wildcard_update(
&mut self.events,
prefix,
wildcard_key_expr,
wildcard_timestamp,
);

overridden_events
.iter()
.for_each(|overridden_event| self.fingerprint ^= overridden_event.fingerprint());

overridden_events
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl Configuration {
}
}

pub fn prefix(&self) -> Option<&OwnedKeyExpr> {
self.prefix.as_ref()
}

/// Returns the [Fingerprint] of the `Configuration`.
///
/// The fingerprint is the hash of all its fields, using the `xxhash_rust` crate.
Expand Down
58 changes: 49 additions & 9 deletions plugins/zenoh-plugin-storage-manager/src/replication/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@ mod aligner_query;
mod aligner_reply;

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant, SystemTime},
};

use rand::Rng;
use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use tokio::{sync::RwLock, task::JoinHandle};
use tracing::{debug_span, Instrument};
use zenoh::{
key_expr::{
Expand All @@ -34,13 +31,16 @@ use zenoh::{
},
query::{ConsolidationMode, Selector},
sample::Locality,
time::Timestamp,
Session,
};
use zenoh_backend_traits::Storage;

use self::aligner_reply::AlignmentReply;
use super::{digest::Digest, log::LogLatest};
use crate::{replication::core::aligner_query::AlignmentQuery, storages_mgt::LatestUpdates};
use super::{digest::Digest, log::LogLatest, Event};
use crate::{
replication::core::aligner_query::AlignmentQuery,
storages_mgt::{LatestUpdates, StorageService},
};

kedefine!(
pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}",
Expand All @@ -53,7 +53,7 @@ pub(crate) struct Replication {
pub(crate) replication_log: Arc<RwLock<LogLatest>>,
pub(crate) storage_key_expr: OwnedKeyExpr,
pub(crate) latest_updates: Arc<RwLock<LatestUpdates>>,
pub(crate) storage: Arc<Mutex<Box<dyn Storage>>>,
pub(crate) storage_service: Arc<StorageService>,
}

impl Replication {
Expand Down Expand Up @@ -579,3 +579,43 @@ impl Replication {
})
}
}

pub(crate) fn remove_events_overridden_by_wildcard_update(
events: &mut HashMap<Option<OwnedKeyExpr>, Event>,
prefix: Option<&OwnedKeyExpr>,
wildcard_ke: &OwnedKeyExpr,
wildcard_ts: Option<&Timestamp>,
) -> HashSet<Event> {
let mut overridden_events = HashSet::default();

events.retain(|stripped_key_expr, event| {
// We only provide the timestamp of the Wildcard Update if the Wildcard Update belongs
// in this SubInterval.
//
// Only then do we need to compare its timestamp with the timestamp of the Events
// contained in the SubInterval.
if let Some(wildcard_timestamp) = wildcard_ts {
if wildcard_timestamp < event.timestamp() {
return true;
}
}

let Ok(full_event_key_expr) = crate::prefix(prefix, stripped_key_expr.as_ref()) else {
tracing::error!(
"Internal error while attempting to prefix < {:?} > with < {:?} >",
stripped_key_expr,
prefix
);
return true;
};

if wildcard_ke.includes(&full_event_key_expr) {
overridden_events.insert(event.clone());
return false;
}

true
});

overridden_events
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};
use zenoh::{bytes::ZBytes, internal::Value, query::Query, sample::SampleKind};
use zenoh::{bytes::ZBytes, internal::Value, key_expr::keyexpr_tree::IKeyExprTree, query::Query};

use super::aligner_reply::AlignmentReply;
use crate::replication::{
classification::{IntervalIdx, SubIntervalIdx},
core::Replication,
digest::DigestDiff,
log::EventMetadata,
log::{Action, EventMetadata},
};

/// The `AlignmentQuery` enumeration represents the information requested by a Replica to align
Expand Down Expand Up @@ -108,7 +108,17 @@ impl Replication {
.get(&interval_idx)
{
interval.sub_intervals().for_each(|sub_interval| {
events_to_retrieve.extend(sub_interval.events().map(Into::into));
events_to_retrieve.extend(sub_interval
.events()
// .filter(|event| {
// // NOTE: We process Wildcard Update separately, as for `WildcardPut`
// // the payload is stored separately.
// !matches!(
// event.action,
// Action::WildcardPut(_) | Action::WildcardDelete(_)
// )
// })
.map(Into::into));
});
}

Expand All @@ -118,6 +128,39 @@ impl Replication {
self.reply_event_retrieval(&query, event_to_retrieve).await;
}
}

// let wildcard_updates = self
// .storage_service
// .wildcard_updates
// .read()
// .await
// .key_value_pairs()
// .map(|(wildcard_update_ke, update)| {
// let action = match update.kind() {
// SampleKind::Put => Action::WildcardPut(wildcard_update_ke.clone()),
// SampleKind::Delete => {
// Action::WildcardDelete(wildcard_update_ke.clone())
// }
// };

// let event_metadata = EventMetadata {
// stripped_key: Some(wildcard_update_ke.clone()),
// timestamp: *update.timestamp(),
// action,
// };

// (event_metadata, update.value().clone())
// })
// .collect::<HashMap<EventMetadata, Value>>();

// for (wildcard_update_metadata, update_value) in wildcard_updates {
// reply_to_query(
// &query,
// AlignmentReply::Retrieval(wildcard_update_metadata),
// Some(update_value),
// )
// .await;
// }
}
AlignmentQuery::Diff(digest_diff) => {
tracing::trace!("Processing `AlignmentQuery::Diff`");
Expand Down Expand Up @@ -271,54 +314,70 @@ impl Replication {

/// Replies to the [Query] with the [EventMetadata] and [Value] identified as missing.
///
/// This method will fetch the [StoredData] from the Storage.
/// Depending on the associated action, this method will fetch the [Value] either from the
/// Storage or from the wildcard updates.
pub(crate) async fn reply_event_retrieval(
&self,
query: &Query,
event_to_retrieve: EventMetadata,
) {
let mut value = None;
let value = match &event_to_retrieve.action {
// For a Delete or WildcardDelete there is no associated `Value`.
Action::Delete | Action::WildcardDelete(_) => None,
// For a Put we need to retrieve the `Value` in the Storage.
Action::Put => {
let stored_data = {
let mut storage = self.storage_service.storage.lock().await;
match storage
.get(event_to_retrieve.stripped_key.clone(), "")
.await
{
Ok(stored_data) => stored_data,
Err(e) => {
tracing::error!(
"Failed to retrieve data associated to key < {:?} >: {e:?}",
event_to_retrieve.key_expr()
);
return;
}
}
};

if event_to_retrieve.action == SampleKind::Put {
let stored_data = {
let mut storage = self.storage.lock().await;
match storage
.get(event_to_retrieve.stripped_key.clone(), "")
.await
{
Ok(stored_data) => stored_data,
Err(e) => {
tracing::error!(
"Failed to retrieve data associated to key < {:?} >: {e:?}",
event_to_retrieve.key_expr()
let requested_data = stored_data
.into_iter()
.find(|data| data.timestamp == *event_to_retrieve.timestamp());
match requested_data {
Some(data) => Some(data.value),
None => {
// NOTE: This is not necessarily an error. There is a possibility that the
// data associated with this specific key was updated between the time
// the [AlignmentQuery] was sent and when it is processed.
//
// Hence, at the time it was "valid" but it no longer is.
tracing::debug!(
"Found no data in the Storage associated to key < {:?} > with a \
Timestamp equal to: {}",
event_to_retrieve.key_expr(),
event_to_retrieve.timestamp()
);
return;
}
}
};
}
// For a WildcardPut we need to retrieve the `Value` in the `StorageService`.
Action::WildcardPut(wildcard_ke) => {
let wildcard_update_guard = self.storage_service.wildcard_updates.read().await;

let requested_data = stored_data
.into_iter()
.find(|data| data.timestamp == *event_to_retrieve.timestamp());
match requested_data {
Some(data) => {
value = Some(data.value);
}
None => {
// NOTE: This is not necessarily an error. There is a possibility that the data
// associated with this specific key was updated between the time the
// [AlignmentQuery] was sent and when it is processed.
//
// Hence, at the time it was "valid" but it no longer is.
tracing::debug!(
"Found no data in the Storage associated to key < {:?} > with a Timestamp \
equal to: {}",
event_to_retrieve.key_expr(),
event_to_retrieve.timestamp()
if let Some(update) = wildcard_update_guard.weight_at(wildcard_ke) {
Some(update.value().clone())
} else {
tracing::error!(
"Ignoring Wildcard Update < {wildcard_ke} >: found no associated `Update`."
);
return;
}
}
}
};

reply_to_query(query, AlignmentReply::Retrieval(event_to_retrieve), value).await;
}
Expand Down
Loading

0 comments on commit 4fcfb9a

Please sign in to comment.