Skip to content

Commit

Permalink
implement consistent open/import/log_file behavior for all cases
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Nov 5, 2024
1 parent 3886323 commit 1f4c61a
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 48 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5376,6 +5376,7 @@ dependencies = [
"re_types",
"tempfile",
"thiserror",
"uuid",
"walkdir",
]

Expand Down Expand Up @@ -6415,6 +6416,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror",
"uuid",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_data_loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ once_cell.workspace = true
parking_lot.workspace = true
rayon.workspace = true
thiserror.workspace = true
uuid.workspace = true
walkdir.workspace = true

[target.'cfg(not(any(target_arch = "wasm32")))'.dependencies]
Expand Down
37 changes: 29 additions & 8 deletions crates/store/re_data_loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ pub struct DataLoaderSettings {
/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
pub opened_store_id: Option<re_log_types::StoreId>,

/// Whether `SetStoreInfo`s should be sent, regardless of the surrounding context.
///
/// Only useful when creating a recording just-in-time directly in the viewer (which is what
/// happens when importing things into the welcome screen).
pub force_store_info: bool,

/// What should the logged entity paths be prefixed with?
pub entity_path_prefix: Option<EntityPath>,

Expand All @@ -79,6 +85,7 @@ impl DataLoaderSettings {
opened_application_id: Default::default(),
store_id: store_id.into(),
opened_store_id: Default::default(),
force_store_info: false,
entity_path_prefix: Default::default(),
timepoint: Default::default(),
}
Expand All @@ -91,6 +98,7 @@ impl DataLoaderSettings {
opened_application_id,
store_id,
opened_store_id,
force_store_info: _,
entity_path_prefix,
timepoint,
} = self;
Expand Down Expand Up @@ -150,6 +158,8 @@ impl DataLoaderSettings {
}
}

pub type DataLoaderName = String;

/// A [`DataLoader`] loads data from a file path and/or a file's contents.
///
/// Files can be loaded in 3 different ways:
Expand Down Expand Up @@ -205,8 +215,8 @@ impl DataLoaderSettings {
pub trait DataLoader: Send + Sync {
/// Name of the [`DataLoader`].
///
/// Doesn't need to be unique.
fn name(&self) -> String;
/// Should be globally unique.
fn name(&self) -> DataLoaderName;

/// Loads data from a file on the local filesystem and sends it to `tx`.
///
Expand Down Expand Up @@ -314,20 +324,31 @@ impl DataLoaderError {
/// most convenient for them, whether it is raw components, arrow chunks or even
/// full-on [`LogMsg`]s.
pub enum LoadedData {
Chunk(re_log_types::StoreId, Chunk),
ArrowMsg(re_log_types::StoreId, ArrowMsg),
LogMsg(LogMsg),
Chunk(DataLoaderName, re_log_types::StoreId, Chunk),
ArrowMsg(DataLoaderName, re_log_types::StoreId, ArrowMsg),
LogMsg(DataLoaderName, LogMsg),
}

impl LoadedData {
/// Returns the name of the [`DataLoader`] that generated this data.
#[inline]
pub fn data_loader_name(&self) -> &DataLoaderName {
match self {
Self::Chunk(name, ..) | Self::ArrowMsg(name, ..) | Self::LogMsg(name, ..) => name,
}
}

/// Pack the data into a [`LogMsg`].
#[inline]
pub fn into_log_msg(self) -> ChunkResult<LogMsg> {
match self {
Self::Chunk(store_id, chunk) => Ok(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?)),
Self::Chunk(_name, store_id, chunk) => {
Ok(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?))
}

Self::ArrowMsg(store_id, msg) => Ok(LogMsg::ArrowMsg(store_id, msg)),
Self::ArrowMsg(_name, store_id, msg) => Ok(LogMsg::ArrowMsg(store_id, msg)),

Self::LogMsg(msg) => Ok(msg),
Self::LogMsg(_name, msg) => Ok(msg),
}
}
}
Expand Down
49 changes: 33 additions & 16 deletions crates/store/re_data_loader/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ahash::{HashMap, HashMapExt};
use re_log_types::{FileSource, LogMsg};
use re_smart_channel::Sender;

use crate::{DataLoaderError, LoadedData};
use crate::{DataLoader, DataLoaderError, LoadedData, RrdLoader};

// ---

Expand Down Expand Up @@ -37,7 +37,7 @@ pub fn load_from_path(

let rx = load(settings, path, None)?;

send(settings.clone(), file_source, path.to_owned(), rx, tx);
send(settings.clone(), file_source, rx, tx);

Ok(())
}
Expand All @@ -64,7 +64,7 @@ pub fn load_from_file_contents(

let data = load(settings, filepath, Some(contents))?;

send(settings.clone(), file_source, filepath.to_owned(), data, tx);
send(settings.clone(), file_source, data, tx);

Ok(())
}
Expand All @@ -73,21 +73,20 @@ pub fn load_from_file_contents(

/// Prepares an adequate [`re_log_types::StoreInfo`] [`LogMsg`] given the input.
pub(crate) fn prepare_store_info(
application_id: re_log_types::ApplicationId,
store_id: &re_log_types::StoreId,
file_source: FileSource,
path: &std::path::Path,
) -> LogMsg {
re_tracing::profile_function!(path.display().to_string());
re_tracing::profile_function!();

use re_log_types::SetStoreInfo;

let app_id = re_log_types::ApplicationId(path.display().to_string());
let store_source = re_log_types::StoreSource::File { file_source };

LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: re_log_types::StoreInfo {
application_id: app_id.clone(),
application_id,
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
Expand Down Expand Up @@ -263,14 +262,19 @@ pub(crate) fn load(
pub(crate) fn send(
settings: crate::DataLoaderSettings,
file_source: FileSource,
path: std::path::PathBuf,
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
tx: &Sender<LogMsg>,
) {
spawn({
re_tracing::profile_function!();

let mut store_info_tracker: HashMap<re_log_types::StoreId, bool> = HashMap::new();
#[derive(Default, Debug)]
struct Tracked {
is_rrd_or_rbl: bool,
already_has_store_info: bool,
}

let mut store_info_tracker: HashMap<re_log_types::StoreId, Tracked> = HashMap::new();

let tx = tx.clone();
move || {
Expand All @@ -280,6 +284,7 @@ pub(crate) fn send(
// poll the channel in any case so as to make sure that the data producer
// doesn't get stuck.
for data in rx_loader {
let data_loader_name = data.data_loader_name().clone();
let msg = match data.into_log_msg() {
Ok(msg) => {
let store_info = match &msg {
Expand All @@ -293,7 +298,10 @@ pub(crate) fn send(
};

if let Some((store_id, store_info_created)) = store_info {
*store_info_tracker.entry(store_id).or_default() |= store_info_created;
let tracked = store_info_tracker.entry(store_id).or_default();
tracked.is_rrd_or_rbl =
*data_loader_name == RrdLoader::name(&RrdLoader);
tracked.already_has_store_info |= store_info_created;
}

msg
Expand All @@ -306,16 +314,25 @@ pub(crate) fn send(
tx.send(msg).ok();
}

for (store_id, store_info_already_created) in store_info_tracker {
for (store_id, tracked) in store_info_tracker {
let is_a_preexisting_recording =
Some(&store_id) == settings.opened_store_id.as_ref();

if store_info_already_created || is_a_preexisting_recording {
continue;
}
// Never try to send custom store info for RRDs and RBLs, they always have their own, and
// it's always right.
let should_force_store_info = !tracked.is_rrd_or_rbl && settings.force_store_info;

let should_send_new_store_info = should_force_store_info
|| (!tracked.already_has_store_info && !is_a_preexisting_recording);

let store_info = prepare_store_info(&store_id, file_source.clone(), &path);
tx.send(store_info).ok();
if should_send_new_store_info {
let app_id = settings
.opened_application_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string().into());
let store_info = prepare_store_info(app_id, &store_id, file_source.clone());
tx.send(store_info).ok();
}
}

tx.quit(None).ok();
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_data_loader/src/loader_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl DataLoader for ArchetypeLoader {
.clone()
.unwrap_or_else(|| settings.store_id.clone());
for row in rows {
let data = LoadedData::Chunk(store_id.clone(), row);
let data = LoadedData::Chunk(Self::name(&Self), store_id.clone(), row);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_data_loader/src/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use ahash::HashMap;
use once_cell::sync::Lazy;

use crate::LoadedData;
use crate::{DataLoader, LoadedData};

// ---

Expand Down Expand Up @@ -321,7 +321,7 @@ fn decode_and_stream<R: std::io::Read>(
}
};

let data = LoadedData::LogMsg(msg);
let data = LoadedData::LogMsg(ExternalLoader::name(&ExternalLoader), msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use re_log_encoding::decoder::Decoder;
use crossbeam::channel::Receiver;
use re_log_types::{ApplicationId, StoreId};

use crate::LoadedData;
use crate::{DataLoader as _, LoadedData};

// ---

Expand Down Expand Up @@ -192,7 +192,7 @@ fn decode_and_stream<R: std::io::Read>(
msg
};

let data = LoadedData::LogMsg(msg);
let data = LoadedData::LogMsg(RrdLoader::name(&RrdLoader), msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ impl DataSource {
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
force_store_info: file_source.force_store_info(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_path(&settings, file_source, &path, &tx)
Expand Down Expand Up @@ -206,6 +207,7 @@ impl DataSource {
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
force_store_info: file_source.force_store_info(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_file_contents(
Expand Down Expand Up @@ -275,6 +277,7 @@ fn test_data_source_from_uri() {
let file_source = FileSource::DragAndDrop {
recommended_application_id: None,
recommended_recording_id: None,
force_store_info: false,
};

for uri in file {
Expand Down
31 changes: 31 additions & 0 deletions crates/store/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,21 +413,39 @@ pub enum FileSource {
DragAndDrop {
/// The [`ApplicationId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_application_id: Option<ApplicationId>,

/// The [`StoreId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_recording_id: Option<StoreId>,

/// Whether `SetStoreInfo`s should be sent, regardless of the surrounding context.
///
/// Only useful when creating a recording just-in-time directly in the viewer (which is what
/// happens when importing things into the welcome screen).
#[cfg_attr(feature = "serde", serde(skip))]
force_store_info: bool,
},

FileDialog {
/// The [`ApplicationId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_application_id: Option<ApplicationId>,

/// The [`StoreId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_recording_id: Option<StoreId>,

/// Whether `SetStoreInfo`s should be sent, regardless of the surrounding context.
///
/// Only useful when creating a recording just-in-time directly in the viewer (which is what
/// happens when importing things into the welcome screen).
#[cfg_attr(feature = "serde", serde(skip))]
force_store_info: bool,
},

Sdk,
Expand Down Expand Up @@ -463,6 +481,19 @@ impl FileSource {
Self::Cli | Self::Sdk => None,
}
}

#[inline]
pub fn force_store_info(&self) -> bool {
match self {
Self::FileDialog {
force_store_info, ..
}
| Self::DragAndDrop {
force_store_info, ..
} => *force_store_info,
Self::Cli | Self::Sdk => false,
}
}
}

/// The source of a recording or blueprint.
Expand Down
1 change: 1 addition & 0 deletions crates/top/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ impl RecordingStream {
opened_application_id: None,
store_id: store_info.store_id.clone(),
opened_store_id: None,
force_store_info: false,
entity_path_prefix,
timepoint: (!static_).then(|| {
self.with(|inner| {
Expand Down
1 change: 1 addition & 0 deletions crates/viewer/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
serde-wasm-bindgen.workspace = true
thiserror.workspace = true
uuid.workspace = true
web-time.workspace = true
wgpu.workspace = true

Expand Down
Loading

0 comments on commit 1f4c61a

Please sign in to comment.