Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent open/import/log_file behaviors in all common scenarios #7966

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5376,6 +5376,7 @@ dependencies = [
"re_types",
"tempfile",
"thiserror",
"uuid",
"walkdir",
]

Expand Down Expand Up @@ -6415,6 +6416,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror",
"uuid",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_data_loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ once_cell.workspace = true
parking_lot.workspace = true
rayon.workspace = true
thiserror.workspace = true
uuid.workspace = true
walkdir.workspace = true

[target.'cfg(not(any(target_arch = "wasm32")))'.dependencies]
Expand Down
37 changes: 29 additions & 8 deletions crates/store/re_data_loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ pub struct DataLoaderSettings {
/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
pub opened_store_id: Option<re_log_types::StoreId>,

/// Whether `SetStoreInfo`s should be sent, regardless of the surrounding context.
///
/// Only useful when creating a recording just-in-time directly in the viewer (which is what
/// happens when importing things into the welcome screen).
pub force_store_info: bool,

/// What should the logged entity paths be prefixed with?
pub entity_path_prefix: Option<EntityPath>,

Expand All @@ -79,6 +85,7 @@ impl DataLoaderSettings {
opened_application_id: Default::default(),
store_id: store_id.into(),
opened_store_id: Default::default(),
force_store_info: false,
entity_path_prefix: Default::default(),
timepoint: Default::default(),
}
Expand All @@ -91,6 +98,7 @@ impl DataLoaderSettings {
opened_application_id,
store_id,
opened_store_id,
force_store_info: _,
entity_path_prefix,
timepoint,
} = self;
Expand Down Expand Up @@ -150,6 +158,8 @@ impl DataLoaderSettings {
}
}

pub type DataLoaderName = String;

/// A [`DataLoader`] loads data from a file path and/or a file's contents.
///
/// Files can be loaded in 3 different ways:
Expand Down Expand Up @@ -205,8 +215,8 @@ impl DataLoaderSettings {
pub trait DataLoader: Send + Sync {
/// Name of the [`DataLoader`].
///
/// Doesn't need to be unique.
fn name(&self) -> String;
/// Should be globally unique.
fn name(&self) -> DataLoaderName;

/// Loads data from a file on the local filesystem and sends it to `tx`.
///
Expand Down Expand Up @@ -314,20 +324,31 @@ impl DataLoaderError {
/// most convenient for them, whether it is raw components, arrow chunks or even
/// full-on [`LogMsg`]s.
pub enum LoadedData {
Chunk(re_log_types::StoreId, Chunk),
ArrowMsg(re_log_types::StoreId, ArrowMsg),
LogMsg(LogMsg),
Chunk(DataLoaderName, re_log_types::StoreId, Chunk),
ArrowMsg(DataLoaderName, re_log_types::StoreId, ArrowMsg),
LogMsg(DataLoaderName, LogMsg),
}

impl LoadedData {
/// Returns the name of the [`DataLoader`] that generated this data.
#[inline]
pub fn data_loader_name(&self) -> &DataLoaderName {
match self {
Self::Chunk(name, ..) | Self::ArrowMsg(name, ..) | Self::LogMsg(name, ..) => name,
}
}

/// Pack the data into a [`LogMsg`].
#[inline]
pub fn into_log_msg(self) -> ChunkResult<LogMsg> {
match self {
Self::Chunk(store_id, chunk) => Ok(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?)),
Self::Chunk(_name, store_id, chunk) => {
Ok(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?))
}

Self::ArrowMsg(store_id, msg) => Ok(LogMsg::ArrowMsg(store_id, msg)),
Self::ArrowMsg(_name, store_id, msg) => Ok(LogMsg::ArrowMsg(store_id, msg)),

Self::LogMsg(msg) => Ok(msg),
Self::LogMsg(_name, msg) => Ok(msg),
}
}
}
Expand Down
49 changes: 33 additions & 16 deletions crates/store/re_data_loader/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ahash::{HashMap, HashMapExt};
use re_log_types::{FileSource, LogMsg};
use re_smart_channel::Sender;

use crate::{DataLoaderError, LoadedData};
use crate::{DataLoader, DataLoaderError, LoadedData, RrdLoader};

// ---

Expand Down Expand Up @@ -37,7 +37,7 @@ pub fn load_from_path(

let rx = load(settings, path, None)?;

send(settings.clone(), file_source, path.to_owned(), rx, tx);
send(settings.clone(), file_source, rx, tx);

Ok(())
}
Expand All @@ -64,7 +64,7 @@ pub fn load_from_file_contents(

let data = load(settings, filepath, Some(contents))?;

send(settings.clone(), file_source, filepath.to_owned(), data, tx);
send(settings.clone(), file_source, data, tx);

Ok(())
}
Expand All @@ -73,21 +73,20 @@ pub fn load_from_file_contents(

/// Prepares an adequate [`re_log_types::StoreInfo`] [`LogMsg`] given the input.
pub(crate) fn prepare_store_info(
application_id: re_log_types::ApplicationId,
store_id: &re_log_types::StoreId,
file_source: FileSource,
path: &std::path::Path,
) -> LogMsg {
re_tracing::profile_function!(path.display().to_string());
re_tracing::profile_function!();

use re_log_types::SetStoreInfo;

let app_id = re_log_types::ApplicationId(path.display().to_string());
let store_source = re_log_types::StoreSource::File { file_source };

LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: re_log_types::StoreInfo {
application_id: app_id.clone(),
application_id,
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
Expand Down Expand Up @@ -263,14 +262,19 @@ pub(crate) fn load(
pub(crate) fn send(
settings: crate::DataLoaderSettings,
file_source: FileSource,
path: std::path::PathBuf,
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
tx: &Sender<LogMsg>,
) {
spawn({
re_tracing::profile_function!();

let mut store_info_tracker: HashMap<re_log_types::StoreId, bool> = HashMap::new();
#[derive(Default, Debug)]
struct Tracked {
is_rrd_or_rbl: bool,
already_has_store_info: bool,
}

let mut store_info_tracker: HashMap<re_log_types::StoreId, Tracked> = HashMap::new();

let tx = tx.clone();
move || {
Expand All @@ -280,6 +284,7 @@ pub(crate) fn send(
// poll the channel in any case so as to make sure that the data producer
// doesn't get stuck.
for data in rx_loader {
let data_loader_name = data.data_loader_name().clone();
let msg = match data.into_log_msg() {
Ok(msg) => {
let store_info = match &msg {
Expand All @@ -293,7 +298,10 @@ pub(crate) fn send(
};

if let Some((store_id, store_info_created)) = store_info {
*store_info_tracker.entry(store_id).or_default() |= store_info_created;
let tracked = store_info_tracker.entry(store_id).or_default();
tracked.is_rrd_or_rbl =
*data_loader_name == RrdLoader::name(&RrdLoader);
tracked.already_has_store_info |= store_info_created;
}

msg
Expand All @@ -306,16 +314,25 @@ pub(crate) fn send(
tx.send(msg).ok();
}

for (store_id, store_info_already_created) in store_info_tracker {
for (store_id, tracked) in store_info_tracker {
let is_a_preexisting_recording =
Some(&store_id) == settings.opened_store_id.as_ref();

if store_info_already_created || is_a_preexisting_recording {
continue;
}
// Never try to send custom store info for RRDs and RBLs, they always have their own, and
// it's always right.
let should_force_store_info = !tracked.is_rrd_or_rbl && settings.force_store_info;

let should_send_new_store_info = should_force_store_info
|| (!tracked.already_has_store_info && !is_a_preexisting_recording);

let store_info = prepare_store_info(&store_id, file_source.clone(), &path);
tx.send(store_info).ok();
if should_send_new_store_info {
let app_id = settings
.opened_application_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string().into());
let store_info = prepare_store_info(app_id, &store_id, file_source.clone());
tx.send(store_info).ok();
}
}

tx.quit(None).ok();
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_data_loader/src/loader_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl DataLoader for ArchetypeLoader {
.clone()
.unwrap_or_else(|| settings.store_id.clone());
for row in rows {
let data = LoadedData::Chunk(store_id.clone(), row);
let data = LoadedData::Chunk(Self::name(&Self), store_id.clone(), row);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_data_loader/src/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use ahash::HashMap;
use once_cell::sync::Lazy;

use crate::LoadedData;
use crate::{DataLoader, LoadedData};

// ---

Expand Down Expand Up @@ -321,7 +321,7 @@ fn decode_and_stream<R: std::io::Read>(
}
};

let data = LoadedData::LogMsg(msg);
let data = LoadedData::LogMsg(ExternalLoader::name(&ExternalLoader), msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
17 changes: 13 additions & 4 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use re_log_encoding::decoder::Decoder;
use crossbeam::channel::Receiver;
use re_log_types::{ApplicationId, StoreId};

use crate::LoadedData;
use crate::{DataLoader as _, LoadedData};

// ---

Expand Down Expand Up @@ -130,12 +130,21 @@ impl crate::DataLoader for RrdLoader {
},
};

// * We never want to patch blueprints' store IDs, only their app IDs.
// * We neer use import semantics at all for .rrd files.
let forced_application_id = if extension == "rbl" {
settings.opened_application_id.as_ref()
} else {
None
};
let forced_recording_id = None;

decode_and_stream(
&filepath,
&tx,
decoder,
settings.opened_application_id.as_ref(),
settings.opened_store_id.as_ref(),
forced_application_id,
forced_recording_id,
);

Ok(())
Expand Down Expand Up @@ -192,7 +201,7 @@ fn decode_and_stream<R: std::io::Read>(
msg
};

let data = LoadedData::LogMsg(msg);
let data = LoadedData::LogMsg(RrdLoader::name(&RrdLoader), msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ impl DataSource {
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
force_store_info: file_source.force_store_info(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_path(&settings, file_source, &path, &tx)
Expand Down Expand Up @@ -206,6 +207,7 @@ impl DataSource {
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
force_store_info: file_source.force_store_info(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_file_contents(
Expand Down Expand Up @@ -275,6 +277,7 @@ fn test_data_source_from_uri() {
let file_source = FileSource::DragAndDrop {
recommended_application_id: None,
recommended_recording_id: None,
force_store_info: false,
};

for uri in file {
Expand Down
31 changes: 31 additions & 0 deletions crates/store/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,21 +413,39 @@ pub enum FileSource {
DragAndDrop {
/// The [`ApplicationId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_application_id: Option<ApplicationId>,

/// The [`StoreId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_recording_id: Option<StoreId>,

/// Whether `SetStoreInfo`s should be sent, regardless of the surrounding context.
///
/// Only useful when creating a recording just-in-time directly in the viewer (which is what
/// happens when importing things into the welcome screen).
#[cfg_attr(feature = "serde", serde(skip))]
force_store_info: bool,
},

FileDialog {
/// The [`ApplicationId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_application_id: Option<ApplicationId>,

/// The [`StoreId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_recording_id: Option<StoreId>,

/// Whether `SetStoreInfo`s should be sent, regardless of the surrounding context.
///
/// Only useful when creating a recording just-in-time directly in the viewer (which is what
/// happens when importing things into the welcome screen).
#[cfg_attr(feature = "serde", serde(skip))]
force_store_info: bool,
},

Sdk,
Expand Down Expand Up @@ -463,6 +481,19 @@ impl FileSource {
Self::Cli | Self::Sdk => None,
}
}

#[inline]
pub fn force_store_info(&self) -> bool {
match self {
Self::FileDialog {
force_store_info, ..
}
| Self::DragAndDrop {
force_store_info, ..
} => *force_store_info,
Self::Cli | Self::Sdk => false,
}
}
}

/// The source of a recording or blueprint.
Expand Down
1 change: 1 addition & 0 deletions crates/top/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ impl RecordingStream {
opened_application_id: None,
store_id: store_info.store_id.clone(),
opened_store_id: None,
force_store_info: false,
entity_path_prefix,
timepoint: (!static_).then(|| {
self.with(|inner| {
Expand Down
Loading
Loading