Skip to content

Commit

Permalink
More tests
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal committed Nov 15, 2024
1 parent 8149340 commit e6b0d36
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@
logging.WARNING
)

# allows for printing better stacks in case of a crash, doesn't work well with pytest
# faulthandler.enable()


@platform_name("Iceberg")
@support_status(SupportStatus.TESTING)
Expand Down
193 changes: 182 additions & 11 deletions metadata-ingestion/tests/unit/test_iceberg.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import uuid
from decimal import Decimal
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
from unittest import TestCase
from unittest.mock import patch

import pytest
from pydantic import ValidationError
from pyiceberg.exceptions import NoSuchIcebergTableError, NoSuchPropertyException
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
Expand Down Expand Up @@ -528,7 +530,7 @@ def test_avro_decimal_bytes_nullable() -> None:


class MockCatalog:
def __init__(self, tables: Dict[str, Dict[str, Table]]):
def __init__(self, tables: Dict[str, Dict[str, Callable[[], Table]]]):
"""
:param tables: Dictionary containing namespaces as keys and dictionaries containing names of tables (keys) and
Expand All @@ -543,22 +545,15 @@ def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]:
return [(namespace, table) for table in self.tables[namespace].keys()]

def load_table(self, dataset_path: Tuple[str, str]) -> Table:
return self.tables[dataset_path[0]][dataset_path[1]]


def get_mock_catalog(tables):
def _get_catalog():
return MockCatalog(tables)

return _get_catalog
return self.tables[dataset_path[0]][dataset_path[1]]()


def test_proper_run_with_multiple_namespaces() -> None:
source = with_iceberg_source(processing_threads=3)
mock_catalog = MockCatalog(
{
"namespaceA": {
"table1": Table(
"table1": lambda: Table(
identifier=("namespaceA", "table1"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
Expand Down Expand Up @@ -587,3 +582,179 @@ def test_proper_run_with_multiple_namespaces() -> None:
snapshot.urn
== "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)"
)


def test_handle_expected_exceptions() -> None:
source = with_iceberg_source(processing_threads=3)

def _raise_no_such_property_exception():
raise NoSuchPropertyException()

def _raise_no_such_table_exception():
raise NoSuchIcebergTableError()

mock_catalog = MockCatalog(
{
"namespaceA": {
"table1": lambda: Table(
identifier=("namespaceA", "table1"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table1",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table1",
io=PyArrowFileIO(),
catalog=None,
),
"table2": lambda: Table(
identifier=("namespaceA", "table2"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table2",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table2",
io=PyArrowFileIO(),
catalog=None,
),
"table3": lambda: Table(
identifier=("namespaceA", "table3"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table3",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table3",
io=PyArrowFileIO(),
catalog=None,
),
"table4": lambda: Table(
identifier=("namespaceA", "table4"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table4",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table4",
io=PyArrowFileIO(),
catalog=None,
),
"table5": _raise_no_such_property_exception,
"table6": _raise_no_such_table_exception,
}
}
)
with patch(
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 4
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeEvent)
assert isinstance(unit.metadata.proposedSnapshot, DatasetSnapshotClass)
urns.append(unit.metadata.proposedSnapshot.urn)
TestCase().assertCountEqual(
urns,
[
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
],
)
assert source.report.warnings.total_elements == 2
assert source.report.failures.total_elements == 0
assert source.report.tables_scanned == 4


def test_handle_unexpected_exceptions() -> None:
source = with_iceberg_source(processing_threads=3)

def _raise_exception():
raise Exception()

mock_catalog = MockCatalog(
{
"namespaceA": {
"table1": lambda: Table(
identifier=("namespaceA", "table1"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table1",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table1",
io=PyArrowFileIO(),
catalog=None,
),
"table2": lambda: Table(
identifier=("namespaceA", "table2"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table2",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table2",
io=PyArrowFileIO(),
catalog=None,
),
"table3": lambda: Table(
identifier=("namespaceA", "table3"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table3",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table3",
io=PyArrowFileIO(),
catalog=None,
),
"table4": lambda: Table(
identifier=("namespaceA", "table4"),
metadata=TableMetadataV2(
partition_specs=[PartitionSpec(spec_id=0)],
location="s3://abcdefg/namespaceA/table4",
last_column_id=0,
schemas=[Schema(schema_id=0)],
),
metadata_location="s3://abcdefg/namespaceA/table4",
io=PyArrowFileIO(),
catalog=None,
),
"table5": _raise_exception,
}
}
)
with patch(
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 4
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeEvent)
assert isinstance(unit.metadata.proposedSnapshot, DatasetSnapshotClass)
urns.append(unit.metadata.proposedSnapshot.urn)
TestCase().assertCountEqual(
urns,
[
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
],
)
assert source.report.warnings.total_elements == 0
assert source.report.failures.total_elements == 1
assert source.report.tables_scanned == 4

0 comments on commit e6b0d36

Please sign in to comment.