From e97b23b3ca746ade69da3ccf1e004c1e961489cd Mon Sep 17 00:00:00 2001 From: Marcin Kuthan Date: Tue, 24 Oct 2023 21:02:05 +0200 Subject: [PATCH] dataflow prime --- project/Dependencies.scala | 5 +---- project/Settings.scala | 2 ++ toll-application/run-batch.sh | 8 +++++--- toll-application/run-streaming.sh | 11 +++++++---- toll-infrastructure/main.tf | 8 ++++---- 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 529e70e7..1f718409 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,10 +12,7 @@ object Dependencies { ) val beamDirectRunner = "org.apache.beam" % "beam-runners-direct-java" % "2.51.0" - - val beamDataflowRunner = "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % "2.51.0" excludeAll ( - ExclusionRule(organization = "org.apache.beam", name = "beam-sdks-java-io-kafka") - ) + val beamDataflowRunner = "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % "2.51.0" val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5" diff --git a/project/Settings.scala b/project/Settings.scala index 52cbcb59..5adc5bf1 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -38,6 +38,8 @@ object Settings { Global / onChangedBuildSource := ReloadOnSourceChanges, // experimental feature to speed up the build updateOptions := updateOptions.value.withCachedResolution(true), + // required by beam-runners-google-cloud-dataflow-java + resolvers += "confluent" at "https://packages.confluent.io/maven/", // use jcl-over-slf4j bridge instead of common-logging excludeDependencies += "commons-logging" % "commons-logging", // enable XML report for codecov diff --git a/toll-application/run-batch.sh b/toll-application/run-batch.sh index 465fd7c6..147002c3 100755 --- a/toll-application/run-batch.sh +++ b/toll-application/run-batch.sh @@ -9,9 +9,7 @@ gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \ --template-file-gcs-location "gs://$PROJECT-toll-application/templates/toll-application-batch.json" \ --region "$REGION" \ --staging-location "gs://$PROJECT-toll-application/staging/" \ - --additional-experiments "use_runner_v2" \ - --max-workers 1 \ - --worker-machine-type "t2d-standard-1" \ + --additional-experiments "enable_prime" \ --parameters effectiveDate="$DATE" \ --parameters entryTable="$PROJECT.toll_application.toll-booth-entry" \ --parameters exitTable="$PROJECT.toll_application.toll-booth-exit" \ @@ -22,3 +20,7 @@ gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \ --parameters totalVehicleTimesDiagnosticOneHourGapTable="$PROJECT.toll_application.total-vehicle-times-diagnostic-one-hour-gap" \ --parameters vehiclesWithExpiredRegistrationDailyTable="$PROJECT.toll_application.vehicles-with-expired-registration-daily" \ --parameters vehiclesWithExpiredRegistrationDiagnosticDailyTable="$PROJECT.toll_application.vehicles-with-expired-registration-diagnostic-daily" \ + + # --additional-experiments "use_runner_v2" \ + # --max-workers 1 \ + # --worker-machine-type "t2d-standard-1" \ diff --git a/toll-application/run-streaming.sh b/toll-application/run-streaming.sh index 45fca7f0..865cb944 100755 --- a/toll-application/run-streaming.sh +++ b/toll-application/run-streaming.sh @@ -1,16 +1,14 @@ #!/bin/bash PROJECT=playground-272019 -REGION=europe-west1 +REGION=europe-central2 gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \ --template-file-gcs-location "gs://$PROJECT-toll-application/templates/toll-application-streaming.json" \ --region "$REGION" \ --staging-location "gs://$PROJECT-toll-application/staging/" \ --enable-streaming-engine \ - --additional-experiments "use_runner_v2" \ - --max-workers 1 \ - --worker-machine-type "t2d-standard-1" \ + --additional-experiments "enable_prime" \ --parameters entrySubscription="projects/$PROJECT/subscriptions/toll-booth-entry" \ --parameters entryDlq="gs://$PROJECT-toll-application/dlq/entry" \ --parameters exitSubscription="projects/$PROJECT/subscriptions/toll-booth-exit" \ @@ -22,5 +20,10 @@ gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \ --parameters totalVehicleTimesTable="$PROJECT.toll_application.total-vehicle-times" \ --parameters totalVehicleTimesDiagnosticTable="$PROJECT.toll_application.total-vehicle-times-diagnostic" \ --parameters vehiclesWithExpiredRegistrationTopic="projects/$PROJECT/topics/vehicle-registration" \ + --parameters vehiclesWithExpiredRegistrationTable="$PROJECT.toll_application.vehicles-with-expired-registration" \ --parameters vehiclesWithExpiredRegistrationDiagnosticTable="$PROJECT.toll_application.vehicles-with-expired-registration-diagnostic" \ --parameters ioDiagnosticTable="$PROJECT.toll_application.io-diagnostic" \ + + # --additional-experiments "use_runner_v2" \ + # --max-workers 1 \ + # --worker-machine-type "t2d-standard-1" \ \ No newline at end of file diff --git a/toll-infrastructure/main.tf b/toll-infrastructure/main.tf index 645f21be..b67d7edc 100644 --- a/toll-infrastructure/main.tf +++ b/toll-infrastructure/main.tf @@ -207,8 +207,8 @@ resource "google_bigquery_table" "total-vehicle-times-one-hour-gap-table" { schema = file("${path.module}/schemas/total-vehicle-times.json") } -resource "google_bigquery_table" "total-vehicle-times-one-hour-gap-diagnostic-table" { - table_id = "total-vehicle-times-one-hour-gap-diagnostic" +resource "google_bigquery_table" "total-vehicle-times-diagnostic-one-hour-gap-table" { + table_id = "total-vehicle-times-diagnostic-one-hour-gap" dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id deletion_protection = false require_partition_filter = true @@ -267,8 +267,8 @@ resource "google_bigquery_table" "vehicles-with-expired-registration-daily-table schema = file("${path.module}/schemas/vehicles-with-expired-registration.json") } -resource "google_bigquery_table" "vehicles-with-expired-registration-daily-diagnostic-table" { - table_id = "vehicles-with-expired-registration-daily-diagnostic" +resource "google_bigquery_table" "vehicles-with-expired-registration-diagnostic-daily-table" { + table_id = "vehicles-with-expired-registration-diagnostic-daily" dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id deletion_protection = false require_partition_filter = true