Skip to content

Commit

Permalink
Notes with TODOs after pair review (#105)
Browse files Browse the repository at this point in the history
* Note TODOs after pair review

* Resolve TODOs - part 1

* Resolve TODOs - part 2

* Resolve TODOs - part 3
  • Loading branch information
mkuthan authored Sep 23, 2023
1 parent cba4677 commit 355fbfa
Show file tree
Hide file tree
Showing 30 changed files with 206 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object RowRestriction {
read.withRowRestriction(sql)
}

case class DateColumnRestriction(columnName: String, localDate: LocalDate) extends RowRestriction {
case class TimestampColumnRestriction(columnName: String, localDate: LocalDate) extends RowRestriction {
override def configure[T](read: TypedRead[T]): TypedRead[T] =
read.withRowRestriction(s"TIMESTAMP_TRUNC($columnName, DAY) = '$localDate'")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.mkuthan.streamprocessing.infrastructure.common

import com.spotify.scio.bigquery.types.BigQueryType
import com.spotify.scio.values.SCollection

import org.joda.time.Instant

import org.mkuthan.streamprocessing.shared.scio.syntax._
import org.mkuthan.streamprocessing.shared.scio.SumByKey

case class IoDiagnostic(id: String, reason: String, count: Long = 1) {
Expand All @@ -15,16 +17,19 @@ case class IoDiagnostic(id: String, reason: String, count: Long = 1) {

object IoDiagnostic {
@BigQueryType.toTable
case class Raw(created_at: Instant, id: String, reason: String, count: Long)
case class Record(created_at: Instant, id: String, reason: String, count: Long)

implicit val sumByKey: SumByKey[IoDiagnostic] =
SumByKey.create(
keyFn = _.keyFields.mkString("|@|"),
plusFn = (x, y) => x.copy(count = x.count + y.count)
)

def toRaw[T](diagnostic: IoDiagnostic, createdAt: Instant): Raw =
Raw(
def union(first: SCollection[IoDiagnostic], others: SCollection[IoDiagnostic]*): SCollection[IoDiagnostic] =
first.unionInGlobalWindow(others: _*)

def toRecord[T](diagnostic: IoDiagnostic, createdAt: Instant): Record =
Record(
created_at = createdAt,
id = diagnostic.id,
reason = diagnostic.reason,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.mkuthan.streamprocessing.shared.json
package org.mkuthan.streamprocessing.infrastructure.json

import scala.util.Try

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import com.spotify.scio.values.SCollection

import org.mkuthan.streamprocessing.infrastructure.common.IoDiagnostic
import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier
import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde
import org.mkuthan.streamprocessing.infrastructure.pubsub.JsonWriteConfiguration
import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubDeadLetter
import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubTopic
import org.mkuthan.streamprocessing.shared.common.Message
import org.mkuthan.streamprocessing.shared.json.JsonSerde

private[syntax] trait PubsubSCollectionSyntax {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@ import com.spotify.scio.values.SCollection
import com.spotify.scio.ScioContext

import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier
import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde
import org.mkuthan.streamprocessing.infrastructure.pubsub.JsonReadConfiguration
import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubDeadLetter
import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubSubscription
import org.mkuthan.streamprocessing.shared.common.Message
import org.mkuthan.streamprocessing.shared.json.JsonSerde

private[syntax] trait PubsubScioContextSyntax {

type PubsubResult[T] = Either[PubsubDeadLetter[T], Message[T]]

implicit class PubsubScioContextOps(private val self: ScioContext) {

def subscribeJsonFromPubsub[T <: AnyRef: Coder: ClassTag](
id: IoIdentifier[T],
subscription: PubsubSubscription[T],
configuration: JsonReadConfiguration = JsonReadConfiguration()
): SCollection[Either[PubsubDeadLetter[T], Message[T]]] = {
): SCollection[PubsubResult[T]] = {
val io = PubsubIO
.readMessagesWithAttributes()
.pipe(read => configuration.configure(read))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import com.spotify.scio.coders.Coder
import com.spotify.scio.values.SCollection

import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier
import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde
import org.mkuthan.streamprocessing.infrastructure.storage.StorageBucket
import org.mkuthan.streamprocessing.infrastructure.storage.StorageConfiguration
import org.mkuthan.streamprocessing.shared.json.JsonSerde

private[syntax] trait StorageSCollectionSyntax {
implicit class StorageSCollectionOps[T <: AnyRef: Coder](private val self: SCollection[T]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.joda.time.Instant
import org.joda.time.LocalDate
import org.scalacheck._

import org.mkuthan.streamprocessing.shared.json.JsonSerde
import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde
import org.mkuthan.streamprocessing.test.common.JodaTimeArbitrary

trait IntegrationTestFixtures extends JodaTimeArbitrary {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers
with IntegrationTestScioContext
with BigQueryContext {

val ioDiagnosticType = BigQueryType[IoDiagnostic.Raw]
val ioDiagnosticType = BigQueryType[IoDiagnostic.Record]

val anyIoDiagnostic = IoDiagnostic(
id = "any-id",
Expand Down Expand Up @@ -62,10 +62,10 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers
sc
.testUnbounded(ioDiagnostics)
.sumByKeyInFixedWindow(windowDuration = tenMinutes, windowOptions = windowOptions)
.mapWithTimestamp(IoDiagnostic.toRaw)
.mapWithTimestamp(IoDiagnostic.toRecord)
.writeUnboundedToBigQuery(
IoIdentifier[IoDiagnostic.Raw]("any-id"),
BigQueryTable[IoDiagnostic.Raw](s"$projectId:$datasetName.$tableName")
IoIdentifier[IoDiagnostic.Record]("any-id"),
BigQueryTable[IoDiagnostic.Record](s"$projectId:$datasetName.$tableName")
)

val run = sc.run()
Expand All @@ -74,8 +74,8 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers
val results = readTable(datasetName, tableName).map(ioDiagnosticType.fromAvro)

results should contain.only(
IoDiagnostic.toRaw(ioDiagnostic1.copy(count = 3L), instant),
IoDiagnostic.toRaw(ioDiagnostic2.copy(count = 2L), instant)
IoDiagnostic.toRecord(ioDiagnostic1.copy(count = 3L), instant),
IoDiagnostic.toRecord(ioDiagnostic2.copy(count = 2L), instant)
)
}

Expand All @@ -100,10 +100,10 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers
sc
.testBounded(ioDiagnostics)
.sumByKeyInFixedWindow(windowDuration = tenMinutes, windowOptions = windowOptions)
.mapWithTimestamp(IoDiagnostic.toRaw)
.mapWithTimestamp(IoDiagnostic.toRecord)
.writeBoundedToBigQuery(
IoIdentifier[IoDiagnostic.Raw]("any-id"),
BigQueryPartition.hourly[IoDiagnostic.Raw](s"$projectId:$datasetName.$tableName", localDateTime)
IoIdentifier[IoDiagnostic.Record]("any-id"),
BigQueryPartition.hourly[IoDiagnostic.Record](s"$projectId:$datasetName.$tableName", localDateTime)
)

val run = sc.run()
Expand All @@ -112,8 +112,8 @@ class IoDiagnosticTest extends AnyFlatSpec with Matchers
val results = readTable(datasetName, tableName).map(ioDiagnosticType.fromAvro)

results should contain.only(
IoDiagnostic.toRaw(ioDiagnostic1.copy(count = 3L), instant),
IoDiagnostic.toRaw(ioDiagnostic2.copy(count = 2L), instant)
IoDiagnostic.toRecord(ioDiagnostic1.copy(count = 3L), instant),
IoDiagnostic.toRecord(ioDiagnostic2.copy(count = 2L), instant)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.mkuthan.streamprocessing.shared.json
package org.mkuthan.streamprocessing.infrastructure.json

import com.softwaremill.diffx.generic.auto._
import com.softwaremill.diffx.scalatest.DiffShouldMatcher._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import org.scalatest.tags.Slow

import org.mkuthan.streamprocessing.infrastructure._
import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier
import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde
import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubTopic
import org.mkuthan.streamprocessing.shared.common.Message
import org.mkuthan.streamprocessing.shared.json.JsonSerde
import org.mkuthan.streamprocessing.test.gcp.GcpTestPatience
import org.mkuthan.streamprocessing.test.gcp.PubSubClient._
import org.mkuthan.streamprocessing.test.gcp.PubsubContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.tags.Slow

import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier
import org.mkuthan.streamprocessing.infrastructure.json.JsonSerde
import org.mkuthan.streamprocessing.infrastructure.storage.NumShards
import org.mkuthan.streamprocessing.infrastructure.storage.StorageBucket
import org.mkuthan.streamprocessing.infrastructure.storage.StorageConfiguration
import org.mkuthan.streamprocessing.infrastructure.IntegrationTestFixtures
import org.mkuthan.streamprocessing.infrastructure.IntegrationTestFixtures.SampleClass
import org.mkuthan.streamprocessing.shared.json.JsonSerde
import org.mkuthan.streamprocessing.test.gcp.GcpTestPatience
import org.mkuthan.streamprocessing.test.gcp.StorageClient._
import org.mkuthan.streamprocessing.test.gcp.StorageContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,17 @@ trait TimestampedMatchers extends InstantSyntax {
time: String,
value: T
): IterableMatcher[SCollection[(T, Instant)], (T, Instant)] =
containValue((value, time.toInstant))
containValueAtTime(time.toInstant, value)

/**
* Assert that the SCollection contains the provided element at given time without making assumptions about other
* elements in the collection.
*/
def containValueAtTime[T: Coder: Eq](
time: Instant,
value: T
): IterableMatcher[SCollection[(T, Instant)], (T, Instant)] =
containValue((value, time))

/**
* Assert that the SCollection contains a single provided element at given time.
Expand All @@ -81,6 +91,15 @@ trait TimestampedMatchers extends InstantSyntax {
def containInAnyOrderAtTime[T: Coder: Eq](
time: String,
value: Iterable[T]
): IterableMatcher[SCollection[(T, Instant)], (T, Instant)] =
containInAnyOrderAtTime(time.toInstant, value)

/**
* Assert that the SCollection contains the provided elements at given time.
*/
def containInAnyOrderAtTime[T: Coder: Eq](
time: Instant,
value: Iterable[T]
): IterableMatcher[SCollection[(T, Instant)], (T, Instant)] =
containInAnyOrder(value.map(v => (v, time.toInstant)))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.mkuthan.streamprocessing.toll.application.batch

import com.spotify.scio.values.WindowOptions
import com.spotify.scio.ContextAndArgs

import org.joda.time.Duration

import org.mkuthan.streamprocessing.infrastructure._
import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction
import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction.PartitionDateRestriction
import org.mkuthan.streamprocessing.infrastructure.bigquery.StorageReadConfiguration
import org.mkuthan.streamprocessing.shared._
import org.mkuthan.streamprocessing.toll.domain.booth.TollBoothEntry
Expand All @@ -24,6 +24,10 @@ object TollBatchJob extends TollBatchJobIo {

private val OneDay = Duration.standardDays(1)

private val TwoDays = Duration.standardDays(1)

private val DefaultWindowOptions = WindowOptions()

def main(mainArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(mainArgs)

Expand All @@ -34,7 +38,7 @@ object TollBatchJob extends TollBatchJobIo {
EntryTableIoId,
config.entryTable,
StorageReadConfiguration().withRowRestriction(
RowRestriction.DateColumnRestriction(TollBoothEntry.PartitioningColumnName, config.effectiveDate)
RowRestriction.TimestampColumnRestriction(TollBoothEntry.PartitioningColumnName, config.effectiveDate)
)
)
val boothEntries = TollBoothEntry.decodeRecord(boothEntryRecords)
Expand All @@ -43,7 +47,7 @@ object TollBatchJob extends TollBatchJobIo {
ExitTableIoId,
config.exitTable,
StorageReadConfiguration().withRowRestriction(
RowRestriction.DateColumnRestriction(TollBoothExit.PartitioningColumnName, config.effectiveDate)
RowRestriction.TimestampColumnRestriction(TollBoothExit.PartitioningColumnName, config.effectiveDate)
)
)
val boothExits = TollBoothExit.decodeRecord(boothExitRecords)
Expand All @@ -54,19 +58,19 @@ object TollBatchJob extends TollBatchJobIo {
VehicleRegistrationTableIoId,
config.vehicleRegistrationTable,
StorageReadConfiguration().withRowRestriction(
PartitionDateRestriction(config.effectiveDate)
RowRestriction.PartitionDateRestriction(config.effectiveDate)
)
)

val vehicleRegistrations = VehicleRegistration.decodeRecord(vehicleRegistrationRecords, config.effectiveDate)

// calculate tool booth stats
val boothStatsHourly = TollBoothStats.calculateInFixedWindow(boothEntries, OneHour)
val boothStatsHourly = TollBoothStats.calculateInFixedWindow(boothEntries, OneHour, DefaultWindowOptions)
TollBoothStats
.encode(boothStatsHourly)
.writeBoundedToBigQuery(EntryStatsHourlyTableIoId, config.entryStatsHourlyPartition)

val boothStatsDaily = TollBoothStats.calculateInFixedWindow(boothEntries, OneDay)
val boothStatsDaily = TollBoothStats.calculateInFixedWindow(boothEntries, OneDay, DefaultWindowOptions)
TollBoothStats
.encode(boothStatsDaily)
.writeBoundedToBigQuery(EntryStatsDailyTableIoId, config.entryStatsDailyPartition)
Expand All @@ -88,7 +92,13 @@ object TollBatchJob extends TollBatchJobIo {

// calculate vehicles with expired registrations
val (vehiclesWithExpiredRegistration, vehiclesWithExpiredRegistrationDiagnostic) =
VehiclesWithExpiredRegistration.calculateInFixedWindow(boothEntries, vehicleRegistrations, OneDay)
VehiclesWithExpiredRegistration.calculateWithTemporalJoin(
boothEntries,
vehicleRegistrations,
leftWindowDuration = OneDay,
rightWindowDuration = TwoDays,
windowOptions = DefaultWindowOptions
)
VehiclesWithExpiredRegistration
.encodeRecord(vehiclesWithExpiredRegistration)
.writeBoundedToBigQuery(
Expand Down
Loading

0 comments on commit 355fbfa

Please sign in to comment.