From 4a465686c58ff31f0cd0d787f012253c045d7a42 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 15 Jan 2025 17:56:58 +0300 Subject: [PATCH] Put ProtocolID and ChunkDescriptor into SHM metadata: less wire overhead and less structure sizes for SHM --- commons/zenoh-codec/src/core/shm.rs | 14 +---------- commons/zenoh-codec/tests/codec.rs | 2 -- commons/zenoh-shm/src/api/provider/chunk.rs | 1 + .../src/api/provider/shm_provider.rs | 25 ++++++++++++++++--- commons/zenoh-shm/src/header/chunk_header.rs | 10 +++++++- commons/zenoh-shm/src/lib.rs | 18 ++++++------- commons/zenoh-shm/src/reader.rs | 24 +++++++++++++----- io/zenoh-transport/src/shm.rs | 4 +-- 8 files changed, 61 insertions(+), 37 deletions(-) diff --git a/commons/zenoh-codec/src/core/shm.rs b/commons/zenoh-codec/src/core/shm.rs index 15cd4c8b2d..0408053679 100644 --- a/commons/zenoh-codec/src/core/shm.rs +++ b/commons/zenoh-codec/src/core/shm.rs @@ -70,15 +70,11 @@ where fn write(self, writer: &mut W, x: &ShmBufInfo) -> Self::Output { let ShmBufInfo { - data_descriptor, - shm_protocol, data_len, metadata, generation, } = x; - self.write(&mut *writer, data_descriptor)?; - self.write(&mut *writer, shm_protocol)?; self.write(&mut *writer, *data_len)?; self.write(&mut *writer, metadata)?; self.write(&mut *writer, generation)?; @@ -139,19 +135,11 @@ where type Error = DidntRead; fn read(self, reader: &mut R) -> Result { - let data_descriptor = self.read(&mut *reader)?; - let shm_protocol = self.read(&mut *reader)?; let data_len = self.read(&mut *reader)?; let metadata = self.read(&mut *reader)?; let generation = self.read(&mut *reader)?; - let shm_info = ShmBufInfo::new( - data_descriptor, - shm_protocol, - data_len, - metadata, - generation, - ); + let shm_info = ShmBufInfo::new(data_len, metadata, generation); Ok(shm_info) } } diff --git a/commons/zenoh-codec/tests/codec.rs b/commons/zenoh-codec/tests/codec.rs index ac27fb868a..af39d9bbc6 100644 --- a/commons/zenoh-codec/tests/codec.rs +++ b/commons/zenoh-codec/tests/codec.rs @@ -368,8 +368,6 @@ fn codec_shm_info() { run!(ShmBufInfo, { let mut rng = rand::thread_rng(); ShmBufInfo::new( - ChunkDescriptor::new(rng.gen(), rng.gen(), rng.gen()), - rng.gen(), rng.gen(), MetadataDescriptor { id: rng.gen(), diff --git a/commons/zenoh-shm/src/api/provider/chunk.rs b/commons/zenoh-shm/src/api/provider/chunk.rs index fe7d0d5cb6..6d8ea8e884 100644 --- a/commons/zenoh-shm/src/api/provider/chunk.rs +++ b/commons/zenoh-shm/src/api/provider/chunk.rs @@ -19,6 +19,7 @@ use crate::api::common::types::{ChunkID, SegmentID}; /// Uniquely identifies the particular chunk within particular segment #[zenoh_macros::unstable_doc] #[derive(Clone, Debug, PartialEq, Eq)] +#[stabby::stabby] pub struct ChunkDescriptor { pub segment: SegmentID, pub chunk: ChunkID, diff --git a/commons/zenoh-shm/src/api/provider/shm_provider.rs b/commons/zenoh-shm/src/api/provider/shm_provider.rs index bb9211716e..eb01351c72 100644 --- a/commons/zenoh-shm/src/api/provider/shm_provider.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider.rs @@ -908,6 +908,26 @@ where allocated_metadata: AllocatedMetadataDescriptor, confirmed_metadata: ConfirmedDescriptor, ) -> ShmBufInner { + // write additional metadata + // chunk descriptor + allocated_metadata + .header() + .segment + .store(chunk.descriptor.segment, Ordering::Relaxed); + allocated_metadata + .header() + .chunk + .store(chunk.descriptor.chunk, Ordering::Relaxed); + allocated_metadata + .header() + .len + .store(chunk.descriptor.len.into(), Ordering::Relaxed); + // protocol + allocated_metadata + .header() + .protocol + .store(self.id.id(), Ordering::Relaxed); + // add watchdog to validator GLOBAL_VALIDATOR .read() @@ -915,12 +935,9 @@ where // Create buffer's info let info = ShmBufInfo::new( - chunk.descriptor.clone(), - self.id.id(), len, MetadataDescriptor::from(&confirmed_metadata.owned), - confirmed_metadata - .owned + allocated_metadata .header() .generation .load(Ordering::SeqCst), diff --git a/commons/zenoh-shm/src/header/chunk_header.rs b/commons/zenoh-shm/src/header/chunk_header.rs index c5eb11bb7c..abc3f25990 100644 --- a/commons/zenoh-shm/src/header/chunk_header.rs +++ b/commons/zenoh-shm/src/header/chunk_header.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -use std::sync::atomic::{AtomicBool, AtomicU32}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize}; // Chunk header #[stabby::stabby] @@ -25,4 +25,12 @@ pub struct ChunkHeaderType { pub refcount: AtomicU32, pub watchdog_invalidated: AtomicBool, pub generation: AtomicU32, + + /// Protocol identifier for particular SHM implementation + pub protocol: AtomicU32, + + /// The data chunk descriptor + pub segment: AtomicU32, + pub chunk: AtomicU32, + pub len: AtomicUsize, } diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index b8303a5763..01b5315559 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -26,7 +26,7 @@ use std::{ }, }; -use api::{common::types::ProtocolID, provider::chunk::ChunkDescriptor}; +use api::common::types::ProtocolID; use metadata::descriptor::MetadataDescriptor; use watchdog::confirmator::ConfirmedDescriptor; use zenoh_buffers::ZSliceBuffer; @@ -64,10 +64,6 @@ pub mod watchdog; /// This that can be serialized and can be used to retrieve the [`ShmBufInner`] in a remote process. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ShmBufInfo { - /// The data chunk descriptor - pub data_descriptor: ChunkDescriptor, - /// Protocol identifier for particular SHM implementation - pub shm_protocol: ProtocolID, /// Actual data length /// NOTE: data_descriptor's len is >= of this len and describes the actual memory length /// dedicated in shared memory segment for this particular buffer. @@ -81,15 +77,11 @@ pub struct ShmBufInfo { impl ShmBufInfo { pub fn new( - data_descriptor: ChunkDescriptor, - shm_protocol: ProtocolID, data_len: NonZeroUsize, metadata: MetadataDescriptor, generation: u32, ) -> ShmBufInfo { ShmBufInfo { - data_descriptor, - shm_protocol, data_len, metadata, generation, @@ -125,6 +117,14 @@ impl std::fmt::Debug for ShmBufInner { } impl ShmBufInner { + pub fn protocol(&self) -> ProtocolID { + self.metadata + .owned + .header() + .protocol + .load(Ordering::Relaxed) + } + pub fn len(&self) -> NonZeroUsize { self.info.data_len } diff --git a/commons/zenoh-shm/src/reader.rs b/commons/zenoh-shm/src/reader.rs index 99d19df913..6751779070 100644 --- a/commons/zenoh-shm/src/reader.rs +++ b/commons/zenoh-shm/src/reader.rs @@ -23,6 +23,7 @@ use crate::{ client_storage::ShmClientStorage, common::types::{ProtocolID, SegmentID}, }, + header::chunk_header::ChunkHeaderType, metadata::subscription::GLOBAL_METADATA_SUBSCRIPTION, watchdog::confirmator::GLOBAL_CONFIRMATOR, ShmBufInfo, ShmBufInner, @@ -46,7 +47,7 @@ impl ShmReader { Self { client_storage } } - pub fn read_shmbuf(&self, info: &ShmBufInfo) -> ZResult { + pub fn read_shmbuf(&self, info: ShmBufInfo) -> ZResult { // Read does not increment the reference count as it is assumed // that the sender of this buffer has incremented it for us. @@ -54,10 +55,17 @@ impl ShmReader { // attach to the watchdog before doing other things let confirmed_metadata = Arc::new(GLOBAL_CONFIRMATOR.read().add(metadata)); - let segment = self.ensure_data_segment(info)?; + let segment = self.ensure_data_segment(confirmed_metadata.owned.header())?; + let buf = segment.map( + confirmed_metadata + .owned + .header() + .chunk + .load(std::sync::atomic::Ordering::Relaxed), + )?; let shmb = ShmBufInner { metadata: confirmed_metadata, - buf: segment.map(info.data_descriptor.chunk)?, + buf, info: info.clone(), }; @@ -68,8 +76,11 @@ impl ShmReader { } } - fn ensure_data_segment(&self, info: &ShmBufInfo) -> ZResult> { - let id = GlobalDataSegmentID::new(info.shm_protocol, info.data_descriptor.segment); + fn ensure_data_segment(&self, header: &ChunkHeaderType) -> ZResult> { + let id = GlobalDataSegmentID::new( + header.protocol.load(std::sync::atomic::Ordering::Relaxed), + header.segment.load(std::sync::atomic::Ordering::Relaxed), + ); // fastest path: try to get access to already mounted SHM segment // read lock allows concurrent execution of multiple requests @@ -99,7 +110,8 @@ impl ShmReader { // (common case) mount a new segment and add it to the map std::collections::hash_map::Entry::Vacant(vacant) => { - let new_segment = client.attach(info.data_descriptor.segment)?; + let new_segment = + client.attach(header.segment.load(std::sync::atomic::Ordering::Relaxed))?; Ok(vacant.insert(new_segment).clone()) } } diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index c562e47135..35aa2c7396 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -293,7 +293,7 @@ fn to_shm_partner( let mut res = false; for zs in zbuf.zslices_mut() { if let Some(shmb) = zs.downcast_ref::() { - if partner_shm_cfg.supports_protocol(shmb.info.shm_protocol) { + if partner_shm_cfg.supports_protocol(shmb.protocol()) { *zs = shmbuf_to_shminfo(shmb)?; res = true; } else { @@ -331,7 +331,7 @@ pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &ShmReader) -> ZResult<() let shmbinfo: ShmBufInfo = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; // Mount shmbuf - let smb = shmr.read_shmbuf(&shmbinfo)?; + let smb = shmr.read_shmbuf(shmbinfo)?; // Replace the content of the slice *zslice = smb.into();