Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

fix: Fix possible empty snapshot #144

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions icelake/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use crate::error::Result;
use crate::types::{
DataFile, DataFileFormat, ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter,
ManifestMetadata, ManifestStatus, ManifestWriter, Snapshot,
DataFile, DataFileFormat, ManifestContentType, ManifestEntry, ManifestFile, ManifestList,
ManifestListWriter, ManifestMetadata, ManifestStatus, ManifestWriter, Snapshot,
};
use crate::Table;
use opendal::Operator;
Expand Down Expand Up @@ -142,11 +142,16 @@ impl<'a> Transaction<'a> {
let manifest_list_entry = writer.write(manifest_file).await?;

// Load existing manifest list
let mut manifest_list = cur_metadata
.current_snapshot()?
.load_manifest_list(&ctx.io)
.await?;
manifest_list.entries.push(manifest_list_entry);
let manifest_list = match cur_metadata.current_snapshot()? {
Some(s) => {
let mut ret = s.load_manifest_list(&ctx.io).await?;
ret.entries.push(manifest_list_entry);
ret
}
None => ManifestList {
entries: vec![manifest_list_entry],
},
};

let manifest_list_path = Transaction::manifest_list_path(&mut ctx, next_snapshot_id);
// Writing manifest list
Expand All @@ -164,14 +169,20 @@ impl<'a> Transaction<'a> {
format!("{}/{manifest_list_path}", cur_metadata.location)
};

let cur_snapshot = cur_metadata.current_snapshot()?;
let mut new_snapshot = cur_snapshot.clone();
let mut new_snapshot = match cur_metadata.current_snapshot()? {
Some(cur_snapshot) => {
let mut new_snapshot = cur_snapshot.clone();
new_snapshot.parent_snapshot_id = Some(cur_snapshot.snapshot_id);
new_snapshot
}
None => Snapshot::default(),
};
new_snapshot.snapshot_id = next_snapshot_id;
new_snapshot.parent_snapshot_id = Some(cur_snapshot.snapshot_id);
new_snapshot.sequence_number = next_seq_number;
new_snapshot.timestamp_ms =
SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as i64;
new_snapshot.manifest_list = manifest_list_path;
new_snapshot.schema_id = Some(cur_metadata.current_schema_id as i64);

// TODO: Add operations
Ok(new_snapshot)
Expand Down
33 changes: 18 additions & 15 deletions icelake/src/types/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{Error, Table};

pub(crate) const UNASSIGNED_SEQ_NUM: i64 = -1;
const MAIN_BRANCH: &str = "main";
const EMPTY_SNAPSHOT_ID: i64 = -1;

/// All data types are either primitives or nested types, which are maps, lists, or structs.
#[derive(Debug, PartialEq, Clone, Eq)]
Expand Down Expand Up @@ -1632,7 +1633,7 @@ impl ToString for DataFileFormat {
}

/// Snapshot of contains all data of a table at a point in time.
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone, Default)]
pub struct Snapshot {
/// A unique long ID
pub snapshot_id: i64,
Expand Down Expand Up @@ -1926,22 +1927,24 @@ impl TableMetadata {
}

/// Current schema.
pub fn current_snapshot(&self) -> Result<&Snapshot> {
pub fn current_snapshot(&self) -> Result<Option<&Snapshot>> {
if let (Some(snapshots), Some(snapshot_id)) = (&self.snapshots, self.current_snapshot_id) {
snapshots
.iter()
.find(|s| s.snapshot_id == snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::IcebergDataInvalid,
format!("Snapshot id {snapshot_id} not found!"),
)
})
} else {
Err(Error::new(
ErrorKind::IcebergDataInvalid,
"Current snapshot missing!",
if snapshot_id == EMPTY_SNAPSHOT_ID {
return Ok(None);
}
Ok(Some(
snapshots
.iter()
.find(|s| s.snapshot_id == snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::IcebergDataInvalid,
format!("Snapshot id {snapshot_id} not found!"),
)
})?,
))
} else {
Ok(None)
}
}

Expand Down
10 changes: 0 additions & 10 deletions tests/integration/python/init.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from pyspark.sql import SparkSession
import csv
import argparse


Expand All @@ -24,15 +23,6 @@ def check(args):
print(f"Executing sql: {sql}")
spark.sql(sql)

with open(args.file, newline='') as insert_csv_file:
inserts = ", ".join([f"""({row[0]}, "{row[1]}", {row[2]})""" for row in csv.reader(insert_csv_file)])
sql = f"""
INSERT INTO s1.t1 VALUES {inserts}
"""
print(f"Executing sql: {sql}")
spark.sql(sql)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Test icelake with spark')
parser.add_argument('-s', dest='sparkurl', type=str, help='Spark remote url')
Expand Down
Loading