From 515c1b7f70d4c17918ffd209cb8fadfa7a37b385 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 14 Aug 2023 17:25:08 +0200 Subject: [PATCH] adds retries to google sheets --- .../test_on_local_destinations_forks.yml | 26 +++++++-- CONTRIBUTING.md | 5 ++ sources/google_sheets/__init__.py | 14 ++--- sources/google_sheets/helpers/api_calls.py | 56 ++++++++++++++++++- 4 files changed, 86 insertions(+), 15 deletions(-) diff --git a/.github/workflows/test_on_local_destinations_forks.yml b/.github/workflows/test_on_local_destinations_forks.yml index 1d548c7a5..c78139b03 100644 --- a/.github/workflows/test_on_local_destinations_forks.yml +++ b/.github/workflows/test_on_local_destinations_forks.yml @@ -20,7 +20,7 @@ jobs: # if: ${{ github.event.pull_request.head.repo.fork }} run_loader: - name: test on local postgres and duckdb + name: test on local postgres and duckdb on forks needs: get_changed_sources if: needs.get_changed_sources.outputs.sources_list != '' strategy: @@ -31,6 +31,26 @@ jobs: shell: bash runs-on: "ubuntu-latest" + # Service containers to run with `container-job` + services: + # Label used to access the service container + postgres: + # Docker Hub image + image: postgres + # Provide the password for postgres + env: + POSTGRES_DB: dlt_data + POSTGRES_USER: loader + POSTGRES_PASSWORD: loader + ports: + - 5432:5432 + # Set health checks to wait until postgres has started + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + steps: - name: Check out uses: actions/checkout@master @@ -61,9 +81,7 @@ jobs: - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > sources/.dlt/secrets.toml - # run: pwd && echo "$DESTINATIONS_SECRETS" > sources/.dlt/secrets.toml && echo "$SOURCES_SECRETS" >> sources/.dlt/secrets.toml - # - name: Setup upterm session - # uses: lhotari/action-upterm@v1 + - run: | sources_list="${{ needs.get_changed_sources.outputs.sources_list }}" test_paths=$(echo "$sources_list" | awk '{for(i=1;i<=NF;i++) printf "tests/%s ", $i}') diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 57eb7ce37..991bd8e01 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -285,6 +285,11 @@ ALL_DESTINATIONS='["postgres"]' pytest tests/chess There's also `make test-local` command that will run all the tests on `duckdb` and `postgres`. +## Running tests on CI + + + + ## Advanced topics ### Ensuring the correct Python version diff --git a/sources/google_sheets/__init__.py b/sources/google_sheets/__init__.py index 512ae512d..2abab572e 100644 --- a/sources/google_sheets/__init__.py +++ b/sources/google_sheets/__init__.py @@ -27,6 +27,7 @@ def google_spreadsheet( ] = dlt.secrets.value, get_sheets: bool = False, get_named_ranges: bool = True, + max_api_retries: int = 5, ) -> Iterable[DltResource]: """ The source for the dlt pipeline. It returns the following resources: @@ -43,12 +44,13 @@ def google_spreadsheet( Defaults to False. get_named_ranges (bool, optional): If True, load all the named ranges inside the spreadsheet into the database. Defaults to True. + max_api_retries (int, optional): Max number of retires to google sheets API. Actual behavior is internal to google client. Yields: Iterable[DltResource]: List of dlt resources. """ # authenticate to the service using the helper function - service = api_auth(credentials) + service = api_auth(credentials, max_api_retries=max_api_retries) # get spreadsheet id from url or id spreadsheet_id = get_spreadsheet_id(spreadsheet_url_or_id) all_range_names = set(range_names or []) @@ -107,14 +109,8 @@ def google_spreadsheet( metadata_table[-1]["skipped"] = False range_data.append((name, parsed_range, meta_range, values)) - meta_values = ( - service.spreadsheets() - .get( - spreadsheetId=spreadsheet_id, - ranges=[str(data[2]) for data in range_data], - includeGridData=True, - ) - .execute() + meta_values = api_calls.get_meta_for_ranges( + service, spreadsheet_id, [str(data[2]) for data in range_data] ) for name, parsed_range, _, values in range_data: logger.info(f"Processing range {parsed_range} with name {name}") diff --git a/sources/google_sheets/helpers/api_calls.py b/sources/google_sheets/helpers/api_calls.py index b81dddc50..06e756482 100644 --- a/sources/google_sheets/helpers/api_calls.py +++ b/sources/google_sheets/helpers/api_calls.py @@ -1,11 +1,13 @@ """Contains helper functions to extract data from spreadsheet API""" from typing import Any, List, Tuple +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception from dlt.common.exceptions import MissingDependencyException from dlt.common.typing import DictStrAny from dlt.sources.credentials import GcpCredentials, GcpOAuthCredentials +from dlt.sources.helpers.requests.retry import DEFAULT_RETRY_STATUS from .data_processing import ParsedRange, trim_range_top_left @@ -15,12 +17,39 @@ raise MissingDependencyException("Google API Client", ["google-api-python-client"]) -def api_auth(credentials: GcpCredentials) -> Resource: +def is_retry_status_code(exception: BaseException) -> bool: + """Retry condition on HttpError""" + from googleapiclient.errors import HttpError # type: ignore + + # print(f"RETRY ON {str(HttpError)} = {isinstance(exception, HttpError) and exception.resp.status in DEFAULT_RETRY_STATUS}") + # if isinstance(exception, HttpError): + # print(exception.resp.status) + # print(DEFAULT_RETRY_STATUS) + return ( + isinstance(exception, HttpError) + and exception.resp.status in DEFAULT_RETRY_STATUS + ) + + +retry_deco = retry( + # Retry if it's a rate limit error (HTTP 429) + retry=retry_if_exception(is_retry_status_code), + # Use exponential backoff for the waiting time between retries, starting with 5 seconds + wait=wait_exponential(multiplier=1.5, min=5, max=120), + # Stop retrying after 10 attempts + stop=stop_after_attempt(10), + # Print out the retrying details + reraise=True, +) + + +def api_auth(credentials: GcpCredentials, max_api_retries: int) -> Resource: """ Uses GCP credentials to authenticate with Google Sheets API. Args: credentials (GcpCredentials): Credentials needed to log in to GCP. + max_api_retries (int): Max number of retires to google sheets API. Actual behavior is internal to google client. Returns: Resource: Object needed to make API calls to Google Sheets API. @@ -28,10 +57,32 @@ def api_auth(credentials: GcpCredentials) -> Resource: if isinstance(credentials, GcpOAuthCredentials): credentials.auth("https://www.googleapis.com/auth/spreadsheets.readonly") # Build the service object for Google sheets api. - service = build("sheets", "v4", credentials=credentials.to_native_credentials()) + service = build( + "sheets", + "v4", + credentials=credentials.to_native_credentials(), + num_retries=max_api_retries, + ) return service +@retry_deco +def get_meta_for_ranges( + service: Resource, spreadsheet_id: str, range_names: List[str] +) -> Any: + """Retrieves `spreadsheet_id` cell metadata for `range_names`""" + return ( + service.spreadsheets() + .get( + spreadsheetId=spreadsheet_id, + ranges=range_names, + includeGridData=True, + ) + .execute() + ) + + +@retry_deco def get_known_range_names( spreadsheet_id: str, service: Resource ) -> Tuple[List[str], List[str], str]: @@ -52,6 +103,7 @@ def get_known_range_names( return sheet_names, named_ranges, title +@retry_deco def get_data_for_ranges( service: Resource, spreadsheet_id: str, range_names: List[str] ) -> List[Tuple[str, ParsedRange, ParsedRange, List[List[Any]]]]: