Skip to content

Commit

Permalink
fix: Java ProducerPushDestination transformation and interceptor did …
Browse files Browse the repository at this point in the history
…not work as expected
  • Loading branch information
johanandren committed Aug 30, 2023
1 parent 98ef7e7 commit 49ce934
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.projection.grpc.consumer.javadsl

import akka.Done
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.grpc.scaladsl.MetadataBuilder
import org.scalatest.wordspec.AnyWordSpecLike

import java.util.Collections
import java.util.concurrent.CompletableFuture
import scala.annotation.nowarn

class EventProducerPushDestinationSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {

"The Java DSL EventProducerPushDestination" should {

"correctly map the transformationForOrigin to accept Scala GRPC metadata" in {
@nowarn("msg=never used")
val epsd = EventProducerPushDestination
.create("origin-id", Collections.emptyList(), system)
.withTransformationForOrigin((originId, metadata) => Transformation.identity)
val scalaEpsd = epsd.asScala

val transformation = scalaEpsd.transformationForOrigin("someOrigin", MetadataBuilder.empty)

transformation should be(Transformation.identity.delegate)
}

"correctly map the interceptor to accept Scala GRPC metadata" in {
@nowarn("msg=never used")
val epsd = EventProducerPushDestination
.create("origin-id", Collections.emptyList(), system)
.withInterceptor((originId, metadata) => CompletableFuture.completedFuture(Done))
val scalaEpsd = epsd.asScala

val interceptResult = scalaEpsd.interceptor.get.intercept("someOrigin", MetadataBuilder.empty)

interceptResult.futureValue should be(Done)
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import akka.actor.typed.ActorSystem
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.grpc.internal.JavaMetadataImpl
import akka.grpc.javadsl.Metadata
import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
Expand Down Expand Up @@ -155,9 +156,9 @@ final class EventProducerPushDestination private (
new scaladsl.EventProducerPushDestination(
journalPluginId.asScala,
acceptedStreamId,
(origin, meta) => transformationForOrigin.apply(origin, meta.asInstanceOf[akka.grpc.javadsl.Metadata]).delegate,
(origin, meta) => transformationForOrigin.apply(origin, new JavaMetadataImpl(meta)).delegate,
interceptor.asScala.map(javaInterceptor =>
(streamId, meta) => javaInterceptor.intercept(streamId, meta.asInstanceOf[akka.grpc.javadsl.Metadata]).toScala),
(streamId, meta) => javaInterceptor.intercept(streamId, new JavaMetadataImpl(meta)).toScala),
filters.asScala.toVector,
protobufDescriptors.asScala.toVector,
settings)
Expand Down

0 comments on commit 49ce934

Please sign in to comment.