diff --git a/sources/arrow_pandas/__init__.py b/sources/arrow_pandas/__init__.py new file mode 100644 index 000000000..366d0db62 --- /dev/null +++ b/sources/arrow_pandas/__init__.py @@ -0,0 +1,3 @@ +""" +This source provides a simple starting point for loading data from arrow tables or pandas dataframes +""" diff --git a/sources/arrow_pandas/example_resources.py b/sources/arrow_pandas/example_resources.py new file mode 100644 index 000000000..6e793ae0d --- /dev/null +++ b/sources/arrow_pandas/example_resources.py @@ -0,0 +1,50 @@ +import dlt + +from typing import Generator + +from dlt.common import pendulum +import pandas as pd +import pyarrow as pa + + +@dlt.resource(name="orders", write_disposition="append") +def orders() -> Generator[pd.DataFrame, None, None]: + # this is example data, you will get this from somewhere on your resource function + EXAMPLE_ORDERS_DATA_FRAME = pd.DataFrame( + data={ + "order_id": [1, 2, 3], + "customer_id": [1, 2, 3], + "ordered_at": [ + pendulum.DateTime(2021, 1, 1, 4, 5, 6), + pendulum.DateTime(2021, 1, 3, 4, 5, 6), + pendulum.DateTime(2021, 1, 6, 4, 5, 6), + ], + "order_amount": [100.0, 200.0, 300.0], + } + ) + + # we can yield dataframes here, you will usually read them from a file or + # receive them from another library + yield EXAMPLE_ORDERS_DATA_FRAME + + +@dlt.resource( + name="customers", + write_disposition="merge", + primary_key="customer_id", + merge_key="customer_id", +) +def customers() -> Generator[pd.DataFrame, None, None]: + # we can yield arrow tables here, you will usually read them from a file or + # receive them from another library + EXAMPLE_CUSTOMERS_DATA_FRAME = pd.DataFrame( + data={ + "customer_id": [1, 2, 3], + "name": ["Alice", "Bob", "Charlie"], + "age": [25, 30, 35], + } + ) + + # here we convert our dataframe to an arrow table, usually you would just yield the + # dataframe if you have it, this is for demonstration purposes + yield pa.Table.from_pandas(EXAMPLE_CUSTOMERS_DATA_FRAME) diff --git a/sources/arrow_pandas/requirements.txt b/sources/arrow_pandas/requirements.txt new file mode 100644 index 000000000..106c03471 --- /dev/null +++ b/sources/arrow_pandas/requirements.txt @@ -0,0 +1,2 @@ +pandas>=2.0.0 +dlt>=0.4.0 \ No newline at end of file diff --git a/sources/arrow_pandas_pipeline.py b/sources/arrow_pandas_pipeline.py new file mode 100644 index 000000000..d1891cf0b --- /dev/null +++ b/sources/arrow_pandas_pipeline.py @@ -0,0 +1,14 @@ +"""Very simple pipeline, to be used as a starting point for pandas or arrow pipelines. + +""" + +import dlt +from arrow_pandas.example_resources import orders, customers + + +if __name__ == "__main__": + pipeline = dlt.pipeline( + "orders_pipeline", destination="duckdb", dataset_name="orders_dataset" + ) + # run both resources + pipeline.run([orders, customers]) diff --git a/tests/arrow_pandas/__init__.py b/tests/arrow_pandas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/arrow_pandas/test_arrow_pandas_source.py b/tests/arrow_pandas/test_arrow_pandas_source.py new file mode 100644 index 000000000..ee2982964 --- /dev/null +++ b/tests/arrow_pandas/test_arrow_pandas_source.py @@ -0,0 +1,20 @@ +import dlt +import pytest + +from sources.arrow_pandas.example_resources import orders, customers +from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_example_resources(destination_name: str) -> None: + """Simple test for the example resources.""" + + p = dlt.pipeline("orders_pipeline", destination=destination_name, full_refresh=True) + orders.apply_hints(incremental=dlt.sources.incremental("ordered_at")) + + # run pipeline + info = p.run([orders(), customers()]) + + # check that the data was loaded + assert_load_info(info) + assert load_table_counts(p, "orders", "customers") == {"orders": 3, "customers": 3}