From d59c5b8047cd13c3645fa16028521f468b3537f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Sun, 24 Nov 2024 12:31:40 -0500 Subject: [PATCH] Add unsafeToProcessor syntax to Pipe --- core/shared/src/main/scala/fs2/Stream.scala | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index b12b94c7f1..9b5ef51f8d 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -5564,6 +5564,26 @@ object Stream extends StreamLowPriority { interop.flow.pipeToProcessor(pipe = self, chunkSize) } + /** Provides operations on IO pipes for syntactic convenience. */ + implicit final class IOPipeOps[I, O](private val self: Pipe[IO, I, O]) extends AnyVal { + + /** Creates a [[Processor]] from this [[Pipe]]. + * + * You are required to manually subscribe this [[Processor]] to an upstream [[Publisher]], and have at least one downstream [[Subscriber]] subscribe to the [[Consumer]]. + * + * @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]]. + * A high number may be useful if the publisher is triggering from IO, + * like requesting elements from a database. + * A high number will also lead to more elements in memory. + */ + def unsafeToProcessor( + chunkSize: Int + )(implicit + runtime: IORuntime + ): Processor[I, O] = + interop.flow.unsafePipeToProcessor(pipe = self, chunkSize) + } + /** Provides operations on pure pipes for syntactic convenience. */ implicit final class PurePipeOps[I, O](private val self: Pipe[Pure, I, O]) extends AnyVal {