Skip to content

Commit

Permalink
Put ProtocolID and ChunkDescriptor into SHM metadata: less wire overh…
Browse files Browse the repository at this point in the history
…ead and less structure sizes for SHM
  • Loading branch information
yellowhatter committed Jan 15, 2025
1 parent 95265b0 commit 4a46568
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 37 deletions.
14 changes: 1 addition & 13 deletions commons/zenoh-codec/src/core/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,11 @@ where

fn write(self, writer: &mut W, x: &ShmBufInfo) -> Self::Output {
let ShmBufInfo {
data_descriptor,
shm_protocol,
data_len,
metadata,
generation,
} = x;

self.write(&mut *writer, data_descriptor)?;
self.write(&mut *writer, shm_protocol)?;
self.write(&mut *writer, *data_len)?;
self.write(&mut *writer, metadata)?;
self.write(&mut *writer, generation)?;
Expand Down Expand Up @@ -139,19 +135,11 @@ where
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<ShmBufInfo, Self::Error> {
let data_descriptor = self.read(&mut *reader)?;
let shm_protocol = self.read(&mut *reader)?;
let data_len = self.read(&mut *reader)?;
let metadata = self.read(&mut *reader)?;
let generation = self.read(&mut *reader)?;

let shm_info = ShmBufInfo::new(
data_descriptor,
shm_protocol,
data_len,
metadata,
generation,
);
let shm_info = ShmBufInfo::new(data_len, metadata, generation);
Ok(shm_info)
}
}
2 changes: 0 additions & 2 deletions commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,6 @@ fn codec_shm_info() {
run!(ShmBufInfo, {
let mut rng = rand::thread_rng();
ShmBufInfo::new(
ChunkDescriptor::new(rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
rng.gen(),
MetadataDescriptor {
id: rng.gen(),
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/api/provider/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::api::common::types::{ChunkID, SegmentID};
/// Uniquely identifies the particular chunk within particular segment
#[zenoh_macros::unstable_doc]
#[derive(Clone, Debug, PartialEq, Eq)]
#[stabby::stabby]
pub struct ChunkDescriptor {
pub segment: SegmentID,
pub chunk: ChunkID,
Expand Down
25 changes: 21 additions & 4 deletions commons/zenoh-shm/src/api/provider/shm_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,19 +908,36 @@ where
allocated_metadata: AllocatedMetadataDescriptor,
confirmed_metadata: ConfirmedDescriptor,
) -> ShmBufInner {
// write additional metadata
// chunk descriptor
allocated_metadata
.header()
.segment
.store(chunk.descriptor.segment, Ordering::Relaxed);
allocated_metadata
.header()
.chunk
.store(chunk.descriptor.chunk, Ordering::Relaxed);
allocated_metadata
.header()
.len
.store(chunk.descriptor.len.into(), Ordering::Relaxed);
// protocol
allocated_metadata
.header()
.protocol
.store(self.id.id(), Ordering::Relaxed);

// add watchdog to validator
GLOBAL_VALIDATOR
.read()
.add(confirmed_metadata.owned.clone());

// Create buffer's info
let info = ShmBufInfo::new(
chunk.descriptor.clone(),
self.id.id(),
len,
MetadataDescriptor::from(&confirmed_metadata.owned),
confirmed_metadata
.owned
allocated_metadata
.header()
.generation
.load(Ordering::SeqCst),
Expand Down
10 changes: 9 additions & 1 deletion commons/zenoh-shm/src/header/chunk_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize};

// Chunk header
#[stabby::stabby]
Expand All @@ -25,4 +25,12 @@ pub struct ChunkHeaderType {
pub refcount: AtomicU32,
pub watchdog_invalidated: AtomicBool,
pub generation: AtomicU32,

/// Protocol identifier for particular SHM implementation
pub protocol: AtomicU32,

/// The data chunk descriptor
pub segment: AtomicU32,
pub chunk: AtomicU32,
pub len: AtomicUsize,
}
18 changes: 9 additions & 9 deletions commons/zenoh-shm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
},
};

use api::{common::types::ProtocolID, provider::chunk::ChunkDescriptor};
use api::common::types::ProtocolID;
use metadata::descriptor::MetadataDescriptor;
use watchdog::confirmator::ConfirmedDescriptor;
use zenoh_buffers::ZSliceBuffer;
Expand Down Expand Up @@ -64,10 +64,6 @@ pub mod watchdog;
/// This that can be serialized and can be used to retrieve the [`ShmBufInner`] in a remote process.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ShmBufInfo {
/// The data chunk descriptor
pub data_descriptor: ChunkDescriptor,
/// Protocol identifier for particular SHM implementation
pub shm_protocol: ProtocolID,
/// Actual data length
/// NOTE: data_descriptor's len is >= of this len and describes the actual memory length
/// dedicated in shared memory segment for this particular buffer.
Expand All @@ -81,15 +77,11 @@ pub struct ShmBufInfo {

impl ShmBufInfo {
pub fn new(
data_descriptor: ChunkDescriptor,
shm_protocol: ProtocolID,
data_len: NonZeroUsize,
metadata: MetadataDescriptor,
generation: u32,
) -> ShmBufInfo {
ShmBufInfo {
data_descriptor,
shm_protocol,
data_len,
metadata,
generation,
Expand Down Expand Up @@ -125,6 +117,14 @@ impl std::fmt::Debug for ShmBufInner {
}

impl ShmBufInner {
pub fn protocol(&self) -> ProtocolID {
self.metadata
.owned
.header()
.protocol
.load(Ordering::Relaxed)
}

pub fn len(&self) -> NonZeroUsize {
self.info.data_len
}
Expand Down
24 changes: 18 additions & 6 deletions commons/zenoh-shm/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
client_storage::ShmClientStorage,
common::types::{ProtocolID, SegmentID},
},
header::chunk_header::ChunkHeaderType,
metadata::subscription::GLOBAL_METADATA_SUBSCRIPTION,
watchdog::confirmator::GLOBAL_CONFIRMATOR,
ShmBufInfo, ShmBufInner,
Expand All @@ -46,18 +47,25 @@ impl ShmReader {
Self { client_storage }
}

pub fn read_shmbuf(&self, info: &ShmBufInfo) -> ZResult<ShmBufInner> {
pub fn read_shmbuf(&self, info: ShmBufInfo) -> ZResult<ShmBufInner> {
// Read does not increment the reference count as it is assumed
// that the sender of this buffer has incremented it for us.

let metadata = GLOBAL_METADATA_SUBSCRIPTION.read().link(&info.metadata)?;
// attach to the watchdog before doing other things
let confirmed_metadata = Arc::new(GLOBAL_CONFIRMATOR.read().add(metadata));

let segment = self.ensure_data_segment(info)?;
let segment = self.ensure_data_segment(confirmed_metadata.owned.header())?;
let buf = segment.map(
confirmed_metadata
.owned
.header()
.chunk
.load(std::sync::atomic::Ordering::Relaxed),
)?;
let shmb = ShmBufInner {
metadata: confirmed_metadata,
buf: segment.map(info.data_descriptor.chunk)?,
buf,
info: info.clone(),
};

Expand All @@ -68,8 +76,11 @@ impl ShmReader {
}
}

fn ensure_data_segment(&self, info: &ShmBufInfo) -> ZResult<Arc<dyn ShmSegment>> {
let id = GlobalDataSegmentID::new(info.shm_protocol, info.data_descriptor.segment);
fn ensure_data_segment(&self, header: &ChunkHeaderType) -> ZResult<Arc<dyn ShmSegment>> {
let id = GlobalDataSegmentID::new(
header.protocol.load(std::sync::atomic::Ordering::Relaxed),
header.segment.load(std::sync::atomic::Ordering::Relaxed),
);

// fastest path: try to get access to already mounted SHM segment
// read lock allows concurrent execution of multiple requests
Expand Down Expand Up @@ -99,7 +110,8 @@ impl ShmReader {

// (common case) mount a new segment and add it to the map
std::collections::hash_map::Entry::Vacant(vacant) => {
let new_segment = client.attach(info.data_descriptor.segment)?;
let new_segment =
client.attach(header.segment.load(std::sync::atomic::Ordering::Relaxed))?;
Ok(vacant.insert(new_segment).clone())
}
}
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ fn to_shm_partner<ShmCfg: PartnerShmConfig>(
let mut res = false;
for zs in zbuf.zslices_mut() {
if let Some(shmb) = zs.downcast_ref::<ShmBufInner>() {
if partner_shm_cfg.supports_protocol(shmb.info.shm_protocol) {
if partner_shm_cfg.supports_protocol(shmb.protocol()) {
*zs = shmbuf_to_shminfo(shmb)?;
res = true;
} else {
Expand Down Expand Up @@ -331,7 +331,7 @@ pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &ShmReader) -> ZResult<()
let shmbinfo: ShmBufInfo = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?;

// Mount shmbuf
let smb = shmr.read_shmbuf(&shmbinfo)?;
let smb = shmr.read_shmbuf(shmbinfo)?;

// Replace the content of the slice
*zslice = smb.into();
Expand Down

0 comments on commit 4a46568

Please sign in to comment.