Skip to content

Commit

Permalink
propagate changes in all libs and binaries
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Oct 28, 2024
1 parent 1db26ff commit 5255dfb
Show file tree
Hide file tree
Showing 47 changed files with 253 additions and 243 deletions.
9 changes: 5 additions & 4 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use re_chunk::{EntityPath, TransportChunk};
use re_chunk_store::{ChunkStore, ColumnDescriptor, QueryExpression};
use re_chunk_store::{ChunkStoreHandle, ColumnDescriptor, QueryExpression};
use re_log_types::EntityPathFilter;
use re_query::QueryCache;

Expand Down Expand Up @@ -30,7 +30,7 @@ pub type RecordBatch = TransportChunk;
// got to go. But for that we need to generally introduce `ChunkStoreHandle` and `QueryCacheHandle`
// first, and this is not as straightforward as it seems.
pub struct QueryEngine<'a> {
pub store: &'a ChunkStore,
pub store: ChunkStoreHandle,
pub cache: &'a QueryCache,
}

Expand All @@ -45,7 +45,7 @@ impl QueryEngine<'_> {
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
pub fn schema(&self) -> Vec<ColumnDescriptor> {
self.store.schema()
self.store.read().schema()
}

/// Returns the filtered schema for the given [`QueryExpression`].
Expand All @@ -55,7 +55,7 @@ impl QueryEngine<'_> {
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
self.store.schema_for_query(query)
self.store.read().schema_for_query(query)
}

/// Starts a new query by instantiating a [`QueryHandle`].
Expand All @@ -71,6 +71,7 @@ impl QueryEngine<'_> {
filter: &'a EntityPathFilter,
) -> impl Iterator<Item = EntityPath> + 'a {
self.store
.read()
.all_entities()
.into_iter()
.filter(|entity_path| filter.matches(entity_path))
Expand Down
11 changes: 4 additions & 7 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,13 @@ impl QueryHandle<'_> {
fn init_(&self) -> QueryHandleState {
re_tracing::profile_scope!("init");

let store = self.engine.store.read();

// The timeline doesn't matter if we're running in static-only mode.
let filtered_index = self.query.filtered_index.unwrap_or_default();

// 1. Compute the schema for the query.
let view_contents = self.engine.store.schema_for_query(&self.query);
let view_contents = store.schema_for_query(&self.query);

// 2. Compute the schema of the selected contents.
//
Expand Down Expand Up @@ -327,7 +329,6 @@ impl QueryHandle<'_> {
re_chunk::LatestAtQuery::new(Timeline::default(), TimeInt::STATIC);

let results = self.engine.cache.latest_at(
self.engine.store,
&query,
&descr.entity_path,
[descr.component_name],
Expand Down Expand Up @@ -586,10 +587,7 @@ impl QueryHandle<'_> {
//
// TODO(cmc): Going through the cache is very useful in a Viewer context, but
// not so much in an SDK context. Make it configurable.
let results =
self.engine
.cache
.range(self.engine.store, query, entity_path, component_names);
let results = self.engine.cache.range(query, entity_path, component_names);

debug_assert!(
results.components.len() <= 1,
Expand Down Expand Up @@ -997,7 +995,6 @@ impl QueryHandle<'_> {
re_chunk::LatestAtQuery::new(state.filtered_index, *cur_index_value);

let results = self.engine.cache.latest_at(
self.engine.store,
&query,
&descr.entity_path,
[descr.component_name],
Expand Down
94 changes: 52 additions & 42 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use parking_lot::Mutex;

use re_chunk::{Chunk, ChunkResult, RowId, TimeInt};
use re_chunk_store::{
ChunkStore, ChunkStoreChunkStats, ChunkStoreConfig, ChunkStoreEvent, ChunkStoreSubscriber,
GarbageCollectionOptions, GarbageCollectionTarget,
ChunkStore, ChunkStoreChunkStats, ChunkStoreConfig, ChunkStoreEvent, ChunkStoreHandle,
ChunkStoreSubscriber, GarbageCollectionOptions, GarbageCollectionTarget,
};
use re_log_types::{
ApplicationId, EntityPath, EntityPathHash, LogMsg, ResolvedTimeRange, ResolvedTimeRangeF,
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct EntityDb {
tree: crate::EntityTree,

/// Stores all components for all entities for all timelines.
data_store: ChunkStore,
data_store: ChunkStoreHandle,

/// Query caches for the data in [`Self::data_store`].
query_caches: re_query::QueryCache,
Expand All @@ -76,8 +76,8 @@ impl EntityDb {
}

pub fn with_store_config(store_id: StoreId, store_config: ChunkStoreConfig) -> Self {
let data_store = ChunkStore::new(store_id.clone(), store_config);
let query_caches = re_query::QueryCache::new(&data_store);
let data_store = ChunkStoreHandle::new(ChunkStore::new(store_id.clone(), store_config));
let query_caches = re_query::QueryCache::new(data_store.clone());

Self {
data_source: None,
Expand All @@ -99,11 +99,6 @@ impl EntityDb {
&self.tree
}

#[inline]
pub fn data_store(&self) -> &ChunkStore {
&self.data_store
}

pub fn store_info_msg(&self) -> Option<&SetStoreInfo> {
self.set_store_info.as_ref()
}
Expand All @@ -123,7 +118,7 @@ impl EntityDb {

pub fn query_engine(&self) -> re_dataframe::QueryEngine<'_> {
re_dataframe::QueryEngine {
store: self.store(),
store: self.store().clone(),
cache: self.query_caches(),
}
}
Expand All @@ -141,7 +136,7 @@ impl EntityDb {
component_names: impl IntoIterator<Item = re_types_core::ComponentName>,
) -> re_query::LatestAtResults {
self.query_caches()
.latest_at(self.store(), query, entity_path, component_names)
.latest_at(query, entity_path, component_names)
}

/// Get the latest index and value for a given dense [`re_types_core::Component`].
Expand All @@ -160,7 +155,7 @@ impl EntityDb {
) -> Option<((TimeInt, RowId), C)> {
let results = self
.query_caches()
.latest_at(self.store(), query, entity_path, [C::name()]);
.latest_at(query, entity_path, [C::name()]);
results
.component_mono()
.map(|value| (results.index(), value))
Expand All @@ -182,7 +177,7 @@ impl EntityDb {
) -> Option<((TimeInt, RowId), C)> {
let results = self
.query_caches()
.latest_at(self.store(), query, entity_path, [C::name()]);
.latest_at(query, entity_path, [C::name()]);
results
.component_mono_quiet()
.map(|value| (results.index(), value))
Expand All @@ -208,7 +203,7 @@ impl EntityDb {
}

#[inline]
pub fn store(&self) -> &ChunkStore {
pub fn store(&self) -> &ChunkStoreHandle {
&self.data_store
}

Expand All @@ -218,8 +213,8 @@ impl EntityDb {
}

#[inline]
pub fn store_id(&self) -> &StoreId {
self.data_store.id()
pub fn store_id(&self) -> StoreId {
self.data_store.read().id()
}

/// If this entity db is the result of a clone, which store was it cloned from?
Expand Down Expand Up @@ -264,14 +259,14 @@ impl EntityDb {

#[inline]
pub fn num_rows(&self) -> u64 {
self.data_store.stats().total().num_rows
self.data_store.read().stats().total().num_rows
}

/// Return the current `ChunkStoreGeneration`. This can be used to determine whether the
/// database has been modified since the last time it was queried.
#[inline]
pub fn generation(&self) -> re_chunk_store::ChunkStoreGeneration {
self.data_store.generation()
self.data_store.read().generation()
}

#[inline]
Expand Down Expand Up @@ -324,7 +319,7 @@ impl EntityDb {
pub fn add(&mut self, msg: &LogMsg) -> Result<Vec<ChunkStoreEvent>, Error> {
re_tracing::profile_function!();

debug_assert_eq!(msg.store_id(), self.store_id());
debug_assert_eq!(*msg.store_id(), self.store_id());

let store_events = match &msg {
LogMsg::SetStoreInfo(msg) => {
Expand All @@ -350,7 +345,10 @@ impl EntityDb {
}

pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<Vec<ChunkStoreEvent>, Error> {
let store_events = self.data_store.insert_chunk(chunk)?;
// NOTE: using `write_arc` so the borrow checker can make sense of the partial borrow of `self`.
let mut store = self.data_store.write_arc();

let store_events = store.insert_chunk(chunk)?;

self.register_entity_path(chunk.entity_path());

Expand All @@ -367,8 +365,7 @@ impl EntityDb {

// It is possible for writes to trigger deletions: specifically in the case of
// overwritten static data leading to dangling chunks.
self.tree
.on_store_deletions(&self.data_store, &store_events);
self.tree.on_store_deletions(&store, &store_events);

// We inform the stats last, since it measures e2e latency.
self.stats.on_events(&store_events);
Expand Down Expand Up @@ -433,15 +430,18 @@ impl EntityDb {
fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

let (store_events, stats_diff) = self.data_store.gc(gc_options);
// NOTE: using `write_arc` so the borrow checker can make sense of the partial borrow of `self`.
let mut store = self.data_store.write_arc();

let (store_events, stats_diff) = store.gc(gc_options);

re_log::trace!(
num_row_ids_dropped = store_events.len(),
size_bytes_dropped = re_format::format_bytes(stats_diff.total().total_size_bytes as _),
"purged datastore"
);

self.on_store_deletions(&store_events);
self.on_store_deletions(&store, &store_events);

store_events
}
Expand All @@ -454,8 +454,11 @@ impl EntityDb {
timeline: &Timeline,
drop_range: ResolvedTimeRange,
) -> Vec<ChunkStoreEvent> {
let store_events = self.data_store.drop_time_range(timeline, drop_range);
self.on_store_deletions(&store_events);
// NOTE: using `write_arc` so the borrow checker can make sense of the partial borrow of `self`.
let mut store = self.data_store.write_arc();

let store_events = store.drop_time_range(timeline, drop_range);
self.on_store_deletions(&store, &store_events);
store_events
}

Expand All @@ -467,9 +470,12 @@ impl EntityDb {
pub fn drop_entity_path(&mut self, entity_path: &EntityPath) {
re_tracing::profile_function!();

let store_events = self.data_store.drop_entity_path(entity_path);
// NOTE: using `write_arc` so the borrow checker can make sense of the partial borrow of `self`.
let mut store = self.data_store.write_arc();

let store_events = store.drop_entity_path(entity_path);

self.on_store_deletions(&store_events);
self.on_store_deletions(&store, &store_events);
}

/// Unconditionally drops all the data for a given [`EntityPath`] and all its children.
Expand All @@ -489,13 +495,13 @@ impl EntityDb {
}
}

fn on_store_deletions(&mut self, store_events: &[ChunkStoreEvent]) {
fn on_store_deletions(&mut self, store: &ChunkStore, store_events: &[ChunkStoreEvent]) {
re_tracing::profile_function!();

self.times_per_timeline.on_events(store_events);
self.query_caches.on_events(store_events);
self.time_histogram_per_timeline.on_events(store_events);
self.tree.on_store_deletions(&self.data_store, store_events);
self.tree.on_store_deletions(store, store_events);
}

/// Key used for sorting recordings in the UI.
Expand All @@ -514,6 +520,8 @@ impl EntityDb {
) -> impl Iterator<Item = ChunkResult<LogMsg>> + '_ {
re_tracing::profile_function!();

let store = self.data_store.read();

let set_store_info_msg = self
.store_info_msg()
.map(|msg| Ok(LogMsg::SetStoreInfo(msg.clone())));
Expand All @@ -526,8 +534,7 @@ impl EntityDb {
)
});

let mut chunks: Vec<&Arc<Chunk>> = self
.store()
let mut chunks: Vec<Arc<Chunk>> = store
.iter_chunks()
.filter(move |chunk| {
let Some((timeline, time_range)) = time_filter else {
Expand All @@ -543,6 +550,7 @@ impl EntityDb {
|| time_range.contains(time_column.time_range().max())
})
})
.cloned() // refcount
.collect();

// Try to roughly preserve the order of the chunks
Expand Down Expand Up @@ -606,7 +614,8 @@ impl EntityDb {
});
}

for chunk in self.store().iter_chunks() {
let store = self.data_store.read();
for chunk in store.iter_chunks() {
new_db.add_chunk(&Arc::clone(chunk))?;
}

Expand All @@ -627,8 +636,9 @@ impl EntityDb {
};

let mut stats = ChunkStoreChunkStats::default();
let store = self.data_store.read();
subtree.visit_children_recursively(|path| {
stats += self.store().entity_stats_static(path);
stats += store.entity_stats_static(path);
});

stats
Expand All @@ -649,8 +659,9 @@ impl EntityDb {
};

let mut stats = ChunkStoreChunkStats::default();
let store = self.data_store.read();
subtree.visit_children_recursively(|path| {
stats += self.store().entity_stats_on_timeline(path, timeline);
stats += store.entity_stats_on_timeline(path, timeline);
});

stats
Expand All @@ -670,10 +681,9 @@ impl EntityDb {
return false;
};

let store = self.data_store.read();
subtree
.find_first_child_recursive(|path| {
self.store().entity_has_data_on_timeline(timeline, path)
})
.find_first_child_recursive(|path| store.entity_has_data_on_timeline(timeline, path))
.is_some()
}

Expand All @@ -691,10 +701,10 @@ impl EntityDb {
return false;
};

let store = self.data_store.read();
subtree
.find_first_child_recursive(|path| {
self.store()
.entity_has_temporal_data_on_timeline(timeline, path)
store.entity_has_temporal_data_on_timeline(timeline, path)
})
.is_some()
}
Expand All @@ -704,7 +714,7 @@ impl re_types_core::SizeBytes for EntityDb {
#[inline]
fn heap_size_bytes(&self) -> u64 {
// TODO(emilk): size of entire EntityDb, including secondary indices etc
self.data_store().stats().total().total_size_bytes
self.data_store.read().stats().total().total_size_bytes
}
}

Expand Down
Loading

0 comments on commit 5255dfb

Please sign in to comment.