Skip to content

Commit

Permalink
dataflow prime
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuthan committed Oct 24, 2023
1 parent 3af71a8 commit e97b23b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 15 deletions.
5 changes: 1 addition & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ object Dependencies {
)

val beamDirectRunner = "org.apache.beam" % "beam-runners-direct-java" % "2.51.0"

val beamDataflowRunner = "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % "2.51.0" excludeAll (
ExclusionRule(organization = "org.apache.beam", name = "beam-sdks-java-io-kafka")
)
val beamDataflowRunner = "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % "2.51.0"

val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5"

Expand Down
2 changes: 2 additions & 0 deletions project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ object Settings {
Global / onChangedBuildSource := ReloadOnSourceChanges,
// experimental feature to speed up the build
updateOptions := updateOptions.value.withCachedResolution(true),
// required by beam-runners-google-cloud-dataflow-java
resolvers += "confluent" at "https://packages.confluent.io/maven/",
// use jcl-over-slf4j bridge instead of common-logging
excludeDependencies += "commons-logging" % "commons-logging",
// enable XML report for codecov
Expand Down
8 changes: 5 additions & 3 deletions toll-application/run-batch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location "gs://$PROJECT-toll-application/templates/toll-application-batch.json" \
--region "$REGION" \
--staging-location "gs://$PROJECT-toll-application/staging/" \
--additional-experiments "use_runner_v2" \
--max-workers 1 \
--worker-machine-type "t2d-standard-1" \
--additional-experiments "enable_prime" \
--parameters effectiveDate="$DATE" \
--parameters entryTable="$PROJECT.toll_application.toll-booth-entry" \
--parameters exitTable="$PROJECT.toll_application.toll-booth-exit" \
Expand All @@ -22,3 +20,7 @@ gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \
--parameters totalVehicleTimesDiagnosticOneHourGapTable="$PROJECT.toll_application.total-vehicle-times-diagnostic-one-hour-gap" \
--parameters vehiclesWithExpiredRegistrationDailyTable="$PROJECT.toll_application.vehicles-with-expired-registration-daily" \
--parameters vehiclesWithExpiredRegistrationDiagnosticDailyTable="$PROJECT.toll_application.vehicles-with-expired-registration-diagnostic-daily" \

# --additional-experiments "use_runner_v2" \
# --max-workers 1 \
# --worker-machine-type "t2d-standard-1" \
11 changes: 7 additions & 4 deletions toll-application/run-streaming.sh
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
#!/bin/bash

PROJECT=playground-272019
REGION=europe-west1
REGION=europe-central2

gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location "gs://$PROJECT-toll-application/templates/toll-application-streaming.json" \
--region "$REGION" \
--staging-location "gs://$PROJECT-toll-application/staging/" \
--enable-streaming-engine \
--additional-experiments "use_runner_v2" \
--max-workers 1 \
--worker-machine-type "t2d-standard-1" \
--additional-experiments "enable_prime" \
--parameters entrySubscription="projects/$PROJECT/subscriptions/toll-booth-entry" \
--parameters entryDlq="gs://$PROJECT-toll-application/dlq/entry" \
--parameters exitSubscription="projects/$PROJECT/subscriptions/toll-booth-exit" \
Expand All @@ -22,5 +20,10 @@ gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \
--parameters totalVehicleTimesTable="$PROJECT.toll_application.total-vehicle-times" \
--parameters totalVehicleTimesDiagnosticTable="$PROJECT.toll_application.total-vehicle-times-diagnostic" \
--parameters vehiclesWithExpiredRegistrationTopic="projects/$PROJECT/topics/vehicle-registration" \
--parameters vehiclesWithExpiredRegistrationTable="$PROJECT.toll_application.vehicles-with-expired-registration" \
--parameters vehiclesWithExpiredRegistrationDiagnosticTable="$PROJECT.toll_application.vehicles-with-expired-registration-diagnostic" \
--parameters ioDiagnosticTable="$PROJECT.toll_application.io-diagnostic" \

# --additional-experiments "use_runner_v2" \
# --max-workers 1 \
# --worker-machine-type "t2d-standard-1" \
8 changes: 4 additions & 4 deletions toll-infrastructure/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ resource "google_bigquery_table" "total-vehicle-times-one-hour-gap-table" {
schema = file("${path.module}/schemas/total-vehicle-times.json")
}

resource "google_bigquery_table" "total-vehicle-times-one-hour-gap-diagnostic-table" {
table_id = "total-vehicle-times-one-hour-gap-diagnostic"
resource "google_bigquery_table" "total-vehicle-times-diagnostic-one-hour-gap-table" {
table_id = "total-vehicle-times-diagnostic-one-hour-gap"
dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id
deletion_protection = false
require_partition_filter = true
Expand Down Expand Up @@ -267,8 +267,8 @@ resource "google_bigquery_table" "vehicles-with-expired-registration-daily-table
schema = file("${path.module}/schemas/vehicles-with-expired-registration.json")
}

resource "google_bigquery_table" "vehicles-with-expired-registration-daily-diagnostic-table" {
table_id = "vehicles-with-expired-registration-daily-diagnostic"
resource "google_bigquery_table" "vehicles-with-expired-registration-diagnostic-daily-table" {
table_id = "vehicles-with-expired-registration-diagnostic-daily"
dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id
deletion_protection = false
require_partition_filter = true
Expand Down

0 comments on commit e97b23b

Please sign in to comment.