From b9f56e6ad77b4b9868e719e68319d3da976412bf Mon Sep 17 00:00:00 2001 From: Raymond Cheng Date: Fri, 31 May 2024 12:20:16 -0700 Subject: [PATCH] feat: build indices for mart models (#1567) * Allows the mart model author to specify which indices to build * Adds a few indices in `marts/directory` * Adds a README to bq2cloudsql * Adds an .env.example in monorepo root * Adds a utility to read the index config from the mart model and create the indices on copy Note: This will not work on incremental mode in bq2cloudsql --- .env.example | 25 +++++++++++ warehouse/bq2cloudsql/README.md | 41 +++++++++++++++++++ warehouse/bq2cloudsql/cloudsql.py | 25 ++++++++--- warehouse/bq2cloudsql/script.py | 29 +++++++++---- warehouse/bq2cloudsql/synchronizer.py | 13 +++++- .../directory/artifacts_by_project_v1.sql | 7 +++- .../models/marts/directory/artifacts_v1.sql | 7 +++- .../models/marts/directory/collections_v1.sql | 6 ++- .../models/marts/directory/contracts_v1.sql | 5 ++- .../directory/projects_by_collection_v1.sql | 7 +++- .../models/marts/directory/projects_v1.sql | 6 ++- .../dbt/models/marts/directory/users_v1.sql | 6 ++- 12 files changed, 155 insertions(+), 22 deletions(-) create mode 100644 .env.example create mode 100644 warehouse/bq2cloudsql/README.md diff --git a/.env.example b/.env.example new file mode 100644 index 000000000..08c68a64e --- /dev/null +++ b/.env.example @@ -0,0 +1,25 @@ +# .env +## This .env file is mostly used for Python data ops + +## Google Cloud setup +GOOGLE_PROJECT_ID= +# You will need to generate Google application credentials +# Note: You can use your gcloud auth credentials +GOOGLE_APPLICATION_CREDENTIALS= +# Used for data transfer between databases +CLOUDSTORAGE_BUCKET_NAME= +# Used for storing all BigQuery data in the dbt pipeline +BIGQUERY_DATASET_ID= +# Used for Frontend/API-facing services +CLOUDSQL_REGION= +CLOUDSQL_INSTANCE_ID= +CLOUDSQL_DB_NAME= +CLOUDSQL_DB_PASSWORD= +CLOUDSQL_DB_USER= + +## Dagster Setup +# You may want to change the location of dagster home if you want it to survive resets +DAGSTER_HOME=/tmp/dagster-home +# This is used to put generated dbt profiles for dagster in a specific place +DAGSTER_DBT_TARGET_BASE_DIR=/tmp/dagster-home/generated-dbt +DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1 diff --git a/warehouse/bq2cloudsql/README.md b/warehouse/bq2cloudsql/README.md new file mode 100644 index 000000000..ab67e7d34 --- /dev/null +++ b/warehouse/bq2cloudsql/README.md @@ -0,0 +1,41 @@ +# bq2cloudsql + +This utility script copies the final mart models from the end of +the dbt pipeline into a Google CloudSQL postgres database for +serving the API / frontend. + +The general strategy is to dump the models into CSV files +on GCS, then import them into the Postgres database +into temporary tables. When the copy is done, we drop the old tables +and rename the fresh import. + +Note: Because we only maintain a single database, this operation can +severely load the production server during copies. + +## Setup + +Copy `.env.example` to `.env` in the root of the monorepo. +Populate the values there. + +Then from the monorepo root, run install Python dependencies + +```bash +poetry install +``` + +## Running + +Run the copy with + +```bash +poetry run bq2cloudsql +``` + +Note: this command is defined in the root `pyproject.toml` + +## Development + +For now, we've been developing / testing this on the production servers. +In the future, we should create staging and development databases for this. + +You can copy only select tables by slicing `table_sync_configs` in `./script.py`. diff --git a/warehouse/bq2cloudsql/cloudsql.py b/warehouse/bq2cloudsql/cloudsql.py index a7dc878ee..c06bdeb6d 100644 --- a/warehouse/bq2cloudsql/cloudsql.py +++ b/warehouse/bq2cloudsql/cloudsql.py @@ -4,18 +4,15 @@ import pg8000.native import sqlalchemy -from sqlalchemy import Table, Column, MetaData +from sqlalchemy import Table, Column, MetaData, text from google.cloud.sql.connector import Connector from googleapiclient.errors import HttpError from googleapiclient.discovery import build -from typing import Callable, List - +from typing import Callable, List, Dict, Optional connector = Connector() - pp = pprint.PrettyPrinter(indent=4) - def get_connection( project_id: str, region: str, @@ -121,6 +118,24 @@ def ensure_table(self, table_name: str, columns: List[Column]): Table(table_name, metadata, *columns) metadata.create_all(self.sql_conn) + def build_index(self, table_name: str, index_names: Optional[Dict[str, List[str]]]): + print(f"Building indices for {table_name}:") + + # Check that the table exists + if not sqlalchemy.inspect(self.sql_conn).has_table(table_name): + raise Exception(f"Failed to build indices for {table_name}: the table does not exist") + elif index_names is None: + print(f"No indices found for {table_name}") + return + + for key in index_names: + column_names = index_names[key] + print(f"- creating Index({key}) as {column_names}") + with self.begin() as conn: + sql_str = f"CREATE INDEX IF NOT EXISTS {key} ON {table_name}({','.join(column_names)})" + print(sql_str) + conn.execute(text(sql_str)) + def import_csv(self, csv_uri: str, table: str, columns: None | List[str] = None): print("importing into %s" % table) csv_import_options = dict(table=table) diff --git a/warehouse/bq2cloudsql/script.py b/warehouse/bq2cloudsql/script.py index 611c48ba9..5022ae90e 100644 --- a/warehouse/bq2cloudsql/script.py +++ b/warehouse/bq2cloudsql/script.py @@ -10,8 +10,9 @@ from dotenv import load_dotenv - def table_sync_config_from_dbt_marts(target: str) -> List[TableSyncConfig]: + # Run dbt to get model meta + print("Loading config from dbt mart models...") dbt = dbtRunner() r: dbtRunnerResult = dbt.invoke( [ @@ -30,29 +31,41 @@ def table_sync_config_from_dbt_marts(target: str) -> List[TableSyncConfig]: raise Exception("dbt listing failed") if not isinstance(r.result, list): raise Exception("Unexpected response from dbt") + + # Load the model metadata + print("Results from loading dbt model config:") model_configs = map(lambda a: json.loads(a), r.result) sync_configs: List[TableSyncConfig] = [] + skipped_tables: List[str] = [] for model_config in model_configs: config = model_config.get("config", {}) meta = config.get("meta", {}) if not meta.get("sync_to_db", False): - print("Skipping %s" % model_config["name"]) + skipped_tables.append(model_config["name"]) continue - print(model_config["name"]) + print(f"Queuing {model_config["name"]}") sync_configs.append( TableSyncConfig( TableSyncMode.OVERWRITE, model_config["name"], model_config["name"], + meta.get("index"), ) ) + print(f"Skipping {skipped_tables}") return sync_configs - +# Main function def run(): load_dotenv() - bq = bigquery.Client() - storage_client = storage.Client() + print("Running bq2cloudsql...") + # Initialize clients + bq = bigquery.Client( + project=os.environ.get("GOOGLE_PROJECT_ID") + ) + storage_client = storage.Client( + project=os.environ.get("GOOGLE_PROJECT_ID") + ) cloudsql = CloudSQLClient.connect( os.environ.get("GOOGLE_PROJECT_ID"), os.environ.get("CLOUDSQL_REGION"), @@ -62,9 +75,10 @@ def run(): os.environ.get("CLOUDSQL_DB_NAME"), ) - # Automtically discover dbt marts + # Automatically discover dbt marts table_sync_configs = table_sync_config_from_dbt_marts(os.environ.get("DBT_TARGET")) + # Run sync synchronizer = BigQueryCloudSQLSynchronizer( bq, storage_client, @@ -75,3 +89,4 @@ def run(): os.environ.get("CLOUDSTORAGE_BUCKET_NAME"), ) synchronizer.sync() + print("...done") \ No newline at end of file diff --git a/warehouse/bq2cloudsql/synchronizer.py b/warehouse/bq2cloudsql/synchronizer.py index 70ddb2b4c..a86e73da6 100644 --- a/warehouse/bq2cloudsql/synchronizer.py +++ b/warehouse/bq2cloudsql/synchronizer.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from datetime import datetime import textwrap -from typing import List, Dict, Union, Tuple +from typing import List, Dict, Union, Tuple, Optional import pendulum from sqlalchemy import ( @@ -75,12 +75,17 @@ class TableSyncMode(Enum): OVERWRITE = 3 +# There is a TableSyncConfig per table that we want to copy @dataclass class TableSyncConfig: + # Whether it's incremental or overwrite mode: TableSyncMode + # The name of the table in source database source_table: str + # The name of the table in destination database destination_table: str - + # index_name => list of column names to index + index: Optional[Dict[str, List[str]]] class BigQueryCloudSQLSynchronizer(object): def __init__( @@ -262,8 +267,12 @@ def commit_table_for_overwrite( config: TableSyncConfig, last_partition_date: pendulum.DateTime | None, ): + # Build the index in the temporary table + self._cloudsql.build_index(temp_dest_table, config.index) + # Creates a transaction to rename the new table to the old table. # Currently if there are schema changes those changes are simply forced + print(f"Moving {temp_dest_table} to {config.destination_table}") with self._cloudsql.begin() as conn: update_data = dict(last_sync_at=pendulum.now("UTC"), is_incremental=False) insert_stmt = insert(self._sync_state_table).values( diff --git a/warehouse/dbt/models/marts/directory/artifacts_by_project_v1.sql b/warehouse/dbt/models/marts/directory/artifacts_by_project_v1.sql index 3388e797f..45499870b 100644 --- a/warehouse/dbt/models/marts/directory/artifacts_by_project_v1.sql +++ b/warehouse/dbt/models/marts/directory/artifacts_by_project_v1.sql @@ -1,6 +1,11 @@ {{ config(meta = { - 'sync_to_db': True + 'sync_to_db': True, + 'index': { + 'idx_project_id': ["project_id"], + 'idx_project_name': ["project_source", "project_namespace", "project_name"], + 'idx_artifact_id': ["artifact_id"], + } }) }} diff --git a/warehouse/dbt/models/marts/directory/artifacts_v1.sql b/warehouse/dbt/models/marts/directory/artifacts_v1.sql index b58ef983c..2f728f440 100644 --- a/warehouse/dbt/models/marts/directory/artifacts_v1.sql +++ b/warehouse/dbt/models/marts/directory/artifacts_v1.sql @@ -1,7 +1,10 @@ {{ config(meta = { - 'sync_to_db': True - + 'sync_to_db': True, + 'index': { + 'idx_artifact_id': ["artifact_id"], + 'idx_artifact_name': ["artifact_source", "artifact_namespace", "artifact_name"], + } }) }} diff --git a/warehouse/dbt/models/marts/directory/collections_v1.sql b/warehouse/dbt/models/marts/directory/collections_v1.sql index 5572de6c5..8088fb259 100644 --- a/warehouse/dbt/models/marts/directory/collections_v1.sql +++ b/warehouse/dbt/models/marts/directory/collections_v1.sql @@ -1,6 +1,10 @@ {{ config(meta = { - 'sync_to_db': True + 'sync_to_db': True, + 'index': { + 'idx_collection_id': ["collection_id"], + 'idx_collection_name': ["collection_source", "collection_namespace", "collection_name"], + } }) }} diff --git a/warehouse/dbt/models/marts/directory/contracts_v1.sql b/warehouse/dbt/models/marts/directory/contracts_v1.sql index c22f748c2..8d82c0cab 100644 --- a/warehouse/dbt/models/marts/directory/contracts_v1.sql +++ b/warehouse/dbt/models/marts/directory/contracts_v1.sql @@ -1,6 +1,9 @@ {{ config(meta = { - 'sync_to_db': True + 'sync_to_db': True, + 'index': { + 'idx_deployer': ["root_deployer_address"], + } }) }} diff --git a/warehouse/dbt/models/marts/directory/projects_by_collection_v1.sql b/warehouse/dbt/models/marts/directory/projects_by_collection_v1.sql index 21f44f811..d015597ee 100644 --- a/warehouse/dbt/models/marts/directory/projects_by_collection_v1.sql +++ b/warehouse/dbt/models/marts/directory/projects_by_collection_v1.sql @@ -1,6 +1,11 @@ {{ config(meta = { - 'sync_to_db': True + 'sync_to_db': True, + 'index': { + 'idx_collection_id': ["collection_id"], + 'idx_collection_name': ["collection_source", "collection_namespace", "collection_name"], + 'idx_project_id': ["project_id"], + } }) }} select diff --git a/warehouse/dbt/models/marts/directory/projects_v1.sql b/warehouse/dbt/models/marts/directory/projects_v1.sql index 937812737..8259f70bd 100644 --- a/warehouse/dbt/models/marts/directory/projects_v1.sql +++ b/warehouse/dbt/models/marts/directory/projects_v1.sql @@ -1,6 +1,10 @@ {{ config(meta = { - 'sync_to_db': True + 'sync_to_db': True, + 'index': { + 'idx_project_id': ["project_id"], + 'idx_project_name': ["project_source", "project_namespace", "project_name"], + } }) }} diff --git a/warehouse/dbt/models/marts/directory/users_v1.sql b/warehouse/dbt/models/marts/directory/users_v1.sql index 620796e28..ee6946e2d 100644 --- a/warehouse/dbt/models/marts/directory/users_v1.sql +++ b/warehouse/dbt/models/marts/directory/users_v1.sql @@ -1,6 +1,10 @@ {{ config(meta = { - 'sync_to_db': True + 'sync_to_db': True, + 'index': { + 'idx_user_id': ["user_id"], + 'idx_source_id': ["user_source", "user_source_id"], + } }) }}