Skip to content

Commit

Permalink
WIP: refactor(storage-manager): overriding_wild_update without get
Browse files Browse the repository at this point in the history
- [ ] Unit test: changes in #1499 will simplify adding Wildcard Updates

The previous implementation was always performing a `get` on the Storage
when checking if a received publication could be overridden by a
Wildcard Update.

This commit changes that behaviour and removes the call to the
Storage. In addition to potentially creating more network exchanges,
this was also generating "warning" logs during the alignment process
when Wildcard Updates are involved.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs
* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
* plugins/zenoh-plugin-storage-manager/src/storages_mgt/tests/service.tests.rs

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Oct 8, 2024
1 parent 0985806 commit 13b54fd
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub enum StorageMessage {

pub(crate) type LatestUpdates = HashMap<Option<OwnedKeyExpr>, Event>;

#[derive(Clone)]
#[derive(Default, Clone)]
pub(crate) struct CacheLatest {
pub(crate) latest_updates: Arc<RwLock<LatestUpdates>>,
pub(crate) replication_log: Option<Arc<RwLock<LogLatest>>>,
Expand Down
62 changes: 23 additions & 39 deletions plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,48 +431,28 @@ impl StorageService {
key_expr: &OwnedKeyExpr,
timestamp: &Timestamp,
) -> Option<Update> {
// check wild card store for any futuristic update
let wildcards = self.wildcard_updates.read().await;
let mut ts = timestamp;
let mut update = None;

let prefix = self.configuration.strip_prefix.as_ref();

for node in wildcards.intersecting_keys(key_expr) {
let weight = wildcards.weight_at(&node);
if weight.is_some() && weight.unwrap().data.timestamp > *ts {
// if the key matches a wild card update, check whether it was saved in storage
// remember that wild card updates change only existing keys
let stripped_key = match crate::strip_prefix(prefix, &key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
break;
}
};
let mut storage = self.storage.lock().await;
match storage.get(stripped_key, "").await {
Ok(stored_data) => {
for entry in stored_data {
if entry.timestamp > *ts {
return None;
}
}
}
Err(e) => {
tracing::warn!(
"Storage '{}' raised an error fetching a query on key {} : {}",
self.name,
key_expr,
e
);
ts = &weight.unwrap().data.timestamp;
update = Some(weight.unwrap().clone());
let mut ts = *timestamp;

let update = wildcards
.intersecting_keys(key_expr)
.fold(None, |acc, node| {
if let Some(update) = wildcards.weight_at(&node) {
// NOTE: As an optimisation for the Replication, we also want to retrieve
// Wildcard Update that have exactly the same timestamp.
//
// If the Aligner detects that an Event is missing and that Event is overridden
// by a Wildcard Update, we can avoid retrieving its associated payload if we
// have already retrieved the Wildcard Update.
if update.data.timestamp >= ts {
ts = update.data.timestamp;
return Some(node);
}
}
}
}
update
acc
});

update.and_then(|node| wildcards.weight_at(&node)).cloned()
}

/// Returns a guard over the cache if the provided [Timestamp] is more recent than what is kept
Expand Down Expand Up @@ -732,3 +712,7 @@ impl Timed for GarbageCollectionEvent {
tracing::trace!("End garbage collection of obsolete data-infos");
}
}

#[cfg(test)]
#[path = "./tests/service.tests.rs"]
mod tests;
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::Arc;

use tokio::sync::{Mutex, RwLock};
use zenoh::{
internal::Value,
key_expr::{
keyexpr_tree::{KeBoxTree, KeyedSetProvider, NonWild, UnknownWildness},
OwnedKeyExpr,
},
time::Timestamp,
Result as ZResult,
};
use zenoh_backend_traits::{
config::{GarbageCollectionConfig, StorageConfig},
Capability, Storage, StorageInsertionResult, StoredData,
};

use super::{StorageService, Update};
use crate::storages_mgt::CacheLatest;

struct DummyStorage;

#[async_trait::async_trait]
impl Storage for DummyStorage {
fn get_admin_status(&self) -> serde_json::Value {
todo!()
}

async fn put(
&mut self,
_key: Option<OwnedKeyExpr>,
_value: Value,
_timestamp: Timestamp,
) -> ZResult<StorageInsertionResult> {
todo!()
}

async fn delete(
&mut self,
_key: Option<OwnedKeyExpr>,
_timestamp: Timestamp,
) -> ZResult<StorageInsertionResult> {
todo!()
}

async fn get(
&mut self,
_key: Option<OwnedKeyExpr>,
_parameters: &str,
) -> ZResult<Vec<StoredData>> {
todo!()
}

async fn get_all_entries(&self) -> ZResult<Vec<(Option<OwnedKeyExpr>, Timestamp)>> {
todo!()
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_overriding_wild_update() {
let _storage_service = StorageService {
session: Arc::new(zenoh::open(zenoh::Config::default()).await.unwrap()),
configuration: StorageConfig {
name: "test".to_string(),
key_expr: OwnedKeyExpr::new("test/**").unwrap(),
complete: true,
strip_prefix: Some(OwnedKeyExpr::new("test").unwrap()),
volume_id: "test-volume".to_string(),
volume_cfg: serde_json::json!({}),
garbage_collection_config: GarbageCollectionConfig::default(),
replication: None,
},
name: "test-storage".to_string(),
storage: Arc::new(Mutex::new(Box::new(DummyStorage {}) as Box<dyn Storage>)),
capability: Capability {
persistence: zenoh_backend_traits::Persistence::Volatile,
history: zenoh_backend_traits::History::Latest,
},
tombstones: Arc::new(RwLock::new(
KeBoxTree::<Timestamp, NonWild, KeyedSetProvider>::default(),
)),
wildcard_updates: Arc::new(RwLock::new(KeBoxTree::<
Update,
UnknownWildness,
KeyedSetProvider,
>::default())),
cache_latest: CacheLatest::default(),
};
}

0 comments on commit 13b54fd

Please sign in to comment.