diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index b70a1372ac..b12b94c7f1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -40,7 +40,7 @@ import fs2.internal._ import org.typelevel.scalaccompat.annotation._ import Pull.StreamPullOps -import java.util.concurrent.Flow.{Publisher, Subscriber} +import java.util.concurrent.Flow.{Publisher, Processor, Subscriber} /** A stream producing output of type `O` and which may evaluate `F` effects. * @@ -5541,6 +5541,27 @@ object Stream extends StreamLowPriority { /** Transforms the right input of the given `Pipe2` using a `Pipe`. */ def attachR[I0, O2](p: Pipe2[F, I0, O, O2]): Pipe2[F, I0, I, O2] = (l, r) => p(l, self(r)) + + /** Creates a flow [[Processor]] from this [[Pipe]]. + * + * You are required to manually subscribe this [[Processor]] to an upstream [[Publisher]], and have at least one downstream [[Subscriber]] subscribe to the [[Consumer]]. + * + * Closing the [[Resource]] means not accepting new subscriptions, + * but waiting for all active ones to finish consuming. + * Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions. + * Thus, no more elements will be published. + * + * @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]]. + * A high number may be useful if the publisher is triggering from IO, + * like requesting elements from a database. + * A high number will also lead to more elements in memory. + */ + def toProcessor( + chunkSize: Int + )(implicit + F: Async[F] + ): Resource[F, Processor[I, O]] = + interop.flow.pipeToProcessor(pipe = self, chunkSize) } /** Provides operations on pure pipes for syntactic convenience. */