Skip to content

Commit

Permalink
Optimize SHM internals by combining watchdog and header data together…
Browse files Browse the repository at this point in the history
… into sigle "Metadata" structure
  • Loading branch information
yellowhatter committed Jan 14, 2025
1 parent 2c889ba commit 4cb0b50
Show file tree
Hide file tree
Showing 26 changed files with 568 additions and 610 deletions.
55 changes: 10 additions & 45 deletions commons/zenoh-codec/src/core/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,18 @@ use zenoh_buffers::{
writer::{DidntWrite, Writer},
};
use zenoh_shm::{
api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor,
watchdog::descriptor::Descriptor, ShmBufInfo,
api::provider::chunk::ChunkDescriptor, metadata::descriptor::MetadataDescriptor, ShmBufInfo,
};

use crate::{RCodec, WCodec, Zenoh080};

impl<W> WCodec<&Descriptor, &mut W> for Zenoh080
impl<W> WCodec<&MetadataDescriptor, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &Descriptor) -> Self::Output {
self.write(&mut *writer, x.id)?;
self.write(&mut *writer, x.index_and_bitpos)?;
Ok(())
}
}

impl<W> WCodec<&HeaderDescriptor, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &HeaderDescriptor) -> Self::Output {
fn write(self, writer: &mut W, x: &MetadataDescriptor) -> Self::Output {
self.write(&mut *writer, x.id)?;
self.write(&mut *writer, x.index)?;
Ok(())
Expand Down Expand Up @@ -87,49 +73,30 @@ where
data_descriptor,
shm_protocol,
data_len,
watchdog_descriptor,
header_descriptor,
metadata,
generation,
} = x;

self.write(&mut *writer, data_descriptor)?;
self.write(&mut *writer, shm_protocol)?;
self.write(&mut *writer, *data_len)?;
self.write(&mut *writer, watchdog_descriptor)?;
self.write(&mut *writer, header_descriptor)?;
self.write(&mut *writer, metadata)?;
self.write(&mut *writer, generation)?;
Ok(())
}
}

impl<R> RCodec<Descriptor, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<Descriptor, Self::Error> {
let id = self.read(&mut *reader)?;
let index_and_bitpos = self.read(&mut *reader)?;

Ok(Descriptor {
id,
index_and_bitpos,
})
}
}

impl<R> RCodec<HeaderDescriptor, &mut R> for Zenoh080
impl<R> RCodec<MetadataDescriptor, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<HeaderDescriptor, Self::Error> {
fn read(self, reader: &mut R) -> Result<MetadataDescriptor, Self::Error> {
let id = self.read(&mut *reader)?;
let index = self.read(&mut *reader)?;

Ok(HeaderDescriptor { id, index })
Ok(MetadataDescriptor { id, index })
}
}

Expand Down Expand Up @@ -175,16 +142,14 @@ where
let data_descriptor = self.read(&mut *reader)?;
let shm_protocol = self.read(&mut *reader)?;
let data_len = self.read(&mut *reader)?;
let watchdog_descriptor = self.read(&mut *reader)?;
let header_descriptor = self.read(&mut *reader)?;
let metadata = self.read(&mut *reader)?;
let generation = self.read(&mut *reader)?;

let shm_info = ShmBufInfo::new(
data_descriptor,
shm_protocol,
data_len,
watchdog_descriptor,
header_descriptor,
metadata,
generation,
);
Ok(shm_info)
Expand Down
9 changes: 2 additions & 7 deletions commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,7 @@ fn codec_encoding() {
#[test]
fn codec_shm_info() {
use zenoh_shm::{
api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor,
watchdog::descriptor::Descriptor, ShmBufInfo,
api::provider::chunk::ChunkDescriptor, metadata::descriptor::MetadataDescriptor, ShmBufInfo,
};

run!(ShmBufInfo, {
Expand All @@ -372,11 +371,7 @@ fn codec_shm_info() {
ChunkDescriptor::new(rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
rng.gen(),
Descriptor {
id: rng.gen(),
index_and_bitpos: rng.gen(),
},
HeaderDescriptor {
MetadataDescriptor {
id: rng.gen(),
index: rng.gen(),
},
Expand Down
114 changes: 34 additions & 80 deletions commons/zenoh-shm/src/api/provider/shm_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ use super::{
};
use crate::{
api::{buffer::zshmmut::ZShmMut, common::types::ProtocolID},
header::{
allocated_descriptor::AllocatedHeaderDescriptor, descriptor::HeaderDescriptor,
storage::GLOBAL_HEADER_STORAGE,
metadata::{
allocated_descriptor::AllocatedMetadataDescriptor, descriptor::MetadataDescriptor,
storage::GLOBAL_METADATA_STORAGE,
},
watchdog::{
allocated_watchdog::AllocatedWatchdog,
confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
descriptor::Descriptor,
storage::GLOBAL_STORAGE,
validator::GLOBAL_VALIDATOR,
},
ShmBufInfo, ShmBufInner,
Expand All @@ -53,20 +50,14 @@ use crate::{
#[derive(Debug)]
struct BusyChunk {
descriptor: ChunkDescriptor,
header: AllocatedHeaderDescriptor,
_watchdog: AllocatedWatchdog,
metadata: AllocatedMetadataDescriptor,
}

impl BusyChunk {
fn new(
descriptor: ChunkDescriptor,
header: AllocatedHeaderDescriptor,
watchdog: AllocatedWatchdog,
) -> Self {
fn new(descriptor: ChunkDescriptor, metadata: AllocatedMetadataDescriptor) -> Self {
Self {
descriptor,
header,
_watchdog: watchdog,
metadata,
}
}
}
Expand Down Expand Up @@ -822,16 +813,10 @@ where
let len = len.try_into()?;

// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;
let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;

// wrap everything to ShmBufInner
let wrapped = self.wrap(
chunk,
len,
allocated_header,
allocated_watchdog,
confirmed_watchdog,
);
let wrapped = self.wrap(chunk, len, allocated_metadata, confirmed_metadata);
Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
}

Expand All @@ -840,7 +825,7 @@ where
#[zenoh_macros::unstable_doc]
pub fn garbage_collect(&self) -> usize {
fn is_free_chunk(chunk: &BusyChunk) -> bool {
let header = chunk.header.descriptor.header();
let header = chunk.metadata.header();
if header.refcount.load(Ordering::SeqCst) != 0 {
return header.watchdog_invalidated.load(Ordering::SeqCst);
}
Expand Down Expand Up @@ -891,7 +876,7 @@ where
Policy: AllocPolicy,
{
// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;
let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;

// allocate data chunk
// Perform actions depending on the Policy
Expand All @@ -902,82 +887,57 @@ where
let chunk = Policy::alloc(layout, self)?;

// wrap allocated chunk to ShmBufInner
let wrapped = self.wrap(
chunk,
size,
allocated_header,
allocated_watchdog,
confirmed_watchdog,
);
let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
}

fn alloc_resources() -> ZResult<(
AllocatedHeaderDescriptor,
AllocatedWatchdog,
ConfirmedDescriptor,
)> {
// allocate shared header
let allocated_header = GLOBAL_HEADER_STORAGE.read().allocate_header()?;

// allocate watchdog
let allocated_watchdog = GLOBAL_STORAGE.read().allocate_watchdog()?;
fn alloc_resources() -> ZResult<(AllocatedMetadataDescriptor, ConfirmedDescriptor)> {
// allocate metadata
let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?;

// add watchdog to confirmator
let confirmed_watchdog = GLOBAL_CONFIRMATOR
.read()
.add_owned(&allocated_watchdog.descriptor)?;
let confirmed_metadata = GLOBAL_CONFIRMATOR.read().add(allocated_metadata.clone());

Ok((allocated_header, allocated_watchdog, confirmed_watchdog))
Ok((allocated_metadata, confirmed_metadata))
}

fn wrap(
&self,
chunk: AllocatedChunk,
len: NonZeroUsize,
allocated_header: AllocatedHeaderDescriptor,
allocated_watchdog: AllocatedWatchdog,
confirmed_watchdog: ConfirmedDescriptor,
allocated_metadata: AllocatedMetadataDescriptor,
confirmed_metadata: ConfirmedDescriptor,
) -> ShmBufInner {
let header = allocated_header.descriptor.clone();
let descriptor = Descriptor::from(&allocated_watchdog.descriptor);

// add watchdog to validator
let c_header = header.clone();
GLOBAL_VALIDATOR.read().add(
allocated_watchdog.descriptor.clone(),
Box::new(move || {
c_header
.header()
.watchdog_invalidated
.store(true, Ordering::SeqCst);
}),
);
GLOBAL_VALIDATOR
.read()
.add(confirmed_metadata.owned.clone());

// Create buffer's info
let info = ShmBufInfo::new(
chunk.descriptor.clone(),
self.id.id(),
len,
descriptor,
HeaderDescriptor::from(&header),
header.header().generation.load(Ordering::SeqCst),
MetadataDescriptor::from(&confirmed_metadata.owned),
confirmed_metadata
.owned
.header()
.generation
.load(Ordering::SeqCst),
);

// Create buffer
let shmb = ShmBufInner {
header,
metadata: Arc::new(confirmed_metadata),
buf: chunk.data,
info,
watchdog: Arc::new(confirmed_watchdog),
};

// Create and store busy chunk
self.busy_list.lock().unwrap().push_back(BusyChunk::new(
chunk.descriptor,
allocated_header,
allocated_watchdog,
));
self.busy_list
.lock()
.unwrap()
.push_back(BusyChunk::new(chunk.descriptor, allocated_metadata));

shmb
}
Expand All @@ -998,7 +958,7 @@ where
Policy: AsyncAllocPolicy,
{
// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;
let (allocated_metadata, confirmed_metadata) = Self::alloc_resources()?;

// allocate data chunk
// Perform actions depending on the Policy
Expand All @@ -1009,13 +969,7 @@ where
let chunk = Policy::alloc_async(backend_layout, self).await?;

// wrap allocated chunk to ShmBufInner
let wrapped = self.wrap(
chunk,
size,
allocated_header,
allocated_watchdog,
confirmed_watchdog,
);
let wrapped = self.wrap(chunk, size, allocated_metadata, confirmed_metadata);
Ok(unsafe { ZShmMut::new_unchecked(wrapped) })
}
}
File renamed without changes.
Loading

0 comments on commit 4cb0b50

Please sign in to comment.