Skip to content

Commit

Permalink
Merge branch 'main' into emilk/cargo-machete
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Oct 9, 2024
2 parents 9579a7b + 9e8011a commit c6a7356
Show file tree
Hide file tree
Showing 61 changed files with 1,464 additions and 2,705 deletions.
1 change: 0 additions & 1 deletion ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ Update instructions:
| re_entity_db | In-memory storage of Rerun entities |
| re_query | Querying data in the re_chunk_store |
| re_dataframe | The Rerun public data APIs. |
| re_dataframe2 | The Rerun public data APIs. |
| re_types | The built-in Rerun data types, component types, and archetypes. |
| re_types_blueprint | The core traits and types that power Rerun's Blueprint sub-system. |
| re_log_encoding | Helpers for encoding and transporting Rerun log messages |
Expand Down
22 changes: 2 additions & 20 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5102,22 +5102,6 @@ dependencies = [
[[package]]
name = "re_dataframe"
version = "0.19.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
"itertools 0.13.0",
"re_arrow2",
"re_chunk",
"re_chunk_store",
"re_log_types",
"re_query",
"re_tracing",
"re_types",
]

[[package]]
name = "re_dataframe2"
version = "0.19.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
Expand All @@ -5127,12 +5111,10 @@ dependencies = [
"re_arrow2",
"re_chunk",
"re_chunk_store",
"re_log",
"re_log_types",
"re_query",
"re_tracing",
"re_types",
"re_types_core",
"similar-asserts",
"unindent",
]
Expand Down Expand Up @@ -5602,7 +5584,7 @@ dependencies = [
"egui_table",
"itertools 0.13.0",
"re_chunk_store",
"re_dataframe2",
"re_dataframe",
"re_format",
"re_log",
"re_log_types",
Expand Down Expand Up @@ -6314,7 +6296,7 @@ dependencies = [
"re_build_tools",
"re_chunk",
"re_chunk_store",
"re_dataframe2",
"re_dataframe",
"re_log",
"re_log_types",
"re_memory",
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ re_chunk_store = { path = "crates/store/re_chunk_store", version = "=0.19.0-alph
re_data_loader = { path = "crates/store/re_data_loader", version = "=0.19.0-alpha.1", default-features = false }
re_data_source = { path = "crates/store/re_data_source", version = "=0.19.0-alpha.1", default-features = false }
re_dataframe = { path = "crates/store/re_dataframe", version = "=0.19.0-alpha.1", default-features = false }
re_dataframe2 = { path = "crates/store/re_dataframe2", version = "=0.19.0-alpha.1", default-features = false }
re_entity_db = { path = "crates/store/re_entity_db", version = "=0.19.0-alpha.1", default-features = false }
re_format_arrow = { path = "crates/store/re_format_arrow", version = "=0.19.0-alpha.1", default-features = false }
re_log_encoding = { path = "crates/store/re_log_encoding", version = "=0.19.0-alpha.1", default-features = false }
Expand Down
220 changes: 14 additions & 206 deletions crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@

use std::collections::{BTreeMap, BTreeSet};

use ahash::HashSet;
use arrow2::{
array::ListArray as ArrowListArray,
datatypes::{DataType as ArrowDatatype, Field as ArrowField},
};
use itertools::Itertools as _;

use re_chunk::{LatestAtQuery, TimelineName};
use re_chunk::TimelineName;
use re_log_types::ResolvedTimeRange;
use re_log_types::{EntityPath, TimeInt, Timeline};
use re_log_types::{EntityPathFilter, ResolvedTimeRange};
use re_types_core::{ArchetypeName, ComponentName};

use crate::ChunkStore;
Expand Down Expand Up @@ -340,7 +338,11 @@ impl ComponentColumnDescriptor {
#[inline]
pub fn to_arrow_field(&self) -> ArrowField {
ArrowField::new(
self.component_name.short_name().to_owned(),
format!(
"{}:{}",
self.entity_path,
self.component_name.short_name().to_owned()
),
self.returned_datatype(),
true, /* nullable */
)
Expand Down Expand Up @@ -501,134 +503,6 @@ pub struct ArchetypeFieldColumnSelector {
}
*/

// --- Queries ---

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum QueryExpression {
LatestAt(LatestAtQueryExpression),
Range(RangeQueryExpression),
}

impl From<LatestAtQueryExpression> for QueryExpression {
#[inline]
fn from(query: LatestAtQueryExpression) -> Self {
Self::LatestAt(query)
}
}

impl From<RangeQueryExpression> for QueryExpression {
#[inline]
fn from(query: RangeQueryExpression) -> Self {
Self::Range(query)
}
}

impl QueryExpression {
#[inline]
pub fn entity_path_filter(&self) -> &EntityPathFilter {
match self {
Self::LatestAt(query) => &query.entity_path_filter,
Self::Range(query) => &query.entity_path_filter,
}
}
}

impl std::fmt::Display for QueryExpression {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::LatestAt(query) => query.fmt(f),
Self::Range(query) => query.fmt(f),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LatestAtQueryExpression {
/// The entity path expression to query.
///
/// Example: `world/camera/**`
pub entity_path_filter: EntityPathFilter,

/// The timeline to query.
///
/// Example: `frame`.
pub timeline: Timeline,

/// The time at which to query.
///
/// Example: `18`.
pub at: TimeInt,
}

impl std::fmt::Display for LatestAtQueryExpression {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
entity_path_filter,
timeline,
at,
} = self;

f.write_fmt(format_args!(
"latest state for '{}' at {} on {:?}",
entity_path_filter.iter_expressions().join(", "),
timeline.typ().format_utc(*at),
timeline.name(),
))
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RangeQueryExpression {
/// The entity path expression to query.
///
/// Example: `world/camera/**`
pub entity_path_filter: EntityPathFilter,

/// The timeline to query.
///
/// Example `frame`
pub timeline: Timeline,

/// The time range to query.
pub time_range: ResolvedTimeRange,

/// The point-of-view of the query, as described by its [`ComponentColumnDescriptor`].
///
/// In a range query results, each non-null value of the point-of-view component column
/// will generate a row in the result.
///
/// Note that a component can be logged multiple times at the same timestamp (e.g. something
/// happened multiple times during a single frame), in which case the results will contain
/// multiple rows at a given timestamp.
//
// TODO(cmc): issue for multi-pov support
pub pov: ComponentColumnSelector,
//
// TODO(cmc): custom join policy support
}

impl std::fmt::Display for RangeQueryExpression {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
entity_path_filter,
timeline,
time_range,
pov,
} = self;

f.write_fmt(format_args!(
"{} ranging {}..={} on {:?} as seen from {pov}",
entity_path_filter.iter_expressions().join(", "),
timeline.typ().format_utc(time_range.min()),
timeline.typ().format_utc(time_range.max()),
timeline.name(),
))
}
}

// --- Queries v2 ---

/// Specifies how null values should be filled in the returned dataframe.
Expand Down Expand Up @@ -701,9 +575,9 @@ pub type IndexRange = ResolvedTimeRange;
/// ```
//
// TODO(cmc): ideally we'd like this to be the same type as the one used in the blueprint, possibly?
// TODO(cmc): Get rid of all re_dataframe (as opposed to re_dataframe2) stuff and rename this.
// TODO(cmc): Get rid of all re_dataframe (as opposed to re_dataframe) stuff and rename this.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct QueryExpression2 {
pub struct QueryExpression {
/// The subset of the database that the query will run on: a set of [`EntityPath`]s and their
/// associated [`ComponentName`]s.
///
Expand Down Expand Up @@ -733,7 +607,7 @@ pub struct QueryExpression2 {
/// Only rows where at least 1 of the view-contents contains non-null data within that range will be kept in
/// the final dataset.
///
/// This is ignored if [`QueryExpression2::using_index_values`] is set.
/// This is ignored if [`QueryExpression::using_index_values`] is set.
///
/// Example: `ResolvedTimeRange(10, 20)`.
pub filtered_index_range: Option<IndexRange>,
Expand All @@ -743,7 +617,7 @@ pub struct QueryExpression2 {
/// Only rows where at least 1 column contains non-null data at these specific values will be kept
/// in the final dataset.
///
/// This is ignored if [`QueryExpression2::using_index_values`] is set.
/// This is ignored if [`QueryExpression::using_index_values`] is set.
///
/// Example: `[TimeInt(12), TimeInt(14)]`.
pub filtered_index_values: Option<BTreeSet<IndexValue>>,
Expand All @@ -757,8 +631,8 @@ pub struct QueryExpression2 {
///
/// The order of the samples will be respected in the final result.
///
/// If [`QueryExpression2::using_index_values`] is set, it overrides both [`QueryExpression2::filtered_index_range`]
/// and [`QueryExpression2::filtered_index_values`].
/// If [`QueryExpression::using_index_values`] is set, it overrides both [`QueryExpression::filtered_index_range`]
/// and [`QueryExpression::filtered_index_values`].
///
/// Example: `[TimeInt(12), TimeInt(14)]`.
pub using_index_values: Option<BTreeSet<IndexValue>>,
Expand Down Expand Up @@ -789,7 +663,7 @@ pub struct QueryExpression2 {
pub selection: Option<Vec<ColumnSelector>>,
}

impl QueryExpression2 {
impl QueryExpression {
#[inline]
pub fn new(index: impl Into<Timeline>) -> Self {
let index = index.into();
Expand Down Expand Up @@ -969,72 +843,6 @@ impl ChunkStore {
.collect()
}

/// Returns the filtered schema for the given query expression.
///
/// This will only include columns which may contain non-empty values from the perspective of
/// the query semantics.
///
/// The order of the columns is guaranteed to be in a specific order:
/// * first, the control columns in lexical order (`RowId`);
/// * second, the time columns in lexical order (`frame_nr`, `log_time`, ...);
/// * third, the component columns in lexical order (`Color`, `Radius, ...`).
///
/// This does not run a full-blown query, but rather just inspects `Chunk`-level metadata,
/// which can lead to false positives, but makes this very cheap to compute.
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
re_tracing::profile_function!(format!("{query:?}"));

// First, grab the full schema and filters out every entity path that isn't covered by the query.
let schema = self
.schema()
.into_iter()
.filter(|descr| {
descr.entity_path().map_or(true, |entity_path| {
query.entity_path_filter().matches(entity_path)
})
})
.collect_vec();

// Then, discard any column descriptor which cannot possibly have data for the given query.
//
// TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice.
// TODO(jleibs): This filtering actually seems incorrect. This operation should be based solely
// on the timeline,
let mut filtered_out = HashSet::default();
for column_descr in &schema {
let ColumnDescriptor::Component(descr) = column_descr else {
continue;
};

match query {
QueryExpression::LatestAt(query) => {
let q = LatestAtQuery::new(query.timeline, query.at);
if self
.latest_at_relevant_chunks(&q, &descr.entity_path, descr.component_name)
.is_empty()
{
filtered_out.insert(column_descr.clone());
}
}

QueryExpression::Range(query) => {
let q = LatestAtQuery::new(query.timeline, query.time_range.max());
if self
.latest_at_relevant_chunks(&q, &descr.entity_path, descr.component_name)
.is_empty()
{
filtered_out.insert(column_descr.clone());
}
}
}
}

schema
.into_iter()
.filter(|descr| !filtered_out.contains(descr))
.collect()
}

/// Returns the filtered schema for the given [`ViewContentsSelector`].
///
/// The order of the columns is guaranteed to be in a specific order:
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_chunk_store/src/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ impl ChunkStore {

let Self {
id: _,
info: _,
config: _,
type_registry: _,
chunks_per_chunk_id,
Expand Down
5 changes: 2 additions & 3 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ mod writes;

pub use self::dataframe::{
ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector, Index,
IndexRange, IndexValue, JoinEncoding, LatestAtQueryExpression, QueryExpression,
QueryExpression2, RangeQueryExpression, SparseFillStrategy, TimeColumnDescriptor,
TimeColumnSelector, ViewContentsSelector,
IndexRange, IndexValue, JoinEncoding, QueryExpression, SparseFillStrategy,
TimeColumnDescriptor, TimeColumnSelector, ViewContentsSelector,
};
pub use self::events::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent};
pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget};
Expand Down
Loading

0 comments on commit c6a7356

Please sign in to comment.