Skip to content

Commit

Permalink
Batch version of the pipeline (#102)
Browse files Browse the repository at this point in the history
* Initial commit for batch version of the pipeline

* Read toll booth entries and exits from bigquery

* Calculate TollBoothStats and TotalVehicleTime in batch

* Batch calculates hourly and daily stats instead of ten minutes granularity from streaming part

* Resolves TODOs in batch job test

* Better vehicle registration aligned to batch and streaming

* Vehicles with expired registrations in batch

* Diagnostic in batch

* Fix formatting

* Fix formatting
  • Loading branch information
mkuthan authored Sep 21, 2023
1 parent 9f2ca0e commit cba4677
Show file tree
Hide file tree
Showing 51 changed files with 1,077 additions and 474 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import sbt._
import Dependencies._
import Settings._

addCommandAlias("check", "clean; scalafmtAll; scapegoat; testOnly -- -l org.scalatest.tags.Slow")

lazy val root = (project in file("."))
.settings(
name := "stream-processing",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import com.spotify.scio.bigquery.types.BigQueryType
import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation
import com.spotify.scio.coders.Coder

import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier

private[bigquery] object BigQueryDeadLetterEncoderDoFn {
private type In = BigQueryStorageApiInsertError
private type Out[T] = BigQueryDeadLetter[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import scala.jdk.CollectionConverters._
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write

import org.joda.time.LocalDate

sealed trait BigQueryReadParam {
def configure[T](read: TypedRead[T]): TypedRead[T]
}
Expand Down Expand Up @@ -48,6 +50,16 @@ object RowRestriction {
override def configure[T](read: TypedRead[T]): TypedRead[T] =
read.withRowRestriction(sql)
}

case class DateColumnRestriction(columnName: String, localDate: LocalDate) extends RowRestriction {
override def configure[T](read: TypedRead[T]): TypedRead[T] =
read.withRowRestriction(s"TIMESTAMP_TRUNC($columnName, DAY) = '$localDate'")
}

case class PartitionDateRestriction(localDate: LocalDate) extends RowRestriction {
override def configure[T](read: TypedRead[T]): TypedRead[T] =
read.withRowRestriction(s"_PARTITIONDATE = '$localDate'")
}
}

sealed trait SelectedFields extends BigQueryReadParam
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class BigQuerySCollectionOpsTest extends AnyFlatSpec with Matchers
withTable(datasetName, SampleClassBigQuerySchema) { tableName =>
val sampleObjects = boundedTestCollectionOf[SampleClass]
.addElementsAtMinimumTime(SampleObject1, SampleObject2)
.build()
.advanceWatermarkToInfinity()

sc
.testBounded(sampleObjects)
Expand All @@ -61,7 +61,7 @@ class BigQuerySCollectionOpsTest extends AnyFlatSpec with Matchers
val localDateTime = LocalDateTime.parse("2023-06-15T14:00:00")
val sampleObjects = boundedTestCollectionOf[SampleClass]
.addElementsAtMinimumTime(SampleObject1, SampleObject2)
.build()
.advanceWatermarkToInfinity()

sc
.testBounded(sampleObjects)
Expand All @@ -88,7 +88,7 @@ class BigQuerySCollectionOpsTest extends AnyFlatSpec with Matchers
val localDate = LocalDate.parse("2023-06-15")
val sampleObjects = boundedTestCollectionOf[SampleClass]
.addElementsAtMinimumTime(SampleObject1, SampleObject2)
.build()
.advanceWatermarkToInfinity()

sc
.testBounded(sampleObjects)
Expand All @@ -115,7 +115,7 @@ class BigQuerySCollectionOpsTest extends AnyFlatSpec with Matchers
val invalidObject = SampleObject1.copy(instantField = Instant.ofEpochMilli(Long.MaxValue))
val sampleObjects = boundedTestCollectionOf[SampleClass]
.addElementsAtMinimumTime(invalidObject)
.build()
.advanceWatermarkToInfinity()

sc
.testBounded(sampleObjects)
Expand Down Expand Up @@ -177,8 +177,6 @@ class BigQuerySCollectionOpsTest extends AnyFlatSpec with Matchers
BigQueryTable[SampleClass](s"$projectId:$datasetName.$tableName")
)

results.debug()

val resultsSink = InMemorySink(results)

val run = sc.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers
val ioDiagnostics = boundedTestCollectionOf[IoDiagnostic]
.addElementsAtTime("2023-06-15T12:01:00Z", ioDiagnostic1, ioDiagnostic1, ioDiagnostic2)
.addElementsAtTime("2023-06-15T12:02:00Z", ioDiagnostic1, ioDiagnostic2)
.build()
.advanceWatermarkToInfinity()

sc
.testBounded(ioDiagnostics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class SCollectionEitherOpsTest extends AnyFlatSpec
it should "unzip Either" in runWithScioContext { sc =>
val collection = boundedTestCollectionOf[Either[String, String]]
.addElementsAtMinimumTime(Right("r1"), Left("l1"), Right("r2"), Left("l2"), Right("r3"))
.build()
.advanceWatermarkToInfinity()

val (right, left) = sc
.testBounded(collection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ class SCollectionOpsTest extends AnyFlatSpec

it should "union in global window" in runWithScioContext { sc =>
val collection1 = boundedTestCollectionOf[String]
.addElementsAtMinimumTime("one").build()
.addElementsAtMinimumTime("one").advanceWatermarkToInfinity()
val collection2 = boundedTestCollectionOf[String]
.addElementsAtMinimumTime("two", "three").build()
.addElementsAtMinimumTime("two", "three").advanceWatermarkToInfinity()
val collection3 = boundedTestCollectionOf[String]
.build()
.advanceWatermarkToInfinity()

val results = sc
.testBounded(collection1)
Expand All @@ -38,7 +38,7 @@ class SCollectionOpsTest extends AnyFlatSpec
val element = "any element"
val collection = boundedTestCollectionOf[String]
.addElementsAtTime(instant, element)
.build()
.advanceWatermarkToInfinity()

val results = sc.testBounded(collection)
.mapWithTimestamp { case (e, i) => e + i.toString }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SCollectionSumByKeyOpsTest extends AnyFlatSpec
.addElementsAtTime("12:01:00", sample2)
.addElementsAtTime("12:02:00", sample1)
.addElementsAtTime("12:10:00", sample2)
.build()
.advanceWatermarkToInfinity()

val results = sc.testBounded(collection).sumByKeyInFixedWindow(Duration.standardMinutes(10))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class ScioContextOpsTest extends AnyFlatSpec

it should "union in global window" in runWithScioContext { sc =>
val collection1 = boundedTestCollectionOf[String]
.addElementsAtMinimumTime("one").build()
.addElementsAtMinimumTime("one").advanceWatermarkToInfinity()
val collection2 = boundedTestCollectionOf[String]
.addElementsAtMinimumTime("two", "three").build()
.addElementsAtMinimumTime("two", "three").advanceWatermarkToInfinity()
val collection3 = boundedTestCollectionOf[String]
.build()
.advanceWatermarkToInfinity()

val results = sc.unionInGlobalWindow(
sc.testBounded(collection1).withGlobalWindow(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object BoundedTestCollection extends InstantSyntax {
Builder(timestampedElements)
}

def build(): BoundedTestCollection[T] = {
def advanceWatermarkToInfinity(): BoundedTestCollection[T] = {
import scala.jdk.CollectionConverters._

val coder = CoderMaterializer.beamWithDefault(Coder[T])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,16 @@ trait TimestampedMatchers extends InstantSyntax {
time: String,
value: T
): SingleMatcher[SCollection[(T, Instant)], (T, Instant)] =
containSingleValue((value, time.toInstant))
containSingleValueAtTime(time.toInstant, value)

/**
* Assert that the SCollection contains a single provided element at given time.
*/
def containSingleValueAtTime[T: Coder: Eq](
time: Instant,
value: T
): SingleMatcher[SCollection[(T, Instant)], (T, Instant)] =
containSingleValue((value, time))

/**
* Assert that the SCollection contains the provided elements at given time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class BoundedTestCollectionTest extends AnyFlatSpec with Matchers with TestScioC
.addElementsAtMinimumTime("first", "second", "third")
.addElementsAtTime(anyTime, "fourth", "fifth", "sixth")
.addElementsAtTime(Instant.parse(anyTime), "seventh")
.build()
.advanceWatermarkToInfinity()

val results = sc.testBounded(boundedTestCollection)
results.withTimestamp should containInAnyOrderAtTime[String](Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package org.mkuthan.streamprocessing.toll.application.batch

import com.spotify.scio.ContextAndArgs

import org.joda.time.Duration

import org.mkuthan.streamprocessing.infrastructure._
import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction
import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction.PartitionDateRestriction
import org.mkuthan.streamprocessing.infrastructure.bigquery.StorageReadConfiguration
import org.mkuthan.streamprocessing.shared._
import org.mkuthan.streamprocessing.toll.domain.booth.TollBoothEntry
import org.mkuthan.streamprocessing.toll.domain.booth.TollBoothExit
import org.mkuthan.streamprocessing.toll.domain.booth.TollBoothStats
import org.mkuthan.streamprocessing.toll.domain.registration.VehicleRegistration
import org.mkuthan.streamprocessing.toll.domain.vehicle.TotalVehicleTime
import org.mkuthan.streamprocessing.toll.domain.vehicle.TotalVehicleTimeDiagnostic
import org.mkuthan.streamprocessing.toll.domain.vehicle.VehiclesWithExpiredRegistration
import org.mkuthan.streamprocessing.toll.domain.vehicle.VehiclesWithExpiredRegistrationDiagnostic

object TollBatchJob extends TollBatchJobIo {

private val OneHour = Duration.standardHours(1)

private val OneDay = Duration.standardDays(1)

def main(mainArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(mainArgs)

val config = TollBatchJobConfig.parse(args)

// read toll booth entries and toll booth exists
val boothEntryRecords = sc.readFromBigQuery(
EntryTableIoId,
config.entryTable,
StorageReadConfiguration().withRowRestriction(
RowRestriction.DateColumnRestriction(TollBoothEntry.PartitioningColumnName, config.effectiveDate)
)
)
val boothEntries = TollBoothEntry.decodeRecord(boothEntryRecords)

val boothExitRecords = sc.readFromBigQuery(
ExitTableIoId,
config.exitTable,
StorageReadConfiguration().withRowRestriction(
RowRestriction.DateColumnRestriction(TollBoothExit.PartitioningColumnName, config.effectiveDate)
)
)
val boothExits = TollBoothExit.decodeRecord(boothExitRecords)

// read vehicle registrations
val vehicleRegistrationRecords =
sc.readFromBigQuery(
VehicleRegistrationTableIoId,
config.vehicleRegistrationTable,
StorageReadConfiguration().withRowRestriction(
PartitionDateRestriction(config.effectiveDate)
)
)

val vehicleRegistrations = VehicleRegistration.decodeRecord(vehicleRegistrationRecords, config.effectiveDate)

// calculate tool booth stats
val boothStatsHourly = TollBoothStats.calculateInFixedWindow(boothEntries, OneHour)
TollBoothStats
.encode(boothStatsHourly)
.writeBoundedToBigQuery(EntryStatsHourlyTableIoId, config.entryStatsHourlyPartition)

val boothStatsDaily = TollBoothStats.calculateInFixedWindow(boothEntries, OneDay)
TollBoothStats
.encode(boothStatsDaily)
.writeBoundedToBigQuery(EntryStatsDailyTableIoId, config.entryStatsDailyPartition)

// calculate total vehicle times
val (totalVehicleTimes, totalVehicleTimesDiagnostic) =
TotalVehicleTime.calculateInSessionWindow(boothEntries, boothExits, OneHour)
TotalVehicleTime
.encodeRecord(totalVehicleTimes)
.writeBoundedToBigQuery(TotalVehicleTimeOneHourGapTableIoId, config.totalVehicleTimeOneHourGapPartition)

totalVehicleTimesDiagnostic
.sumByKeyInFixedWindow(windowDuration = OneDay)
.mapWithTimestamp(TotalVehicleTimeDiagnostic.toRecord)
.writeBoundedToBigQuery(
TotalVehicleTimeDiagnosticOneHourGapTableIoId,
config.totalVehicleTimeDiagnosticOneHourGapTable
)

// calculate vehicles with expired registrations
val (vehiclesWithExpiredRegistration, vehiclesWithExpiredRegistrationDiagnostic) =
VehiclesWithExpiredRegistration.calculateInFixedWindow(boothEntries, vehicleRegistrations, OneDay)
VehiclesWithExpiredRegistration
.encodeRecord(vehiclesWithExpiredRegistration)
.writeBoundedToBigQuery(
VehiclesWithExpiredRegistrationDailyTableIoId,
config.vehiclesWithExpiredRegistrationDailyPartition
)

vehiclesWithExpiredRegistrationDiagnostic
.sumByKeyInFixedWindow(windowDuration = OneDay)
.mapWithTimestamp(VehiclesWithExpiredRegistrationDiagnostic.toRecord)
.writeBoundedToBigQuery(
VehiclesWithExpiredRegistrationDiagnosticDailyTableIoId,
config.vehiclesWithExpiredRegistrationDiagnosticDailyPartition
)

val _ = sc.run()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.mkuthan.streamprocessing.toll.application.batch

import com.spotify.scio.Args

import org.joda.time.LocalDate

import org.mkuthan.streamprocessing.infrastructure.bigquery.BigQueryPartition
import org.mkuthan.streamprocessing.infrastructure.bigquery.BigQueryTable
import org.mkuthan.streamprocessing.toll.domain.booth.TollBoothEntry
import org.mkuthan.streamprocessing.toll.domain.booth.TollBoothExit
import org.mkuthan.streamprocessing.toll.domain.booth.TollBoothStats
import org.mkuthan.streamprocessing.toll.domain.registration.VehicleRegistration
import org.mkuthan.streamprocessing.toll.domain.vehicle.TotalVehicleTime
import org.mkuthan.streamprocessing.toll.domain.vehicle.TotalVehicleTimeDiagnostic
import org.mkuthan.streamprocessing.toll.domain.vehicle.VehiclesWithExpiredRegistration
import org.mkuthan.streamprocessing.toll.domain.vehicle.VehiclesWithExpiredRegistrationDiagnostic

case class TollBatchJobConfig(
effectiveDate: LocalDate,
entryTable: BigQueryTable[TollBoothEntry.Record],
exitTable: BigQueryTable[TollBoothExit.Record],
vehicleRegistrationTable: BigQueryTable[VehicleRegistration.Record],
entryStatsHourlyPartition: BigQueryPartition[TollBoothStats.Record],
entryStatsDailyPartition: BigQueryPartition[TollBoothStats.Record],
vehiclesWithExpiredRegistrationDailyPartition: BigQueryPartition[VehiclesWithExpiredRegistration.Record],
vehiclesWithExpiredRegistrationDiagnosticDailyPartition: BigQueryPartition[
VehiclesWithExpiredRegistrationDiagnostic.Record
],
totalVehicleTimeOneHourGapPartition: BigQueryPartition[TotalVehicleTime.Record],
totalVehicleTimeDiagnosticOneHourGapTable: BigQueryPartition[TotalVehicleTimeDiagnostic.Record]
)

object TollBatchJobConfig {
def parse(args: Args): TollBatchJobConfig = {
val effectiveDate = LocalDate.parse(args.required("effectiveDate"))
TollBatchJobConfig(
effectiveDate = effectiveDate,
entryTable = BigQueryTable(args.required("entryTable")),
exitTable = BigQueryTable(args.required("exitTable")),
vehicleRegistrationTable = BigQueryTable(args.required("vehicleRegistrationTable")),
entryStatsHourlyPartition = BigQueryPartition.daily(args.required("entryStatsHourlyTable"), effectiveDate),
entryStatsDailyPartition = BigQueryPartition.daily(args.required("entryStatsDailyTable"), effectiveDate),
vehiclesWithExpiredRegistrationDailyPartition =
BigQueryPartition.daily(args.required("vehiclesWithExpiredRegistrationDailyTable"), effectiveDate),
vehiclesWithExpiredRegistrationDiagnosticDailyPartition =
BigQueryPartition.daily(args.required("vehiclesWithExpiredRegistrationDiagnosticDailyTable"), effectiveDate),
totalVehicleTimeOneHourGapPartition =
BigQueryPartition.daily(args.required("totalVehicleTimeOneHourGapTable"), effectiveDate),
totalVehicleTimeDiagnosticOneHourGapTable =
BigQueryPartition.daily(args.required("totalVehicleTimeDiagnosticOneHourGapTable"), effectiveDate)
)
}
}
Loading

0 comments on commit cba4677

Please sign in to comment.