From 13b54fd4d668aa6515dd10141416f658078aa325 Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Tue, 8 Oct 2024 11:35:31 +0200 Subject: [PATCH] WIP: refactor(storage-manager): `overriding_wild_update` without get - [ ] Unit test: changes in #1499 will simplify adding Wildcard Updates The previous implementation was always performing a `get` on the Storage when checking if a received publication could be overridden by a Wildcard Update. This commit changes that behaviour and removes the call to the Storage. In addition to potentially creating more network exchanges, this was also generating "warning" logs during the alignment process when Wildcard Updates are involved. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs * plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs * plugins/zenoh-plugin-storage-manager/src/storages_mgt/tests/service.tests.rs Signed-off-by: Julien Loudet --- .../src/storages_mgt/mod.rs | 2 +- .../src/storages_mgt/service.rs | 62 ++++------- .../src/storages_mgt/tests/service.tests.rs | 103 ++++++++++++++++++ 3 files changed, 127 insertions(+), 40 deletions(-) create mode 100644 plugins/zenoh-plugin-storage-manager/src/storages_mgt/tests/service.tests.rs diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 3e5a2e1f09..77cd935ebf 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -33,7 +33,7 @@ pub enum StorageMessage { pub(crate) type LatestUpdates = HashMap, Event>; -#[derive(Clone)] +#[derive(Default, Clone)] pub(crate) struct CacheLatest { pub(crate) latest_updates: Arc>, pub(crate) replication_log: Option>>, diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index 8ae3d77634..e8f1b43c4f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -431,48 +431,28 @@ impl StorageService { key_expr: &OwnedKeyExpr, timestamp: &Timestamp, ) -> Option { - // check wild card store for any futuristic update let wildcards = self.wildcard_updates.read().await; - let mut ts = timestamp; - let mut update = None; - - let prefix = self.configuration.strip_prefix.as_ref(); - - for node in wildcards.intersecting_keys(key_expr) { - let weight = wildcards.weight_at(&node); - if weight.is_some() && weight.unwrap().data.timestamp > *ts { - // if the key matches a wild card update, check whether it was saved in storage - // remember that wild card updates change only existing keys - let stripped_key = match crate::strip_prefix(prefix, &key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - tracing::error!("{}", e); - break; - } - }; - let mut storage = self.storage.lock().await; - match storage.get(stripped_key, "").await { - Ok(stored_data) => { - for entry in stored_data { - if entry.timestamp > *ts { - return None; - } - } - } - Err(e) => { - tracing::warn!( - "Storage '{}' raised an error fetching a query on key {} : {}", - self.name, - key_expr, - e - ); - ts = &weight.unwrap().data.timestamp; - update = Some(weight.unwrap().clone()); + let mut ts = *timestamp; + + let update = wildcards + .intersecting_keys(key_expr) + .fold(None, |acc, node| { + if let Some(update) = wildcards.weight_at(&node) { + // NOTE: As an optimisation for the Replication, we also want to retrieve + // Wildcard Update that have exactly the same timestamp. + // + // If the Aligner detects that an Event is missing and that Event is overridden + // by a Wildcard Update, we can avoid retrieving its associated payload if we + // have already retrieved the Wildcard Update. + if update.data.timestamp >= ts { + ts = update.data.timestamp; + return Some(node); } } - } - } - update + acc + }); + + update.and_then(|node| wildcards.weight_at(&node)).cloned() } /// Returns a guard over the cache if the provided [Timestamp] is more recent than what is kept @@ -732,3 +712,7 @@ impl Timed for GarbageCollectionEvent { tracing::trace!("End garbage collection of obsolete data-infos"); } } + +#[cfg(test)] +#[path = "./tests/service.tests.rs"] +mod tests; diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/tests/service.tests.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/tests/service.tests.rs new file mode 100644 index 0000000000..ee3d48a02c --- /dev/null +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/tests/service.tests.rs @@ -0,0 +1,103 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::sync::Arc; + +use tokio::sync::{Mutex, RwLock}; +use zenoh::{ + internal::Value, + key_expr::{ + keyexpr_tree::{KeBoxTree, KeyedSetProvider, NonWild, UnknownWildness}, + OwnedKeyExpr, + }, + time::Timestamp, + Result as ZResult, +}; +use zenoh_backend_traits::{ + config::{GarbageCollectionConfig, StorageConfig}, + Capability, Storage, StorageInsertionResult, StoredData, +}; + +use super::{StorageService, Update}; +use crate::storages_mgt::CacheLatest; + +struct DummyStorage; + +#[async_trait::async_trait] +impl Storage for DummyStorage { + fn get_admin_status(&self) -> serde_json::Value { + todo!() + } + + async fn put( + &mut self, + _key: Option, + _value: Value, + _timestamp: Timestamp, + ) -> ZResult { + todo!() + } + + async fn delete( + &mut self, + _key: Option, + _timestamp: Timestamp, + ) -> ZResult { + todo!() + } + + async fn get( + &mut self, + _key: Option, + _parameters: &str, + ) -> ZResult> { + todo!() + } + + async fn get_all_entries(&self) -> ZResult, Timestamp)>> { + todo!() + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_overriding_wild_update() { + let _storage_service = StorageService { + session: Arc::new(zenoh::open(zenoh::Config::default()).await.unwrap()), + configuration: StorageConfig { + name: "test".to_string(), + key_expr: OwnedKeyExpr::new("test/**").unwrap(), + complete: true, + strip_prefix: Some(OwnedKeyExpr::new("test").unwrap()), + volume_id: "test-volume".to_string(), + volume_cfg: serde_json::json!({}), + garbage_collection_config: GarbageCollectionConfig::default(), + replication: None, + }, + name: "test-storage".to_string(), + storage: Arc::new(Mutex::new(Box::new(DummyStorage {}) as Box)), + capability: Capability { + persistence: zenoh_backend_traits::Persistence::Volatile, + history: zenoh_backend_traits::History::Latest, + }, + tombstones: Arc::new(RwLock::new( + KeBoxTree::::default(), + )), + wildcard_updates: Arc::new(RwLock::new(KeBoxTree::< + Update, + UnknownWildness, + KeyedSetProvider, + >::default())), + cache_latest: CacheLatest::default(), + }; +}