From e7d8473c692f54b8ecbd3ad5f6b1b9b5c165a5ec Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Sun, 27 Oct 2024 01:24:55 +0000 Subject: [PATCH 1/5] added partial loading example --- .github/workflows/test_doc_snippets.yml | 2 +- .../examples/partial_loading/.dlt/config.toml | 2 + .../partial_loading/.dlt/example.secrets.toml | 3 + docs/examples/partial_loading/__init__.py | 0 .../partial_loading/partial_loading.py | 179 ++++++++++++++++++ .../examples/partial_loading/requirements.txt | 1 + 6 files changed, 186 insertions(+), 1 deletion(-) create mode 100644 docs/examples/partial_loading/.dlt/config.toml create mode 100644 docs/examples/partial_loading/.dlt/example.secrets.toml create mode 100644 docs/examples/partial_loading/__init__.py create mode 100644 docs/examples/partial_loading/partial_loading.py create mode 100644 docs/examples/partial_loading/requirements.txt diff --git a/.github/workflows/test_doc_snippets.yml b/.github/workflows/test_doc_snippets.yml index e6d58376ba..ae06a72df9 100644 --- a/.github/workflows/test_doc_snippets.yml +++ b/.github/workflows/test_doc_snippets.yml @@ -91,7 +91,7 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery -E postgres -E lancedb --with docs,sentry-sdk --without airflow + run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery -E postgres -E lancedb --with docs,sentry-sdk --without airflow -E s3 - name: create secrets.toml for examples run: pwd && echo "$DLT_SECRETS_TOML" > docs/examples/.dlt/secrets.toml diff --git a/docs/examples/partial_loading/.dlt/config.toml b/docs/examples/partial_loading/.dlt/config.toml new file mode 100644 index 0000000000..dad6cffd19 --- /dev/null +++ b/docs/examples/partial_loading/.dlt/config.toml @@ -0,0 +1,2 @@ +[destination.filesystem] +bucket_url="s3://dlt-ci-test-bucket" \ No newline at end of file diff --git a/docs/examples/partial_loading/.dlt/example.secrets.toml b/docs/examples/partial_loading/.dlt/example.secrets.toml new file mode 100644 index 0000000000..811614e687 --- /dev/null +++ b/docs/examples/partial_loading/.dlt/example.secrets.toml @@ -0,0 +1,3 @@ +[destination.filesystem.credentials] +aws_access_key_id = "" # copy the access key here +aws_secret_access_key = "" # copy the secret access key here \ No newline at end of file diff --git a/docs/examples/partial_loading/__init__.py b/docs/examples/partial_loading/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/examples/partial_loading/partial_loading.py b/docs/examples/partial_loading/partial_loading.py new file mode 100644 index 0000000000..c8c6d7e4c6 --- /dev/null +++ b/docs/examples/partial_loading/partial_loading.py @@ -0,0 +1,179 @@ +""" +--- +title: Load data to Filesystem destination with partial replace using backfill management +description: Load chess game data from Chess.com into a filesystem destination, while deleting old backfill files. +keywords: [incremental loading, REST API, dlt, chess.com, data pipeline, backfill management, filesystem] +--- + +This script interacts with the Chess.com REST API to extract game data for a specific user on a monthly basis. +The script retrieves game data for a specified time range, and when additional data is loaded for a different time range, +it automatically handles de-duplication by deleting any previously loaded files for overlapping time range. + +We'll learn: + +- How to configure a [REST API source](../dlt-ecosystem/verified-sources/rest_api/basic.md) using + the `dlt` library. +- How to manage and delete old backfill files for de-duplication. +- How to use [Filesystem](../dlt-ecosystem/destinations/filesystem.md) as a destination for storing extracted data. +""" + +import os +import re +from dlt.common import pendulum as p +from typing import Dict, List, Iterator + +import dlt +from dlt.sources import DltResource +from dlt.common.pipeline import LoadInfo +from dlt.destinations.impl.filesystem.filesystem import FilesystemClient +from dlt.sources.rest_api import RESTAPIConfig, rest_api_source + + +@dlt.source +def chess_com_source(username: str, months: List[Dict[str, str]]) -> Iterator[DltResource]: + """ + Configures and yields resources to fetch chess game data for a given user across specified months. + + Args: + username (str): Chess.com username to fetch games for. + months (List[Dict[str, str]]): List of dictionaries containing 'year' and 'month' keys. + + Yields: + dlt.resource: Resource objects containing fetched game data. + """ + resources = [] + for month in months: + year = month["year"] + month_str = month["month"] + # Configure REST API endpoint for the specific month + config = RESTAPIConfig( + client={"base_url": "https://api.chess.com/pub/"}, # Base URL for Chess.com API + resources=[ + { + "name": f"chess_com_games_{year}_{month_str}", # Unique resource name + "endpoint": { + "path": f"player/{username}/games/{year}/{month_str}", # API endpoint path + }, + "primary_key": ["url"], # Primary key to prevent duplicates + } + ], + ) + + # Fetch data from the API using the configured REST API source + data = list(rest_api_source(config)) + + # Ensure the fetched data is a list + if isinstance(data, dict): + data = [data] # Wrap single dict in a list + + if data: + # Create a resource with write_disposition='append' to add new data + resource = dlt.resource( + data, + name=f"chess_com_games_{year}_{month_str}", + write_disposition="append", # Append new data without overwriting + ) + resources.append(resource) + else: + # Inform if no data was returned for the specified month + print(f"No data returned for {username} in {year}-{month_str}") + + # Yield all configured resources + for resource in resources: + yield resource + + +def generate_months( + start_year: int, start_month: int, end_year: int, end_month: int +) -> Iterator[Dict[str, str]]: + """ + Generates a list of months between the start and end dates. + + Args: + start_year (int): Starting year. + start_month (int): Starting month. + end_year (int): Ending year. + end_month (int): Ending month. + + Yields: + Dict[str, str]: Dictionary containing 'year' and 'month' as strings. + """ + start_date = p.datetime(start_year, start_month, 1) + end_date = p.datetime(end_year, end_month, 1) + current_date = start_date + while current_date <= end_date: + yield {"year": str(current_date.year), "month": f"{current_date.month:02d}"} + # Move to the next month + if current_date.month == 12: + current_date = current_date.replace(year=current_date.year + 1, month=1) + else: + current_date = current_date.replace(month=current_date.month + 1) + + +def delete_old_backfills(load_info: LoadInfo, p: dlt.Pipeline, table_name: str) -> None: + """ + Deletes old backfill files that do not match the current load ID to maintain data integrity. + + Args: + load_info (LoadInfo): Information about the current load. + p (dlt.Pipeline): The dlt pipeline instance. + table_name (str): Name of the table to clean up backfills for. + """ + # Fetch current load id + load_id = load_info.loads_ids[0] + pattern = re.compile(rf"{load_id}") # Compile regex pattern for the current load ID + + # Initialize the filesystem client + fs_client: FilesystemClient = p.destination.client( # type: ignore + p.default_schema, initial_config=p._get_destination_client_initial_config(p.destination) + ) + + # Construct the table directory path + table_dir = os.path.join(fs_client.dataset_path, table_name) + + # Check if the table directory exists + if fs_client.fs_client.exists(table_dir): + # Traverse the table directory + for root, _dirs, files in fs_client.fs_client.walk(table_dir, maxdepth=None): + for file in files: + # Construct the full file path + file_path = os.path.join(root, file) + # If the file does not match the current load ID, delete it + if not pattern.search(file_path): + try: + fs_client.fs_client.rm(file_path) # Remove the old backfill file + print(f"Deleted old backfill file: {file_path}") + except Exception as e: + print(f"Error deleting file {file_path}: {e}") + else: + # Inform if the table directory does not exist + print(f"Table directory does not exist: {table_dir}") + + +def load_chess_data(): + """ + Sets up and runs the dlt pipeline to load chess game data, then manages backfills. + """ + # Initialize the dlt pipeline with filesystem destination + pipeline = dlt.pipeline( + pipeline_name="chess_com_data", destination="filesystem", dataset_name="chess_games" + ) + + # Generate the list of months for the desired date range + months = list(generate_months(2023, 1, 2023, 12)) + + # Create the source with all specified months + source = chess_com_source("MagnusCarlsen", months) + + # Run the pipeline to fetch and load data + info = pipeline.run(source) + print(info) + + # After the run, delete old backfills for each table to maintain data consistency + for month in months: + table_name = f"chess_com_games_{month['year']}_{month['month']}" + delete_old_backfills(info, pipeline, table_name) + + +if __name__ == "__main__": + load_chess_data() \ No newline at end of file diff --git a/docs/examples/partial_loading/requirements.txt b/docs/examples/partial_loading/requirements.txt new file mode 100644 index 0000000000..e89e448f62 --- /dev/null +++ b/docs/examples/partial_loading/requirements.txt @@ -0,0 +1 @@ +dlt[s3] \ No newline at end of file From e08c45cd5038a1253bbf5f374fa57d07f2806ee0 Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Sun, 27 Oct 2024 01:33:20 +0000 Subject: [PATCH 2/5] Updated formatting --- docs/examples/partial_loading/partial_loading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples/partial_loading/partial_loading.py b/docs/examples/partial_loading/partial_loading.py index c8c6d7e4c6..2999ad827d 100644 --- a/docs/examples/partial_loading/partial_loading.py +++ b/docs/examples/partial_loading/partial_loading.py @@ -176,4 +176,4 @@ def load_chess_data(): if __name__ == "__main__": - load_chess_data() \ No newline at end of file + load_chess_data() From aa3835409be1fbc2fe2c401e766842038c34885c Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Tue, 5 Nov 2024 04:06:17 +0000 Subject: [PATCH 3/5] Updated --- docs/examples/partial_loading/partial_loading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples/partial_loading/partial_loading.py b/docs/examples/partial_loading/partial_loading.py index 2999ad827d..c8c6d7e4c6 100644 --- a/docs/examples/partial_loading/partial_loading.py +++ b/docs/examples/partial_loading/partial_loading.py @@ -176,4 +176,4 @@ def load_chess_data(): if __name__ == "__main__": - load_chess_data() + load_chess_data() \ No newline at end of file From 52868055e0f5c9c5e90e8b7127c44ea34d23a948 Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Tue, 5 Nov 2024 05:01:02 +0000 Subject: [PATCH 4/5] Updated --- docs/examples/partial_loading/partial_loading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples/partial_loading/partial_loading.py b/docs/examples/partial_loading/partial_loading.py index c8c6d7e4c6..2999ad827d 100644 --- a/docs/examples/partial_loading/partial_loading.py +++ b/docs/examples/partial_loading/partial_loading.py @@ -176,4 +176,4 @@ def load_chess_data(): if __name__ == "__main__": - load_chess_data() \ No newline at end of file + load_chess_data() From d9473c7d4f6f0be178635eb0899b57823f65097f Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Tue, 5 Nov 2024 12:52:21 +0000 Subject: [PATCH 5/5] Updated the logic according to comment --- .../partial_loading/partial_loading.py | 40 +++++-------------- 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/docs/examples/partial_loading/partial_loading.py b/docs/examples/partial_loading/partial_loading.py index 2999ad827d..23b9545eb4 100644 --- a/docs/examples/partial_loading/partial_loading.py +++ b/docs/examples/partial_loading/partial_loading.py @@ -26,7 +26,7 @@ from dlt.sources import DltResource from dlt.common.pipeline import LoadInfo from dlt.destinations.impl.filesystem.filesystem import FilesystemClient -from dlt.sources.rest_api import RESTAPIConfig, rest_api_source +from dlt.sources.rest_api import RESTAPIConfig, rest_api_resources @dlt.source @@ -39,48 +39,28 @@ def chess_com_source(username: str, months: List[Dict[str, str]]) -> Iterator[Dl months (List[Dict[str, str]]): List of dictionaries containing 'year' and 'month' keys. Yields: - dlt.resource: Resource objects containing fetched game data. + dlt.Resource: Resource objects containing fetched game data. """ - resources = [] for month in months: year = month["year"] month_str = month["month"] # Configure REST API endpoint for the specific month - config = RESTAPIConfig( - client={"base_url": "https://api.chess.com/pub/"}, # Base URL for Chess.com API - resources=[ + config: RESTAPIConfig = { + "client": { + "base_url": "https://api.chess.com/pub/", # Base URL for Chess.com API + }, + "resources": [ { "name": f"chess_com_games_{year}_{month_str}", # Unique resource name + "write_disposition": "append", "endpoint": { "path": f"player/{username}/games/{year}/{month_str}", # API endpoint path }, "primary_key": ["url"], # Primary key to prevent duplicates } ], - ) - - # Fetch data from the API using the configured REST API source - data = list(rest_api_source(config)) - - # Ensure the fetched data is a list - if isinstance(data, dict): - data = [data] # Wrap single dict in a list - - if data: - # Create a resource with write_disposition='append' to add new data - resource = dlt.resource( - data, - name=f"chess_com_games_{year}_{month_str}", - write_disposition="append", # Append new data without overwriting - ) - resources.append(resource) - else: - # Inform if no data was returned for the specified month - print(f"No data returned for {username} in {year}-{month_str}") - - # Yield all configured resources - for resource in resources: - yield resource + } + yield from rest_api_resources(config) def generate_months(