Skip to content

Commit

Permalink
MVP
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Jan 24, 2025
1 parent 6751838 commit e327470
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 51 deletions.
80 changes: 56 additions & 24 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const CDC_NAME: &str = "cdc";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SIDECAR_NAME: &str = "sidecar";

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());
Expand All @@ -58,6 +60,7 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Option::<SetTransaction>::get_struct_field(SET_TRANSACTION_NAME),
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Sidecar>::get_struct_field(SIDECAR_NAME),
// We don't support the following actions yet
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
])
Expand Down Expand Up @@ -511,6 +514,17 @@ pub struct SetTransaction {
pub last_updated: Option<i64>,
}

#[derive(Debug, Clone, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Sidecar {
pub path: String, // Sidecar path relative to `_delta_log/_sidecar` directory.
pub size_in_bytes: i64,
pub modification_time: i64,
pub tags: Option<HashMap<String, String>>,
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -637,8 +651,8 @@ mod tests {
fn test_cdc_schema() {
let schema = get_log_schema()
.project(&[CDC_NAME])
.expect("Couldn't get remove field");
let expected = Arc::new(StructType::new([StructField::nullable(
.expect("Couldn't get cdc field");
let expected = Arc::new(StructType::new([StructField::new(
"cdc",
StructType::new([
StructField::not_null("path", DataType::STRING),
Expand All @@ -654,6 +668,24 @@ mod tests {
assert_eq!(schema, expected);
}

#[test]
fn test_sidecar_schema() {
let schema = get_log_schema()
.project(&[SIDECAR_NAME])
.expect("Couldn't get sidecar field");
let expected = Arc::new(StructType::new([StructField::new(
"sidecar",
StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new("sizeInBytes", DataType::LONG, false),
StructField::new("modificationTime", DataType::LONG, false),
tags_field(),
]),
true,
)]));
assert_eq!(schema, expected);
}

#[test]
fn test_transaction_schema() {
let schema = get_log_schema()
Expand Down Expand Up @@ -738,26 +770,26 @@ mod tests {
}
}

#[test]
fn test_v2_checkpoint_unsupported() {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([ReaderFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());

let protocol = Protocol::try_new(
4,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([ReaderFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
}
// #[test]
// fn test_v2_checkpoint_unsupported() {
// let protocol = Protocol::try_new(
// 3,
// 7,
// Some([ReaderFeatures::V2Checkpoint]),
// Some([ReaderFeatures::V2Checkpoint]),
// )
// .unwrap();
// assert!(protocol.ensure_read_supported().is_ok());

// let protocol = Protocol::try_new(
// 4,
// 7,
// Some([ReaderFeatures::V2Checkpoint]),
// Some([ReaderFeatures::V2Checkpoint]),
// )
// .unwrap();
// assert!(protocol.ensure_read_supported().is_err());
// }

#[test]
fn test_ensure_read_supported() {
Expand All @@ -777,7 +809,7 @@ mod tests {
Some(&empty_features),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol::try_new(
3,
Expand All @@ -795,7 +827,7 @@ mod tests {
Some([WriterFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol {
min_reader_version: 1,
Expand Down
55 changes: 52 additions & 3 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::{DeltaResult, Error};
use super::deletion_vector::DeletionVectorDescriptor;
use super::schemas::ToSchema as _;
use super::{
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, ADD_NAME, CDC_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME,
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME, CDC_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME,
};

#[derive(Default)]
Expand Down Expand Up @@ -356,7 +356,7 @@ impl RowVisitor for CdcVisitor {
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of an Add action
// Since path column is required, use it to detect presence of a cdc action
if let Some(path) = getters[0].get_opt(i, "cdc.path")? {
self.cdcs.push(Self::visit_cdc(i, path, getters)?);
}
Expand Down Expand Up @@ -444,6 +444,55 @@ impl RowVisitor for SetTransactionVisitor {
}
}

#[allow(unused)]
#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct SidecarVisitor {
pub(crate) sidecars: Vec<Sidecar>,
}

impl SidecarVisitor {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn visit_sidecar<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Sidecar> {
Ok(Sidecar {
path,
size_in_bytes: getters[1].get(row_index, "sidecar.sizeInBytes")?,
modification_time: getters[2].get(row_index, "sidecar.modificationTime")?,
tags: getters[3].get_opt(row_index, "sidecar.tags")?,
})
}
}

impl RowVisitor for SidecarVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Sidecar::to_schema().leaves(SIDECAR_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 4,
Error::InternalError(format!(
"Wrong number of SidecarVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of a sidecar action
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? {
self.sidecars.push(Self::visit_sidecar(i, path, getters)?);
}
}
Ok(())
}
}

/// Get a DV out of some engine data. The caller is responsible for slicing the `getters` slice such
/// that the first element contains the `storageType` element of the deletion vector.
pub(crate) fn visit_deletion_vector_at<'a>(
Expand Down
109 changes: 105 additions & 4 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
//! Represents a segment of a delta log. [`LogSegment`] wraps a set of checkpoint and commit
//! files.
use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::actions::visitors::SidecarVisitor;
use crate::actions::{
get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, METADATA_NAME, PROTOCOL_NAME,
SIDECAR_NAME,
};
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta, FileSystemClient,
ParquetHandler, RowVisitor, Version,
};
use itertools::Itertools;
use std::collections::HashMap;
Expand Down Expand Up @@ -218,14 +223,109 @@ impl LogSegment {
.iter()
.map(|f| f.location.clone())
.collect();

let file_type = if !self.checkpoint_parts.is_empty() {
self.checkpoint_parts[0].file_type.clone()
} else {
LogPathFileType::Unknown
};

let handler = engine.get_parquet_handler();
let log_root_ref = self.log_root.clone();

let checkpoint_stream = engine
.get_parquet_handler()
.read_parquet_files(&checkpoint_parts, checkpoint_read_schema, meta_predicate)?
.map_ok(|batch| (batch, false));
.read_parquet_files(
&checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
)?
.flat_map(move |checkpoint_batch_result| {
match checkpoint_batch_result {
Ok(checkpoint_batch) => {
let stream: Box<
dyn Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send,
> = if let LogPathFileType::UuidCheckpoint(_uuid) = &file_type {
if checkpoint_read_schema.contains(SIDECAR_NAME) {
// If the checkpoint is a V2 checkpoint AND the schema contains the sidecar column,
// we need to read the sidecar files and return the iterator of sidecar actions

// Replay is sometimes passed a schema that doesn't contain the sidecar column. (e.g. when reading metadata & protocol)
// In this case, we do not need to read the sidecar files and can chain the checkpoint batch as is.

// Flatten the new batches returned. The new batches could be:
// - the checkpoint batch itself if no sidecar actions are present
// - 1 or more sidecar batch that are referenced by the checkpoint batch
let iterator = Self::process_checkpoint_batch(
handler.clone(),
log_root_ref.clone(),
checkpoint_batch,
);
match iterator {
Ok(iter) => Box::new(iter.map_ok(|batch| (batch, false))),
Err(e) => Box::new(std::iter::once(Err(e))),
}
} else {
// Chain the checkpoint batch as is if we do not need to extract sidecar actions
Box::new(std::iter::once(Ok((checkpoint_batch, false))))
}
} else {
// Chain the checkpoint batch as is if not a UUID checkpoint batch
Box::new(std::iter::once(Ok((checkpoint_batch, false))))
};
stream
}
Err(e) => {
// Chain the error if any
Box::new(std::iter::once(Err(e)))
}
}
});

Ok(commit_stream.chain(checkpoint_stream))
}

fn process_checkpoint_batch(
parquet_handler: Arc<dyn ParquetHandler>,
log_root: Url,
batch: Box<dyn EngineData>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send> {
let mut visitor = SidecarVisitor::default();

// collect sidecars
visitor.visit_rows_of(batch.as_ref())?;

//convert sidecar actions to sidecar file paths
let sidecar_files: Result<Vec<_>, _> = visitor
.sidecars
.iter()
.map(|sidecar| Self::sidecar_to_filemeta(sidecar, &log_root))
.collect();

let sidecar_read_schema = get_log_add_schema().clone();

// if sidecars exist, read the sidecar files and return the iterator of sidecar actions
if !visitor.sidecars.is_empty() {
return parquet_handler.read_parquet_files(&sidecar_files?, sidecar_read_schema, None);
} else {
return Ok(Box::new(std::iter::once(Ok(batch))));
}
}

// Helper function to convert a single sidecar action to a FileMeta
fn sidecar_to_filemeta(sidecar: &Sidecar, log_root: &Url) -> Result<FileMeta, Error> {
let location = log_root
.join("_sidecars/")
.map_err(Error::from)?
.join(&sidecar.path)
.map_err(Error::from)?;
Ok(FileMeta {
location,
last_modified: sidecar.modification_time,
size: sidecar.size_in_bytes as usize,
})
}

// Get the most up-to-date Protocol and Metadata actions
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
let data_batches = self.replay_for_metadata(engine)?;
Expand Down Expand Up @@ -296,6 +396,7 @@ fn list_log_files(
Err(_) => true,
}))
}

/// List all commit and checkpoint files with versions above the provided `start_version` (inclusive).
/// If successful, this returns a tuple `(ascending_commit_files, checkpoint_parts)` of type
/// `(Vec<ParsedLogPath>, Vec<ParsedLogPath>)`. The commit files are guaranteed to be sorted in
Expand Down
Loading

0 comments on commit e327470

Please sign in to comment.