Skip to content

Commit

Permalink
updates docs
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Aug 7, 2023
1 parent 37235cc commit e79f482
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 20 deletions.
52 changes: 51 additions & 1 deletion sources/google_sheets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,61 @@ When source detects any problems with headers or table layout **it will issue a
**date time** and **date** types are also recognized and this happens via additional metadata that is retrieved for the first row.

## Passing the spreadsheet id/url and explicit range names
You can use both url of your spreadsheet that you can copy from the browser ie.
```
https://docs.google.com/spreadsheets/d/1VTtCiYgxjAwcIw7UM1_BSaxC3rzIpr0HwXZwd2OlPD4/edit?usp=sharing
```
or spreadsheet id (which is a part of the url)
```
1VTtCiYgxjAwcIw7UM1_BSaxC3rzIpr0HwXZwd2OlPD4
```
typically you pass it directly to the `google_spreadsheet` function

**passing ranges**

You can pass explicit ranges to the `google_spreadsheet`:
1. sheet names
2. named ranges
3. any range in Google Sheet format ie. **sheet 1!A1:B7**


## The `spreadsheet_info` table
This table is repopulated after every load and keeps the information on loaded ranges:
* id of the spreadsheet
* name of the range as passed to the source
* string representation of the loaded range
* range above in parsed representation

## Running on Airflow (and some under the hood information)
Internally, the source loads all the data immediately in the `google_spreadsheet` before execution of the pipeline in `run`. No matter how many ranges you request, we make just two calls to the API to retrieve data. This works very well with typical scripts that create a dlt source with `google_spreadsheet` and then run it with `pipeline.run`.

In case of Airflow, the source is created and executed separately. In typical configuration where runner is a separate machine, **this will load data twice**.

**Moreover, you should not use `scc` decomposition in our Airflow helper**. It will create an instance of the source for each requested range in order to run a task that corresponds to it! Following our [Airflow deployment guide](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer#2-modify-dag-file), this is how you should use `tasks.add_run` on `PipelineTasksGroup`:
```python
@dag(
schedule_interval='@daily',
start_date=pendulum.datetime(2023, 2, 1),
catchup=False,
max_active_runs=1,
default_args=default_task_args
)
def get_named_ranges():
tasks = PipelineTasksGroup("get_named_ranges", use_data_folder=False, wipe_local_data=True)

# import your source from pipeline script
from google_sheets import google_spreadsheet

pipeline = dlt.pipeline(
pipeline_name="get_named_ranges",
dataset_name="named_ranges_data",
destination='bigquery',
)

# do not use decompose to run `google_spreadsheet` in single task
tasks.add_run(pipeline, google_spreadsheet("1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580"), decompose="none", trigger_rule="all_done", retries=0, provide_context=True)
```

## Setup credentials
We recommend to use service account for any production deployments.
[We recommend to use service account for any production deployments](https://dlthub.com/docs/dlt-ecosystem/verified-sources/google_sheets#google-sheets-api-authentication)

7 changes: 5 additions & 2 deletions sources/google_sheets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Loads Google Sheets data from tabs, named and explicit ranges. Contains the main source functions."""

from typing import Any, List, Sequence, Union, Iterable
from typing import Sequence, Union, Iterable

import dlt
from dlt.common import logger
Expand Down Expand Up @@ -149,5 +149,8 @@ def google_spreadsheet(
write_disposition="replace",
)
yield dlt.resource(
metadata_table, write_disposition="replace", name="spreadsheet_info"
metadata_table,
write_disposition="merge",
name="spreadsheet_info",
merge_key="spreadsheet_id",
)
34 changes: 20 additions & 14 deletions sources/google_sheets_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@

def load_pipeline_with_ranges() -> None:
"""
Does a full pipeline run. Will load all ranges in config.toml. The dlt config also contains the spreadsheet url or id that data will be loaded from.
Loads explicitly passed ranges
"""
pipeline = dlt.pipeline(
pipeline_name="google_sheets_pipeline",
destination="postgres",
destination="duckdb",
full_refresh=False,
dataset_name="test",
)
data = google_spreadsheet(
"https://docs.google.com/spreadsheets/d/1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580/edit#gid=0",
range_names=["NamedRange1", "Sheet 1", "Sheet 1!A1:D4"],
get_sheets=False,
get_named_ranges=False,
)
Expand All @@ -23,44 +24,49 @@ def load_pipeline_with_ranges() -> None:

def load_pipeline_with_sheets() -> None:
"""
Does a pipeline run. Will load all the sheets in the spreadsheet, but it will not load any of the named ranges in the spreadsheet. Will also load all the ranges given in config.
The dlt config also contains the spreadsheet url or id that data will be loaded from.
Does a pipeline run. Will load all the sheets in the spreadsheet, but it will not load any of the named ranges in the spreadsheet.
"""
pipeline = dlt.pipeline(
pipeline_name="google_sheets_pipeline",
destination="postgres",
destination="duckdb",
full_refresh=False,
dataset_name="sample_google_sheet_data",
)
data = google_spreadsheet(get_sheets=True, get_named_ranges=False)
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
get_sheets=True,
get_named_ranges=False,
)
info = pipeline.run(data)
print(info)


def load_pipeline_with_named_ranges() -> None:
"""
Does a pipeline run. Will not load the sheets in the spreadsheet, but it will load all the named ranges in the spreadsheet. Will also load all the ranges given in config.
The dlt config also contains the spreadsheet url or id that data will be loaded from.
Does a pipeline run. Will not load the sheets in the spreadsheet, but it will load all the named ranges in the spreadsheet.
"""
pipeline = dlt.pipeline(
pipeline_name="google_sheets_pipeline",
destination="postgres",
destination="duckdb",
full_refresh=False,
dataset_name="sample_google_sheet_data",
)
data = google_spreadsheet(get_sheets=False, get_named_ranges=True)
data = google_spreadsheet(
"1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580",
get_sheets=False,
get_named_ranges=True,
)
info = pipeline.run(data)
print(info)


def load_pipeline_with_sheets_and_ranges() -> None:
"""
Does a pipeline run. Will load all the sheets in the spreadsheet and all the named ranges in the spreadsheet. Will also load all the ranges given in config.
The dlt config also contains the spreadsheet url or id that data will be loaded from.
Does a pipeline run. Will load all the sheets in the spreadsheet and all the named ranges in the spreadsheet.
"""
pipeline = dlt.pipeline(
pipeline_name="google_sheets_pipeline",
destination="postgres",
destination="duckdb",
full_refresh=True,
dataset_name="sample_google_sheet_data",
)
Expand All @@ -74,4 +80,4 @@ def load_pipeline_with_sheets_and_ranges() -> None:


if __name__ == "__main__":
load_pipeline_with_sheets_and_ranges()
load_pipeline_with_ranges()
9 changes: 6 additions & 3 deletions tests/google_sheets/test_google_sheets_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ def test_two_overlapping_tables(destination_name) -> None:
# assert first column
assert_query_data(
pipeline,
"SELECT col_1 FROM two_tables ORDER BY col_1 ASC",
list(range(10, 21)) + [None] * 11,
"SELECT col_1 FROM two_tables ORDER BY col_1 NULLS FIRST",
[None] * 11 + list(range(10, 21)),
)
# assert first overlapped column
assert_query_data(
Expand Down Expand Up @@ -446,8 +446,10 @@ def test_explicit_named_range(destination_name) -> None:
("test5", 5, 1.05, True),
("test6", 6, 1.06, True),
]

# perform queries to check data inside
with pipeline.sql_client() as c:
quoted_range = c.capabilities.escape_identifier("range")
# columns are auto named - we hit a middle of a table with this range
sql_query = f"SELECT col_1, col_2, col_3, col_4 FROM {table_name_db};"
with c.execute_query(sql_query) as cur:
Expand All @@ -456,10 +458,11 @@ def test_explicit_named_range(destination_name) -> None:
for i in range(len(rows)):
processed_row = _row_helper(rows[i], destination_name)
assert processed_row == expected_rows[i]

# check spreadsheet info
assert_query_data(
pipeline,
"SELECT range FROM spreadsheet_info ORDER BY range ASC",
f"SELECT {quoted_range} FROM spreadsheet_info ORDER BY {quoted_range} ASC",
["empty!ZY1:AAA4", "more_data!A4:D7"],
)

Expand Down

0 comments on commit e79f482

Please sign in to comment.