Skip to content

Commit

Permalink
add pandas arrow verified sources
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Feb 29, 2024
1 parent eddfd3a commit bf816b6
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 0 deletions.
3 changes: 3 additions & 0 deletions sources/arrow_pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
This source provides a simple starting point for loading data from arrow tables or pandas dataframes
"""
51 changes: 51 additions & 0 deletions sources/arrow_pandas/example_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import dlt

from typing import Generator

from dlt.common import pendulum
import pandas as pd
import pyarrow as pa

# this is example data, you will get this from somewhere on your resource function
EXAMPLE_ORDERS_DATA_FRAME = pd.DataFrame(
{
"order_id": [1, 2, 3],
"customer_id": [1, 2, 3],
"ordered_at": [
pendulum.DateTime(2021, 1, 1, 4, 5, 6),
pendulum.DateTime(2021, 1, 3, 4, 5, 6),
pendulum.DateTime(2021, 1, 6, 4, 5, 6),
],
"order_amount": [100.0, 200.0, 300.0],
}
)

EXAMPLE_CUSTOMERS_DATA_FRAME = pd.DataFrame(
{
"customer_id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
}
)


@dlt.resource(name="orders", write_disposition="append")
def orders() -> Generator[pd.DataFrame, None, None]:
# we can yield dataframes here, you will usually read them from a file or
# receive them from another library
yield EXAMPLE_ORDERS_DATA_FRAME


@dlt.resource(
name="customers",
write_disposition="merge",
primary_key="customer_id",
merge_key="customer_id",
)
def customers() -> Generator[pd.DataFrame, None, None]:
# we can yield arrow tables here, you will usually read them from a file or
# receive them from another library

# here we convert our dataframe to an arrow table, usually you would just yield the
# dataframe if you have it, this is for demonstration purposes
yield pa.Table.from_pandas(EXAMPLE_ORDERS_DATA_FRAME)
2 changes: 2 additions & 0 deletions sources/arrow_pandas/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pandas = "^2.0.0"
dlt>=0.4.0
12 changes: 12 additions & 0 deletions sources/arrow_pandas_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Very simple pipeline, to be used as a starting point for pandas or arrow pipelines.
"""

import dlt
from arrow_pandas.example_resources import orders, customers


if __name__ == "__main__":
pipeline = dlt.pipeline("orders_pipeline", destination="duckdb")
# run both resources
pipeline.run([orders, customers])
Empty file added tests/arrow_pandas/__init__.py
Empty file.
20 changes: 20 additions & 0 deletions tests/arrow_pandas/test_arrow_pandas_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import dlt
import pytest

from sources.arrow_pandas.example_resources import orders, customers
from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_example_resources(destination_name: str) -> None:
"""Simple test for the example resources."""

p = dlt.pipeline("orders_pipeline", destination=destination_name, full_refresh=True)
orders.apply_hints(incremental=dlt.sources.incremental("ordered_at"))

# run pipeline
info = p.run([orders(), customers()])

# check that the data was loaded
assert_load_info(info)
assert load_table_counts(p, "orders", "customers") == {"orders": 3, "customers": 3}

0 comments on commit bf816b6

Please sign in to comment.