From e6b0d360e1a08abe26eae38f52cce0930ba9a253 Mon Sep 17 00:00:00 2001 From: Piotr Skrydalewicz Date: Fri, 15 Nov 2024 15:13:26 +0100 Subject: [PATCH] More tests --- .../ingestion/source/iceberg/iceberg.py | 3 - metadata-ingestion/tests/unit/test_iceberg.py | 193 +++++++++++++++++- 2 files changed, 182 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index 31ddb86bed87a..a72d09bc75f78 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -84,9 +84,6 @@ logging.WARNING ) -# allows for printing better stacks in case of a crash, doesn't work well with pytest -# faulthandler.enable() - @platform_name("Iceberg") @support_status(SupportStatus.TESTING) diff --git a/metadata-ingestion/tests/unit/test_iceberg.py b/metadata-ingestion/tests/unit/test_iceberg.py index 1286543440d48..f0d4863d46f57 100644 --- a/metadata-ingestion/tests/unit/test_iceberg.py +++ b/metadata-ingestion/tests/unit/test_iceberg.py @@ -1,10 +1,12 @@ import uuid from decimal import Decimal -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple +from unittest import TestCase from unittest.mock import patch import pytest from pydantic import ValidationError +from pyiceberg.exceptions import NoSuchIcebergTableError, NoSuchPropertyException from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema @@ -528,7 +530,7 @@ def test_avro_decimal_bytes_nullable() -> None: class MockCatalog: - def __init__(self, tables: Dict[str, Dict[str, Table]]): + def __init__(self, tables: Dict[str, Dict[str, Callable[[], Table]]]): """ :param tables: Dictionary containing namespaces as keys and dictionaries containing names of tables (keys) and @@ -543,14 +545,7 @@ def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]: return [(namespace, table) for table in self.tables[namespace].keys()] def load_table(self, dataset_path: Tuple[str, str]) -> Table: - return self.tables[dataset_path[0]][dataset_path[1]] - - -def get_mock_catalog(tables): - def _get_catalog(): - return MockCatalog(tables) - - return _get_catalog + return self.tables[dataset_path[0]][dataset_path[1]]() def test_proper_run_with_multiple_namespaces() -> None: @@ -558,7 +553,7 @@ def test_proper_run_with_multiple_namespaces() -> None: mock_catalog = MockCatalog( { "namespaceA": { - "table1": Table( + "table1": lambda: Table( identifier=("namespaceA", "table1"), metadata=TableMetadataV2( partition_specs=[PartitionSpec(spec_id=0)], @@ -587,3 +582,179 @@ def test_proper_run_with_multiple_namespaces() -> None: snapshot.urn == "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)" ) + + +def test_handle_expected_exceptions() -> None: + source = with_iceberg_source(processing_threads=3) + + def _raise_no_such_property_exception(): + raise NoSuchPropertyException() + + def _raise_no_such_table_exception(): + raise NoSuchIcebergTableError() + + mock_catalog = MockCatalog( + { + "namespaceA": { + "table1": lambda: Table( + identifier=("namespaceA", "table1"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table1", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table1", + io=PyArrowFileIO(), + catalog=None, + ), + "table2": lambda: Table( + identifier=("namespaceA", "table2"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table2", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table2", + io=PyArrowFileIO(), + catalog=None, + ), + "table3": lambda: Table( + identifier=("namespaceA", "table3"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table3", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table3", + io=PyArrowFileIO(), + catalog=None, + ), + "table4": lambda: Table( + identifier=("namespaceA", "table4"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table4", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table4", + io=PyArrowFileIO(), + catalog=None, + ), + "table5": _raise_no_such_property_exception, + "table6": _raise_no_such_table_exception, + } + } + ) + with patch( + "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" + ) as get_catalog: + get_catalog.return_value = mock_catalog + wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] + assert len(wu) == 4 + urns = [] + for unit in wu: + assert isinstance(unit.metadata, MetadataChangeEvent) + assert isinstance(unit.metadata.proposedSnapshot, DatasetSnapshotClass) + urns.append(unit.metadata.proposedSnapshot.urn) + TestCase().assertCountEqual( + urns, + [ + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)", + ], + ) + assert source.report.warnings.total_elements == 2 + assert source.report.failures.total_elements == 0 + assert source.report.tables_scanned == 4 + + +def test_handle_unexpected_exceptions() -> None: + source = with_iceberg_source(processing_threads=3) + + def _raise_exception(): + raise Exception() + + mock_catalog = MockCatalog( + { + "namespaceA": { + "table1": lambda: Table( + identifier=("namespaceA", "table1"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table1", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table1", + io=PyArrowFileIO(), + catalog=None, + ), + "table2": lambda: Table( + identifier=("namespaceA", "table2"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table2", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table2", + io=PyArrowFileIO(), + catalog=None, + ), + "table3": lambda: Table( + identifier=("namespaceA", "table3"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table3", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table3", + io=PyArrowFileIO(), + catalog=None, + ), + "table4": lambda: Table( + identifier=("namespaceA", "table4"), + metadata=TableMetadataV2( + partition_specs=[PartitionSpec(spec_id=0)], + location="s3://abcdefg/namespaceA/table4", + last_column_id=0, + schemas=[Schema(schema_id=0)], + ), + metadata_location="s3://abcdefg/namespaceA/table4", + io=PyArrowFileIO(), + catalog=None, + ), + "table5": _raise_exception, + } + } + ) + with patch( + "datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog" + ) as get_catalog: + get_catalog.return_value = mock_catalog + wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()] + assert len(wu) == 4 + urns = [] + for unit in wu: + assert isinstance(unit.metadata, MetadataChangeEvent) + assert isinstance(unit.metadata.proposedSnapshot, DatasetSnapshotClass) + urns.append(unit.metadata.proposedSnapshot.urn) + TestCase().assertCountEqual( + urns, + [ + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)", + ], + ) + assert source.report.warnings.total_elements == 0 + assert source.report.failures.total_elements == 1 + assert source.report.tables_scanned == 4