Skip to content

Commit

Permalink
Merge branch 'master' into google_drive
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Mar 1, 2024
2 parents f4ce2d1 + 7c15404 commit dd75eb2
Show file tree
Hide file tree
Showing 66 changed files with 1,200 additions and 377 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/get_changed_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
raw_changed_sources=$(
git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.event.pull_request.head.sha }} \
| grep '^sources/[^._]' || true
| grep -E '^sources/[^._]|^tests/[^._]' || true
)
echo $raw_changed_sources
Expand All @@ -37,10 +37,11 @@ jobs:
echo "No changed sources. Skipping tests."
else
changed_sources=$(echo "$raw_changed_sources" \
| sed -nE 's/^sources\/([^/]+)\/.*$/\1/p; s/^sources\/(.*)_pipeline\.py$/\1/p' \
| sed -nE 's/^sources\/([^/]+)\/.*$/\1/p; s/^sources\/(.*)_pipeline\.py$/\1/p; s/^tests\/([^/]+)\/.*$/\1/p' \
| sort -u \
| tr '\n' ' '
)
echo "Changed sources: $changed_sources"
echo "Changed sources or sources tests: $changed_sources"
fi
echo "sources_list=$changed_sources" >> $GITHUB_OUTPUT
16 changes: 9 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ packages = [{include = "sources"}]

[tool.poetry.dependencies]
python = ">=3.8.1,<3.13"
dlt = {version = "0.4.3a0", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]}
dlt = {version = "0.4.4", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]}

[tool.poetry.group.dev.dependencies]
mypy = "1.6.1"
Expand Down
4 changes: 3 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ norecursedirs= .direnv .eggs build dist
addopts= -v --showlocals --durations 10
xfail_strict= true
log_cli= 1
log_cli_level= INFO
log_cli_level= INFO
filterwarnings =
ignore:Deprecated call to `pkg_resources.declare_namespace
2 changes: 1 addition & 1 deletion sources/airtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ to the
1. You're now ready to run the pipeline! To get started, run the following command:

```bash
python3 airtable_pipeline.py
python airtable_pipeline.py
```

1. Once the pipeline has finished running, you can verify that everything loaded correctly by using
Expand Down
19 changes: 11 additions & 8 deletions sources/airtable/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""Source that loads tables form Airtable.
Supports whitelisting of tables or loading of all tables from a specified base.
"""
from typing import Optional, Iterable, Iterator, List, Dict, Any
from typing import Any, Dict, Iterable, Iterator, List, Optional

import dlt
from dlt.sources import DltResource
from dlt.common.typing import TDataItem

import pyairtable
from dlt.sources import DltResource


@dlt.source
Expand All @@ -19,9 +17,13 @@ def airtable_source(
"""
Represents tables for a single Airtable base.
Args:
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table_names (Optional[List[str]]): A list of table IDs or table names to load. Unless specified otherwise, all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser.
It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table_names (Optional[List[str]]): A list of table IDs or table names to load.
Unless specified otherwise, all tables in the schema are loaded.
Names are freely user-defined. IDs start with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
access_token (str): The personal access token.
See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
"""
api = pyairtable.Api(access_token)
all_tables_url = api.build_url(f"meta/bases/{base_id}/tables")
Expand All @@ -43,7 +45,8 @@ def airtable_resource(
Represents a single airtable.
Args:
api (pyairtable.Api): The API connection object
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser.
It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table (Dict[str, Any]): Metadata about an airtable, does not contain the actual records
"""
primary_key_id = table["primaryFieldId"]
Expand Down
2 changes: 1 addition & 1 deletion sources/airtable/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pyairtable~=2.1
dlt>=0.3.17
dlt>=0.3.25
84 changes: 53 additions & 31 deletions sources/airtable_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from typing import List
from typing import List, Dict, Any

import dlt

from airtable import airtable_source


def load_entire_base(base_id: str) -> None:
def load_entire_base(base_id: str, resources_to_apply_hints: Dict[str, Any]) -> None:
"""
Loads all tables from the specified Airtable base.
Args:
base_id (str): The id of the base. Obtain it, e.g. from the URL in your web browser.
It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.
Note:
- The base_id can either be passed directly or set up in ".dlt/config.toml".
Expand All @@ -23,6 +25,13 @@ def load_entire_base(base_id: str) -> None:
# Retrieve data from Airtable using airtable_source.
airtables = airtable_source(base_id=base_id)

# typing columns to silence warnings
for resource_name, field_names in resources_to_apply_hints.items():
for field_name in field_names:
airtables.resources[resource_name].apply_hints(
columns={field_name: {"name": field_name, "data_type": "text"}}
)

load_info = pipeline.run(airtables, write_disposition="replace")
print(load_info)

Expand All @@ -37,6 +46,7 @@ def load_select_tables_from_base_by_id(base_id: str, table_names: List[str]) ->
table_names (List[str]): A list of table IDs or table names to load. Unless specified otherwise,
all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl".
See https://support.airtable.com/docs/finding-airtable-ids
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.
Note:
- Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
Expand All @@ -60,7 +70,7 @@ def load_select_tables_from_base_by_id(base_id: str, table_names: List[str]) ->


def load_select_tables_from_base_by_name(
base_id: str, table_names: List[str], resource_name: str, field_name: str
base_id: str, table_names: List[str], resources_to_apply_hints: Dict[str, Any]
) -> None:
"""
Loads specific table names from an Airtable base.
Expand All @@ -71,8 +81,7 @@ def load_select_tables_from_base_by_name(
table_names (List[str]): A list of table IDs or table names to load. Unless specified otherwise,
all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl".
See https://support.airtable.com/docs/finding-airtable-idss
resource_name (str): The table name we want to apply hints.
field_name (str): The table field name for which we want to apply hints.
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.
Note:
- Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
Expand All @@ -89,16 +98,19 @@ def load_select_tables_from_base_by_name(
table_names=table_names,
)

airtables.resources[resource_name].apply_hints(
primary_key=field_name,
columns={field_name: {"data_type": "text"}},
)
# typing columns to silence warnings
for resource_name, field_names in resources_to_apply_hints.items():
for field_name in field_names:
airtables.resources[resource_name].apply_hints(
columns={field_name: {"name": field_name, "data_type": "text"}}
)

load_info = pipeline.run(airtables, write_disposition="replace")
print(load_info)


def load_and_customize_write_disposition(
base_id: str, table_names: List[str], resource_name: str, field_name: str
base_id: str, table_names: List[str], resources_to_apply_hints: Dict[str, Any]
) -> None:
"""
Loads data from a specific Airtable base's table with customized write disposition("merge") using field_name.
Expand All @@ -109,8 +121,8 @@ def load_and_customize_write_disposition(
table_names (List[str]): A list of table IDs or table names to load. Unless specified otherwise,
all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl".
See https://support.airtable.com/docs/finding-airtable-ids
resource_name (str): The table name we want to apply hints.
field_name (str): The table field name for which we want to apply hints.
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.
Note:
- Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
Expand All @@ -127,31 +139,41 @@ def load_and_customize_write_disposition(
base_id=base_id,
table_names=table_names,
)
airtables.resources[resource_name].apply_hints(
primary_key=field_name,
columns={field_name: {"data_type": "text"}},
)

# typing columns to silence warnings
for resource_name, field_names in resources_to_apply_hints.items():
for field_name in field_names:
airtables.resources[resource_name].apply_hints(
primary_key=field_name,
columns={field_name: {"name": field_name, "data_type": "text"}},
write_disposition="merge",
)

load_info = pipeline.run(airtables)
print(load_info)


if __name__ == "__main__":
base_id_example = "Please set me up!"
table_names_example = ["Please set me up!"]
resource_name_to_apply_hints = "Please set me up!"
field_name_example = "Please set me up!"

load_entire_base(base_id_example)
load_select_tables_from_base_by_id(base_id_example, table_names_example)
load_entire_base(
base_id="app7RlqvdoOmJm9XR",
resources_to_apply_hints={
"🎤 Speakers": ["Name"],
"📆 Schedule": ["Activity"],
"🪑 Attendees": ["Name"],
"💰 Budget": ["Item"],
},
)
load_select_tables_from_base_by_id(
base_id="app7RlqvdoOmJm9XR",
table_names=["tblKHM5s3AujfSbAH", "tbloBrS8PnoO63aMP"],
)
load_select_tables_from_base_by_name(
base_id_example,
table_names_example,
resource_name_to_apply_hints,
field_name_example,
"app7RlqvdoOmJm9XR",
table_names=["💰 Budget"],
resources_to_apply_hints={"💰 Budget": ["Item"]},
)
load_and_customize_write_disposition(
base_id_example,
table_names_example,
resource_name_to_apply_hints,
field_name_example,
base_id="appcChDyP0pZeC76v",
table_names=["tbl1sN4CpPv8pBll4"],
resources_to_apply_hints={"Sheet1": ["Name"]},
)
32 changes: 16 additions & 16 deletions sources/asana_dlt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ Resources that can be loaded using this verified source are:

## Initialize the pipeline with Asana source
```bash
dlt init asana_dlt bigquery
dlt init asana_dlt duckdb
```

Here, we chose BigQuery as the destination. Alternatively, you can also choose redshift, duckdb, or any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).
Here, we chose DuckDB as the destination. Alternatively, you can also choose redshift, bigquery, or any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Grab Asana credentials

Expand All @@ -29,41 +29,41 @@ To grab the Asana credentials please refer to the [full documentation here.](htt

1. Open .dlt/secrets.toml.
2. Enter the access token:

```toml
[sources.asana_dlt]
access_token = "access_token" # please set me up!
```

3. Enter credentials for your chosen destination as per the [docs](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Run the pipeline

1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:

```bash
pip install -r requirements.txt

```

2. You're now ready to run the pipeline! To get started, run the following command:

```bash
python3 asana_dlt_pipeline.py
python asana_dlt_pipeline.py

```

3. Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command:

```bash
dlt pipeline <pipeline_name> show
```
Note that in the above command, replace `<pipeline_name>` with the name of your pipeline. For example, if you named your pipeline "asana," you would run:

Note that in the above command, replace `<pipeline_name>` with the name of your pipeline. For example, if you named your pipeline "asana" you would run:

```bash
dlt pipeline asana show
```


💡 To explore additional customizations for this pipeline, I recommend referring to the official Asana documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the Asana documentation in [Setup Guide: Asana](https://dlthub.com/docs/dlt-ecosystem/verified-sources/asana)
Loading

0 comments on commit dd75eb2

Please sign in to comment.