Skip to content

Commit

Permalink
feat(storage-manager): Replication supports Wildcard Updates
Browse files Browse the repository at this point in the history
-----
WIP:
- reword commit
- remove extra logging
- check for duplicated code
-----

This commit adds support for Wildcard Updates (be it Delete or Put) when
aligning replicated Storage.

To make this feature possible the following structures and methods were
added / changed:

- A new enumeration `Action` was introduced. This enumeration builds on
  top of `SampleKind` and adds two variants: `WildcardPut` and
  `WildcardDelete`, each of these variants contains their full key
  expression.
  We track the full key expression to not have to deal with `Option` and
  the `strip_prefix`: there are cases where the `strip_prefix` cannot be
  removed from a Wildcard Update — yet the Wildcard Update should be
  recorded.
  For instance, if a storage subscribes to "test/replication/**" and has
  its strip prefix set to "test/replication", a delete on "test/**"
  should be recorded while the `strip_prefix` cannot be removed.

- The `LogLatest`, `Interval` and `SubInterval` structures now have the
  the method `remove_events_overridden_by_wildcard_update`: this method
  removes all the Events that are impacted by a Wildcard Update and
  updates the Fingerprint of the impacted Interval / SubInterval.

- The visibility of the fields of `Event` structure were set to
  `pub(crate)` to prevent modifying them without updating the
  Fingerprint.
  Setters and accessors were thus added.

- The core `Replication` structure now keeps an `Arc` over the
  `StorageService` such that it can add Wildcard Updates to it when it
  receives some.

- The visibility of the following field / structures were changed to
  `pub(crate)` such that the Replication can access them:
  - The `Update` structure that contains the payload and timestamp
    associated with a Wildcard Update.
  - The `configuration`, `wildcard_updates` fields of the
    `StorageService` structure.

- The signature of the method `register_wildcard_update` was changed: it
  no longer extracts all the metadata from the `sample` argument as,
  when called from the Replication Aligner, these values are not
  "correct" (they are linked to a reply and not to the contained
  payload).

* plugins/zenoh-plugin-storage-manager/src/replication/classification.rs:
  added the method `remove_events_overridden_by_wildcard_update` to the
  `Interval` and `SubInterval` structures.

* plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs:
  added the `prefix` accessor.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - Changed the `Replication` structure to keep track of the
    `StorageService`. The latter is used to access and add/remove
    Wildcard Updates.
  - Added the method `remove_events_overridden_by_wildcard_updates`
    that takes a `HashMap` as this logic is shared for the latest cache
    and the Replication Log.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_query.rs:
  updated the `reply_event_retrieval` method to handle Wildcard Updates.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs:
  - Added some traces to ease debugging.
  - Added the method `needs_further_processing` that, given an
    EventMetadata, checks if it needs to be processed, processes it if it
    can and otherwise returns true -- indicating that further processing
    is needed.
  - Added the method `apply_wildcard_update` that applies a Wildcard
    Update.
  - Added the method `is_overridden_by_wildcard_update` that checks if
    there is a Wildcard Update that overrides the provided key
    expression for the provided timestamp.
  - Added the method `store_event_overridden_by_wildcard_update` that
    overrides the provided EventMetadata with the Wildcard Update and
    stores the result.

* plugins/zenoh-plugin-storage-manager/src/replication/log.rs:
  - Added the `Action` enumeration.
  - Changed the visibility of the fields of the `Event` structure to
    `pub(crate)` and added accessors / setters that properly update
    the Fingerprint.
  - Added the method `remove_events_overridden_by_wildcard_update` to
    the `LogLatest` structure.

* plugins/zenoh-plugin-storage-manager/src/replication/mod.rs: make the
  `Action` visible from the `replication` module.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  changed the `spawn_start` function to take an `Arc<StorageService>`
  instead of a reference.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs: wrap the
  `StorageService` inside of an `Arc` such that the `Replication` can use
  it in its tasks.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  - Changed the visibility of the `Update` structure to `pub(crate)` and
    added accessors.
  - Changed the visibility of the `configuration` and `wildcard_updates`
    fields of the `StorageService` structure to `pub(crate)` as the
    `Replication` uses them when dealing with Wildcard Updates.
  - Changed the signature of the method `register_wildcard_update` to
    not extract the metadata from the `Sample` -- as the values are not
    correct when processing an Alignment reply.
  - Changed the visibility of the methods `register_wildcard_update` and
    `overriding_wild_update` to `pub(crate)` as they are used by the
    Replication to deal with Wildcard Updates.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Oct 10, 2024
1 parent b61fbe7 commit 902f984
Show file tree
Hide file tree
Showing 12 changed files with 1,374 additions and 451 deletions.
139 changes: 111 additions & 28 deletions plugins/zenoh-plugin-storage-manager/src/replication/classification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ use std::{
};

use serde::{Deserialize, Serialize};
use zenoh::{key_expr::OwnedKeyExpr, time::Timestamp};
use zenoh::{key_expr::OwnedKeyExpr, sample::SampleKind, time::Timestamp};

use super::{digest::Fingerprint, log::Event};
use super::{
digest::Fingerprint,
log::{Event, EventMetadata, LogLatestKey},
};

/// The `EventRemoval` enumeration lists the possible outcomes when searching for an older [Event]
/// and removing it if one was found.
Expand Down Expand Up @@ -103,7 +106,7 @@ impl Interval {
#[cfg(debug_assertions)]
pub(crate) fn assert_only_one_event_per_key_expr(
&self,
events: &mut HashSet<Option<OwnedKeyExpr>>,
events: &mut HashSet<LogLatestKey>,
) -> bool {
for sub_interval in self.sub_intervals.values() {
if !sub_interval.assert_only_one_event_per_key_expr(events) {
Expand Down Expand Up @@ -137,9 +140,9 @@ impl Interval {
}

/// Lookup the provided key expression and return, if found, its associated [Event].
pub(crate) fn lookup(&self, stripped_key: &Option<OwnedKeyExpr>) -> Option<&Event> {
pub(crate) fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
for sub_interval in self.sub_intervals.values() {
if let Some(event) = sub_interval.events.get(stripped_key) {
if let Some(event) = sub_interval.lookup(event_to_lookup) {
return Some(event);
}
}
Expand Down Expand Up @@ -187,16 +190,12 @@ impl Interval {
/// The [Fingerprint] of this Interval will be updated accordingly.
///
/// This method returns, through the [EventRemoval] enumeration, the action that was performed.
pub(crate) fn remove_older(
&mut self,
key_expr: &Option<OwnedKeyExpr>,
timestamp: &Timestamp,
) -> EventRemoval {
pub(crate) fn remove_older(&mut self, event_to_remove: &EventMetadata) -> EventRemoval {
let mut sub_interval_idx_to_remove = None;
let mut result = EventRemoval::NotFound;

for (sub_interval_idx, sub_interval) in self.sub_intervals.iter_mut() {
result = sub_interval.remove_older(key_expr, timestamp);
result = sub_interval.remove_older(event_to_remove);
if let EventRemoval::RemovedOlder(ref old_event) = result {
self.fingerprint ^= old_event.fingerprint();
if sub_interval.events.is_empty() {
Expand All @@ -216,6 +215,54 @@ impl Interval {

result
}

pub(crate) fn remove_event(
&mut self,
sub_interval_idx: &SubIntervalIdx,
event_to_remove: &EventMetadata,
) -> Option<Event> {
let removed_event = self
.sub_intervals
.get_mut(sub_interval_idx)
.and_then(|sub_interval| sub_interval.remove_event(event_to_remove));

if let Some(event) = &removed_event {
self.fingerprint ^= event.fingerprint();
}

removed_event
}

/// Removes and returns the [Event] present in this `Interval` that are overridden by the
/// provided Wildcard Update.
///
/// If the Wildcard Update should be recorded in this `Interval` then the index of the
/// `SubInterval` should also be provided as the removal can be stopped right after: all the
/// [Event]s contained in greater `SubInterval` will, by construction of the Replication Log,
/// only have greater timestamps and thus cannot be overridden by this Wildcard Update.
pub(crate) fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
wildcard_key_expr: &OwnedKeyExpr,
wildcard_timestamp: &Timestamp,
wildcard_kind: SampleKind,
) -> HashSet<Event> {
let mut overridden_events = HashSet::new();
for sub_interval in self.sub_intervals.values_mut() {
self.fingerprint ^= sub_interval.fingerprint;

overridden_events.extend(sub_interval.remove_events_overridden_by_wildcard_update(
prefix,
wildcard_key_expr,
wildcard_timestamp,
wildcard_kind,
));

self.fingerprint ^= sub_interval.fingerprint;
}

overridden_events
}
}

/// A `SubIntervalIdx` represents the index of a [SubInterval].
Expand Down Expand Up @@ -249,7 +296,7 @@ pub(crate) struct SubInterval {
fingerprint: Fingerprint,
// ⚠️ This field should remain private: we cannot manipulate the `Events` without updating the
// Fingerprint.
events: HashMap<Option<OwnedKeyExpr>, Event>,
events: HashMap<LogLatestKey, Event>,
}

impl<const N: usize> From<[Event; N]> for SubInterval {
Expand All @@ -262,7 +309,7 @@ impl<const N: usize> From<[Event; N]> for SubInterval {
fingerprint,
events: events
.into_iter()
.map(|event| (event.key_expr().clone(), event))
.map(|event| (event.log_key(), event))
.collect(),
}
}
Expand All @@ -276,15 +323,12 @@ impl SubInterval {
///
/// ⚠️ This method will only be called if Zenoh is compiled in Debug mode.
#[cfg(debug_assertions)]
fn assert_only_one_event_per_key_expr(
&self,
events: &mut HashSet<Option<OwnedKeyExpr>>,
) -> bool {
for event_ke in self.events.keys() {
if !events.insert(event_ke.clone()) {
fn assert_only_one_event_per_key_expr(&self, events: &mut HashSet<LogLatestKey>) -> bool {
for event_log_key in self.events.keys() {
if !events.insert(event_log_key.clone()) {
tracing::error!(
"FATAL ERROR, REPLICATION LOG INVARIANT VIOLATED, KEY APPEARS MULTIPLE TIMES: \
< {event_ke:?} >"
< {event_log_key:?} >"
);
return false;
}
Expand Down Expand Up @@ -323,7 +367,7 @@ impl SubInterval {
/// be updated to keep it correct and a warning message will be emitted.
fn insert_unchecked(&mut self, event: Event) {
self.fingerprint ^= event.fingerprint();
if let Some(replaced_event) = self.events.insert(event.key_expr().clone(), event) {
if let Some(replaced_event) = self.events.insert(event.log_key(), event) {
tracing::warn!(
"Call to `insert_unchecked` replaced an Event in the replication Log, this should \
NOT have happened: {replaced_event:?}"
Expand All @@ -339,13 +383,9 @@ impl SubInterval {
/// performed.
///
/// The [Fingerprint] of this SubInterval will be updated accordingly.
fn remove_older(
&mut self,
key_expr: &Option<OwnedKeyExpr>,
timestamp: &Timestamp,
) -> EventRemoval {
if let Some((key_expr, event)) = self.events.remove_entry(key_expr) {
if event.timestamp() < timestamp {
fn remove_older(&mut self, event_to_remove: &EventMetadata) -> EventRemoval {
if let Some((key_expr, event)) = self.events.remove_entry(&event_to_remove.log_key()) {
if event.timestamp() < &event_to_remove.timestamp {
self.fingerprint ^= event.fingerprint();
return EventRemoval::RemovedOlder(event);
} else {
Expand All @@ -356,6 +396,49 @@ impl SubInterval {

EventRemoval::NotFound
}

fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
self.events.get(&event_to_lookup.log_key())
}

fn remove_event(&mut self, event_to_remove: &EventMetadata) -> Option<Event> {
let removed_event = self.events.remove(&event_to_remove.log_key());
if let Some(event) = &removed_event {
self.fingerprint ^= event.fingerprint();
}

removed_event
}

/// Removes and returns the [Event] present in this `SubInterval` that are overridden by the
/// provided Wildcard Update.
///
/// The timestamp of the Wildcard Update should only be provided if the considered `SubInterval`
/// is where the Wildcard Update should be recorded.
/// It is only in that specific scenario that we are not sure that all [Event]s have a lower
/// timestamp.
fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
wildcard_key_expr: &OwnedKeyExpr,
wildcard_timestamp: &Timestamp,
wildcard_kind: SampleKind,
) -> HashSet<Event> {
let overridden_events =
crate::replication::core::remove_events_overridden_by_wildcard_update(
&mut self.events,
prefix,
wildcard_key_expr,
wildcard_timestamp,
wildcard_kind,
);

overridden_events
.iter()
.for_each(|overridden_event| self.fingerprint ^= overridden_event.fingerprint());

overridden_events
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl Configuration {
}
}

pub fn prefix(&self) -> Option<&OwnedKeyExpr> {
self.prefix.as_ref()
}

/// Returns the [Fingerprint] of the `Configuration`.
///
/// The fingerprint is the hash of all its fields, using the `xxhash_rust` crate.
Expand Down
110 changes: 100 additions & 10 deletions plugins/zenoh-plugin-storage-manager/src/replication/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,31 @@ mod aligner_query;
mod aligner_reply;

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant, SystemTime},
};

use rand::Rng;
use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use tokio::{sync::RwLock, task::JoinHandle};
use tracing::{debug_span, Instrument};
use zenoh::{
key_expr::{
format::{kedefine, keformat},
OwnedKeyExpr,
},
query::{ConsolidationMode, Selector},
sample::Locality,
sample::{Locality, SampleKind},
time::Timestamp,
Session,
};
use zenoh_backend_traits::Storage;

use self::aligner_reply::AlignmentReply;
use super::{digest::Digest, log::LogLatest};
use crate::{replication::core::aligner_query::AlignmentQuery, storages_mgt::LatestUpdates};
use super::{digest::Digest, log::LogLatest, Action, Event, LogLatestKey};
use crate::{
replication::core::aligner_query::AlignmentQuery,
storages_mgt::{LatestUpdates, StorageService},
};

kedefine!(
pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}",
Expand All @@ -53,7 +53,7 @@ pub(crate) struct Replication {
pub(crate) replication_log: Arc<RwLock<LogLatest>>,
pub(crate) storage_key_expr: OwnedKeyExpr,
pub(crate) latest_updates: Arc<RwLock<LatestUpdates>>,
pub(crate) storage: Arc<Mutex<Box<dyn Storage>>>,
pub(crate) storage_service: Arc<StorageService>,
}

impl Replication {
Expand Down Expand Up @@ -579,3 +579,93 @@ impl Replication {
})
}
}

pub(crate) fn remove_events_overridden_by_wildcard_update(
events: &mut HashMap<LogLatestKey, Event>,
prefix: Option<&OwnedKeyExpr>,
wildcard_ke: &OwnedKeyExpr,
wildcard_ts: &Timestamp,
wildcard_kind: SampleKind,
) -> HashSet<Event> {
let mut overridden_events = HashSet::default();

events.retain(|_, event| {
// We only provide the timestamp of the Wildcard Update if the Wildcard Update belongs
// in this SubInterval.
//
// Only then do we need to compare its timestamp with the timestamp of the Events
// contained in the SubInterval.
if event.timestamp() >= wildcard_ts {
// Very specific scenario: we are processing a Wildcard Delete that should have been
// applied before another Wildcard Update.
//
// With an example, we had the events:
// - put "a = 1" @t0
// - put "** = 42" @t2
//
// That leads the Event in the Replication Log associated to "a" to be:
// - timestamp = @t2
// - timestamp_last_non_wildcard_update = @t0
//
// And now we receive:
// - delete "**" @t1 (@t0 < @t1 < @t2)
//
// As the Wildcard Delete should have arrived before the Wildcard Update (put "** =
// 42"), "a" should have been deleted and the Wildcard Update Put not applied.
//
// These `if` check that very specific scenario. Basically we should only retain the
// Event if its `timestamp_last_non_wildcard_update` exists and is greater than the
// timestamp of the Wildcard Delete.
if wildcard_kind == SampleKind::Delete && event.action() == &Action::Put {
if let Some(timestamp_last_non_wildcard_update) =
event.timestamp_last_non_wildcard_update
{
if timestamp_last_non_wildcard_update > *wildcard_ts {
return true;
}
}
} else {
return true;
}
}

let full_event_key_expr = match event.action() {
// We do not want to override deleted Events, either with another Wildcard Update or
// with a Wildcard Delete.
Action::Delete => return true,
Action::Put => match crate::prefix(prefix, event.stripped_key.as_ref()) {
Ok(full_ke) => full_ke,
Err(e) => {
tracing::error!(
"Internal error while attempting to prefix < {:?} > with < {:?} >: {e:?}",
event.stripped_key,
prefix
);
return true;
}
},
Action::WildcardPut(wildcard_ke) | Action::WildcardDelete(wildcard_ke) => {
wildcard_ke.clone()
}
};

if wildcard_ke.includes(&full_event_key_expr) {
// A Wildcard Update cannot override a Wildcard Delete. A Wildcard Delete can only be
// overridden by another Wildcard Delete.
//
// The opposite is not true: a Wildcard Delete can override a Wildcard Update.
if wildcard_kind == SampleKind::Put && matches!(event.action, Action::WildcardDelete(_))
{
return true;
}

overridden_events.insert(event.clone());
tracing::trace!("(Wildcard Update) REMOVING < {event:?} >");
return false;
}

true
});

overridden_events
}
Loading

0 comments on commit 902f984

Please sign in to comment.