Skip to content

Commit

Permalink
Merge branch 'flow-benchmark' into optimize-flow-interop
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Mar 29, 2024
2 parents 8234158 + becbd2e commit 05418a2
Show file tree
Hide file tree
Showing 24 changed files with 824 additions and 123 deletions.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.7.17"
version = "3.8.0"

style = default

Expand Down
14 changes: 0 additions & 14 deletions CODE_OF_CONDUCT.md

This file was deleted.

95 changes: 95 additions & 0 deletions benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package benchmark

import cats.effect.IO
import cats.effect.unsafe.implicits.global

import org.openjdk.jmh.annotations.{
Benchmark,
BenchmarkMode,
Mode,
OutputTimeUnit,
Param,
Scope,
State
}

import java.util.concurrent.TimeUnit
import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription}

import scala.concurrent.Future

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class FlowInteropBenchmark {
@Param(Array("1024", "5120", "10240"))
var totalElements: Int = _

@Param(Array("1000"))
var iterations: Int = _

@Benchmark
def fastPublisher(): Unit = {
def publisher =
new Publisher[Int] {
override final def subscribe(subscriber: Subscriber[? >: Int]): Unit =
subscriber.onSubscribe(
new Subscription {
@volatile var i: Int = 0
@volatile var canceled: Boolean = false

override final def request(n: Long): Unit = {
Future {
var j = 0
while (j < n && i < totalElements && !canceled) {
subscriber.onNext(i)
i += 1
j += 1
}

if (i == totalElements || canceled) {
subscriber.onComplete()
}
}(global.compute)

// Discarding the Future so it runs in the background.
()
}

override final def cancel(): Unit =
canceled = true
}
)
}

val stream =
interop.flow.fromPublisher[IO](publisher, chunkSize = 512)

val program =
stream.compile.toVector

program.replicateA_(iterations).unsafeRunSync()
}
}
12 changes: 6 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._

Global / onChangedBuildSource := ReloadOnSourceChanges

ThisBuild / tlBaseVersion := "3.9"
ThisBuild / tlBaseVersion := "3.10"

ThisBuild / organization := "co.fs2"
ThisBuild / organizationName := "Functional Streams for Scala"
Expand All @@ -11,7 +11,7 @@ ThisBuild / startYear := Some(2013)
val Scala213 = "2.13.12"

ThisBuild / scalaVersion := Scala213
ThisBuild / crossScalaVersions := Seq("2.12.18", Scala213, "3.3.1")
ThisBuild / crossScalaVersions := Seq("2.12.19", Scala213, "3.3.3")
ThisBuild / tlVersionIntroduced := Map("3" -> "3.0.3")

ThisBuild / githubWorkflowOSes := Seq("ubuntu-latest")
Expand Down Expand Up @@ -294,9 +294,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
libraryDependencies ++= Seq(
"org.scodec" %%% "scodec-bits" % "1.1.38",
"org.typelevel" %%% "cats-core" % "2.10.0",
"org.typelevel" %%% "cats-effect" % "3.5.3",
"org.typelevel" %%% "cats-effect-laws" % "3.5.3" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.5.3" % Test,
"org.typelevel" %%% "cats-effect" % "3.5.4",
"org.typelevel" %%% "cats-effect-laws" % "3.5.4" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.5.4" % Test,
"org.typelevel" %%% "cats-laws" % "2.10.0" % Test,
"org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M4" % Test,
Expand Down Expand Up @@ -354,7 +354,7 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform)
.jvmSettings(
Test / fork := true,
libraryDependencies ++= Seq(
"com.github.jnr" % "jnr-unixsocket" % "0.38.21" % Optional,
"com.github.jnr" % "jnr-unixsocket" % "0.38.22" % Optional,
"com.google.jimfs" % "jimfs" % "1.3.0" % Test
)
)
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala-2.12/fs2/ChunkPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.reflect.ClassTag
private[fs2] trait ChunkPlatform[+O] {
self: Chunk[O] =>

def asSeqPlatform: Option[IndexedSeq[O]] =
private[fs2] def asSeqPlatform: Option[IndexedSeq[O]] =
None
}

Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala-2.13/fs2/ChunkPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.immutable.ArraySeq
private[fs2] trait ChunkPlatform[+O] extends Chunk213And3Compat[O] {
self: Chunk[O] =>

def asSeqPlatform: Option[IndexedSeq[O]] =
private[fs2] def asSeqPlatform: Option[IndexedSeq[O]] =
this match {
case arraySlice: Chunk.ArraySlice[?] =>
Some(
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala-3/fs2/ChunkPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[fs2] trait ChunkPlatform[+O] extends Chunk213And3Compat[O] {
case _ => new Chunk.IArraySlice(IArray.unsafeFromArray(toArray(ct)), 0, size)
}

def asSeqPlatform: Option[IndexedSeq[O]] =
private[fs2] def asSeqPlatform: Option[IndexedSeq[O]] =
this match {
case arraySlice: Chunk.ArraySlice[_] =>
Some(
Expand Down
53 changes: 49 additions & 4 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,48 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Stream.eval(fstream)
}

/** Pulls up to the specified number of chunks from the source stream while concurrently allowing
* downstream to process emitted chunks. Unlike `prefetchN`, all accumulated chunks are emitted
* as a single chunk upon downstream pulling.
*
* The `chunkLimit` parameter controls backpressure on the source stream.
*/
def conflateChunks[F2[x] >: F[x]: Concurrent](chunkLimit: Int): Stream[F2, Chunk[O]] =
Stream.eval(Channel.bounded[F2, Chunk[O]](chunkLimit)).flatMap { chan =>
val producer = chunks.through(chan.sendAll)
val consumer = chan.stream.chunks.map(_.combineAll)
consumer.concurrently(producer)
}

/** Like `conflateChunks` but uses the supplied `zero` and `f` values to combine the elements of
* each output chunk in to a single value.
*/
def conflate[F2[x] >: F[x]: Concurrent, O2](chunkLimit: Int, zero: O2)(
f: (O2, O) => O2
): Stream[F2, O2] =
conflateChunks[F2](chunkLimit).map(_.foldLeft(zero)(f))

/** Like `conflate` but combines elements of the output chunk with the supplied function.
*/
def conflate1[F2[x] >: F[x]: Concurrent, O2 >: O](chunkLimit: Int)(
f: (O2, O2) => O2
): Stream[F2, O2] =
conflateChunks[F2](chunkLimit).map(_.iterator.reduce(f))

/** Like `conflate1` but combines elements using the semigroup of the output type.
*/
def conflateSemigroup[F2[x] >: F[x]: Concurrent, O2 >: O: Semigroup](
chunkLimit: Int
): Stream[F2, O2] =
conflate1[F2, O2](chunkLimit)(Semigroup[O2].combine)

/** Conflates elements and then maps the supplied function over the output chunk and combines the results using a semigroup.
*/
def conflateMap[F2[x] >: F[x]: Concurrent, O2: Semigroup](
chunkLimit: Int
)(f: O => O2): Stream[F2, O2] =
conflateChunks[F2](chunkLimit).map(_.iterator.map(f).reduce(Semigroup[O2].combine))

/** Prepends a chunk onto the front of this stream.
*
* @example {{{
Expand Down Expand Up @@ -2398,7 +2440,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
chan.stream.unchunks.concurrently {
chunks.through(chan.sendAll)

}
}

Expand Down Expand Up @@ -3605,9 +3646,13 @@ object Stream extends StreamLowPriority {

def getNextChunk(i: Iterator[A]): F[Option[(Chunk[A], Iterator[A])]] =
F.suspend(hint) {
for (_ <- 1 to chunkSize if i.hasNext) yield i.next()
}.map { s =>
if (s.isEmpty) None else Some((Chunk.from(s), i))
val bldr = Vector.newBuilder[A]
var cnt = 0
while (cnt < chunkSize && i.hasNext) {
bldr += i.next()
cnt += 1
}
if (cnt == 0) None else Some((Chunk.from(bldr.result()), i))
}

Stream.unfoldChunkEval(iterator)(getNextChunk)
Expand Down
10 changes: 6 additions & 4 deletions core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -704,16 +704,18 @@ class StreamCombinatorsSuite extends Fs2Suite {
}

test("fromIterator") {
forAllF { (x: List[Int], cs: Int) =>
// Note: important to use Vector here and not List in order to prevent https://github.com/typelevel/fs2/issues/3415
forAllF { (x: Vector[Int], cs: Int) =>
val chunkSize = (cs % 4096).abs + 1
Stream.fromIterator[IO](x.iterator, chunkSize).assertEmits(x)
Stream.fromIterator[IO](x.iterator, chunkSize).assertEmits(x.toList)
}
}

test("fromBlockingIterator") {
forAllF { (x: List[Int], cs: Int) =>
// Note: important to use Vector here and not List in order to prevent https://github.com/typelevel/fs2/issues/3415
forAllF { (x: Vector[Int], cs: Int) =>
val chunkSize = (cs % 4096).abs + 1
Stream.fromBlockingIterator[IO](x.iterator, chunkSize).assertEmits(x)
Stream.fromBlockingIterator[IO](x.iterator, chunkSize).assertEmits(x.toList)
}
}

Expand Down
47 changes: 47 additions & 0 deletions core/shared/src/test/scala/fs2/StreamConflateSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2

import cats.effect.IO
import cats.effect.testkit.TestControl

import scala.concurrent.duration._

class StreamConflateSuite extends Fs2Suite {

test("conflateMap") {
TestControl.executeEmbed(
Stream
.iterate(0)(_ + 1)
.covary[IO]
.metered(10.millis)
.conflateMap(100)(List(_))
.metered(101.millis)
.take(5)
.compile
.toList
.assertEquals(
List(0) :: (1 until 10).toList :: 10.until(40).toList.grouped(10).toList
)
)
}
}
Loading

0 comments on commit 05418a2

Please sign in to comment.