From 3452b3ea488edb977f64005a0357ff6d54684203 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Fri, 29 Mar 2024 10:24:35 -0500 Subject: [PATCH] Add benchmark FlowInterop.fastPublisher --- .../fs2/benchmark/FlowInteropBenchmark.scala | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala diff --git a/benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala b/benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala new file mode 100644 index 0000000000..506f2913d4 --- /dev/null +++ b/benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package benchmark + +import cats.effect.IO +import cats.effect.unsafe.implicits.global + +import org.openjdk.jmh.annotations.{ + Benchmark, + BenchmarkMode, + Mode, + OutputTimeUnit, + Param, + Scope, + State +} + +import java.util.concurrent.TimeUnit +import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription} + +import scala.concurrent.Future + +@State(Scope.Thread) +@BenchmarkMode(Array(Mode.Throughput)) +@OutputTimeUnit(TimeUnit.SECONDS) +class FlowInteropBenchmark { + @Param(Array("1024", "5120", "10240")) + var totalElements: Int = _ + + @Param(Array("1000")) + var iterations: Int = _ + + @Benchmark + def fastPublisher(): Unit = { + def publisher = + new Publisher[Unit] { + override final def subscribe(subscriber: Subscriber[? >: Unit]): Unit = + subscriber.onSubscribe( + new Subscription { + @volatile var i: Int = 0 + @volatile var canceled: Boolean = false + + override final def request(n: Long): Unit = { + Future { + var j = 0 + while ((j < n) && (i < totalElements) && !canceled) { + subscriber.onNext(()) + i += 1 + j += 1 + } + + if (i == totalElements || canceled) { + subscriber.onComplete() + } + }(global.compute) + + // Discarding the Future so it runs in the background. + () + } + + override final def cancel(): Unit = + canceled = true + } + ) + } + + val stream = + interop.flow.fromPublisher[IO](publisher, chunkSize = 512) + + val program = + stream.compile.drain + + program.replicateA_(iterations).unsafeRunSync() + } +}