Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

D#/add pandas pyarrow verified source #384

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sources/arrow_pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
This source provides a simple starting point for loading data from arrow tables or pandas dataframes
"""
50 changes: 50 additions & 0 deletions sources/arrow_pandas/example_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import dlt

from typing import Generator

from dlt.common import pendulum
import pandas as pd
import pyarrow as pa


@dlt.resource(name="orders", write_disposition="append")
def orders() -> Generator[pd.DataFrame, None, None]:
# this is example data, you will get this from somewhere on your resource function
EXAMPLE_ORDERS_DATA_FRAME = pd.DataFrame(
data={
"order_id": [1, 2, 3],
"customer_id": [1, 2, 3],
"ordered_at": [
pendulum.DateTime(2021, 1, 1, 4, 5, 6),
pendulum.DateTime(2021, 1, 3, 4, 5, 6),
pendulum.DateTime(2021, 1, 6, 4, 5, 6),
],
"order_amount": [100.0, 200.0, 300.0],
}
)

# we can yield dataframes here, you will usually read them from a file or
# receive them from another library
yield EXAMPLE_ORDERS_DATA_FRAME


@dlt.resource(
name="customers",
write_disposition="merge",
primary_key="customer_id",
merge_key="customer_id",
)
def customers() -> Generator[pd.DataFrame, None, None]:
# we can yield arrow tables here, you will usually read them from a file or
# receive them from another library
EXAMPLE_CUSTOMERS_DATA_FRAME = pd.DataFrame(
data={
"customer_id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
}
)

# here we convert our dataframe to an arrow table, usually you would just yield the
# dataframe if you have it, this is for demonstration purposes
yield pa.Table.from_pandas(EXAMPLE_CUSTOMERS_DATA_FRAME)
2 changes: 2 additions & 0 deletions sources/arrow_pandas/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pandas>=2.0.0
dlt>=0.4.0
14 changes: 14 additions & 0 deletions sources/arrow_pandas_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Very simple pipeline, to be used as a starting point for pandas or arrow pipelines.

"""

import dlt
from arrow_pandas.example_resources import orders, customers


if __name__ == "__main__":
pipeline = dlt.pipeline(
"orders_pipeline", destination="duckdb", dataset_name="orders_dataset"
)
# run both resources
pipeline.run([orders, customers])
Empty file added tests/arrow_pandas/__init__.py
Empty file.
20 changes: 20 additions & 0 deletions tests/arrow_pandas/test_arrow_pandas_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import dlt
import pytest

from sources.arrow_pandas.example_resources import orders, customers
from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_example_resources(destination_name: str) -> None:
"""Simple test for the example resources."""

p = dlt.pipeline("orders_pipeline", destination=destination_name, full_refresh=True)
orders.apply_hints(incremental=dlt.sources.incremental("ordered_at"))

# run pipeline
info = p.run([orders(), customers()])

# check that the data was loaded
assert_load_info(info)
assert load_table_counts(p, "orders", "customers") == {"orders": 3, "customers": 3}
Loading