Skip to content

Commit

Permalink
Add unsafeToProcessor syntax to Pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Nov 24, 2024
1 parent 4d1690f commit d59c5b8
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5564,6 +5564,26 @@ object Stream extends StreamLowPriority {
interop.flow.pipeToProcessor(pipe = self, chunkSize)
}

/** Provides operations on IO pipes for syntactic convenience. */
implicit final class IOPipeOps[I, O](private val self: Pipe[IO, I, O]) extends AnyVal {

/** Creates a [[Processor]] from this [[Pipe]].
*
* You are required to manually subscribe this [[Processor]] to an upstream [[Publisher]], and have at least one downstream [[Subscriber]] subscribe to the [[Consumer]].
*
* @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
*/
def unsafeToProcessor(
chunkSize: Int
)(implicit
runtime: IORuntime
): Processor[I, O] =
interop.flow.unsafePipeToProcessor(pipe = self, chunkSize)
}

/** Provides operations on pure pipes for syntactic convenience. */
implicit final class PurePipeOps[I, O](private val self: Pipe[Pure, I, O]) extends AnyVal {

Expand Down

0 comments on commit d59c5b8

Please sign in to comment.