From 2e32eec858b50bed4e900a51b1dfb771491e32d1 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Mon, 12 Feb 2024 03:39:52 +0000 Subject: [PATCH 1/8] added test for snapshot properties --- tests/catalog/test_glue.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 270d2251ba..3d2ccd6e42 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -23,6 +23,7 @@ from moto import mock_aws from pyiceberg.catalog.glue import GlueCatalog +from pyiceberg.table.snapshots import Snapshot, Summary, Operation from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, NamespaceNotEmptyError, @@ -639,3 +640,34 @@ def test_commit_table_properties( updated_table_metadata = table.metadata assert test_catalog._parse_metadata_version(table.metadata_location) == 1 assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + +@mock_aws +def test_commit_table_snapshot_properties( + _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested) + + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638573590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND, data={"test_a": "test_a"}), + schema_id=3, + ) + + transaction = table.transaction() + transaction.add_snapshot(new_snapshot) + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert test_catalog._parse_metadata_version(table.metadata_location) == 1 + assert updated_table_metadata.snapshots[-1].summary == new_snapshot.summary + From def79c8324c2e52106d2a1f989cf3b0c8908a7e8 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Tue, 13 Feb 2024 04:10:39 +0000 Subject: [PATCH 2/8] change append/overwrite to accept snapshot_properties --- pyiceberg/table/__init__.py | 16 ++++++++-------- tests/catalog/test_glue.py | 28 +++++++++++----------------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index a87435fcfb..f832476187 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -974,7 +974,7 @@ def name_mapping(self) -> NameMapping: else: return create_mapping_from_schema(self.schema()) - def append(self, df: pa.Table) -> None: + def append(self, df: pa.Table, **snapshot_properties) -> None: """ Append data to the table. @@ -1000,9 +1000,9 @@ def append(self, df: pa.Table) -> None: for data_file in data_files: merge.append_data_file(data_file) - merge.commit() + merge.commit(**snapshot_properties) - def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None: + def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, **snapshot_properties) -> None: """ Overwrite all the data in the table. @@ -1036,7 +1036,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T for data_file in data_files: merge.append_data_file(data_file) - merge.commit() + merge.commit(**snapshot_properties) def refs(self) -> Dict[str, SnapshotRef]: """Return the snapshot references in the table.""" @@ -2474,7 +2474,7 @@ def _fetch_existing_manifests() -> List[ManifestFile]: return added_manifests.result() + delete_manifests.result() + existing_manifests.result() - def _summary(self) -> Summary: + def _summary(self, **snapshot_properties) -> Summary: ssc = SnapshotSummaryCollector() for data_file in self._added_data_files: @@ -2483,16 +2483,16 @@ def _summary(self) -> Summary: previous_snapshot = self._table.snapshot_by_id(self._parent_snapshot_id) if self._parent_snapshot_id is not None else None return update_snapshot_summaries( - summary=Summary(operation=self._operation, **ssc.build()), + summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties), previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, truncate_full_table=self._operation == Operation.OVERWRITE, ) - def commit(self) -> Snapshot: + def commit(self, **snapshot_properties) -> Snapshot: new_manifests = self._manifests() next_sequence_number = self._table.next_sequence_number() - summary = self._summary() + summary = self._summary(**snapshot_properties) manifest_list_file_path = _generate_manifest_list_path( location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index d6706b7faa..aa4edc6411 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -23,7 +23,6 @@ from moto import mock_aws from pyiceberg.catalog.glue import GlueCatalog -from pyiceberg.table.snapshots import Snapshot, Summary, Operation from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, NamespaceNotEmptyError, @@ -33,6 +32,7 @@ NoSuchTableError, TableAlreadyExistsError, ) +from pyiceberg.io.pyarrow import schema_to_pyarrow from pyiceberg.schema import Schema from pyiceberg.types import IntegerType from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX @@ -675,31 +675,25 @@ def test_commit_table_properties( @mock_aws def test_commit_table_snapshot_properties( - _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str + _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str ) -> None: catalog_name = "glue" identifier = (database_name, table_name) test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) test_catalog.create_namespace(namespace=database_name) - table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested) + table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple) assert test_catalog._parse_metadata_version(table.metadata_location) == 0 - new_snapshot = Snapshot( - snapshot_id=25, - parent_snapshot_id=19, - sequence_number=200, - timestamp_ms=1602638573590, - manifest_list="s3:/a/b/c.avro", - summary=Summary(Operation.APPEND, data={"test_a": "test_a"}), - schema_id=3, + table.append( + pa.Table.from_pylist( + [{"foo": "foo_val", "bar": 1, "baz": False}], + schema=schema_to_pyarrow(table_schema_simple), + ), + snapshot_prop_a="test_prop_a", ) - transaction = table.transaction() - transaction.add_snapshot(new_snapshot) - transaction.commit_transaction() - updated_table_metadata = table.metadata assert test_catalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.snapshots[-1].summary == new_snapshot.summary - + assert updated_table_metadata.snapshots[-1].summary.get("snapshot_prop_a") == "test_prop_a" + From ec60292054249b1207a03ac57804d67d776d8a22 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Tue, 13 Feb 2024 09:57:33 -0500 Subject: [PATCH 3/8] Update tests/catalog/test_glue.py Co-authored-by: Fokko Driesprong --- tests/catalog/test_glue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index aa4edc6411..29b874c736 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -695,5 +695,5 @@ def test_commit_table_snapshot_properties( updated_table_metadata = table.metadata assert test_catalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.snapshots[-1].summary.get("snapshot_prop_a") == "test_prop_a" + assert updated_table_metadata.snapshots[-1].summary["snapshot_prop_a"] == "test_prop_a" From 07802fca933000bb667a4a0030736ab673972a6c Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Tue, 13 Feb 2024 10:03:51 -0500 Subject: [PATCH 4/8] Update pyiceberg/table/__init__.py Co-authored-by: Fokko Driesprong --- pyiceberg/table/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f832476187..1be9a51b27 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -974,7 +974,7 @@ def name_mapping(self) -> NameMapping: else: return create_mapping_from_schema(self.schema()) - def append(self, df: pa.Table, **snapshot_properties) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Append data to the table. From 7f45c8dc9681ae971a4b952d933728e4cb0ecb52 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Tue, 13 Feb 2024 15:52:48 +0000 Subject: [PATCH 5/8] updated docs,docstrings --- mkdocs/docs/api.md | 14 +++++++++++++ pyiceberg/table/__init__.py | 14 +++++++------ tests/catalog/test_glue.py | 40 +++++++++++++++++++++++++++++++++++-- 3 files changed, 60 insertions(+), 8 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 53801922fc..05f73aecb5 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -446,6 +446,20 @@ table = table.transaction().remove_properties("abc").commit_transaction() assert table.properties == {} ``` +## Snapshot properties + +Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API: + +```python +tbl.append(df, snapshot_properties={"abc": "def"}) + +or + +tbl.overwrite(df, snapshot_properties={"abc": "def"}) + +assert tbl.metadata.snapshots[-1].summary["abc"] == "def" +``` + ## Query the data To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 1be9a51b27..70027e7193 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -980,6 +980,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) Args: df: The Arrow dataframe that will be appended to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary """ try: import pyarrow as pa @@ -1000,9 +1001,9 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) for data_file in data_files: merge.append_data_file(data_file) - merge.commit(**snapshot_properties) + merge.commit(snapshot_properties) - def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, **snapshot_properties) -> None: + def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Overwrite all the data in the table. @@ -1010,6 +1011,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T df: The Arrow dataframe that will be used to overwrite the table overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite + snapshot_properties: Custom properties to be added to the snapshot summary """ try: import pyarrow as pa @@ -1036,7 +1038,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T for data_file in data_files: merge.append_data_file(data_file) - merge.commit(**snapshot_properties) + merge.commit(snapshot_properties) def refs(self) -> Dict[str, SnapshotRef]: """Return the snapshot references in the table.""" @@ -2474,7 +2476,7 @@ def _fetch_existing_manifests() -> List[ManifestFile]: return added_manifests.result() + delete_manifests.result() + existing_manifests.result() - def _summary(self, **snapshot_properties) -> Summary: + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: ssc = SnapshotSummaryCollector() for data_file in self._added_data_files: @@ -2488,11 +2490,11 @@ def _summary(self, **snapshot_properties) -> Summary: truncate_full_table=self._operation == Operation.OVERWRITE, ) - def commit(self, **snapshot_properties) -> Snapshot: + def commit(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Snapshot: new_manifests = self._manifests() next_sequence_number = self._table.next_sequence_number() - summary = self._summary(**snapshot_properties) + summary = self._summary(snapshot_properties) manifest_list_file_path = _generate_manifest_list_path( location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 29b874c736..451f4934bf 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -674,7 +674,7 @@ def test_commit_table_properties( assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} @mock_aws -def test_commit_table_snapshot_properties( +def test_commit_append_table_snapshot_properties( _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str ) -> None: catalog_name = "glue" @@ -690,10 +690,46 @@ def test_commit_table_snapshot_properties( [{"foo": "foo_val", "bar": 1, "baz": False}], schema=schema_to_pyarrow(table_schema_simple), ), - snapshot_prop_a="test_prop_a", + snapshot_properties={"snapshot_prop_a":"test_prop_a"}, ) updated_table_metadata = table.metadata assert test_catalog._parse_metadata_version(table.metadata_location) == 1 assert updated_table_metadata.snapshots[-1].summary["snapshot_prop_a"] == "test_prop_a" + +@mock_aws +def test_commit_overwrite_table_snapshot_properties( + _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple) + + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + + table.append( + pa.Table.from_pylist( + [{"foo": "foo_val", "bar": 1, "baz": False}], + schema=schema_to_pyarrow(table_schema_simple), + ), + snapshot_properties={"snapshot_prop_a":"test_prop_a"}, + ) + + assert test_catalog._parse_metadata_version(table.metadata_location) == 1 + + table.overwrite( + pa.Table.from_pylist( + [{"foo": "foo_val", "bar": 2, "baz": True}], + schema=schema_to_pyarrow(table_schema_simple), + ), + snapshot_properties={"snapshot_prop_b":"test_prop_b"}, + ) + + updated_table_metadata = table.metadata + assert test_catalog._parse_metadata_version(table.metadata_location) == 2 + print(updated_table_metadata.snapshots[-1].summary.additional_properties) + assert updated_table_metadata.snapshots[-1].summary["snapshot_prop_a"] is None + assert updated_table_metadata.snapshots[-1].summary["snapshot_prop_b"] == "test_prop_b" From 478627ba55c317129617ed3c9527a03d335a886e Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Wed, 14 Feb 2024 15:32:58 +0000 Subject: [PATCH 6/8] fix linting --- mkdocs/docs/api.md | 2 +- pyiceberg/table/__init__.py | 4 +++- tests/catalog/test_glue.py | 22 +++++++++++++--------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 05f73aecb5..d43d929c50 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -453,7 +453,7 @@ Optionally, Snapshot properties can be set while writing to a table using `appen ```python tbl.append(df, snapshot_properties={"abc": "def"}) -or +or tbl.overwrite(df, snapshot_properties={"abc": "def"}) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 70027e7193..ff01076c84 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1003,7 +1003,9 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) merge.commit(snapshot_properties) - def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def overwrite( + self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + ) -> None: """ Overwrite all the data in the table. diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 451f4934bf..78edd0e2ac 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -673,6 +673,7 @@ def test_commit_table_properties( assert test_catalog._parse_metadata_version(table.metadata_location) == 1 assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + @mock_aws def test_commit_append_table_snapshot_properties( _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str @@ -690,12 +691,15 @@ def test_commit_append_table_snapshot_properties( [{"foo": "foo_val", "bar": 1, "baz": False}], schema=schema_to_pyarrow(table_schema_simple), ), - snapshot_properties={"snapshot_prop_a":"test_prop_a"}, + snapshot_properties={"snapshot_prop_a": "test_prop_a"}, ) updated_table_metadata = table.metadata + summary = updated_table_metadata.snapshots[-1].summary assert test_catalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.snapshots[-1].summary["snapshot_prop_a"] == "test_prop_a" + assert summary is not None + assert summary["snapshot_prop_a"] == "test_prop_a" + @mock_aws def test_commit_overwrite_table_snapshot_properties( @@ -714,9 +718,9 @@ def test_commit_overwrite_table_snapshot_properties( [{"foo": "foo_val", "bar": 1, "baz": False}], schema=schema_to_pyarrow(table_schema_simple), ), - snapshot_properties={"snapshot_prop_a":"test_prop_a"}, + snapshot_properties={"snapshot_prop_a": "test_prop_a"}, ) - + assert test_catalog._parse_metadata_version(table.metadata_location) == 1 table.overwrite( @@ -724,12 +728,12 @@ def test_commit_overwrite_table_snapshot_properties( [{"foo": "foo_val", "bar": 2, "baz": True}], schema=schema_to_pyarrow(table_schema_simple), ), - snapshot_properties={"snapshot_prop_b":"test_prop_b"}, + snapshot_properties={"snapshot_prop_b": "test_prop_b"}, ) updated_table_metadata = table.metadata + summary = updated_table_metadata.snapshots[-1].summary assert test_catalog._parse_metadata_version(table.metadata_location) == 2 - print(updated_table_metadata.snapshots[-1].summary.additional_properties) - assert updated_table_metadata.snapshots[-1].summary["snapshot_prop_a"] is None - assert updated_table_metadata.snapshots[-1].summary["snapshot_prop_b"] == "test_prop_b" - + assert summary is not None + assert summary["snapshot_prop_a"] is None + assert summary["snapshot_prop_b"] == "test_prop_b" From ddfd751afbaf9937158a02b6fb9ba068cf57d994 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Wed, 14 Feb 2024 12:29:44 -0500 Subject: [PATCH 7/8] Update mkdocs/docs/api.md Co-authored-by: Fokko Driesprong --- mkdocs/docs/api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index d43d929c50..1f2c503dbe 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -453,7 +453,7 @@ Optionally, Snapshot properties can be set while writing to a table using `appen ```python tbl.append(df, snapshot_properties={"abc": "def"}) -or +# or tbl.overwrite(df, snapshot_properties={"abc": "def"}) From 76b33c8105e6dd57519d6796da2f1b39502860a8 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Tue, 19 Mar 2024 09:03:33 -0400 Subject: [PATCH 8/8] Update pyiceberg/table/__init__.py Co-authored-by: Fokko Driesprong --- pyiceberg/table/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6706baa4fa..39e66dd063 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2491,8 +2491,8 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, - snapshot_properties: Dict[str, str], commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4()