Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add argument to ManifestWriter to specify where data is written #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 28 additions & 24 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,36 +253,40 @@ mod tests {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec();
let manifest_file_output = self.next_manifest_file();

// Write data files
let data_file_manifest = ManifestWriter::new(
self.next_manifest_file(),
manifest_file_output.location().to_string(),
current_snapshot.snapshot_id(),
vec![],
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build()],
))
.write(
manifest_file_output,
Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build()],
),
)
.await
.unwrap();

Expand Down
122 changes: 63 additions & 59 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -980,72 +980,76 @@ mod tests {
.unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec();
let manifest_file_output = self.next_manifest_file();

// Write data files
let data_file_manifest = ManifestWriter::new(
self.next_manifest_file(),
manifest_file_output.location().to_string(),
current_snapshot.snapshot_id(),
vec![],
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.write(
manifest_file_output,
Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap(),
)
.build(),
],
))
vec![
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap(),
)
.build(),
],
),
)
.await
.unwrap();

Expand Down
41 changes: 27 additions & 14 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Manifest {

/// A manifest writer.
pub struct ManifestWriter {
output: OutputFile,
manifest_path: String,

snapshot_id: i64,

Expand All @@ -133,9 +133,9 @@ pub struct ManifestWriter {

impl ManifestWriter {
/// Create a new manifest writer.
pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Vec<u8>) -> Self {
pub fn new(manifest_path: String, snapshot_id: i64, key_metadata: Vec<u8>) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a difference between this new manifest_path and what is contained within the location for an OutputFile already?

In my mind the manifest path "metadata" and where the serialisation actually occurs (e.g. in S3) will always be the same. I.e. we write to s3://metadata/.../v1.table-metadata.json, which is both the place where it is serialised to a file and a metadata pointer of "where is this file"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind the manifest path "metadata" and where the serialisation actually occurs (e.g. in S3) will always be the same. I.e. we write to s3://metadata/.../v1.table-metadata.json, which is both the place where it is serialised to a file and a metadata pointer of "where is this file"

So - yes. As part of the spec that is a requirement; and location is both where the data gets serialized to and is included in the returned ManifestFile.

The problem here is that iceberg-rust enforces that by passing in the OutputFile to a new ManifestWriter; which the writer then puts in the ManifestFile returned by writer.write(manifest) this is the coupling of serialization and building. Because of this, the user cannot build a metadata object (create the Rust type of that object) with location = /tmp/foo.avro and serialize it (write that Rust type as bytes) to /tmp/bar.avro. The iceberg-rust crate handles that for you. So if I pass in an OutputFile with location /tmp/foo.avro to a ManifestWriter that's where the ManifestFile is being serialized to whether I like it or not.

This is why we can't use ObjectStore at the moment. If we were to build/serialize the bytes on a node A and then write those bytes to node B, and query them from node B; the queries will fail because the metadata that has been written to node B will point to metadata on node A. Because that's where the building/serialization took place. And those take place together because the iceberg-rust crate has coupled them together.

IMO, Building/serialization needs to be be made completely separate in the iceberg-rust crate. As an example, for ManifestFile you would ideally have some method, say ManifestFile::to_bytes(self) -> Bytes that consumes the ManifestFile and serializes it. Where those bytes go is entirely up to the user, _but the user is required to uphold the invariant that where those bytes get written to match the manifest_path for the ManifestFile that got serialized. However the maintainers seem keen on preserving that coupling; but want to change the FileIO type to be a trait so that users can provide their own object_store/storage implementations.

Part of me wonders if we should try to convince them to separate things out better; but I'm not sure how worthy that is given that a FileIO trait would allow us to implement that trait for Arc<dyn ObjectStore>, which should meet the design goal of using ObjectStore.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that iceberg-rust enforces that by passing in the OutputFile to a new ManifestWriter

Note: This PR removes that enforcement - hence why I think having a discussion about it first on this PR is important.

Self {
output,
manifest_path,
snapshot_id,
added_files: 0,
added_rows: 0,
Expand Down Expand Up @@ -204,7 +204,7 @@ impl ManifestWriter {
}

/// Write a manifest.
pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
pub async fn write(mut self, dest: OutputFile, manifest: Manifest) -> Result<ManifestFile> {
// Create the avro writer
let partition_type = manifest
.metadata
Expand Down Expand Up @@ -299,13 +299,13 @@ impl ManifestWriter {

let content = avro_writer.into_inner()?;
let length = content.len();
self.output.write(Bytes::from(content)).await?;
dest.write(Bytes::from(content)).await?;

let partition_summary =
self.get_field_summary_vec(manifest.metadata.partition_spec.fields());

Ok(ManifestFile {
manifest_path: self.output.location().to_string(),
manifest_path: self.manifest_path,
manifest_length: length as i64,
partition_spec_id: manifest.metadata.partition_spec.spec_id(),
content: manifest.metadata.content,
Expand Down Expand Up @@ -1621,7 +1621,9 @@ mod tests {
]
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(output_file.location().to_string(), 1, vec![])
};

test_manifest_read_write(manifest, writer).await;
}
Expand Down Expand Up @@ -1787,7 +1789,9 @@ mod tests {
})],
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(output_file.location().to_string(), 1, vec![])
};

let res = test_manifest_read_write(manifest, writer).await;

Expand Down Expand Up @@ -1854,8 +1858,13 @@ mod tests {
})],
};

let writer =
|output_file: OutputFile| ManifestWriter::new(output_file, 2966623707104393227, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(
output_file.location().to_string(),
2966623707104393227,
vec![],
)
};

test_manifest_read_write(manifest, writer).await;
}
Expand Down Expand Up @@ -1940,7 +1949,9 @@ mod tests {
]
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(output_file.location().to_string(), 1, vec![])
};

let entry = test_manifest_read_write(manifest, writer).await;

Expand Down Expand Up @@ -2014,7 +2025,9 @@ mod tests {
})],
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(output_file.location().to_string(), 1, vec![])
};

let (avro_bytes, _) = write_manifest(&manifest, writer).await;

Expand Down Expand Up @@ -2107,8 +2120,8 @@ mod tests {
let path = temp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let writer = writer_builder(output_file);
let res = writer.write(manifest.clone()).await.unwrap();
let writer = writer_builder(output_file.clone());
let res = writer.write(output_file, manifest.clone()).await.unwrap();

// Verify manifest
(fs::read(path).expect("read_file must succeed"), res)
Expand Down