Skip to content

Commit

Permalink
feat(storage-manager): Replication supports Wildcard Updates
Browse files Browse the repository at this point in the history
This commit adds support for Wildcard Updates (be it Delete or Put) when
aligning replicated Storage.

To make this feature possible the following structures and methods were
added / changed:

- A new enumeration `Action` was introduced. This enumeration builds on
  top of `SampleKind` and adds two variants: `WildcardPut` and
  `WildcardDelete`, each of these variants contains their full key
  expression.
  We track the full key expression to not have to deal with `Option` and
  the `strip_prefix`: there are cases where the `strip_prefix` cannot be
  removed from a Wildcard Update — yet the Wildcard Update should be
  recorded.
  For instance, if a storage subscribes to "test/replication/**" and has
  its strip prefix set to "test/replication", a delete on "test/**"
  should be recorded while the `strip_prefix` cannot be removed.

- The `LogLatest`, `Interval` and `SubInterval` structures now have the
  the method `remove_events_overridden_by_wildcard_update`: this method
  removes all the Events that are impacted by a Wildcard Update and
  updates the Fingerprint of the impacted Interval / SubInterval.

- The visibility of the fields of `Event` structure were set to
  `pub(crate)` to prevent modifying them without updating the
  Fingerprint.
  Setters and accessors were thus added.

- The core `Replication` structure now keeps an `Arc` over the
  `StorageService` such that it can add Wildcard Updates to it when it
  receives some.

- The visibility of the following field / structures were changed to
  `pub(crate)` such that the Replication can access them:
  - The `Update` structure that contains the payload and timestamp
    associated with a Wildcard Update.
  - The `configuration`, `wildcard_updates` fields of the
    `StorageService` structure.

- The signature of the method `register_wildcard_update` was changed: it
  no longer extracts all the metadata from the `sample` argument as,
  when called from the Replication Aligner, these values are not
  "correct" (they are linked to a reply and not to the contained
  payload).

* plugins/zenoh-plugin-storage-manager/src/replication/classification.rs:
  added the method `remove_events_overridden_by_wildcard_update` to the
  `Interval` and `SubInterval` structures.

* plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs:
  added the `prefix` accessor.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - Changed the `Replication` structure to keep track of the
    `StorageService`. The latter is used to access and add/remove
    Wildcard Updates.
  - Added the method `remove_events_overridden_by_wildcard_updates`
    that takes a `HashMap` as this logic is shared for the latest cache
    and the Replication Log.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_query.rs:
  updated the `reply_event_retrieval` method to handle Wildcard Updates.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs:
  - Added some traces to ease debugging.
  - Added the method `needs_further_processing` that, given an
    EventMetadata, checks if it needs to be processed, processes it if it
    can and otherwise returns true -- indicating that further processing
    is needed.
  - Added the method `apply_wildcard_update` that applies a Wildcard
    Update.
  - Added the method `is_overridden_by_wildcard_update` that checks if
    there is a Wildcard Update that overrides the provided key
    expression for the provided timestamp.
  - Added the method `store_event_overridden_by_wildcard_update` that
    overrides the provided EventMetadata with the Wildcard Update and
    stores the result.

* plugins/zenoh-plugin-storage-manager/src/replication/log.rs:
  - Added the `Action` enumeration.
  - Changed the visibility of the fields of the `Event` structure to
    `pub(crate)` and added accessors / setters that properly update
    the Fingerprint.
  - Added the method `remove_events_overridden_by_wildcard_update` to
    the `LogLatest` structure.

* plugins/zenoh-plugin-storage-manager/src/replication/mod.rs: make the
  `Action` visible from the `replication` module.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  changed the `spawn_start` function to take an `Arc<StorageService>`
  instead of a reference.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs: wrap the
  `StorageService` inside of an `Arc` such that the `Replication` can use
  it in its tasks.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  - Changed the visibility of the `Update` structure to `pub(crate)` and
    added accessors.
  - Changed the visibility of the `configuration` and `wildcard_updates`
    fields of the `StorageService` structure to `pub(crate)` as the
    `Replication` uses them when dealing with Wildcard Updates.
  - Changed the signature of the method `register_wildcard_update` to
    not extract the metadata from the `Sample` -- as the values are not
    correct when processing an Alignment reply.
  - Changed the visibility of the methods `register_wildcard_update` and
    `overriding_wild_update` to `pub(crate)` as they are used by the
    Replication to deal with Wildcard Updates.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Oct 7, 2024
1 parent c164e5f commit bd6bf44
Show file tree
Hide file tree
Showing 10 changed files with 790 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,50 @@ impl Interval {

result
}

/// Removes and returns the [Event] present in this `Interval` that are overridden by the
/// provided Wildcard Update.
///
/// If the Wildcard Update should be recorded in this `Interval` then the index of the
/// `SubInterval` should also be provided as the removal can be stopped right after: all the
/// [Event]s contained in greater `SubInterval` will, by construction of the Replication Log,
/// only have greater timestamps and thus cannot be overridden by this Wildcard Update.
pub(crate) fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
wildcard_key_expr: &OwnedKeyExpr,
wildcard_timestamp: &Timestamp,
wildcard_sub_idx: Option<SubIntervalIdx>,
) -> HashSet<Event> {
let mut overridden_events = HashSet::new();
for (sub_interval_idx, sub_interval) in self.sub_intervals.iter_mut() {
let mut timestamp = None;
if let Some(wildcard_sub_idx) = wildcard_sub_idx {
if *sub_interval_idx > wildcard_sub_idx {
break;
}

// We only provide the timestamp of the Wildcard Update if the index of the
// SubInterval (if provided) equals the index of SubInterval where the Wildcard
// Update will be stored.
if *sub_interval_idx == wildcard_sub_idx {
timestamp = Some(wildcard_timestamp);
}
}

self.fingerprint ^= sub_interval.fingerprint;

overridden_events.extend(sub_interval.remove_events_overridden_by_wildcard_update(
prefix,
wildcard_key_expr,
timestamp,
));

self.fingerprint ^= sub_interval.fingerprint;
}

overridden_events
}
}

/// A `SubIntervalIdx` represents the index of a [SubInterval].
Expand Down Expand Up @@ -356,6 +400,34 @@ impl SubInterval {

EventRemoval::NotFound
}

/// Removes and returns the [Event] present in this `SubInterval` that are overridden by the
/// provided Wildcard Update.
///
/// The timestamp of the Wildcard Update should only be provided if the considered `SubInterval`
/// is where the Wildcard Update should be recorded.
/// It is only in that specific scenario that we are not sure that all [Event]s have a lower
/// timestamp.
fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
wildcard_key_expr: &OwnedKeyExpr,
wildcard_timestamp: Option<&Timestamp>,
) -> HashSet<Event> {
let overridden_events =
crate::replication::core::remove_events_overridden_by_wildcard_update(
&mut self.events,
prefix,
wildcard_key_expr,
wildcard_timestamp,
);

overridden_events
.iter()
.for_each(|overridden_event| self.fingerprint ^= overridden_event.fingerprint());

overridden_events
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl Configuration {
}
}

pub fn prefix(&self) -> Option<&OwnedKeyExpr> {
self.prefix.as_ref()
}

/// Returns the [Fingerprint] of the `Configuration`.
///
/// The fingerprint is the hash of all its fields, using the `xxhash_rust` crate.
Expand Down
69 changes: 60 additions & 9 deletions plugins/zenoh-plugin-storage-manager/src/replication/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@ mod aligner_query;
mod aligner_reply;

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant, SystemTime},
};

use rand::Rng;
use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use tokio::{sync::RwLock, task::JoinHandle};
use tracing::{debug_span, Instrument};
use zenoh::{
key_expr::{
Expand All @@ -34,13 +31,16 @@ use zenoh::{
},
query::{ConsolidationMode, Selector},
sample::Locality,
time::Timestamp,
Session,
};
use zenoh_backend_traits::Storage;

use self::aligner_reply::AlignmentReply;
use super::{digest::Digest, log::LogLatest};
use crate::{replication::core::aligner_query::AlignmentQuery, storages_mgt::LatestUpdates};
use super::{digest::Digest, log::LogLatest, Action, Event};
use crate::{
replication::core::aligner_query::AlignmentQuery,
storages_mgt::{LatestUpdates, StorageService},
};

kedefine!(
pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}",
Expand All @@ -53,7 +53,7 @@ pub(crate) struct Replication {
pub(crate) replication_log: Arc<RwLock<LogLatest>>,
pub(crate) storage_key_expr: OwnedKeyExpr,
pub(crate) latest_updates: Arc<RwLock<LatestUpdates>>,
pub(crate) storage: Arc<Mutex<Box<dyn Storage>>>,
pub(crate) storage_service: Arc<StorageService>,
}

impl Replication {
Expand Down Expand Up @@ -579,3 +579,54 @@ impl Replication {
})
}
}

pub(crate) fn remove_events_overridden_by_wildcard_update(
events: &mut HashMap<Option<OwnedKeyExpr>, Event>,
prefix: Option<&OwnedKeyExpr>,
wildcard_ke: &OwnedKeyExpr,
wildcard_ts: Option<&Timestamp>,
) -> HashSet<Event> {
let mut overridden_events = HashSet::default();

events.retain(|stripped_key_expr, event| {
// We only provide the timestamp of the Wildcard Update if the Wildcard Update belongs
// in this SubInterval.
//
// Only then do we need to compare its timestamp with the timestamp of the Events
// contained in the SubInterval.
if let Some(wildcard_timestamp) = wildcard_ts {
if wildcard_timestamp < event.timestamp() {
return true;
}
}

let full_event_key_expr = match event.action() {
Action::Put | Action::Delete => {
match crate::prefix(prefix, stripped_key_expr.as_ref()) {
Ok(full_ke) => full_ke,
Err(e) => {
tracing::error!(
"Internal error while attempting to prefix < {:?} > with < {:?} >: \
{e:?}",
stripped_key_expr,
prefix
);
return true;
}
}
}
Action::WildcardPut(wildcard_ke) | Action::WildcardDelete(wildcard_ke) => {
wildcard_ke.clone()
}
};

if wildcard_ke.includes(&full_event_key_expr) {
overridden_events.insert(event.clone());
return false;
}

true
});

overridden_events
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};
use zenoh::{bytes::ZBytes, internal::Value, query::Query, sample::SampleKind};
use zenoh::{bytes::ZBytes, internal::Value, key_expr::keyexpr_tree::IKeyExprTree, query::Query};

use super::aligner_reply::AlignmentReply;
use crate::replication::{
classification::{IntervalIdx, SubIntervalIdx},
core::Replication,
digest::DigestDiff,
log::EventMetadata,
log::{Action, EventMetadata},
};

/// The `AlignmentQuery` enumeration represents the information requested by a Replica to align
Expand Down Expand Up @@ -271,54 +271,70 @@ impl Replication {

/// Replies to the [Query] with the [EventMetadata] and [Value] identified as missing.
///
/// This method will fetch the [StoredData] from the Storage.
/// Depending on the associated action, this method will fetch the [Value] either from the
/// Storage or from the wildcard updates.
pub(crate) async fn reply_event_retrieval(
&self,
query: &Query,
event_to_retrieve: EventMetadata,
) {
let mut value = None;
let value = match &event_to_retrieve.action {
// For a Delete or WildcardDelete there is no associated `Value`.
Action::Delete | Action::WildcardDelete(_) => None,
// For a Put we need to retrieve the `Value` in the Storage.
Action::Put => {
let stored_data = {
let mut storage = self.storage_service.storage.lock().await;
match storage
.get(event_to_retrieve.stripped_key.clone(), "")
.await
{
Ok(stored_data) => stored_data,
Err(e) => {
tracing::error!(
"Failed to retrieve data associated to key < {:?} >: {e:?}",
event_to_retrieve.key_expr()
);
return;
}
}
};

if event_to_retrieve.action == SampleKind::Put {
let stored_data = {
let mut storage = self.storage.lock().await;
match storage
.get(event_to_retrieve.stripped_key.clone(), "")
.await
{
Ok(stored_data) => stored_data,
Err(e) => {
tracing::error!(
"Failed to retrieve data associated to key < {:?} >: {e:?}",
event_to_retrieve.key_expr()
let requested_data = stored_data
.into_iter()
.find(|data| data.timestamp == *event_to_retrieve.timestamp());
match requested_data {
Some(data) => Some(data.value),
None => {
// NOTE: This is not necessarily an error. There is a possibility that the
// data associated with this specific key was updated between the time
// the [AlignmentQuery] was sent and when it is processed.
//
// Hence, at the time it was "valid" but it no longer is.
tracing::debug!(
"Found no data in the Storage associated to key < {:?} > with a \
Timestamp equal to: {}",
event_to_retrieve.key_expr(),
event_to_retrieve.timestamp()
);
return;
}
}
};
}
// For a WildcardPut we need to retrieve the `Value` in the `StorageService`.
Action::WildcardPut(wildcard_ke) => {
let wildcard_update_guard = self.storage_service.wildcard_updates.read().await;

let requested_data = stored_data
.into_iter()
.find(|data| data.timestamp == *event_to_retrieve.timestamp());
match requested_data {
Some(data) => {
value = Some(data.value);
}
None => {
// NOTE: This is not necessarily an error. There is a possibility that the data
// associated with this specific key was updated between the time the
// [AlignmentQuery] was sent and when it is processed.
//
// Hence, at the time it was "valid" but it no longer is.
tracing::debug!(
"Found no data in the Storage associated to key < {:?} > with a Timestamp \
equal to: {}",
event_to_retrieve.key_expr(),
event_to_retrieve.timestamp()
if let Some(update) = wildcard_update_guard.weight_at(wildcard_ke) {
Some(update.value().clone())
} else {
tracing::error!(
"Ignoring Wildcard Update < {wildcard_ke} >: found no associated `Update`."
);
return;
}
}
}
};

reply_to_query(query, AlignmentReply::Retrieval(event_to_retrieve), value).await;
}
Expand Down
Loading

0 comments on commit bd6bf44

Please sign in to comment.