Skip to content

Commit

Permalink
Merge pull request #539 from permutive-engineering/update/all
Browse files Browse the repository at this point in the history
Dependency Updates
  • Loading branch information
github-actions[bot] authored Jul 23, 2024
2 parents ad89d29 + 167f60f commit c57ace3
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 74 deletions.
124 changes: 56 additions & 68 deletions modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import scala.concurrent.duration._
import scala.util.Properties

import cats.effect.IO
import cats.effect.std.Queue
import cats.effect.kernel.Resource
import cats.syntax.all._

import com.dimafeng.testcontainers.GenericContainer
import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures
import com.permutive.common.types.gcp.http4s._
import fs2.Chunk
import fs2.pubsub.dsl.client.PubSubClientStep
Expand All @@ -39,7 +38,7 @@ import org.http4s.client.dsl.io._
import org.http4s.ember.client.EmberClientBuilder
import org.testcontainers.containers.wait.strategy.Wait

class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {
class PubSubSuite extends CatsEffectSuite {

@nowarn("msg=deprecated")
val options =
Expand Down Expand Up @@ -109,81 +108,70 @@ class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {
// Fixtures //
//////////////

val projects = List.fill(8)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) =>
s"test-project-${index + 1},$topics"
}

val projectsFixture = ResourceSuiteLocalFixture(
"Projects",
Queue
.unbounded[IO, ProjectId]
.flatTap(queue => projects.map(_.split(",").head).traverse(ProjectId.fromStringF[IO](_).flatMap(queue.offer)))
.toResource
)

def afterProducing(constructor: PubSubClientStep[IO], records: Int, withAckDeadlineSeconds: Int = 10) =
ResourceFunFixture {
IO(projectsFixture().take).flatten.toResource
.product(EmberClientBuilder.default[IO].withHttp2.build)
.evalTap { case (projectId, client) =>
val body = Json.obj(
"subscription" := Json.obj(
"topic" := "example-topic",
val projectId = ProjectId("test-project")

Resource.fromAutoCloseable(IO(container).flatTap(container => IO(container.start()))) >>
EmberClientBuilder
.default[IO]
.withHttp2
.build
.evalTap { client =>
val body = Json.obj(
"topic" := "projects/test-project/topics/example-topic",
"ackDeadlineSeconds" := withAckDeadlineSeconds
),
"updateMask" := "ackDeadlineSeconds"
)

val request =
PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")

client.expect[Unit](request)
}
.map { case (projectId, client) =>
val pubSubClient = constructor
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
)

val requests = List(
PUT(container.uri / "v1" / "projects" / projectId / "topics" / "example-topic"),
PUT(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")
)

requests.traverse_(client.expect[Unit])
}
.map { client =>
val pubSubClient = constructor
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
}

case object container
extends GenericContainer(
"thekevjames/gcloud-pubsub-emulator:450.0.0",
exposedPorts = Seq(8681, 8682),
waitStrategy = Wait.forListeningPort().some,
env = projects.zipWithIndex.map { case (project, index) => s"PUBSUB_PROJECT${index + 1}" -> project }.toMap
"google/cloud-sdk:emulators",
command = "gcloud" :: "beta" :: "emulators" :: "pubsub" :: "start" :: "--project=test-project"
:: "--host-port=0.0.0.0:8085" :: Nil,
exposedPorts = Seq(8085),
waitStrategy = Wait.forLogMessage(".*Server started, listening on 8085.*", 1).some
) {

def uri = Uri.unsafeFromString(s"http://localhost:${mappedPort(8681)}")
def uri = Uri.unsafeFromString(s"http://localhost:${mappedPort(8085)}")

}

val containerFixture = new ForAllContainerFixture[GenericContainer](container)

override def munitFixtures = super.munitFixtures :+ projectsFixture :+ containerFixture

}
8 changes: 4 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ object Dependencies {
lazy val `http4s-grpc` = "io.chrisdavenport" %% "http4s-grpc" % "0.0.4"

lazy val grpc = Seq(
"com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.108.6",
"com.google.api.grpc" % "proto-google-common-protos" % "2.39.1",
"com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.113.0",
"com.google.api.grpc" % "proto-google-common-protos" % "2.41.0",
"com.google.protobuf" % "protobuf-java" % "3.25.3"
).map(_ % "protobuf-src" intransitive ()) ++ Seq(
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
Expand All @@ -25,7 +25,7 @@ object Dependencies {
lazy val `fs2-pubsub` = Seq(
"co.fs2" %% "fs2-core" % "3.10.2",
"com.permutive" %% "common-types-gcp-http4s" % "1.0.0",
"io.circe" %% "circe-parser" % "0.14.7",
"io.circe" %% "circe-parser" % "0.14.9",
"org.http4s" %% "http4s-circe" % "0.23.27",
"org.http4s" %% "http4s-client" % "0.23.27",
"org.http4s" %% "http4s-dsl" % "0.23.27"
Expand All @@ -38,7 +38,7 @@ object Dependencies {
).map(_ % Test)

lazy val `fs2-pubsub-pureconfig` = Seq(
"com.github.pureconfig" %% "pureconfig-http4s" % "0.17.6",
"com.github.pureconfig" %% "pureconfig-http4s" % "0.17.7",
"com.permutive" %% "common-types-gcp-pureconfig" % "1.0.0"
)

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.9
sbt.version=1.10.1
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")
addSbtPlugin("ch.epfl.scala" % "sbt-version-policy" % "3.2.1")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.2")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.4")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
addSbtPlugin("io.chrisdavenport" % "sbt-http4s-grpc" % "0.0.4")

0 comments on commit c57ace3

Please sign in to comment.