diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 3e5a2e1f09..77cd935ebf 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -33,7 +33,7 @@ pub enum StorageMessage { pub(crate) type LatestUpdates = HashMap, Event>; -#[derive(Clone)] +#[derive(Default, Clone)] pub(crate) struct CacheLatest { pub(crate) latest_updates: Arc>, pub(crate) replication_log: Option>>, diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index 8ae3d77634..e8f1b43c4f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -431,48 +431,28 @@ impl StorageService { key_expr: &OwnedKeyExpr, timestamp: &Timestamp, ) -> Option { - // check wild card store for any futuristic update let wildcards = self.wildcard_updates.read().await; - let mut ts = timestamp; - let mut update = None; - - let prefix = self.configuration.strip_prefix.as_ref(); - - for node in wildcards.intersecting_keys(key_expr) { - let weight = wildcards.weight_at(&node); - if weight.is_some() && weight.unwrap().data.timestamp > *ts { - // if the key matches a wild card update, check whether it was saved in storage - // remember that wild card updates change only existing keys - let stripped_key = match crate::strip_prefix(prefix, &key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - tracing::error!("{}", e); - break; - } - }; - let mut storage = self.storage.lock().await; - match storage.get(stripped_key, "").await { - Ok(stored_data) => { - for entry in stored_data { - if entry.timestamp > *ts { - return None; - } - } - } - Err(e) => { - tracing::warn!( - "Storage '{}' raised an error fetching a query on key {} : {}", - self.name, - key_expr, - e - ); - ts = &weight.unwrap().data.timestamp; - update = Some(weight.unwrap().clone()); + let mut ts = *timestamp; + + let update = wildcards + .intersecting_keys(key_expr) + .fold(None, |acc, node| { + if let Some(update) = wildcards.weight_at(&node) { + // NOTE: As an optimisation for the Replication, we also want to retrieve + // Wildcard Update that have exactly the same timestamp. + // + // If the Aligner detects that an Event is missing and that Event is overridden + // by a Wildcard Update, we can avoid retrieving its associated payload if we + // have already retrieved the Wildcard Update. + if update.data.timestamp >= ts { + ts = update.data.timestamp; + return Some(node); } } - } - } - update + acc + }); + + update.and_then(|node| wildcards.weight_at(&node)).cloned() } /// Returns a guard over the cache if the provided [Timestamp] is more recent than what is kept @@ -732,3 +712,7 @@ impl Timed for GarbageCollectionEvent { tracing::trace!("End garbage collection of obsolete data-infos"); } } + +#[cfg(test)] +#[path = "./tests/service.tests.rs"] +mod tests; diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/tests/service.tests.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/tests/service.tests.rs new file mode 100644 index 0000000000..ee3d48a02c --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/tests/service.tests.rs @@ -0,0 +1,103 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::sync::Arc; + +use tokio::sync::{Mutex, RwLock}; +use zenoh::{ + internal::Value, + key_expr::{ + keyexpr_tree::{KeBoxTree, KeyedSetProvider, NonWild, UnknownWildness}, + OwnedKeyExpr, + }, + time::Timestamp, + Result as ZResult, +}; +use zenoh_backend_traits::{ + config::{GarbageCollectionConfig, StorageConfig}, + Capability, Storage, StorageInsertionResult, StoredData, +}; + +use super::{StorageService, Update}; +use crate::storages_mgt::CacheLatest; + +struct DummyStorage; + +#[async_trait::async_trait] +impl Storage for DummyStorage { + fn get_admin_status(&self) -> serde_json::Value { + todo!() + } + + async fn put( + &mut self, + _key: Option, + _value: Value, + _timestamp: Timestamp, + ) -> ZResult { + todo!() + } + + async fn delete( + &mut self, + _key: Option, + _timestamp: Timestamp, + ) -> ZResult { + todo!() + } + + async fn get( + &mut self, + _key: Option, + _parameters: &str, + ) -> ZResult> { + todo!() + } + + async fn get_all_entries(&self) -> ZResult, Timestamp)>> { + todo!() + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_overriding_wild_update() { + let _storage_service = StorageService { + session: Arc::new(zenoh::open(zenoh::Config::default()).await.unwrap()), + configuration: StorageConfig { + name: "test".to_string(), + key_expr: OwnedKeyExpr::new("test/**").unwrap(), + complete: true, + strip_prefix: Some(OwnedKeyExpr::new("test").unwrap()), + volume_id: "test-volume".to_string(), + volume_cfg: serde_json::json!({}), + garbage_collection_config: GarbageCollectionConfig::default(), + replication: None, + }, + name: "test-storage".to_string(), + storage: Arc::new(Mutex::new(Box::new(DummyStorage {}) as Box)), + capability: Capability { + persistence: zenoh_backend_traits::Persistence::Volatile, + history: zenoh_backend_traits::History::Latest, + }, + tombstones: Arc::new(RwLock::new( + KeBoxTree::::default(), + )), + wildcard_updates: Arc::new(RwLock::new(KeBoxTree::< + Update, + UnknownWildness, + KeyedSetProvider, + >::default())), + cache_latest: CacheLatest::default(), + }; +}