Skip to content

Commit

Permalink
Add toProcessor syntax to Pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Nov 24, 2024
1 parent ddd6516 commit 4d1690f
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import fs2.internal._
import org.typelevel.scalaccompat.annotation._
import Pull.StreamPullOps

import java.util.concurrent.Flow.{Publisher, Subscriber}
import java.util.concurrent.Flow.{Publisher, Processor, Subscriber}

/** A stream producing output of type `O` and which may evaluate `F` effects.
*
Expand Down Expand Up @@ -5541,6 +5541,27 @@ object Stream extends StreamLowPriority {
/** Transforms the right input of the given `Pipe2` using a `Pipe`. */
def attachR[I0, O2](p: Pipe2[F, I0, O, O2]): Pipe2[F, I0, I, O2] =
(l, r) => p(l, self(r))

/** Creates a flow [[Processor]] from this [[Pipe]].
*
* You are required to manually subscribe this [[Processor]] to an upstream [[Publisher]], and have at least one downstream [[Subscriber]] subscribe to the [[Consumer]].
*
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
*/
def toProcessor(
chunkSize: Int
)(implicit
F: Async[F]
): Resource[F, Processor[I, O]] =
interop.flow.pipeToProcessor(pipe = self, chunkSize)
}

/** Provides operations on pure pipes for syntactic convenience. */
Expand Down

0 comments on commit 4d1690f

Please sign in to comment.