Skip to content

Commit

Permalink
Add sample counts redoing (#1357)
Browse files Browse the repository at this point in the history
* add sample counts to glam desktop dag

* modified the extract function to be lean

* modified the extract function to be lean

Co-authored-by: Alekhya <[email protected]>
  • Loading branch information
alekhyamoz and Alekhya authored Sep 13, 2021
1 parent 915a78e commit a7059ec
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 11 deletions.
34 changes: 33 additions & 1 deletion dags/glam.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@
dag=dag,
)

glam_sample_counts = bigquery_etl_query(
task_id="glam_sample_counts",
destination_table="glam_sample_counts_v1",
dataset_id=dataset_id,
project_id=project_id,
owner="[email protected]",
date_partition_parameter=None,
parameters=("submission_date:DATE:{{ds}}",),
arguments=("--replace",),
dag=dag,

)
client_scalar_probe_counts = gke_command(
task_id="client_scalar_probe_counts",
command=[
Expand Down Expand Up @@ -256,13 +268,29 @@
"extract_user_counts",
default_args,
dag.schedule_interval,
dataset_id
dataset_id,
"user_counts",
"counts"
),
task_id="extract_user_counts",
executor=get_default_executor(),
dag=dag
)

extract_sample_counts = SubDagOperator(
subdag=extract_user_counts(
GLAM_DAG,
"extract_sample_counts",
default_args,
dag.schedule_interval,
dataset_id,
"sample_counts",
"sample-counts"
),
task_id="extract_sample_counts",
executor=get_default_executor(),
dag=dag
)

extracts_per_channel = SubDagOperator(
subdag=extracts_subdag(
Expand Down Expand Up @@ -300,15 +328,19 @@

clients_histogram_aggregates >> clients_histogram_bucket_counts
clients_histogram_aggregates >> glam_user_counts
clients_histogram_aggregates >> glam_sample_counts


clients_histogram_bucket_counts >> clients_histogram_probe_counts
clients_histogram_probe_counts >> histogram_percentiles

clients_scalar_aggregates >> glam_user_counts

glam_user_counts >> extract_counts
glam_sample_counts >> extract_sample_counts

extract_counts >> extracts_per_channel
extract_sample_counts >> extracts_per_channel
client_scalar_probe_counts >> extracts_per_channel
scalar_percentiles >> extracts_per_channel
histogram_percentiles >> extracts_per_channel
21 changes: 11 additions & 10 deletions dags/glam_subdags/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,19 @@ def extract_user_counts(
child_dag_name,
default_args,
schedule_interval,
dataset_id
dataset_id,
task_prefix,
file_prefix
):

bq_extract_table="glam_{}_extract_v1".format(task_prefix)
dag = DAG(
dag_id="{}.{}".format(parent_dag_name, child_dag_name),
default_args=default_args,
schedule_interval=schedule_interval,
)

bq_extract_table = "glam_user_counts_extract_v1"

etl_query = bigquery_etl_query(
task_id="glam_user_counts_extract",
task_id="glam_{}_extract".format(task_prefix),
destination_table=bq_extract_table,
dataset_id=dataset_id,
project_id=project_id,
Expand All @@ -135,18 +136,18 @@ def extract_user_counts(
)

gcs_delete = GoogleCloudStorageDeleteOperator(
task_id="glam_gcs_delete_count_extracts",
task_id="glam_gcs_delete_{}_extracts".format(task_prefix),
bucket_name=glam_bucket,
prefix="glam-extract-firefox-counts",
prefix="glam-extract-firefox-{}".format(file_prefix),
google_cloud_storage_conn_id=gcp_conn.gcp_conn_id,
dag=dag,
)

gcs_destination = "gs://{}/glam-extract-firefox-counts.csv".format(
glam_bucket
gcs_destination = "gs://{}/glam-extract-firefox-{}.csv".format(
glam_bucket, file_prefix
)
bq2gcs = BigQueryToCloudStorageOperator(
task_id="glam_extract_user_counts_to_csv",
task_id="glam_extract_{}_to_csv".format(task_prefix),
source_project_dataset_table="{}.{}.{}".format(
project_id, dataset_id, bq_extract_table
),
Expand Down

0 comments on commit a7059ec

Please sign in to comment.