Skip to content

Commit

Permalink
Cleanup evening
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuthan committed Oct 19, 2023
1 parent 4f30f7a commit 2f1fb85
Show file tree
Hide file tree
Showing 32 changed files with 212 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ private[syntax] trait BigQuerySCollectionSyntax {
.pipe(write => configuration.configure(write))
.to(table.id)

var deadLetters = self.context.empty[BigQueryDeadLetter[T]]()
var deadLetters = self.context
.withName(s"$id/Empty Dead Letters")
.empty[BigQueryDeadLetter[T]]()

val _ = self.betterSaveAsCustomOutput(id.id) { in =>
val writeResult = in
Expand All @@ -50,7 +52,7 @@ private[syntax] trait BigQuerySCollectionSyntax {
.internal.apply("Write", io)

val errors = in.context.wrap(writeResult.getFailedStorageApiInserts)
deadLetters = errors.applyTransform("Errors", ParDo.of(new BigQueryDeadLetterEncoderDoFn[T]))
deadLetters = errors.applyTransform("Dead Letters", ParDo.of(new BigQueryDeadLetterEncoderDoFn[T]))

writeResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import scala.util.Try

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.joda.JodaModule
import com.fasterxml.jackson.module.scala.ClassTagExtensions
import com.fasterxml.jackson.module.scala.DefaultScalaModule
Expand All @@ -12,6 +13,7 @@ import com.fasterxml.jackson.module.scala.JavaTypeable
object JsonSerde {

private lazy val ObjectMapper = new ObjectMapper()
.enable(SerializationFeature.WRITE_DATES_WITH_ZONE_ID)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.FAIL_ON_NULL_CREATOR_PROPERTIES)
.registerModule(new JodaModule())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder
import com.spotify.scio.coders.Coder

private[syntax] trait PubsubCoders {
implicit def pubsubMessageCoder: Coder[PubsubMessage] =
implicit def pubsubMessageWithAttributesCoder: Coder[PubsubMessage] =
Coder.beam(PubsubMessageWithAttributesCoder.of())
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import org.mkuthan.streamprocessing.shared.common.Message

private[syntax] trait PubsubScioContextSyntax {

type PubsubResult[T] = Either[PubsubDeadLetter[T], Message[T]]

implicit class PubsubScioContextOps(private val self: ScioContext) {

def subscribeJsonFromPubsub[T <: AnyRef: Coder: ClassTag](
id: IoIdentifier[T],
subscription: PubsubSubscription[T],
Expand All @@ -38,7 +35,7 @@ private[syntax] trait PubsubScioContextSyntax {

self.betterCustomInput(id.id) { in =>
self.wrap(in.apply("Subscribe", io))
.withName("Decode")
.withName("Deserialize")
.map { msg =>
val payload = msg.getPayload
val attributes = Utils.readAttributes(msg.getAttributeMap)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package org.mkuthan.streamprocessing.infrastructure.pubsub.syntax

trait PubsubSyntax extends PubsubScioContextSyntax with PubsubSCollectionSyntax with PubsubCoders
trait PubsubSyntax extends PubsubScioContextSyntax with PubsubSCollectionSyntax with PubsubCoders with PubsubTypes
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.mkuthan.streamprocessing.infrastructure.pubsub.syntax

import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubDeadLetter
import org.mkuthan.streamprocessing.shared.common.Message

private[syntax] trait PubsubTypes {
type PubsubResult[T] = Either[PubsubDeadLetter[T], Message[T]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ class BigQuerySCollectionDeadLetterOpsTest extends AnyFlatSpec

val results = sc.testUnbounded(deadLetters).toDiagnostic(id)

results should containInAnyOrder(Seq(
results should containElements(
Diagnostic(id.id, "error 1"),
Diagnostic(id.id, "error 2")
))
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class PubsubSCollectionDeadLetterOpsTest extends AnyFlatSpec

val results = sc.testUnbounded(deadLetters).toDiagnostic(id)

results should containInAnyOrder(Seq(
results should containElements(
Diagnostic(id.id, "error 1"),
Diagnostic(id.id, "error 2")
))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.mkuthan.streamprocessing.shared.common.Message
import org.mkuthan.streamprocessing.test.gcp.GcpTestPatience
import org.mkuthan.streamprocessing.test.gcp.PubsubClient._
import org.mkuthan.streamprocessing.test.gcp.PubsubContext
import org.mkuthan.streamprocessing.test.scio.syntax._
import org.mkuthan.streamprocessing.test.scio.IntegrationTestScioContext

@Slow
Expand All @@ -28,15 +29,53 @@ class PubsubSCollectionOpsTest extends AnyFlatSpec with Matchers

behavior of "Pubsub SCollection syntax"

it should "publish JSON" in withScioContext { sc =>
it should "publish bounded JSON" in withScioContext { sc =>
withTopic { topic =>
withSubscription(topic) { subscription =>
sc
.parallelize[Message[SampleClass]](Seq(
Message(SampleObject1, SampleMap1),
Message(SampleObject2, SampleMap2)
))
.publishJsonToPubsub(IoIdentifier[SampleClass]("any-id"), PubsubTopic[SampleClass](topic))
val message1 = Message(SampleObject1, SampleMap1)
val message2 = Message(SampleObject2, SampleMap2)

val input = boundedTestCollectionOf[Message[SampleClass]]
.addElementsAtMinimumTime(message1, message2)
.advanceWatermarkToInfinity()

sc.testBounded(input).publishJsonToPubsub(
IoIdentifier[SampleClass]("any-id"),
PubsubTopic[SampleClass](topic)
)

sc.run().waitUntilDone()

val results = mutable.ArrayBuffer.empty[(SampleClass, Map[String, String])]
eventually {
results ++= pullMessages(subscription)
.map { case (payload, attributes) =>
(JsonSerde.readJsonFromBytes[SampleClass](payload).get, attributes)
}

results should contain.only(
(SampleObject1, SampleMap1),
(SampleObject2, SampleMap2)
)
}
}
}
}

it should "publish unbounded JSON" in withScioContext { sc =>
withTopic { topic =>
withSubscription(topic) { subscription =>
val message1 = Message(SampleObject1, SampleMap1)
val message2 = Message(SampleObject2, SampleMap2)

val input = unboundedTestCollectionOf[Message[SampleClass]]
.addElementsAtWatermarkTime(message1, message2)
.advanceWatermarkToInfinity()

sc.testUnbounded(input).publishJsonToPubsub(
IoIdentifier[SampleClass]("any-id"),
PubsubTopic[SampleClass](topic)
)

sc.run().waitUntilDone()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package org.mkuthan.streamprocessing.infrastructure.pubsub.syntax

import org.joda.time.Instant
import org.scalactic.Equality
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.tags.Slow
import org.scalatest.LoneElement._
import org.scalatest.EitherValues._

import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier
import org.mkuthan.streamprocessing.infrastructure.pubsub.JsonReadConfiguration
import org.mkuthan.streamprocessing.infrastructure.pubsub.NamedIdAttribute
import org.mkuthan.streamprocessing.infrastructure.pubsub.NamedTimestampAttribute
import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubDeadLetter
import org.mkuthan.streamprocessing.infrastructure.pubsub.PubsubSubscription
import org.mkuthan.streamprocessing.infrastructure.IntegrationTestFixtures
import org.mkuthan.streamprocessing.infrastructure.IntegrationTestFixtures.SampleClass
Expand All @@ -29,6 +31,15 @@ class PubsubScioContextOpsTest extends AnyFlatSpec with Matchers
with IntegrationTestFixtures
with PubsubContext {

implicit def pubsubDeadLetterEquality[T]: Equality[PubsubDeadLetter[T]] = (self: PubsubDeadLetter[T], other: Any) =>
other match {
case o: PubsubDeadLetter[_] =>
self.payload.sameElements(o.payload) &&
self.attributes == o.attributes &&
self.error.startsWith(o.error)
case _ => false
}

behavior of "Pubsub ScioContext syntax"

it should "subscribe JSON" in withScioContext { implicit sc =>
Expand All @@ -42,18 +53,17 @@ class PubsubScioContextOpsTest extends AnyFlatSpec with Matchers
(SampleJson2, SampleMap2)
)

val (messages, _) =
sc.subscribeJsonFromPubsub(IoIdentifier[SampleClass]("any-id"), PubsubSubscription[SampleClass](subscription))
.unzip

val sink = InMemorySink(messages)

val results = sc.subscribeJsonFromPubsub(
IoIdentifier[SampleClass]("any-id"),
PubsubSubscription[SampleClass](subscription)
)
val sink = InMemorySink(results)
val run = sc.run()

eventually {
sink.toSeq should contain.only(
Message(SampleObject1, SampleMap1),
Message(SampleObject2, SampleMap2)
Right(Message(SampleObject1, SampleMap1)),
Right(Message(SampleObject2, SampleMap2))
)
}

Expand All @@ -69,19 +79,21 @@ class PubsubScioContextOpsTest extends AnyFlatSpec with Matchers
withSubscription(topic) { subscription =>
publishMessages(topic, (InvalidJson, SampleMap1))

val (_, dlq) =
sc.subscribeJsonFromPubsub(IoIdentifier[SampleClass]("any-id"), PubsubSubscription[SampleClass](subscription))
.unzip

val sink = InMemorySink(dlq)

val results = sc.subscribeJsonFromPubsub(
IoIdentifier[SampleClass]("any-id"),
PubsubSubscription[SampleClass](subscription)
)
val sink = InMemorySink(results)
val run = sc.run()

val expectedDeadLetter = PubsubDeadLetter[SampleClass](
payload = InvalidJson,
attributes = SampleMap1,
error = "Unrecognized token 'invalid'"
)

eventually {
val error = sink.toElement
error.payload should be(InvalidJson)
error.attributes should be(SampleMap1)
error.error should startWith("Unrecognized token 'invalid'")
sink.toElement.left.value should equal(expectedDeadLetter)
}

run.pipelineResult.cancel()
Expand All @@ -99,18 +111,16 @@ class PubsubScioContextOpsTest extends AnyFlatSpec with Matchers

publishMessages(topic, Seq.fill(10)(messagePrototype): _*)

val (messages, _) = sc.subscribeJsonFromPubsub(
val results = sc.subscribeJsonFromPubsub(
IoIdentifier[SampleClass]("any-id"),
subscription = PubsubSubscription[SampleClass](subscription),
configuration = JsonReadConfiguration().withIdAttribute(NamedIdAttribute.Default)
).unzip

val messagesSink = InMemorySink(messages)

)
val messagesSink = InMemorySink(results)
val run = sc.run()

eventually {
messagesSink.toSeq.loneElement should be(Message(SampleObject1, attributes))
messagesSink.toElement should be(Right(Message(SampleObject1, attributes)))
}

run.pipelineResult.cancel()
Expand All @@ -128,19 +138,19 @@ class PubsubScioContextOpsTest extends AnyFlatSpec with Matchers

publishMessages(topic, (SampleJson1, attributes))

val (messages, _) = sc.subscribeJsonFromPubsub(
val results = sc.subscribeJsonFromPubsub(
IoIdentifier[SampleClass]("any-id"),
subscription = PubsubSubscription[SampleClass](subscription),
configuration = JsonReadConfiguration().withTimestampAttribute(NamedTimestampAttribute.Default)
).unzip
)

val messagesSink = InMemorySink(messages.withTimestamp)
val messagesSink = InMemorySink(results.withTimestamp)

val run = sc.run()

eventually {
val (msg, ts) = messagesSink.toElement
msg should be(Message(SampleObject1, attributes))
msg should be(Right(Message(SampleObject1, attributes)))
ts should be(timestamp)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import org.joda.time.Instant
import org.mkuthan.streamprocessing.shared.scio.syntax._
import org.mkuthan.streamprocessing.shared.scio.SumByKey

// TODO: move to shared/diagnostic
final case class Diagnostic(
id: String,
reason: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ private[syntax] trait SCollectionSyntax {

def unionInGlobalWindow(others: SCollection[T]*): SCollection[T] =
self.transform { in =>
val inputWithGlobalWindow = in.withGlobalWindow(GlobalWindowOptions)
val othersWithGlobalWindow = others.map(_.withGlobalWindow(GlobalWindowOptions))
val collectionsWithGlobalWindow = (in +: others).map(_.withGlobalWindow(GlobalWindowOptions))

SCollection.unionAll(inputWithGlobalWindow +: othersWithGlobalWindow)
SCollection.unionAll(collectionsWithGlobalWindow)
}

def mapWithTimestamp[U: Coder](mapFn: (T, Instant) => U): SCollection[U] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import com.spotify.scio.ScioContext
private[syntax] trait ScioContextSyntax {

implicit class ScioContextOps(private val self: ScioContext) extends SCollectionSyntax {
def unionInGlobalWindow[T: Coder](
def unionInGlobalWindow[T: Coder](name: String)(
first: SCollection[T],
others: SCollection[T]*
): SCollection[T] =
first.unionInGlobalWindow(others: _*)
first
.withName(name)
.unionInGlobalWindow(others: _*)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class SCollectionEitherOpsTest extends AnyFlatSpec
.testBounded(collection)
.unzip

right should containInAnyOrder(Seq("r1", "r2", "r3"))
left should containInAnyOrder(Seq("l1", "l2"))
right should containElements("r1", "r2", "r3")
left should containElements("l1", "l2")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class SCollectionOpsTest extends AnyFlatSpec
sc.testBounded(collection3).windowByMonths(1)
)

results should containInAnyOrder(Seq("one", "two", "three"))
results should containElements("one", "two", "three")
}

it should "map with timestamp" in runWithScioContext { sc =>
Expand All @@ -43,6 +43,6 @@ class SCollectionOpsTest extends AnyFlatSpec
val results = sc.testBounded(collection)
.mapWithTimestamp { case (e, i) => e + i.toString }

results should containSingleValue(s"$element$instant")
results should containElements(s"$element$instant")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class SCollectionSumByKeyOpsTest extends AnyFlatSpec
val results = sc.testBounded(collection).sumByKeyInFixedWindow(Duration.standardMinutes(10))

results should inOnTimePane("12:00:00", "12:10:00") {
containInAnyOrder(Seq(sample1.copy(count = 2), sample2.copy(count = 1)))
containElements(sample1.copy(count = 2), sample2.copy(count = 1))
}

results should inOnTimePane("12:10:00", "12:20:00") {
containInAnyOrder(Seq(sample2.copy(count = 1)))
containElements(sample2.copy(count = 1))
}
}
}
Loading

0 comments on commit 2f1fb85

Please sign in to comment.