From aa3a9923fb7d39c6e868d7f1c194e99994531b6d Mon Sep 17 00:00:00 2001 From: "permutive-steward[bot]" <155995252+permutive-steward[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 05:02:12 +0000 Subject: [PATCH 01/10] Update pureconfig-http4s to 0.17.7 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 45b5ec08..5156ad0a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -38,7 +38,7 @@ object Dependencies { ).map(_ % Test) lazy val `fs2-pubsub-pureconfig` = Seq( - "com.github.pureconfig" %% "pureconfig-http4s" % "0.17.6", + "com.github.pureconfig" %% "pureconfig-http4s" % "0.17.7", "com.permutive" %% "common-types-gcp-pureconfig" % "1.0.0" ) From 648d03c97b8541f0d3298837062c1cc0f77ba997 Mon Sep 17 00:00:00 2001 From: "permutive-steward[bot]" <155995252+permutive-steward[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 05:02:12 +0000 Subject: [PATCH 02/10] Update proto-google-cloud-pubsub-v1 to 1.113.0 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5156ad0a..43b74ded 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,7 +15,7 @@ object Dependencies { lazy val `http4s-grpc` = "io.chrisdavenport" %% "http4s-grpc" % "0.0.4" lazy val grpc = Seq( - "com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.108.6", + "com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.113.0", "com.google.api.grpc" % "proto-google-common-protos" % "2.39.1", "com.google.protobuf" % "protobuf-java" % "3.25.3" ).map(_ % "protobuf-src" intransitive ()) ++ Seq( From 85b9abad7a57af179adfdcb4cd11cd3daa4e2794 Mon Sep 17 00:00:00 2001 From: "permutive-steward[bot]" <155995252+permutive-steward[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 05:02:12 +0000 Subject: [PATCH 03/10] Update proto-google-common-protos to 2.41.0 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 43b74ded..54cfe2db 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,7 +16,7 @@ object Dependencies { lazy val grpc = Seq( "com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.113.0", - "com.google.api.grpc" % "proto-google-common-protos" % "2.39.1", + "com.google.api.grpc" % "proto-google-common-protos" % "2.41.0", "com.google.protobuf" % "protobuf-java" % "3.25.3" ).map(_ % "protobuf-src" intransitive ()) ++ Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf" From 0ebc893e6f6ffcae03883c2c034acf04b55101d3 Mon Sep 17 00:00:00 2001 From: "permutive-steward[bot]" <155995252+permutive-steward[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 05:02:12 +0000 Subject: [PATCH 04/10] Update protobuf-java to 4.27.2 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 54cfe2db..9c42be17 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { lazy val grpc = Seq( "com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.113.0", "com.google.api.grpc" % "proto-google-common-protos" % "2.41.0", - "com.google.protobuf" % "protobuf-java" % "3.25.3" + "com.google.protobuf" % "protobuf-java" % "4.27.2" ).map(_ % "protobuf-src" intransitive ()) ++ Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf" ) From 62434d0c8893d5b18b2878aa8f6c57c87a38a884 Mon Sep 17 00:00:00 2001 From: "permutive-steward[bot]" <155995252+permutive-steward[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 05:02:12 +0000 Subject: [PATCH 05/10] Update circe-parser to 0.14.9 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9c42be17..4a137310 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -25,7 +25,7 @@ object Dependencies { lazy val `fs2-pubsub` = Seq( "co.fs2" %% "fs2-core" % "3.10.2", "com.permutive" %% "common-types-gcp-http4s" % "1.0.0", - "io.circe" %% "circe-parser" % "0.14.7", + "io.circe" %% "circe-parser" % "0.14.9", "org.http4s" %% "http4s-circe" % "0.23.27", "org.http4s" %% "http4s-client" % "0.23.27", "org.http4s" %% "http4s-dsl" % "0.23.27" From 33b860f1d8ec30043f00e3145bb25ae2d222cc95 Mon Sep 17 00:00:00 2001 From: "permutive-steward[bot]" <155995252+permutive-steward[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 05:02:12 +0000 Subject: [PATCH 06/10] Update sbt to 1.10.1 --- project/build.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build.properties b/project/build.properties index 04267b14..ee4c672c 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.9 +sbt.version=1.10.1 From c61a5682b71219abb761c9d8b1549666b9ce4a8c Mon Sep 17 00:00:00 2001 From: "permutive-steward[bot]" <155995252+permutive-steward[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 05:02:13 +0000 Subject: [PATCH 07/10] Update sbt-mdoc to 2.5.4 --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 8f04a4e0..6ed83423 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -12,6 +12,6 @@ addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1. addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0") addSbtPlugin("ch.epfl.scala" % "sbt-version-policy" % "3.2.1") addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1") -addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.2") +addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.4") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") addSbtPlugin("io.chrisdavenport" % "sbt-http4s-grpc" % "0.0.4") From 13fbeb06445e83745b6967fd09ac978d46b909c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Herna=CC=81ndez?= Date: Tue, 23 Jul 2024 12:22:22 +0200 Subject: [PATCH 08/10] Downgrade `protobuf-java` --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4a137310..76a926af 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { lazy val grpc = Seq( "com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.113.0", "com.google.api.grpc" % "proto-google-common-protos" % "2.41.0", - "com.google.protobuf" % "protobuf-java" % "4.27.2" + "com.google.protobuf" % "protobuf-java" % "3.25.3" ).map(_ % "protobuf-src" intransitive ()) ++ Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf" ) From fa65c05a9865f56a2a6536caf2439042d945de3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Herna=CC=81ndez?= Date: Tue, 23 Jul 2024 13:24:51 +0200 Subject: [PATCH 09/10] Start a new container for each test --- .../test/scala/fs2/pubsub/PubSubSuite.scala | 120 ++++++++---------- 1 file changed, 54 insertions(+), 66 deletions(-) diff --git a/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala b/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala index fc8f59c4..de4b8501 100644 --- a/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala +++ b/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala @@ -21,11 +21,10 @@ import scala.concurrent.duration._ import scala.util.Properties import cats.effect.IO -import cats.effect.std.Queue +import cats.effect.kernel.Resource import cats.syntax.all._ import com.dimafeng.testcontainers.GenericContainer -import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures import com.permutive.common.types.gcp.http4s._ import fs2.Chunk import fs2.pubsub.dsl.client.PubSubClientStep @@ -39,7 +38,7 @@ import org.http4s.client.dsl.io._ import org.http4s.ember.client.EmberClientBuilder import org.testcontainers.containers.wait.strategy.Wait -class PubSubSuite extends CatsEffectSuite with TestContainersFixtures { +class PubSubSuite extends CatsEffectSuite { @nowarn("msg=deprecated") val options = @@ -109,81 +108,70 @@ class PubSubSuite extends CatsEffectSuite with TestContainersFixtures { // Fixtures // ////////////// - val projects = List.fill(8)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) => - s"test-project-${index + 1},$topics" - } - - val projectsFixture = ResourceSuiteLocalFixture( - "Projects", - Queue - .unbounded[IO, ProjectId] - .flatTap(queue => projects.map(_.split(",").head).traverse(ProjectId.fromStringF[IO](_).flatMap(queue.offer))) - .toResource - ) - def afterProducing(constructor: PubSubClientStep[IO], records: Int, withAckDeadlineSeconds: Int = 10) = ResourceFunFixture { - IO(projectsFixture().take).flatten.toResource - .product(EmberClientBuilder.default[IO].withHttp2.build) - .evalTap { case (projectId, client) => - val body = Json.obj( - "subscription" := Json.obj( - "topic" := "example-topic", - "ackDeadlineSeconds" := withAckDeadlineSeconds - ), - "updateMask" := "ackDeadlineSeconds" - ) - - val request = - PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription") - - client.expect[Unit](request) - } - .map { case (projectId, client) => - val pubSubClient = constructor - .projectId(projectId) - .uri(container.uri) - .httpClient(client) - .noRetry - - val publisher = pubSubClient - .publisher[String] - .topic(Topic("example-topic")) - - val subscriber = pubSubClient.subscriber - .subscription(Subscription("example-subscription")) - .errorHandler { - case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t) - case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t) - case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack - } - .withDefaults - .decodeTo[String] - .subscribe - - (publisher, subscriber) - } - .evalTap { - case (publisher, _) if records === 1 => publisher.publishOne("ping") - case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping"))) - } - ._2F + val projectId = ProjectId("test-project") + + Resource.fromAutoCloseable(IO(container).flatTap(container => IO(container.start()))) >> + EmberClientBuilder + .default[IO] + .withHttp2 + .build + .evalTap { client => + val body = Json.obj( + "subscription" := Json.obj( + "topic" := "example-topic", + "ackDeadlineSeconds" := withAckDeadlineSeconds + ), + "updateMask" := "ackDeadlineSeconds" + ) + + val request = + PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription") + + client.expect[Unit](request) + } + .map { client => + val pubSubClient = constructor + .projectId(projectId) + .uri(container.uri) + .httpClient(client) + .noRetry + + val publisher = pubSubClient + .publisher[String] + .topic(Topic("example-topic")) + + val subscriber = pubSubClient.subscriber + .subscription(Subscription("example-subscription")) + .errorHandler { + case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t) + case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t) + case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack + } + .withDefaults + .decodeTo[String] + .subscribe + + (publisher, subscriber) + } + .evalTap { + case (publisher, _) if records === 1 => publisher.publishOne("ping") + case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping"))) + } + ._2F } case object container extends GenericContainer( - "thekevjames/gcloud-pubsub-emulator:450.0.0", + "thekevjames/gcloud-pubsub-emulator:484.0.0", exposedPorts = Seq(8681, 8682), waitStrategy = Wait.forListeningPort().some, - env = projects.zipWithIndex.map { case (project, index) => s"PUBSUB_PROJECT${index + 1}" -> project }.toMap + env = Map("PUBSUB_PROJECT1" -> "test-project,example-topic:example-subscription") ) { def uri = Uri.unsafeFromString(s"http://localhost:${mappedPort(8681)}") } - val containerFixture = new ForAllContainerFixture[GenericContainer](container) - - override def munitFixtures = super.munitFixtures :+ projectsFixture :+ containerFixture - } From 167f60f86a79392d70ffdcea992ac07be87200ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Herna=CC=81ndez?= Date: Tue, 23 Jul 2024 17:20:58 +0200 Subject: [PATCH 10/10] Use official Docker Image --- .../test/scala/fs2/pubsub/PubSubSuite.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala b/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala index de4b8501..1c3eb9cc 100644 --- a/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala +++ b/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala @@ -119,17 +119,16 @@ class PubSubSuite extends CatsEffectSuite { .build .evalTap { client => val body = Json.obj( - "subscription" := Json.obj( - "topic" := "example-topic", - "ackDeadlineSeconds" := withAckDeadlineSeconds - ), - "updateMask" := "ackDeadlineSeconds" + "topic" := "projects/test-project/topics/example-topic", + "ackDeadlineSeconds" := withAckDeadlineSeconds ) - val request = - PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription") + val requests = List( + PUT(container.uri / "v1" / "projects" / projectId / "topics" / "example-topic"), + PUT(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription") + ) - client.expect[Unit](request) + requests.traverse_(client.expect[Unit]) } .map { client => val pubSubClient = constructor @@ -164,13 +163,14 @@ class PubSubSuite extends CatsEffectSuite { case object container extends GenericContainer( - "thekevjames/gcloud-pubsub-emulator:484.0.0", - exposedPorts = Seq(8681, 8682), - waitStrategy = Wait.forListeningPort().some, - env = Map("PUBSUB_PROJECT1" -> "test-project,example-topic:example-subscription") + "google/cloud-sdk:emulators", + command = "gcloud" :: "beta" :: "emulators" :: "pubsub" :: "start" :: "--project=test-project" + :: "--host-port=0.0.0.0:8085" :: Nil, + exposedPorts = Seq(8085), + waitStrategy = Wait.forLogMessage(".*Server started, listening on 8085.*", 1).some ) { - def uri = Uri.unsafeFromString(s"http://localhost:${mappedPort(8681)}") + def uri = Uri.unsafeFromString(s"http://localhost:${mappedPort(8085)}") }