Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Append/Overwrite API to accept snapshot properties #419

Merged
merged 13 commits into from
Mar 19, 2024
Merged
14 changes: 14 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,20 @@ table = table.transaction().remove_properties("abc").commit_transaction()
assert table.properties == {}
```

## Snapshot properties

Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API:

```python
tbl.append(df, snapshot_properties={"abc": "def"})

# or

tbl.overwrite(df, snapshot_properties={"abc": "def"})

assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
```

## Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
Expand Down
49 changes: 31 additions & 18 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,12 +1001,13 @@ def name_mapping(self) -> NameMapping:
else:
return create_mapping_from_schema(self.schema())

def append(self, df: pa.Table) -> None:
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand API for appending a PyArrow table to the table.

Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
import pyarrow as pa
Expand All @@ -1019,21 +1020,27 @@ def append(self, df: pa.Table) -> None:
if len(self.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")

with self.update_snapshot().fast_append() as update_snapshot:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
for data_file in data_files:
update_snapshot.append_data_file(data_file)
merge = _MergingSnapshotProducer(operation=Operation.APPEND, table=self)

def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
for data_file in data_files:
merge.append_data_file(data_file)

merge.commit(snapshot_properties)

def overwrite(
self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
) -> None:
"""
Shorthand for overwriting the table with a PyArrow table.

Args:
df: The Arrow dataframe that will be used to overwrite the table
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
import pyarrow as pa
Expand All @@ -1049,12 +1056,18 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
if len(self.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")

with self.update_snapshot().overwrite() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
for data_file in data_files:
update_snapshot.append_data_file(data_file)
merge = _MergingSnapshotProducer(
operation=Operation.OVERWRITE if self.current_snapshot() is not None else Operation.APPEND,
table=self,
)

# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
for data_file in data_files:
merge.append_data_file(data_file)

merge.commit(snapshot_properties)

def refs(self) -> Dict[str, SnapshotRef]:
"""Return the snapshot references in the table."""
Expand Down Expand Up @@ -2456,7 +2469,7 @@ def _write_delete_manifest() -> List[ManifestFile]:

return added_manifests.result() + delete_manifests.result() + existing_manifests.result()

def _summary(self) -> Summary:
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
ssc = SnapshotSummaryCollector()

for data_file in self._added_data_files:
Expand All @@ -2465,16 +2478,16 @@ def _summary(self) -> Summary:
previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None

return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build()),
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=self._operation == Operation.OVERWRITE,
)

def commit(self) -> Snapshot:
def commit(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Snapshot:
new_manifests = self._manifests()
next_sequence_number = self._table.next_sequence_number()

summary = self._summary()
summary = self._summary(snapshot_properties)

manifest_list_file_path = _generate_manifest_list_path(
location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid
Expand Down
66 changes: 66 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
Expand Down Expand Up @@ -671,3 +672,68 @@ def test_commit_table_properties(
updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}


@mock_aws
def test_commit_append_table_snapshot_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)

assert test_catalog._parse_metadata_version(table.metadata_location) == 0

table.append(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 1, "baz": False}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
)

updated_table_metadata = table.metadata
summary = updated_table_metadata.snapshots[-1].summary
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert summary is not None
assert summary["snapshot_prop_a"] == "test_prop_a"


@mock_aws
def test_commit_overwrite_table_snapshot_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)

assert test_catalog._parse_metadata_version(table.metadata_location) == 0

table.append(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 1, "baz": False}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
)

assert test_catalog._parse_metadata_version(table.metadata_location) == 1

table.overwrite(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 2, "baz": True}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_b": "test_prop_b"},
)

updated_table_metadata = table.metadata
summary = updated_table_metadata.snapshots[-1].summary
assert test_catalog._parse_metadata_version(table.metadata_location) == 2
assert summary is not None
assert summary["snapshot_prop_a"] is None
assert summary["snapshot_prop_b"] == "test_prop_b"
Loading