Skip to content

Commit

Permalink
Add all_manifests metadata table with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
soumya-ghosh committed Oct 20, 2024
1 parent ff3a249 commit 73e9bc8
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 28 deletions.
89 changes: 61 additions & 28 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table.snapshots import Snapshot, ancestors_of
from pyiceberg.types import PrimitiveType
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.singleton import _convert_to_hashable_type

if TYPE_CHECKING:
Expand All @@ -32,6 +33,41 @@
from pyiceberg.table import Table


def get_manifests_schema() -> "pa.Schema":
import pyarrow as pa

partition_summary_schema = pa.struct([
pa.field("contains_null", pa.bool_(), nullable=False),
pa.field("contains_nan", pa.bool_(), nullable=True),
pa.field("lower_bound", pa.string(), nullable=True),
pa.field("upper_bound", pa.string(), nullable=True),
])

manifest_schema = pa.schema([
pa.field("content", pa.int8(), nullable=False),
pa.field("path", pa.string(), nullable=False),
pa.field("length", pa.int64(), nullable=False),
pa.field("partition_spec_id", pa.int32(), nullable=False),
pa.field("added_snapshot_id", pa.int64(), nullable=False),
pa.field("added_data_files_count", pa.int32(), nullable=False),
pa.field("existing_data_files_count", pa.int32(), nullable=False),
pa.field("deleted_data_files_count", pa.int32(), nullable=False),
pa.field("added_delete_files_count", pa.int32(), nullable=False),
pa.field("existing_delete_files_count", pa.int32(), nullable=False),
pa.field("deleted_delete_files_count", pa.int32(), nullable=False),
pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False),
])
return manifest_schema


def get_all_manifests_schema() -> "pa.Schema":
import pyarrow as pa

all_manifests_schema = get_manifests_schema()
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
return all_manifests_schema


class InspectTable:
tbl: Table

Expand Down Expand Up @@ -326,31 +362,9 @@ def update_partitions_map(
schema=table_schema,
)

def manifests(self) -> "pa.Table":
def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table":
import pyarrow as pa

partition_summary_schema = pa.struct([
pa.field("contains_null", pa.bool_(), nullable=False),
pa.field("contains_nan", pa.bool_(), nullable=True),
pa.field("lower_bound", pa.string(), nullable=True),
pa.field("upper_bound", pa.string(), nullable=True),
])

manifest_schema = pa.schema([
pa.field("content", pa.int8(), nullable=False),
pa.field("path", pa.string(), nullable=False),
pa.field("length", pa.int64(), nullable=False),
pa.field("partition_spec_id", pa.int32(), nullable=False),
pa.field("added_snapshot_id", pa.int64(), nullable=False),
pa.field("added_data_files_count", pa.int32(), nullable=False),
pa.field("existing_data_files_count", pa.int32(), nullable=False),
pa.field("deleted_data_files_count", pa.int32(), nullable=False),
pa.field("added_delete_files_count", pa.int32(), nullable=False),
pa.field("existing_delete_files_count", pa.int32(), nullable=False),
pa.field("deleted_delete_files_count", pa.int32(), nullable=False),
pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False),
])

def _partition_summaries_to_rows(
spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary]
) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -386,11 +400,11 @@ def _partition_summaries_to_rows(

specs = self.tbl.metadata.specs()
manifests = []
if snapshot := self.tbl.metadata.current_snapshot():
if snapshot:
for manifest in snapshot.manifests(self.tbl.io):
is_data_file = manifest.content == ManifestContent.DATA
is_delete_file = manifest.content == ManifestContent.DELETES
manifests.append({
manifest_row = {
"content": manifest.content,
"path": manifest.manifest_path,
"length": manifest.manifest_length,
Expand All @@ -405,13 +419,19 @@ def _partition_summaries_to_rows(
"partition_summaries": _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
if manifest.partitions
else [],
})
}
if is_all_manifests_table:
manifest_row["reference_snapshot_id"] = snapshot.snapshot_id
manifests.append(manifest_row)

return pa.Table.from_pylist(
manifests,
schema=manifest_schema,
schema=get_all_manifests_schema() if is_all_manifests_table else get_manifests_schema(),
)

def manifests(self) -> "pa.Table":
return self._generate_manifests_table(self.tbl.current_snapshot())

def metadata_log_entries(self) -> "pa.Table":
import pyarrow as pa

Expand Down Expand Up @@ -586,3 +606,16 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":

def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})

def all_manifests(self) -> "pa.Table":
import pyarrow as pa

snapshots = self.tbl.snapshots()
if not snapshots:
return pa.Table.from_pylist([], schema=get_all_manifests_schema())

executor = ExecutorFactory.get_or_create()
manifests_by_snapshots: Iterator["pa.Table"] = executor.map(
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
)
return pa.concat_tables(manifests_by_snapshots)
92 changes: 92 additions & 0 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,3 +846,95 @@ def inspect_files_asserts(df: pa.Table) -> None:
inspect_files_asserts(files_df)
inspect_files_asserts(data_files_df)
inspect_files_asserts(delete_files_df)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = "default.table_metadata_all_manifests"
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

spark.sql(
f"""
CREATE TABLE {identifier} (
id int,
data string
)
PARTITIONED BY (data)
TBLPROPERTIES ('write.update.mode'='merge-on-read',
'write.delete.mode'='merge-on-read')
"""
)
tbl = session_catalog.load_table(identifier)

# check all_manifests when there are no snapshots
lhs = tbl.inspect.all_manifests().to_pandas()
rhs = spark.table(f"{identifier}.all_manifests").toPandas()
assert lhs.empty
assert rhs.empty

spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')")

spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')")

spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1")

spark.sql(f"DELETE FROM {identifier} WHERE id = 2")

spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')")

df = tbl.inspect.all_manifests()

assert df.column_names == [
"content",
"path",
"length",
"partition_spec_id",
"added_snapshot_id",
"added_data_files_count",
"existing_data_files_count",
"deleted_data_files_count",
"added_delete_files_count",
"existing_delete_files_count",
"deleted_delete_files_count",
"partition_summaries",
"reference_snapshot_id",
]

int_cols = [
"content",
"length",
"partition_spec_id",
"added_snapshot_id",
"added_data_files_count",
"existing_data_files_count",
"deleted_data_files_count",
"added_delete_files_count",
"existing_delete_files_count",
"deleted_delete_files_count",
"reference_snapshot_id",
]

for column in int_cols:
for value in df[column]:
assert isinstance(value.as_py(), int)

for value in df["path"]:
assert isinstance(value.as_py(), str)

for value in df["partition_summaries"]:
assert isinstance(value.as_py(), list)
for row in value:
assert isinstance(row["contains_null"].as_py(), bool)
assert isinstance(row["contains_nan"].as_py(), (bool, type(None)))
assert isinstance(row["lower_bound"].as_py(), (str, type(None)))
assert isinstance(row["upper_bound"].as_py(), (str, type(None)))

lhs = spark.table(f"{identifier}.all_manifests").toPandas()
rhs = df.to_pandas()
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
assert left == right, f"Difference in column {column}: {left} != {right}"

0 comments on commit 73e9bc8

Please sign in to comment.