Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Append/Overwrite API to accept snapshot properties #419

Merged
merged 13 commits into from
Mar 19, 2024
Merged
14 changes: 14 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,20 @@ table = table.transaction().remove_properties("abc").commit_transaction()
assert table.properties == {}
```

## Snapshot properties

Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API:

```python
tbl.append(df, snapshot_properties={"abc": "def"})

# or

tbl.overwrite(df, snapshot_properties={"abc": "def"})

assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
```

## Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
Expand Down
33 changes: 22 additions & 11 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,13 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
name_mapping=self._table.name_mapping(),
)

def update_snapshot(self) -> UpdateSnapshot:
def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
"""Create a new UpdateSnapshot to produce a new snapshot for the table.

Returns:
A new UpdateSnapshot
"""
return UpdateSnapshot(self, io=self._table.io)
return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)

def update_spec(self) -> UpdateSpec:
"""Create a new UpdateSpec to update the partitioning of the table.
Expand Down Expand Up @@ -1082,12 +1082,13 @@ def name_mapping(self) -> Optional[NameMapping]:
else:
return None

def append(self, df: pa.Table) -> None:
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand API for appending a PyArrow table to the table.

Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
import pyarrow as pa
Expand All @@ -1103,7 +1104,7 @@ def append(self, df: pa.Table) -> None:
_check_schema(self.schema(), other_schema=df.schema)

with self.transaction() as txn:
with txn.update_snapshot().fast_append() as update_snapshot:
with txn.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
Expand All @@ -1112,14 +1113,17 @@ def append(self, df: pa.Table) -> None:
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None:
def overwrite(
self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
) -> None:
"""
Shorthand for overwriting the table with a PyArrow table.

Args:
df: The Arrow dataframe that will be used to overwrite the table
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
import pyarrow as pa
Expand All @@ -1138,7 +1142,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
_check_schema(self.schema(), other_schema=df.schema)

with self.transaction() as txn:
with txn.update_snapshot().overwrite() as update_snapshot:
with txn.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
Expand Down Expand Up @@ -2488,6 +2492,7 @@ def __init__(
transaction: Transaction,
io: FileIO,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
super().__init__(transaction)
self.commit_uuid = commit_uuid or uuid.uuid4()
Expand All @@ -2499,6 +2504,7 @@ def __init__(
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
)
self._added_data_files = []
self.snapshot_properties = snapshot_properties

def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer:
self._added_data_files.append(data_file)
Expand Down Expand Up @@ -2566,7 +2572,7 @@ def _write_delete_manifest() -> List[ManifestFile]:

return added_manifests.result() + delete_manifests.result() + existing_manifests.result()

def _summary(self) -> Summary:
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
ssc = SnapshotSummaryCollector()

for data_file in self._added_data_files:
Expand All @@ -2579,7 +2585,7 @@ def _summary(self) -> Summary:
)

return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build()),
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=self._operation == Operation.OVERWRITE,
)
Expand All @@ -2588,7 +2594,7 @@ def _commit(self) -> UpdatesAndRequirements:
new_manifests = self._manifests()
next_sequence_number = self._transaction.table_metadata.next_sequence_number()

summary = self._summary()
summary = self._summary(self.snapshot_properties)

manifest_list_file_path = _generate_manifest_list_path(
location=self._transaction.table_metadata.location,
Expand Down Expand Up @@ -2703,13 +2709,17 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
class UpdateSnapshot:
_transaction: Transaction
_io: FileIO
_snapshot_properties: Dict[str, str]

def __init__(self, transaction: Transaction, io: FileIO) -> None:
def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str]) -> None:
self._transaction = transaction
self._io = io
self._snapshot_properties = snapshot_properties

def fast_append(self) -> FastAppendFiles:
return FastAppendFiles(operation=Operation.APPEND, transaction=self._transaction, io=self._io)
return FastAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)

def overwrite(self) -> OverwriteFiles:
return OverwriteFiles(
Expand All @@ -2718,6 +2728,7 @@ def overwrite(self) -> OverwriteFiles:
else Operation.APPEND,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
)


Expand Down
66 changes: 66 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
Expand Down Expand Up @@ -692,3 +693,68 @@ def test_commit_table_properties(
updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}


@mock_aws
def test_commit_append_table_snapshot_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)

assert test_catalog._parse_metadata_version(table.metadata_location) == 0

table.append(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 1, "baz": False}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
)

updated_table_metadata = table.metadata
summary = updated_table_metadata.snapshots[-1].summary
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert summary is not None
assert summary["snapshot_prop_a"] == "test_prop_a"


@mock_aws
def test_commit_overwrite_table_snapshot_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)

assert test_catalog._parse_metadata_version(table.metadata_location) == 0

table.append(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 1, "baz": False}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
)

assert test_catalog._parse_metadata_version(table.metadata_location) == 1

table.overwrite(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 2, "baz": True}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_b": "test_prop_b"},
)

updated_table_metadata = table.metadata
summary = updated_table_metadata.snapshots[-1].summary
assert test_catalog._parse_metadata_version(table.metadata_location) == 2
assert summary is not None
assert summary["snapshot_prop_a"] is None
assert summary["snapshot_prop_b"] == "test_prop_b"