Skip to content

Commit

Permalink
Move get_manifests_schema and get_all_manifests_schema to InspectTabl…
Browse files Browse the repository at this point in the history
…e class
  • Loading branch information
soumya-ghosh committed Oct 28, 2024
1 parent 73e9bc8 commit 044512e
Showing 1 changed file with 35 additions and 37 deletions.
72 changes: 35 additions & 37 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,6 @@
from pyiceberg.table import Table


def get_manifests_schema() -> "pa.Schema":
import pyarrow as pa

partition_summary_schema = pa.struct([
pa.field("contains_null", pa.bool_(), nullable=False),
pa.field("contains_nan", pa.bool_(), nullable=True),
pa.field("lower_bound", pa.string(), nullable=True),
pa.field("upper_bound", pa.string(), nullable=True),
])

manifest_schema = pa.schema([
pa.field("content", pa.int8(), nullable=False),
pa.field("path", pa.string(), nullable=False),
pa.field("length", pa.int64(), nullable=False),
pa.field("partition_spec_id", pa.int32(), nullable=False),
pa.field("added_snapshot_id", pa.int64(), nullable=False),
pa.field("added_data_files_count", pa.int32(), nullable=False),
pa.field("existing_data_files_count", pa.int32(), nullable=False),
pa.field("deleted_data_files_count", pa.int32(), nullable=False),
pa.field("added_delete_files_count", pa.int32(), nullable=False),
pa.field("existing_delete_files_count", pa.int32(), nullable=False),
pa.field("deleted_delete_files_count", pa.int32(), nullable=False),
pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False),
])
return manifest_schema


def get_all_manifests_schema() -> "pa.Schema":
import pyarrow as pa

all_manifests_schema = get_manifests_schema()
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
return all_manifests_schema


class InspectTable:
tbl: Table

Expand Down Expand Up @@ -362,6 +327,39 @@ def update_partitions_map(
schema=table_schema,
)

def _get_manifests_schema(self) -> "pa.Schema":
import pyarrow as pa

partition_summary_schema = pa.struct([
pa.field("contains_null", pa.bool_(), nullable=False),
pa.field("contains_nan", pa.bool_(), nullable=True),
pa.field("lower_bound", pa.string(), nullable=True),
pa.field("upper_bound", pa.string(), nullable=True),
])

manifest_schema = pa.schema([
pa.field("content", pa.int8(), nullable=False),
pa.field("path", pa.string(), nullable=False),
pa.field("length", pa.int64(), nullable=False),
pa.field("partition_spec_id", pa.int32(), nullable=False),
pa.field("added_snapshot_id", pa.int64(), nullable=False),
pa.field("added_data_files_count", pa.int32(), nullable=False),
pa.field("existing_data_files_count", pa.int32(), nullable=False),
pa.field("deleted_data_files_count", pa.int32(), nullable=False),
pa.field("added_delete_files_count", pa.int32(), nullable=False),
pa.field("existing_delete_files_count", pa.int32(), nullable=False),
pa.field("deleted_delete_files_count", pa.int32(), nullable=False),
pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False),
])
return manifest_schema

def _get_all_manifests_schema(self) -> "pa.Schema":
import pyarrow as pa

all_manifests_schema = self._get_manifests_schema()
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
return all_manifests_schema

def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table":
import pyarrow as pa

Expand Down Expand Up @@ -426,7 +424,7 @@ def _partition_summaries_to_rows(

return pa.Table.from_pylist(
manifests,
schema=get_all_manifests_schema() if is_all_manifests_table else get_manifests_schema(),
schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(),
)

def manifests(self) -> "pa.Table":
Expand Down Expand Up @@ -612,7 +610,7 @@ def all_manifests(self) -> "pa.Table":

snapshots = self.tbl.snapshots()
if not snapshots:
return pa.Table.from_pylist([], schema=get_all_manifests_schema())
return pa.Table.from_pylist([], schema=self._get_all_manifests_schema())

executor = ExecutorFactory.get_or_create()
manifests_by_snapshots: Iterator["pa.Table"] = executor.map(
Expand Down

0 comments on commit 044512e

Please sign in to comment.