Skip to content

Commit

Permalink
feat: build indices for mart models (#1567)
Browse files Browse the repository at this point in the history
* Allows the mart model author to specify which indices to build
* Adds a few indices in `marts/directory`
* Adds a README to bq2cloudsql
* Adds an .env.example in monorepo root
* Adds a utility to read the index config from the mart model and create
  the indices on copy
Note: This will not work on incremental mode in bq2cloudsql
  • Loading branch information
ryscheng authored May 31, 2024
1 parent a5f9b7c commit b9f56e6
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 22 deletions.
25 changes: 25 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# .env
## This .env file is mostly used for Python data ops

## Google Cloud setup
GOOGLE_PROJECT_ID=
# You will need to generate Google application credentials
# Note: You can use your gcloud auth credentials
GOOGLE_APPLICATION_CREDENTIALS=<path-to-valid-gcp-creds>
# Used for data transfer between databases
CLOUDSTORAGE_BUCKET_NAME=
# Used for storing all BigQuery data in the dbt pipeline
BIGQUERY_DATASET_ID=
# Used for Frontend/API-facing services
CLOUDSQL_REGION=
CLOUDSQL_INSTANCE_ID=
CLOUDSQL_DB_NAME=
CLOUDSQL_DB_PASSWORD=
CLOUDSQL_DB_USER=

## Dagster Setup
# You may want to change the location of dagster home if you want it to survive resets
DAGSTER_HOME=/tmp/dagster-home
# This is used to put generated dbt profiles for dagster in a specific place
DAGSTER_DBT_TARGET_BASE_DIR=/tmp/dagster-home/generated-dbt
DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1
41 changes: 41 additions & 0 deletions warehouse/bq2cloudsql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# bq2cloudsql

This utility script copies the final mart models from the end of
the dbt pipeline into a Google CloudSQL postgres database for
serving the API / frontend.

The general strategy is to dump the models into CSV files
on GCS, then import them into the Postgres database
into temporary tables. When the copy is done, we drop the old tables
and rename the fresh import.

Note: Because we only maintain a single database, this operation can
severely load the production server during copies.

## Setup

Copy `.env.example` to `.env` in the root of the monorepo.
Populate the values there.

Then from the monorepo root, run install Python dependencies

```bash
poetry install
```

## Running

Run the copy with

```bash
poetry run bq2cloudsql
```

Note: this command is defined in the root `pyproject.toml`

## Development

For now, we've been developing / testing this on the production servers.
In the future, we should create staging and development databases for this.

You can copy only select tables by slicing `table_sync_configs` in `./script.py`.
25 changes: 20 additions & 5 deletions warehouse/bq2cloudsql/cloudsql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@

import pg8000.native
import sqlalchemy
from sqlalchemy import Table, Column, MetaData
from sqlalchemy import Table, Column, MetaData, text
from google.cloud.sql.connector import Connector
from googleapiclient.errors import HttpError
from googleapiclient.discovery import build
from typing import Callable, List

from typing import Callable, List, Dict, Optional

connector = Connector()

pp = pprint.PrettyPrinter(indent=4)


def get_connection(
project_id: str,
region: str,
Expand Down Expand Up @@ -121,6 +118,24 @@ def ensure_table(self, table_name: str, columns: List[Column]):
Table(table_name, metadata, *columns)
metadata.create_all(self.sql_conn)

def build_index(self, table_name: str, index_names: Optional[Dict[str, List[str]]]):
print(f"Building indices for {table_name}:")

# Check that the table exists
if not sqlalchemy.inspect(self.sql_conn).has_table(table_name):
raise Exception(f"Failed to build indices for {table_name}: the table does not exist")
elif index_names is None:
print(f"No indices found for {table_name}")
return

for key in index_names:
column_names = index_names[key]
print(f"- creating Index({key}) as {column_names}")
with self.begin() as conn:
sql_str = f"CREATE INDEX IF NOT EXISTS {key} ON {table_name}({','.join(column_names)})"
print(sql_str)
conn.execute(text(sql_str))

def import_csv(self, csv_uri: str, table: str, columns: None | List[str] = None):
print("importing into %s" % table)
csv_import_options = dict(table=table)
Expand Down
29 changes: 22 additions & 7 deletions warehouse/bq2cloudsql/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

from dotenv import load_dotenv


def table_sync_config_from_dbt_marts(target: str) -> List[TableSyncConfig]:
# Run dbt to get model meta
print("Loading config from dbt mart models...")
dbt = dbtRunner()
r: dbtRunnerResult = dbt.invoke(
[
Expand All @@ -30,29 +31,41 @@ def table_sync_config_from_dbt_marts(target: str) -> List[TableSyncConfig]:
raise Exception("dbt listing failed")
if not isinstance(r.result, list):
raise Exception("Unexpected response from dbt")

# Load the model metadata
print("Results from loading dbt model config:")
model_configs = map(lambda a: json.loads(a), r.result)
sync_configs: List[TableSyncConfig] = []
skipped_tables: List[str] = []
for model_config in model_configs:
config = model_config.get("config", {})
meta = config.get("meta", {})
if not meta.get("sync_to_db", False):
print("Skipping %s" % model_config["name"])
skipped_tables.append(model_config["name"])
continue
print(model_config["name"])
print(f"Queuing {model_config["name"]}")
sync_configs.append(
TableSyncConfig(
TableSyncMode.OVERWRITE,
model_config["name"],
model_config["name"],
meta.get("index"),
)
)
print(f"Skipping {skipped_tables}")
return sync_configs


# Main function
def run():
load_dotenv()
bq = bigquery.Client()
storage_client = storage.Client()
print("Running bq2cloudsql...")
# Initialize clients
bq = bigquery.Client(
project=os.environ.get("GOOGLE_PROJECT_ID")
)
storage_client = storage.Client(
project=os.environ.get("GOOGLE_PROJECT_ID")
)
cloudsql = CloudSQLClient.connect(
os.environ.get("GOOGLE_PROJECT_ID"),
os.environ.get("CLOUDSQL_REGION"),
Expand All @@ -62,9 +75,10 @@ def run():
os.environ.get("CLOUDSQL_DB_NAME"),
)

# Automtically discover dbt marts
# Automatically discover dbt marts
table_sync_configs = table_sync_config_from_dbt_marts(os.environ.get("DBT_TARGET"))

# Run sync
synchronizer = BigQueryCloudSQLSynchronizer(
bq,
storage_client,
Expand All @@ -75,3 +89,4 @@ def run():
os.environ.get("CLOUDSTORAGE_BUCKET_NAME"),
)
synchronizer.sync()
print("...done")
13 changes: 11 additions & 2 deletions warehouse/bq2cloudsql/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from datetime import datetime
import textwrap
from typing import List, Dict, Union, Tuple
from typing import List, Dict, Union, Tuple, Optional

import pendulum
from sqlalchemy import (
Expand Down Expand Up @@ -75,12 +75,17 @@ class TableSyncMode(Enum):
OVERWRITE = 3


# There is a TableSyncConfig per table that we want to copy
@dataclass
class TableSyncConfig:
# Whether it's incremental or overwrite
mode: TableSyncMode
# The name of the table in source database
source_table: str
# The name of the table in destination database
destination_table: str

# index_name => list of column names to index
index: Optional[Dict[str, List[str]]]

class BigQueryCloudSQLSynchronizer(object):
def __init__(
Expand Down Expand Up @@ -262,8 +267,12 @@ def commit_table_for_overwrite(
config: TableSyncConfig,
last_partition_date: pendulum.DateTime | None,
):
# Build the index in the temporary table
self._cloudsql.build_index(temp_dest_table, config.index)

# Creates a transaction to rename the new table to the old table.
# Currently if there are schema changes those changes are simply forced
print(f"Moving {temp_dest_table} to {config.destination_table}")
with self._cloudsql.begin() as conn:
update_data = dict(last_sync_at=pendulum.now("UTC"), is_incremental=False)
insert_stmt = insert(self._sync_state_table).values(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
{{
config(meta = {
'sync_to_db': True
'sync_to_db': True,
'index': {
'idx_project_id': ["project_id"],
'idx_project_name': ["project_source", "project_namespace", "project_name"],
'idx_artifact_id': ["artifact_id"],
}
})
}}

Expand Down
7 changes: 5 additions & 2 deletions warehouse/dbt/models/marts/directory/artifacts_v1.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
{{
config(meta = {
'sync_to_db': True

'sync_to_db': True,
'index': {
'idx_artifact_id': ["artifact_id"],
'idx_artifact_name': ["artifact_source", "artifact_namespace", "artifact_name"],
}
})
}}

Expand Down
6 changes: 5 additions & 1 deletion warehouse/dbt/models/marts/directory/collections_v1.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
{{
config(meta = {
'sync_to_db': True
'sync_to_db': True,
'index': {
'idx_collection_id': ["collection_id"],
'idx_collection_name': ["collection_source", "collection_namespace", "collection_name"],
}
})
}}

Expand Down
5 changes: 4 additions & 1 deletion warehouse/dbt/models/marts/directory/contracts_v1.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{{
config(meta = {
'sync_to_db': True
'sync_to_db': True,
'index': {
'idx_deployer': ["root_deployer_address"],
}
})
}}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
{{
config(meta = {
'sync_to_db': True
'sync_to_db': True,
'index': {
'idx_collection_id': ["collection_id"],
'idx_collection_name': ["collection_source", "collection_namespace", "collection_name"],
'idx_project_id': ["project_id"],
}
})
}}
select
Expand Down
6 changes: 5 additions & 1 deletion warehouse/dbt/models/marts/directory/projects_v1.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
{{
config(meta = {
'sync_to_db': True
'sync_to_db': True,
'index': {
'idx_project_id': ["project_id"],
'idx_project_name': ["project_source", "project_namespace", "project_name"],
}
})
}}

Expand Down
6 changes: 5 additions & 1 deletion warehouse/dbt/models/marts/directory/users_v1.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
{{
config(meta = {
'sync_to_db': True
'sync_to_db': True,
'index': {
'idx_user_id': ["user_id"],
'idx_source_id': ["user_source", "user_source_id"],
}
})
}}

Expand Down

0 comments on commit b9f56e6

Please sign in to comment.