diff --git a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/bigquery/BigQueryParams.scala b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/bigquery/BigQueryParams.scala index ccadbdfa..bfcaf268 100644 --- a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/bigquery/BigQueryParams.scala +++ b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/bigquery/BigQueryParams.scala @@ -51,7 +51,7 @@ object RowRestriction { read.withRowRestriction(sql) } - case class DateColumnRestriction(columnName: String, localDate: LocalDate) extends RowRestriction { + case class TimestampColumnRestriction(columnName: String, localDate: LocalDate) extends RowRestriction { override def configure[T](read: TypedRead[T]): TypedRead[T] = read.withRowRestriction(s"TIMESTAMP_TRUNC($columnName, DAY) = '$localDate'") } diff --git a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/common/IoDiagnostic.scala b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/common/IoDiagnostic.scala index a6d2e037..a1399052 100644 --- a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/common/IoDiagnostic.scala +++ b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/common/IoDiagnostic.scala @@ -1,9 +1,11 @@ package org.mkuthan.streamprocessing.infrastructure.common import com.spotify.scio.bigquery.types.BigQueryType +import com.spotify.scio.values.SCollection import org.joda.time.Instant +import org.mkuthan.streamprocessing.shared.scio.syntax._ import org.mkuthan.streamprocessing.shared.scio.SumByKey case class IoDiagnostic(id: String, reason: String, count: Long = 1) { @@ -15,7 +17,7 @@ case class IoDiagnostic(id: String, reason: String, count: Long = 1) { object IoDiagnostic { @BigQueryType.toTable - case class Raw(created_at: Instant, id: String, reason: String, count: Long) + case class Record(created_at: Instant, id: String, reason: String, count: Long) implicit val sumByKey: SumByKey[IoDiagnostic] = SumByKey.create( @@ -23,8 +25,11 @@ object IoDiagnostic { plusFn = (x, y) => x.copy(count = x.count + y.count) ) - def toRaw[T](diagnostic: IoDiagnostic, createdAt: Instant): Raw = - Raw( + def union(first: SCollection[IoDiagnostic], others: SCollection[IoDiagnostic]*): SCollection[IoDiagnostic] = + first.unionInGlobalWindow(others: _*) + + def toRecord[T](diagnostic: IoDiagnostic, createdAt: Instant): Record = + Record( created_at = createdAt, id = diagnostic.id, reason = diagnostic.reason, diff --git a/stream-processing-shared/src/main/scala/org/mkuthan/streamprocessing/shared/json/JsonSerde.scala b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/json/JsonSerde.scala similarity index 95% rename from stream-processing-shared/src/main/scala/org/mkuthan/streamprocessing/shared/json/JsonSerde.scala rename to stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/json/JsonSerde.scala index 005f18cf..0cdb8f7c 100644 --- a/stream-processing-shared/src/main/scala/org/mkuthan/streamprocessing/shared/json/JsonSerde.scala +++ b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/json/JsonSerde.scala @@ -1,4 +1,4 @@ -package org.mkuthan.streamprocessing.shared.json +package org.mkuthan.streamprocessing.infrastructure.json import scala.util.Try diff --git a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubSCollectionSyntax.scala b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubSCollectionSyntax.scala index 0175b3db..014406f2 100644 --- a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubSCollectionSyntax.scala +++ b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubSCollectionSyntax.scala @@ -10,11 +10,11 @@ import com.spotify.scio.values.SCollection import org.mkuthan.streamprocessing.infrastructure.common.IoDiagnostic import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier +import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde import org.mkuthan.streamprocessing.infrastructure.pubsub.JsonWriteConfiguration import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubDeadLetter import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubTopic import org.mkuthan.streamprocessing.shared.common.Message -import org.mkuthan.streamprocessing.shared.json.JsonSerde private[syntax] trait PubsubSCollectionSyntax { diff --git a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubScioContextSyntax.scala b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubScioContextSyntax.scala index 257e37c9..5ee02f5f 100644 --- a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubScioContextSyntax.scala +++ b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubScioContextSyntax.scala @@ -12,21 +12,23 @@ import com.spotify.scio.values.SCollection import com.spotify.scio.ScioContext import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier +import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde import org.mkuthan.streamprocessing.infrastructure.pubsub.JsonReadConfiguration import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubDeadLetter import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubSubscription import org.mkuthan.streamprocessing.shared.common.Message -import org.mkuthan.streamprocessing.shared.json.JsonSerde private[syntax] trait PubsubScioContextSyntax { + type PubsubResult[T] = Either[PubsubDeadLetter[T], Message[T]] + implicit class PubsubScioContextOps(private val self: ScioContext) { def subscribeJsonFromPubsub[T <: AnyRef: Coder: ClassTag]( id: IoIdentifier[T], subscription: PubsubSubscription[T], configuration: JsonReadConfiguration = JsonReadConfiguration() - ): SCollection[Either[PubsubDeadLetter[T], Message[T]]] = { + ): SCollection[PubsubResult[T]] = { val io = PubsubIO .readMessagesWithAttributes() .pipe(read => configuration.configure(read)) diff --git a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/storage/syntax/StorageSCollectionSyntax.scala b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/storage/syntax/StorageSCollectionSyntax.scala index ccf2bcad..e8d5cfe2 100644 --- a/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/storage/syntax/StorageSCollectionSyntax.scala +++ b/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/storage/syntax/StorageSCollectionSyntax.scala @@ -9,9 +9,9 @@ import com.spotify.scio.coders.Coder import com.spotify.scio.values.SCollection import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier +import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde import org.mkuthan.streamprocessing.infrastructure.storage.StorageBucket import org.mkuthan.streamprocessing.infrastructure.storage.StorageConfiguration -import org.mkuthan.streamprocessing.shared.json.JsonSerde private[syntax] trait StorageSCollectionSyntax { implicit class StorageSCollectionOps[T <: AnyRef: Coder](private val self: SCollection[T]) { diff --git a/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/IntegrationTestFixtures.scala b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/IntegrationTestFixtures.scala index 7631b946..91d371bf 100644 --- a/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/IntegrationTestFixtures.scala +++ b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/IntegrationTestFixtures.scala @@ -8,7 +8,7 @@ import org.joda.time.Instant import org.joda.time.LocalDate import org.scalacheck._ -import org.mkuthan.streamprocessing.shared.json.JsonSerde +import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde import org.mkuthan.streamprocessing.test.common.JodaTimeArbitrary trait IntegrationTestFixtures extends JodaTimeArbitrary { diff --git a/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/common/IoDiagnosticTest.scala b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/common/IoDiagnosticTest.scala index 5b49b52b..cd710d83 100644 --- a/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/common/IoDiagnosticTest.scala +++ b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/common/IoDiagnosticTest.scala @@ -30,7 +30,7 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers with IntegrationTestScioContext with BigQueryContext { - val ioDiagnosticType = BigQueryType[IoDiagnostic.Raw] + val ioDiagnosticType = BigQueryType[IoDiagnostic.Record] val anyIoDiagnostic = IoDiagnostic( id = "any-id", @@ -62,10 +62,10 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers sc .testUnbounded(ioDiagnostics) .sumByKeyInFixedWindow(windowDuration = tenMinutes, windowOptions = windowOptions) - .mapWithTimestamp(IoDiagnostic.toRaw) + .mapWithTimestamp(IoDiagnostic.toRecord) .writeUnboundedToBigQuery( - IoIdentifier[IoDiagnostic.Raw]("any-id"), - BigQueryTable[IoDiagnostic.Raw](s"$projectId:$datasetName.$tableName") + IoIdentifier[IoDiagnostic.Record]("any-id"), + BigQueryTable[IoDiagnostic.Record](s"$projectId:$datasetName.$tableName") ) val run = sc.run() @@ -74,8 +74,8 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers val results = readTable(datasetName, tableName).map(ioDiagnosticType.fromAvro) results should contain.only( - IoDiagnostic.toRaw(ioDiagnostic1.copy(count = 3L), instant), - IoDiagnostic.toRaw(ioDiagnostic2.copy(count = 2L), instant) + IoDiagnostic.toRecord(ioDiagnostic1.copy(count = 3L), instant), + IoDiagnostic.toRecord(ioDiagnostic2.copy(count = 2L), instant) ) } @@ -100,10 +100,10 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers sc .testBounded(ioDiagnostics) .sumByKeyInFixedWindow(windowDuration = tenMinutes, windowOptions = windowOptions) - .mapWithTimestamp(IoDiagnostic.toRaw) + .mapWithTimestamp(IoDiagnostic.toRecord) .writeBoundedToBigQuery( - IoIdentifier[IoDiagnostic.Raw]("any-id"), - BigQueryPartition.hourly[IoDiagnostic.Raw](s"$projectId:$datasetName.$tableName", localDateTime) + IoIdentifier[IoDiagnostic.Record]("any-id"), + BigQueryPartition.hourly[IoDiagnostic.Record](s"$projectId:$datasetName.$tableName", localDateTime) ) val run = sc.run() @@ -112,8 +112,8 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers val results = readTable(datasetName, tableName).map(ioDiagnosticType.fromAvro) results should contain.only( - IoDiagnostic.toRaw(ioDiagnostic1.copy(count = 3L), instant), - IoDiagnostic.toRaw(ioDiagnostic2.copy(count = 2L), instant) + IoDiagnostic.toRecord(ioDiagnostic1.copy(count = 3L), instant), + IoDiagnostic.toRecord(ioDiagnostic2.copy(count = 2L), instant) ) } diff --git a/stream-processing-shared/src/test/scala/org/mkuthan/streamprocessing/shared/json/JsonSerdeTest.scala b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/json/JsonSerdeTest.scala similarity index 98% rename from stream-processing-shared/src/test/scala/org/mkuthan/streamprocessing/shared/json/JsonSerdeTest.scala rename to stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/json/JsonSerdeTest.scala index 12da917e..93a0d514 100644 --- a/stream-processing-shared/src/test/scala/org/mkuthan/streamprocessing/shared/json/JsonSerdeTest.scala +++ b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/json/JsonSerdeTest.scala @@ -1,4 +1,4 @@ -package org.mkuthan.streamprocessing.shared.json +package org.mkuthan.streamprocessing.infrastructure.json import com.softwaremill.diffx.generic.auto._ import com.softwaremill.diffx.scalatest.DiffShouldMatcher._ diff --git a/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubSCollectionOpsTest.scala b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubSCollectionOpsTest.scala index 2b62e702..38b660b6 100644 --- a/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubSCollectionOpsTest.scala +++ b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/pubsub/syntax/PubsubSCollectionOpsTest.scala @@ -9,9 +9,9 @@ import org.scalatest.tags.Slow import org.mkuthan.streamprocessing.infrastructure._ import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier +import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubTopic import org.mkuthan.streamprocessing.shared.common.Message -import org.mkuthan.streamprocessing.shared.json.JsonSerde import org.mkuthan.streamprocessing.test.gcp.GcpTestPatience import org.mkuthan.streamprocessing.test.gcp.PubSubClient._ import org.mkuthan.streamprocessing.test.gcp.PubsubContext diff --git a/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/storage/syntax/StorageSCollectionOpsTest.scala b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/storage/syntax/StorageSCollectionOpsTest.scala index 626b313a..da124ecd 100644 --- a/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/storage/syntax/StorageSCollectionOpsTest.scala +++ b/stream-processing-infrastructure/src/test/scala/org/mkuthan/streamprocessing/infrastructure/storage/syntax/StorageSCollectionOpsTest.scala @@ -16,12 +16,12 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.tags.Slow import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier +import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde import org.mkuthan.streamprocessing.infrastructure.storage.NumShards import org.mkuthan.streamprocessing.infrastructure.storage.StorageBucket import org.mkuthan.streamprocessing.infrastructure.storage.StorageConfiguration import org.mkuthan.streamprocessing.infrastructure.IntegrationTestFixtures import org.mkuthan.streamprocessing.infrastructure.IntegrationTestFixtures.SampleClass -import org.mkuthan.streamprocessing.shared.json.JsonSerde import org.mkuthan.streamprocessing.test.gcp.GcpTestPatience import org.mkuthan.streamprocessing.test.gcp.StorageClient._ import org.mkuthan.streamprocessing.test.gcp.StorageContext diff --git a/stream-processing-test/src/main/scala/org/mkuthan/streamprocessing/test/scio/TimestampedMatchers.scala b/stream-processing-test/src/main/scala/org/mkuthan/streamprocessing/test/scio/TimestampedMatchers.scala index 0d4bf49b..a44e9030 100644 --- a/stream-processing-test/src/main/scala/org/mkuthan/streamprocessing/test/scio/TimestampedMatchers.scala +++ b/stream-processing-test/src/main/scala/org/mkuthan/streamprocessing/test/scio/TimestampedMatchers.scala @@ -55,7 +55,17 @@ trait TimestampedMatchers extends InstantSyntax { time: String, value: T ): IterableMatcher[SCollection[(T, Instant)], (T, Instant)] = - containValue((value, time.toInstant)) + containValueAtTime(time.toInstant, value) + + /** + * Assert that the SCollection contains the provided element at given time without making assumptions about other + * elements in the collection. + */ + def containValueAtTime[T: Coder: Eq]( + time: Instant, + value: T + ): IterableMatcher[SCollection[(T, Instant)], (T, Instant)] = + containValue((value, time)) /** * Assert that the SCollection contains a single provided element at given time. @@ -81,6 +91,15 @@ trait TimestampedMatchers extends InstantSyntax { def containInAnyOrderAtTime[T: Coder: Eq]( time: String, value: Iterable[T] + ): IterableMatcher[SCollection[(T, Instant)], (T, Instant)] = + containInAnyOrderAtTime(time.toInstant, value) + + /** + * Assert that the SCollection contains the provided elements at given time. + */ + def containInAnyOrderAtTime[T: Coder: Eq]( + time: Instant, + value: Iterable[T] ): IterableMatcher[SCollection[(T, Instant)], (T, Instant)] = containInAnyOrder(value.map(v => (v, time.toInstant))) diff --git a/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/batch/TollBatchJob.scala b/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/batch/TollBatchJob.scala index 94ff0030..5290b96b 100644 --- a/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/batch/TollBatchJob.scala +++ b/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/batch/TollBatchJob.scala @@ -1,12 +1,12 @@ package org.mkuthan.streamprocessing.toll.application.batch +import com.spotify.scio.values.WindowOptions import com.spotify.scio.ContextAndArgs import org.joda.time.Duration import org.mkuthan.streamprocessing.infrastructure._ import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction -import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction.PartitionDateRestriction import org.mkuthan.streamprocessing.infrastructure.bigquery.StorageReadConfiguration import org.mkuthan.streamprocessing.shared._ import org.mkuthan.streamprocessing.toll.domain.booth.TollBoothEntry @@ -24,6 +24,10 @@ object TollBatchJob extends TollBatchJobIo { private val OneDay = Duration.standardDays(1) + private val TwoDays = Duration.standardDays(1) + + private val DefaultWindowOptions = WindowOptions() + def main(mainArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(mainArgs) @@ -34,7 +38,7 @@ object TollBatchJob extends TollBatchJobIo { EntryTableIoId, config.entryTable, StorageReadConfiguration().withRowRestriction( - RowRestriction.DateColumnRestriction(TollBoothEntry.PartitioningColumnName, config.effectiveDate) + RowRestriction.TimestampColumnRestriction(TollBoothEntry.PartitioningColumnName, config.effectiveDate) ) ) val boothEntries = TollBoothEntry.decodeRecord(boothEntryRecords) @@ -43,7 +47,7 @@ object TollBatchJob extends TollBatchJobIo { ExitTableIoId, config.exitTable, StorageReadConfiguration().withRowRestriction( - RowRestriction.DateColumnRestriction(TollBoothExit.PartitioningColumnName, config.effectiveDate) + RowRestriction.TimestampColumnRestriction(TollBoothExit.PartitioningColumnName, config.effectiveDate) ) ) val boothExits = TollBoothExit.decodeRecord(boothExitRecords) @@ -54,19 +58,19 @@ object TollBatchJob extends TollBatchJobIo { VehicleRegistrationTableIoId, config.vehicleRegistrationTable, StorageReadConfiguration().withRowRestriction( - PartitionDateRestriction(config.effectiveDate) + RowRestriction.PartitionDateRestriction(config.effectiveDate) ) ) val vehicleRegistrations = VehicleRegistration.decodeRecord(vehicleRegistrationRecords, config.effectiveDate) // calculate tool booth stats - val boothStatsHourly = TollBoothStats.calculateInFixedWindow(boothEntries, OneHour) + val boothStatsHourly = TollBoothStats.calculateInFixedWindow(boothEntries, OneHour, DefaultWindowOptions) TollBoothStats .encode(boothStatsHourly) .writeBoundedToBigQuery(EntryStatsHourlyTableIoId, config.entryStatsHourlyPartition) - val boothStatsDaily = TollBoothStats.calculateInFixedWindow(boothEntries, OneDay) + val boothStatsDaily = TollBoothStats.calculateInFixedWindow(boothEntries, OneDay, DefaultWindowOptions) TollBoothStats .encode(boothStatsDaily) .writeBoundedToBigQuery(EntryStatsDailyTableIoId, config.entryStatsDailyPartition) @@ -88,7 +92,13 @@ object TollBatchJob extends TollBatchJobIo { // calculate vehicles with expired registrations val (vehiclesWithExpiredRegistration, vehiclesWithExpiredRegistrationDiagnostic) = - VehiclesWithExpiredRegistration.calculateInFixedWindow(boothEntries, vehicleRegistrations, OneDay) + VehiclesWithExpiredRegistration.calculateWithTemporalJoin( + boothEntries, + vehicleRegistrations, + leftWindowDuration = OneDay, + rightWindowDuration = TwoDays, + windowOptions = DefaultWindowOptions + ) VehiclesWithExpiredRegistration .encodeRecord(vehiclesWithExpiredRegistration) .writeBoundedToBigQuery( diff --git a/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJob.scala b/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJob.scala index c34e12f8..ba15d1c9 100644 --- a/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJob.scala +++ b/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJob.scala @@ -11,7 +11,6 @@ import com.spotify.scio.values.WindowOptions import com.spotify.scio.ContextAndArgs import org.joda.time.Duration -import org.joda.time.LocalDate import org.mkuthan.streamprocessing.infrastructure._ import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction.PartitionDateRestriction @@ -33,6 +32,15 @@ object TollStreamingJob extends TollStreamingJobIo { private val TenMinutes = Duration.standardMinutes(10) + private val TwoDays = Duration.standardDays(2) + + private val DefaultWindowOptions = WindowOptions( + trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()), + allowedLateness = Duration.ZERO, + accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES, + onTimeBehavior = Window.OnTimeBehavior.FIRE_IF_NON_EMPTY + ) + private val DeadLetterWindowOptions = WindowOptions( trigger = Repeatedly.forever( AfterFirst.of( @@ -45,13 +53,6 @@ object TollStreamingJob extends TollStreamingJobIo { onTimeBehavior = Window.OnTimeBehavior.FIRE_IF_NON_EMPTY ) - private val DiagnosticWindowOptions = WindowOptions( - trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()), - allowedLateness = Duration.ZERO, - accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES, - onTimeBehavior = Window.OnTimeBehavior.FIRE_IF_NON_EMPTY - ) - def main(mainArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(mainArgs) @@ -113,7 +114,7 @@ object TollStreamingJob extends TollStreamingJobIo { .writeUnboundedToStorageAsJson(VehicleRegistrationDlqBucketIoId, config.vehicleRegistrationDlq) // calculate tool booth stats - val boothStats = TollBoothStats.calculateInFixedWindow(boothEntries, TenMinutes) + val boothStats = TollBoothStats.calculateInFixedWindow(boothEntries, TenMinutes, DefaultWindowOptions) val tollBoothStatsDlq = TollBoothStats .encode(boothStats) .writeUnboundedToBigQuery(EntryStatsTableIoId, config.entryStatsTable) @@ -126,20 +127,26 @@ object TollStreamingJob extends TollStreamingJobIo { .writeUnboundedToBigQuery(TotalVehicleTimeTableIoId, config.totalVehicleTimeTable) totalVehicleTimesDiagnostic - .sumByKeyInFixedWindow(windowDuration = TenMinutes, windowOptions = DiagnosticWindowOptions) + .sumByKeyInFixedWindow(windowDuration = TenMinutes, windowOptions = DefaultWindowOptions) .mapWithTimestamp(TotalVehicleTimeDiagnostic.toRecord) .writeUnboundedToBigQuery(TotalVehicleTimeDiagnosticTableIoId, config.totalVehicleTimeDiagnosticTable) // calculate vehicles with expired registrations val (vehiclesWithExpiredRegistration, vehiclesWithExpiredRegistrationDiagnostic) = - VehiclesWithExpiredRegistration.calculateInFixedWindow(boothEntries, vehicleRegistrations, TenMinutes) + VehiclesWithExpiredRegistration.calculateWithTemporalJoin( + boothEntries, + vehicleRegistrations, + leftWindowDuration = TenMinutes, + rightWindowDuration = TwoDays, + windowOptions = DefaultWindowOptions + ) VehiclesWithExpiredRegistration .encodeMessage(vehiclesWithExpiredRegistration) .publishJsonToPubSub(VehiclesWithExpiredRegistrationTopicIoId, config.vehiclesWithExpiredRegistrationTopic) vehiclesWithExpiredRegistrationDiagnostic - .sumByKeyInFixedWindow(windowDuration = TenMinutes, windowOptions = DiagnosticWindowOptions) + .sumByKeyInFixedWindow(windowDuration = TenMinutes, windowOptions = DefaultWindowOptions) .mapWithTimestamp(VehiclesWithExpiredRegistrationDiagnostic.toRecord) .writeUnboundedToBigQuery( VehiclesWithExpiredRegistrationDiagnosticTableIoId, @@ -147,7 +154,7 @@ object TollStreamingJob extends TollStreamingJobIo { ) // dead letters diagnostic - val ioDiagnostics = sc.unionInGlobalWindow( + val ioDiagnostics = IoDiagnostic.union( boothEntryMessagesDlq.toDiagnostic(EntrySubscriptionIoId), boothExitMessagesDlq.toDiagnostic(ExitSubscriptionIoId), vehicleRegistrationMessagesDlq.toDiagnostic(VehicleRegistrationSubscriptionIoId), @@ -156,8 +163,8 @@ object TollStreamingJob extends TollStreamingJobIo { ) ioDiagnostics - .sumByKeyInFixedWindow(windowDuration = TenMinutes, windowOptions = DiagnosticWindowOptions) - .mapWithTimestamp(IoDiagnostic.toRaw) + .sumByKeyInFixedWindow(windowDuration = TenMinutes, windowOptions = DefaultWindowOptions) + .mapWithTimestamp(IoDiagnostic.toRecord) .writeUnboundedToBigQuery(DiagnosticTableIoId, config.diagnosticTable) val _ = sc.run() diff --git a/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobConfig.scala b/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobConfig.scala index 777a2044..493c90ea 100644 --- a/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobConfig.scala +++ b/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobConfig.scala @@ -32,7 +32,7 @@ case class TollStreamingJobConfig( totalVehicleTimeDiagnosticTable: BigQueryTable[TotalVehicleTimeDiagnostic.Record], vehiclesWithExpiredRegistrationTopic: PubsubTopic[VehiclesWithExpiredRegistration.Payload], vehiclesWithExpiredRegistrationDiagnosticTable: BigQueryTable[VehiclesWithExpiredRegistrationDiagnostic.Record], - diagnosticTable: BigQueryTable[IoDiagnostic.Raw] + diagnosticTable: BigQueryTable[IoDiagnostic.Record] ) object TollStreamingJobConfig { diff --git a/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobIo.scala b/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobIo.scala index 0f4aff61..4880e25b 100644 --- a/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobIo.scala +++ b/toll-application/src/main/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobIo.scala @@ -14,7 +14,7 @@ import org.mkuthan.streamprocessing.toll.domain.vehicle.VehiclesWithExpiredRegis trait TollStreamingJobIo extends DiagnosticIo with RegistrationIo with TollBoothIo with VehicleIo trait DiagnosticIo { - val DiagnosticTableIoId: IoIdentifier[IoDiagnostic.Raw] = + val DiagnosticTableIoId: IoIdentifier[IoDiagnostic.Record] = IoIdentifier("diagnostic-table-id") } diff --git a/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/batch/TollBatchJobTest.scala b/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/batch/TollBatchJobTest.scala index ce8f34d1..11ef83d1 100644 --- a/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/batch/TollBatchJobTest.scala +++ b/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/batch/TollBatchJobTest.scala @@ -67,7 +67,7 @@ class TollBatchJobTest extends AnyFlatSpec with Matchers // calculate vehicles with expired registrations .output(CustomIO[VehiclesWithExpiredRegistration.Record](VehiclesWithExpiredRegistrationDailyTableIoId.id)) { results => - val createdAt = Instant.parse("2014-09-10T23:59:59.999Z") + val createdAt = Instant.parse("2014-09-10T12:01:00Z") results should containInAnyOrder(Seq( anyVehicleWithExpiredRegistrationRecord(createdAt, anyVehicleRegistrationRecord.id) )) diff --git a/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobTest.scala b/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobTest.scala index 29a33ac9..dcee947e 100644 --- a/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobTest.scala +++ b/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobTest.scala @@ -8,6 +8,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.mkuthan.streamprocessing.infrastructure.common.IoDiagnostic +import org.mkuthan.streamprocessing.infrastructure.pubsub.syntax._ import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubDeadLetter import org.mkuthan.streamprocessing.shared.common.DeadLetter import org.mkuthan.streamprocessing.shared.common.Message @@ -27,9 +28,6 @@ class TollStreamingJobTest extends AnyFlatSpec with Matchers with TollJobFixtures with TollStreamingJobIo { - // TODO: move to the infra module, but where? - type PubsubResult[T] = Either[PubsubDeadLetter[T], Message[T]] - "Toll job" should "run in the streaming mode" in { JobTest[TollStreamingJob.type] .args( @@ -89,8 +87,8 @@ class TollStreamingJobTest extends AnyFlatSpec with Matchers CustomIO[PubsubResult[VehicleRegistration.Payload]](VehicleRegistrationSubscriptionIoId.id), unboundedTestCollectionOf[PubsubResult[VehicleRegistration.Payload]] .addElementsAtTime( - anyVehicleRegistrationPayload.registration_time, - Right(Message(anyVehicleRegistrationPayload)) + anyVehicleRegistrationMessage.attributes(VehicleRegistration.TimestampAttribute), + Right(anyVehicleRegistrationMessage) ) // TODO: add invalid message and check dead letter .advanceWatermarkToInfinity().testStream @@ -124,9 +122,9 @@ class TollStreamingJobTest extends AnyFlatSpec with Matchers // calculate vehicles with expired registrations .output(CustomIO[Message[VehiclesWithExpiredRegistration.Payload]](VehiclesWithExpiredRegistrationTopicIoId.id)) { results => - val createdAt = Instant.parse("2014-09-10T12:09:59.999Z") + val createdAt = Instant.parse("2014-09-10T12:01:00Z") // entry time results should containInAnyOrder(Seq( - anyVehicleWithExpiredRegistrationMessage(createdAt, anyVehicleRegistrationPayload.id), + anyVehicleWithExpiredRegistrationMessage(createdAt, anyVehicleRegistrationMessage.payload.id), anyVehicleWithExpiredRegistrationMessage(createdAt, anyVehicleRegistrationRecord.id) )) } @@ -137,7 +135,7 @@ class TollStreamingJobTest extends AnyFlatSpec with Matchers // TODO results should beEmpty } - .output(CustomIO[IoDiagnostic.Raw](DiagnosticTableIoId.id)) { results => + .output(CustomIO[IoDiagnostic.Record](DiagnosticTableIoId.id)) { results => // toll booth entry and toll booth exit results should haveSize(2) } diff --git a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/booth/TollBoothStats.scala b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/booth/TollBoothStats.scala index 4e959a71..9df9630a 100644 --- a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/booth/TollBoothStats.scala +++ b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/booth/TollBoothStats.scala @@ -2,6 +2,7 @@ package org.mkuthan.streamprocessing.toll.domain.booth import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.values.SCollection +import com.spotify.scio.values.WindowOptions import org.joda.time.Duration import org.joda.time.Instant @@ -45,10 +46,14 @@ object TollBoothStats { ) ) - def calculateInFixedWindow(input: SCollection[TollBoothEntry], duration: Duration): SCollection[TollBoothStats] = + def calculateInFixedWindow( + input: SCollection[TollBoothEntry], + windowDuration: Duration, + windowOptions: WindowOptions + ): SCollection[TollBoothStats] = input .map(fromBoothEntry) - .sumByKeyInFixedWindow(duration) + .sumByKeyInFixedWindow(windowDuration = windowDuration, windowOptions = windowOptions) def encode(input: SCollection[TollBoothStats]): SCollection[Record] = input.mapWithTimestamp { case (r, t) => diff --git a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistration.scala b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistration.scala index d42c0a05..d0b3d49d 100644 --- a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistration.scala +++ b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistration.scala @@ -34,7 +34,6 @@ object VehicleRegistration { case class Payload( id: String, - registration_time: String, license_plate: String, expired: String ) @@ -66,6 +65,7 @@ object VehicleRegistration { private def fromMessage(message: Message[Payload]): Either[DeadLetter[Payload], VehicleRegistration] = { val payload = message.payload + val attributes = message.attributes try { val expired = payload.expired.toInt @@ -73,7 +73,7 @@ object VehicleRegistration { val vehicleRegistration = VehicleRegistration( id = VehicleRegistrationId(payload.id), - registrationTime = Instant.parse(payload.registration_time), + registrationTime = Instant.parse(attributes(TimestampAttribute)), licensePlate = LicensePlate(payload.license_plate), expired = expired != 0 ) diff --git a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTime.scala b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTime.scala index 497793a6..ea5957b7 100644 --- a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTime.scala +++ b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTime.scala @@ -51,8 +51,7 @@ object TotalVehicleTime { case (boothEntry, Some(boothExit)) => Right(toTotalVehicleTime(boothEntry, boothExit)) case (boothEntry, None) => - val diagnosticReason = "Missing TollBoothExit to calculate TotalVehicleTime" - Left(TotalVehicleTimeDiagnostic(boothEntry.id, diagnosticReason)) + Left(TotalVehicleTimeDiagnostic(boothEntry.id, TotalVehicleTimeDiagnostic.MissingTollBoothExit)) } results.unzip diff --git a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTimeDiagnostic.scala b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTimeDiagnostic.scala index d5c2fbb0..58e9fe0e 100644 --- a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTimeDiagnostic.scala +++ b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTimeDiagnostic.scala @@ -15,6 +15,9 @@ case class TotalVehicleTimeDiagnostic(tollBothId: TollBoothId, reason: String, c } object TotalVehicleTimeDiagnostic { + + val MissingTollBoothExit = "Missing TollBoothExit to calculate TotalVehicleTime" + @BigQueryType.toTable case class Record(created_at: Instant, toll_both_id: String, reason: String, count: Long = 1L) diff --git a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistration.scala b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistration.scala index a90df331..e086d968 100644 --- a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistration.scala +++ b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistration.scala @@ -48,29 +48,24 @@ object VehiclesWithExpiredRegistration { license_plate: String ) - def calculateInFixedWindow( + def calculateWithTemporalJoin( boothEntries: SCollection[TollBoothEntry], vehicleRegistrations: SCollection[VehicleRegistration], - duration: Duration + leftWindowDuration: Duration, + rightWindowDuration: Duration, + windowOptions: WindowOptions ): (SCollection[VehiclesWithExpiredRegistration], SCollection[VehiclesWithExpiredRegistrationDiagnostic]) = { - val windowOptions = WindowOptions( - trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()), - allowedLateness = Duration.ZERO, - accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES, - onTimeBehavior = Window.OnTimeBehavior.FIRE_IF_NON_EMPTY - ) - val boothEntriesByLicensePlate = boothEntries .keyBy(_.licensePlate) .withFixedWindows( - duration = duration, + duration = leftWindowDuration, options = windowOptions ) val vehicleRegistrationByLicensePlate = vehicleRegistrations .keyBy(_.licensePlate) .withFixedWindows( - duration = Duration.standardDays(2), // historical data from today and the previous day + duration = rightWindowDuration, options = windowOptions ) @@ -81,13 +76,16 @@ object VehiclesWithExpiredRegistration { case (boothEntry, Some(vehicleRegistration)) if vehicleRegistration.expired => Right(toVehiclesWithExpiredRegistration(boothEntry, vehicleRegistration)) case (boothEntry, Some(vehicleRegistration)) if !vehicleRegistration.expired => - val diagnosticReason = "Vehicle registration is not expired" - Left(VehiclesWithExpiredRegistrationDiagnostic(boothEntry.id, diagnosticReason)) + Left(VehiclesWithExpiredRegistrationDiagnostic( + boothEntry.id, + VehiclesWithExpiredRegistrationDiagnostic.NotExpired + )) case (boothEntry, None) => - val diagnosticReason = "Missing vehicle registration" - Left(VehiclesWithExpiredRegistrationDiagnostic(boothEntry.id, diagnosticReason)) + Left(VehiclesWithExpiredRegistrationDiagnostic( + boothEntry.id, + VehiclesWithExpiredRegistrationDiagnostic.MissingRegistration + )) } - .distinct // materialize window results.unzip } diff --git a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationDiagnostic.scala b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationDiagnostic.scala index c16104fe..ba755402 100644 --- a/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationDiagnostic.scala +++ b/toll-domain/src/main/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationDiagnostic.scala @@ -15,6 +15,10 @@ case class VehiclesWithExpiredRegistrationDiagnostic(tollBothId: TollBoothId, re } object VehiclesWithExpiredRegistrationDiagnostic { + + val NotExpired = "Vehicle registration is not expired" + val MissingRegistration = "Missing vehicle registration" + @BigQueryType.toTable case class Record(created_at: Instant, toll_both_id: String, reason: String, count: Long = 1L) diff --git a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/booth/TollBoothStatsTest.scala b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/booth/TollBoothStatsTest.scala index e7d41dba..cf96e8fd 100644 --- a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/booth/TollBoothStatsTest.scala +++ b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/booth/TollBoothStatsTest.scala @@ -1,5 +1,7 @@ package org.mkuthan.streamprocessing.toll.domain.booth +import com.spotify.scio.values.WindowOptions + import org.joda.time.Duration import org.joda.time.Instant import org.scalatest.flatspec.AnyFlatSpec @@ -16,12 +18,15 @@ class TollBoothStatsTest extends AnyFlatSpec with Matchers private val FiveMinutes = Duration.standardMinutes(5) + private val DefaultWindowOptions = WindowOptions() + behavior of "TollBoothStats" it should "calculate statistics in fixed window" in runWithScioContext { sc => val tollBoothId1 = TollBoothId("1") val tollBoothId2 = TollBoothId("2") + // TODO: use toInstant from stream-processing-test val tollBoothEntry1Time = Instant.parse("2014-09-10T12:01:00.000Z") val tollBoothEntry1 = anyTollBoothEntry.copy( id = tollBoothId1, @@ -49,7 +54,7 @@ class TollBoothStatsTest extends AnyFlatSpec with Matchers .addElementsAtTime(tollBoothEntry3.entryTime, tollBoothEntry3) .advanceWatermarkToInfinity() - val results = calculateInFixedWindow(sc.testBounded(inputs), FiveMinutes) + val results = calculateInFixedWindow(sc.testBounded(inputs), FiveMinutes, DefaultWindowOptions) results.withTimestamp should inOnTimePane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") { containInAnyOrderAtTime( @@ -76,11 +81,11 @@ class TollBoothStatsTest extends AnyFlatSpec with Matchers it should "encode into record" in runWithScioContext { sc => val recordTimestamp = Instant.parse("2014-09-10T12:04:59.999Z") - val inputs = unboundedTestCollectionOf[TollBoothStats] + val inputs = boundedTestCollectionOf[TollBoothStats] .addElementsAtTime(recordTimestamp, anyTollBoothStats) .advanceWatermarkToInfinity() - val results = encode(sc.testUnbounded(inputs)) + val results = encode(sc.testBounded(inputs)) results should containSingleValue(anyTollBoothStatsRecord.copy(created_at = recordTimestamp)) } } diff --git a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistrationFixture.scala b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistrationFixture.scala index 2f21dbe9..7edad7d5 100644 --- a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistrationFixture.scala +++ b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistrationFixture.scala @@ -3,41 +3,51 @@ package org.mkuthan.streamprocessing.toll.domain.registration import org.joda.time.Instant import org.mkuthan.streamprocessing.shared.common.DeadLetter +import org.mkuthan.streamprocessing.shared.common.Message import org.mkuthan.streamprocessing.toll.domain.common.LicensePlate trait VehicleRegistrationFixture { - final val anyVehicleRegistrationPayload = VehicleRegistration.Payload( - id = "1", - registration_time = "2014-09-10T11:59:00Z", // before toll booth entry - license_plate = "JNB 7001", - expired = "1" + private final val messageTimestamp = "2014-09-10T11:59:00Z" + private final val defaultLicensePlate = "JNB 7001" + private final val defaultExpired = "1" + + final val anyVehicleRegistrationMessage = Message( + VehicleRegistration.Payload( + id = "1", + license_plate = defaultLicensePlate, + expired = defaultExpired + ), + Map(VehicleRegistration.TimestampAttribute -> messageTimestamp) ) - final val vehicleRegistrationPayloadInvalid = anyVehicleRegistrationPayload.copy(license_plate = "") + final val vehicleRegistrationMessageInvalid = Message( + anyVehicleRegistrationMessage.payload.copy(license_plate = ""), + anyVehicleRegistrationMessage.attributes + ) final val vehicleRegistrationDecodingError = DeadLetter[VehicleRegistration.Payload]( - data = vehicleRegistrationPayloadInvalid, + data = vehicleRegistrationMessageInvalid.payload, error = "requirement failed: Licence plate number is empty" ) final val anyVehicleRegistrationRecord = VehicleRegistration.Record( id = "2", - license_plate = anyVehicleRegistrationPayload.license_plate, - expired = anyVehicleRegistrationPayload.expired.toInt + license_plate = defaultLicensePlate, + expired = defaultExpired.toInt ) final val anyVehicleRegistrationUpdate = VehicleRegistration( - id = VehicleRegistrationId(anyVehicleRegistrationPayload.id), - registrationTime = Instant.parse(anyVehicleRegistrationPayload.registration_time), - licensePlate = LicensePlate(anyVehicleRegistrationPayload.license_plate), - expired = anyVehicleRegistrationPayload.expired == "1" + id = VehicleRegistrationId(anyVehicleRegistrationMessage.payload.id), + registrationTime = Instant.parse("2014-09-10T11:59:00Z"), // before toll booth entry + licensePlate = LicensePlate(defaultLicensePlate), + expired = defaultExpired == "1" ) final val anyVehicleRegistrationHistory = VehicleRegistration( id = VehicleRegistrationId(anyVehicleRegistrationRecord.id), registrationTime = Instant.parse("2014-09-09T00:00:00.000Z"), // the previous day - licensePlate = LicensePlate(anyVehicleRegistrationRecord.license_plate), - expired = anyVehicleRegistrationRecord.expired == 1 + licensePlate = LicensePlate(defaultLicensePlate), + expired = defaultExpired == "1" ) } diff --git a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistrationTest.scala b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistrationTest.scala index ee22063e..03faddac 100644 --- a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistrationTest.scala +++ b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/registration/VehicleRegistrationTest.scala @@ -17,7 +17,10 @@ class VehicleRegistrationTest extends AnyFlatSpec with Matchers it should "decode valid message into VehicleRegistration" in runWithScioContext { sc => val input = unboundedTestCollectionOf[Message[VehicleRegistration.Payload]] - .addElementsAtTime(anyVehicleRegistrationPayload.registration_time, Message(anyVehicleRegistrationPayload)) + .addElementsAtTime( + anyVehicleRegistrationMessage.attributes(VehicleRegistration.TimestampAttribute), + anyVehicleRegistrationMessage + ) .advanceWatermarkToInfinity() val (results, dlq) = decodeMessage(sc.testUnbounded(input)) @@ -32,8 +35,8 @@ class VehicleRegistrationTest extends AnyFlatSpec with Matchers val run = runWithScioContext { sc => val input = unboundedTestCollectionOf[Message[VehicleRegistration.Payload]] .addElementsAtTime( - vehicleRegistrationPayloadInvalid.registration_time, - Message(vehicleRegistrationPayloadInvalid) + vehicleRegistrationMessageInvalid.attributes(VehicleRegistration.TimestampAttribute), + vehicleRegistrationMessageInvalid ) .advanceWatermarkToInfinity() diff --git a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTimeTest.scala b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTimeTest.scala index 7b55c019..223e25cc 100644 --- a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTimeTest.scala +++ b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/TotalVehicleTimeTest.scala @@ -86,7 +86,7 @@ class TotalVehicleTimeTest extends AnyFlatSpec with Matchers diagnostic.withTimestamp should inOnTimePane("2014-09-10T12:03:01Z", "2014-09-10T12:08:01Z") { containSingleValueAtTime( "2014-09-10T12:08:00.999Z", - TotalVehicleTimeDiagnostic(tollBoothId, "Missing TollBoothExit to calculate TotalVehicleTime", 1) + TotalVehicleTimeDiagnostic(tollBoothId, TotalVehicleTimeDiagnostic.MissingTollBoothExit, 1) ) } } diff --git a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationFixture.scala b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationFixture.scala index c388b6c4..91756e0e 100644 --- a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationFixture.scala +++ b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationFixture.scala @@ -44,11 +44,11 @@ trait VehiclesWithExpiredRegistrationFixture { final val vehicleWithNotExpiredRegistrationDiagnostic = VehiclesWithExpiredRegistrationDiagnostic( tollBothId = TollBoothId("1"), - reason = "Vehicle registration is not expired" + reason = VehiclesWithExpiredRegistrationDiagnostic.NotExpired ) final val vehicleWithMissingRegistrationDiagnostic = VehiclesWithExpiredRegistrationDiagnostic( tollBothId = TollBoothId("1"), - reason = "Missing vehicle registration" + reason = VehiclesWithExpiredRegistrationDiagnostic.MissingRegistration ) } diff --git a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationTest.scala b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationTest.scala index 8c29cf95..2eaf164c 100644 --- a/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationTest.scala +++ b/toll-domain/src/test/scala/org/mkuthan/streamprocessing/toll/domain/vehicle/VehiclesWithExpiredRegistrationTest.scala @@ -1,5 +1,7 @@ package org.mkuthan.streamprocessing.toll.domain.vehicle +import com.spotify.scio.values.WindowOptions + import org.joda.time.Duration import org.joda.time.Instant import org.scalatest.flatspec.AnyFlatSpec @@ -21,6 +23,9 @@ class VehiclesWithExpiredRegistrationTest extends AnyFlatSpec with Matchers import VehiclesWithExpiredRegistration._ private val FiveMinutes = Duration.standardMinutes(5) + private val TwoDays = Duration.standardDays(2) + + private val DefaultWindowOptions = WindowOptions() behavior of "VehiclesWithExpiredRegistration" @@ -41,11 +46,17 @@ class VehiclesWithExpiredRegistrationTest extends AnyFlatSpec with Matchers .advanceWatermarkToInfinity() val (results, diagnostics) = - calculateInFixedWindow(sc.testBounded(boothEntries), sc.testBounded(vehicleRegistrations), FiveMinutes) + calculateWithTemporalJoin( + sc.testBounded(boothEntries), + sc.testBounded(vehicleRegistrations), + FiveMinutes, + TwoDays, + DefaultWindowOptions + ) - results.withTimestamp should inOnTimePane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") { + results.withTimestamp should inOnlyPane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") { containInAnyOrderAtTime( - "2014-09-10T12:04:59.999Z", + anyTollBoothEntry.entryTime, Seq( anyVehicleWithExpiredRegistration(anyVehicleRegistrationHistory.id), anyVehicleWithExpiredRegistration(anyVehicleRegistrationUpdate.id) @@ -73,18 +84,24 @@ class VehiclesWithExpiredRegistrationTest extends AnyFlatSpec with Matchers .advanceWatermarkToInfinity() val (results, diagnostics) = - calculateInFixedWindow(sc.testBounded(boothEntries), sc.testBounded(vehicleRegistrations), FiveMinutes) + calculateWithTemporalJoin( + sc.testBounded(boothEntries), + sc.testBounded(vehicleRegistrations), + FiveMinutes, + TwoDays, + DefaultWindowOptions + ) - results.withTimestamp should inOnTimePane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") { + results.withTimestamp should inOnlyPane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") { containSingleValueAtTime( - "2014-09-10T12:04:59.999Z", + anyTollBoothEntry.entryTime, anyVehicleWithExpiredRegistration(anyVehicleRegistrationHistory.id) ) } - diagnostics.withTimestamp should inOnTimePane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") { + diagnostics.withTimestamp should inOnlyPane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") { containSingleValueAtTime( - "2014-09-10T12:04:59.999Z", + anyTollBoothEntry.entryTime, vehicleWithNotExpiredRegistrationDiagnostic ) } @@ -109,13 +126,19 @@ class VehiclesWithExpiredRegistrationTest extends AnyFlatSpec with Matchers .advanceWatermarkToInfinity() val (results, diagnostics) = - calculateInFixedWindow(sc.testBounded(boothEntries), sc.testBounded(vehicleRegistrations), FiveMinutes) + calculateWithTemporalJoin( + sc.testBounded(boothEntries), + sc.testBounded(vehicleRegistrations), + FiveMinutes, + TwoDays, + DefaultWindowOptions + ) results should beEmpty - diagnostics.withTimestamp should inOnTimePane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") { + diagnostics.withTimestamp should inOnlyPane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") { containSingleValueAtTime( - "2014-09-10T12:04:59.999Z", + anyTollBoothEntry.entryTime, vehicleWithMissingRegistrationDiagnostic ) }