Skip to content

Commit

Permalink
DENG 946 - Add a gcp util to handle the BQ DQ checks (#1735)
Browse files Browse the repository at this point in the history
* Add gcp util function to handle dq checks

* Fix CI issues

* rename destination table to source table

* Incorporate feedback

Incorporate feedback
  • Loading branch information
alekhyamoz authored Jun 28, 2023
1 parent 7fb8f66 commit e8ec66c
Showing 1 changed file with 62 additions and 1 deletion.
63 changes: 62 additions & 1 deletion dags/utils/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def export_to_parquet(
arguments = []

with models.DAG(dag_id=dag_prefix + dag_name, default_args=default_args) as dag:

create_dataproc_cluster = DataprocCreateClusterOperator(
task_id="create_dataproc_cluster",
cluster_name=cluster_name,
Expand Down Expand Up @@ -334,6 +333,68 @@ def bigquery_etl_copy_deduplicate(
)


def bigquery_dq_check(
source_table,
dataset_id,
task_id,
parameters=(),
project_id="moz-fx-data-shared-prod",
gcp_conn_id="google_cloud_airflow_gke",
gke_project_id=GCP_PROJECT_ID,
gke_location="us-west1",
gke_cluster_name="workloads-prod-v1",
gke_namespace="default",
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
date_partition_parameter="submission_date",
**kwargs,
):
"""
Run `bqetl check run` to run data quality checks against BigQuery table.
:param str destination_table: [Required] BigQuery destination table the DQ checks are run on
:param str dataset_id: [Required] BigQuery default dataset id
:param str task_id: [ [Required] ID for the task
:param Optional[str] sql_file_path: Optional override for path to the
SQL query file to run
:param Tuple[str] parameters: Parameters passed to bq query
:param Optional[str] project_id: BigQuery default project id
:param str gcp_conn_id: Airflow connection id for GCP access
:param str gke_project_id: GKE cluster project id
:param str gke_location: GKE cluster location
:param str gke_cluster_name: GKE cluster name
:param str gke_namespace: GKE cluster namespace
:param str docker_image: docker image to use
:param Optional[str] date_partition_parameter: Parameter for indicating destination
partition to generate, if None
destination should be whole table
rather than partition
:param Dict[str, Any] kwargs: Additional keyword arguments for
GKEPodOperator
:return: GKEPodOperator
"""
kwargs["task_id"] = kwargs.get("task_id", task_id)
kwargs["name"] = kwargs.get("name", task_id.replace("_", "-"))
destination_table_no_partition = (
source_table.split("$")[0] if source_table is not None else None
)
if not date_partition_parameter:
parameters += (date_partition_parameter + ":DATE:{{ds}}",)

sql_file_path = f"sql/{project_id}/{dataset_id}/{destination_table_no_partition}"
args = ["script/bqetl", "check", "run", sql_file_path]
return GKEPodOperator(
gcp_conn_id=gcp_conn_id,
project_id=gke_project_id,
location=gke_location,
cluster_name=gke_cluster_name,
namespace=gke_namespace,
image=docker_image,
arguments=args + ["--parameter=" + parameter for parameter in parameters],
**kwargs,
)


def bigquery_xcom_query(
destination_table,
dataset_id,
Expand Down

0 comments on commit e8ec66c

Please sign in to comment.