diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index f90c3887..9d31c71c 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -22,8 +22,12 @@ jobs: - uses: dflook/terraform-apply@v1 with: path: toll-infrastructure + # TODO: fix IAM and deploy flex templates # - uses: "google-github-actions/setup-gcloud@v1" # - run: ./deploy-batch.sh # working-directory: toll-application # - run: ./deploy-streaming.sh # working-directory: toll-application + + # TODO: run streaming job + # TODO: deploy DAG for batch job diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 63537035..8db12c22 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,7 +6,7 @@ object Dependencies { val scioTest = "com.spotify" %% "scio-test" % "0.13.3" val beamDirectRunner = "org.apache.beam" % "beam-runners-direct-java" % "2.50.0" - val beamDataflowRunner = "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % "2.6.0" + val beamDataflowRunner = "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % "2.50.0" val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5" diff --git a/project/Settings.scala b/project/Settings.scala index df7c9b63..94f16ab4 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -38,6 +38,8 @@ object Settings { Global / onChangedBuildSource := ReloadOnSourceChanges, // experimental feature to speed up the build updateOptions := updateOptions.value.withCachedResolution(true), + // required by beam-runners-google-cloud-dataflow-java + resolvers += "confluent" at "https://packages.confluent.io/maven/", // use jcl-over-slf4j bridge instead of common-logging excludeDependencies += "commons-logging" % "commons-logging", // enable XML report for codecov diff --git a/toll-application/deploy-batch.sh b/toll-application/deploy-batch.sh index 7e3e51a6..962940f5 100755 --- a/toll-application/deploy-batch.sh +++ b/toll-application/deploy-batch.sh @@ -8,4 +8,4 @@ gcloud dataflow flex-template build gs://$PROJECT-toll-application/templates/tol --sdk-language "JAVA" \ --flex-template-base-image JAVA17 \ --jar "target/scala-2.13/toll-application.jar" \ - --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.mkuthan.streamprocessing.toll.application.batch.TollBatchJob" + --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.mkuthan.streamprocessing.toll.application.batch.TollBatchJob" \ diff --git a/toll-application/deploy-streaming.sh b/toll-application/deploy-streaming.sh index da1a6334..082f0224 100755 --- a/toll-application/deploy-streaming.sh +++ b/toll-application/deploy-streaming.sh @@ -8,4 +8,4 @@ gcloud dataflow flex-template build gs://$PROJECT-toll-application/templates/tol --sdk-language "JAVA" \ --flex-template-base-image JAVA17 \ --jar "target/scala-2.13/toll-application.jar" \ - --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.mkuthan.streamprocessing.toll.application.streaming.TollStreamingJob" + --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.mkuthan.streamprocessing.toll.application.streaming.TollStreamingJob" \ diff --git a/toll-application/run-batch.sh b/toll-application/run-batch.sh index 5cde41af..570341ab 100755 --- a/toll-application/run-batch.sh +++ b/toll-application/run-batch.sh @@ -8,6 +8,8 @@ gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \ --template-file-gcs-location "gs://$PROJECT-toll-application/templates/toll-application-batch.json" \ --region "$REGION" \ --staging-location="gs://$PROJECT-toll-application/staging/" \ + --additional-experiments="use_runner_v2" \ + --max-workers=1 \ --parameters effectiveDate="$DATE" \ --parameters entryTable="$PROJECT.toll_application.toll-booth-entry" \ --parameters exitTable="$PROJECT.toll_application.toll-booth-exit" \ @@ -18,4 +20,3 @@ gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \ --parameters totalVehicleTimesOneHourGapDiagnosticTable="$PROJECT.toll_application.total-vehicle-times-one-hour-gap-diagnostic" \ --parameters vehiclesWithExpiredRegistrationDailyTable="$PROJECT.toll_application.vehicles-with-expired-registration-diagnostic-daily" \ --parameters vehiclesWithExpiredRegistrationDailyDiagnosticTable="$PROJECT.toll_application.vehicles-with-expired-registration-daily-diagnostic" \ - diff --git a/toll-application/run-streaming.sh b/toll-application/run-streaming.sh index 4c70dba7..7ff132d8 100755 --- a/toll-application/run-streaming.sh +++ b/toll-application/run-streaming.sh @@ -8,6 +8,8 @@ gcloud dataflow flex-template run "toll-application-`date +%Y%m%d-%H%M%S`" \ --region "$REGION" \ --staging-location="gs://$PROJECT-toll-application/staging/" \ --enable-streaming-engine \ + --additional-experiments="use_runner_v2" \ + --max-workers=1 \ --parameters entrySubscription="projects/$PROJECT/subscriptions/toll-booth-entry" \ --parameters entryDlq="gs://$PROJECT-toll-application/dlq/entry" \ --parameters exitSubscription="projects/$PROJECT/subscriptions/toll-booth-exit" \ diff --git a/toll-infrastructure/main.tf b/toll-infrastructure/main.tf index c3f0ab2c..30a30e00 100644 --- a/toll-infrastructure/main.tf +++ b/toll-infrastructure/main.tf @@ -40,6 +40,10 @@ resource "google_storage_bucket" "toll-application-bucket" { location = "EU" } +resource "google_bigquery_dataset" "toll-application-dataset" { + dataset_id = "toll_application" +} + resource "google_pubsub_topic" "toll-booth-entry-topic" { name = "toll-booth-entry" } @@ -49,6 +53,15 @@ resource "google_pubsub_subscription" "toll-booth-entry-subscription" { topic = google_pubsub_topic.toll-booth-entry-topic.id } +resource "google_bigquery_table" "toll-booth-entry-table" { + table_id = "toll-booth-entry" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false + + # TODO: define schema + schema = file("${path.module}/schemas/toll-booth-entry.json") +} + resource "google_pubsub_topic" "toll-booth-exit-topic" { name = "toll-booth-exit" } @@ -58,6 +71,15 @@ resource "google_pubsub_subscription" "toll-booth-exit-subscription" { topic = google_pubsub_topic.toll-booth-exit-topic.id } +resource "google_bigquery_table" "toll-booth-exit-table" { + table_id = "toll-booth-exit" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false + + # TODO: define schema + schema = file("${path.module}/schemas/toll-booth-exit.json") +} + resource "google_pubsub_topic" "vehicle-registration-topic" { name = "vehicle-registration" } @@ -67,37 +89,73 @@ resource "google_pubsub_subscription" "vehicle-registration-subscription" { topic = google_pubsub_topic.vehicle-registration-topic.id } -resource "google_bigquery_dataset" "toll-application-dataset" { - dataset_id = "toll_application" -} - resource "google_bigquery_table" "vehicle-registration-table" { - table_id = "vehicle-registration" - dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + table_id = "vehicle-registration" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false # TODO: define schema schema = file("${path.module}/schemas/vehicle-registration.json") } resource "google_bigquery_table" "toll-booth-entry-stats-table" { - table_id = "toll-booth-entry-stats" - dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + table_id = "toll-booth-entry-stats" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false + + # TODO: define schema + schema = file("${path.module}/schemas/toll-booth-entry-stats.json") +} + +resource "google_bigquery_table" "toll-booth-entry-stats-hourly-table" { + table_id = "toll-booth-entry-stats-hourly" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false + + # TODO: define schema + schema = file("${path.module}/schemas/toll-booth-entry-stats.json") +} + +resource "google_bigquery_table" "toll-booth-entry-stats-daily-table" { + table_id = "toll-booth-entry-stats-daily" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false # TODO: define schema schema = file("${path.module}/schemas/toll-booth-entry-stats.json") } resource "google_bigquery_table" "total-vehicle-times-table" { - table_id = "total-vehicle-times" - dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + table_id = "total-vehicle-times" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false # TODO: define schema schema = file("${path.module}/schemas/total-vehicle-times.json") } resource "google_bigquery_table" "total-vehicle-times-diagnostic-table" { - table_id = "total-vehicle-times-diagnostic" - dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + table_id = "total-vehicle-times-diagnostic" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false + + # TODO: define schema + schema = file("${path.module}/schemas/toll-booth-diagnostic.json") +} + +resource "google_bigquery_table" "total-vehicle-times-one-hour-gap-table" { + table_id = "total-vehicle-times-one-hour-gap" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false + + # TODO: define schema + schema = file("${path.module}/schemas/total-vehicle-times.json") +} + +resource "google_bigquery_table" "total-vehicle-times-one-hour-gap-diagnostic-table" { + table_id = "total-vehicle-times-one-hour-gap-diagnostic" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false # TODO: define schema schema = file("${path.module}/schemas/toll-booth-diagnostic.json") @@ -108,16 +166,36 @@ resource "google_pubsub_topic" "vehicles-with-expired-registration-topic" { } resource "google_bigquery_table" "vehicles-with-expired-registration-diagnostic-table" { - table_id = "vehicles-with-expired-registration-diagnostic" - dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + table_id = "vehicles-with-expired-registration-diagnostic" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false + + # TODO: define schema + schema = file("${path.module}/schemas/toll-booth-diagnostic.json") +} + +resource "google_bigquery_table" "vehicles-with-expired-registration-daily-table" { + table_id = "vehicles-with-expired-registration-daily" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false + + # TODO: define schema + schema = file("${path.module}/schemas/vehicles-with-expired-registration.json") +} + +resource "google_bigquery_table" "vehicles-with-expired-registration-daily-diagnostic-table" { + table_id = "vehicles-with-expired-registration-diagnostic-daily" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false # TODO: define schema schema = file("${path.module}/schemas/toll-booth-diagnostic.json") } resource "google_bigquery_table" "io-diagnostic-table" { - table_id = "io-diagnostic" - dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + table_id = "io-diagnostic" + dataset_id = google_bigquery_dataset.toll-application-dataset.dataset_id + deletion_protection = false # TODO: define schema schema = file("${path.module}/schemas/io-diagnostic.json") diff --git a/toll-infrastructure/schemas/toll-booth-entry.json b/toll-infrastructure/schemas/toll-booth-entry.json new file mode 100644 index 00000000..d4bd31a0 --- /dev/null +++ b/toll-infrastructure/schemas/toll-booth-entry.json @@ -0,0 +1,10 @@ +[ + { + "name": "permalink", + "type": "STRING" + }, + { + "name": "state", + "type": "STRING" + } +] \ No newline at end of file diff --git a/toll-infrastructure/schemas/toll-booth-exit.json b/toll-infrastructure/schemas/toll-booth-exit.json new file mode 100644 index 00000000..d4bd31a0 --- /dev/null +++ b/toll-infrastructure/schemas/toll-booth-exit.json @@ -0,0 +1,10 @@ +[ + { + "name": "permalink", + "type": "STRING" + }, + { + "name": "state", + "type": "STRING" + } +] \ No newline at end of file diff --git a/toll-infrastructure/schemas/vehicles-with-expired-registration.json b/toll-infrastructure/schemas/vehicles-with-expired-registration.json new file mode 100644 index 00000000..d4bd31a0 --- /dev/null +++ b/toll-infrastructure/schemas/vehicles-with-expired-registration.json @@ -0,0 +1,10 @@ +[ + { + "name": "permalink", + "type": "STRING" + }, + { + "name": "state", + "type": "STRING" + } +] \ No newline at end of file