From df5ede1972abb82934702e6108eb3ec1ddd057df Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Wed, 11 Oct 2023 18:00:47 +0300 Subject: [PATCH 1/4] fix 'fs.io.readInputStreamGeneric' overallocation of underlying buffers --- io/jvm/src/main/scala/fs2/io/ioplatform.scala | 4 +-- io/shared/src/main/scala/fs2/io/io.scala | 27 ++++++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/io/jvm/src/main/scala/fs2/io/ioplatform.scala b/io/jvm/src/main/scala/fs2/io/ioplatform.scala index 2ab6254887..229549d0f4 100644 --- a/io/jvm/src/main/scala/fs2/io/ioplatform.scala +++ b/io/jvm/src/main/scala/fs2/io/ioplatform.scala @@ -43,10 +43,10 @@ private[fs2] trait ioplatform extends iojvmnative { F.pure(System.in), F.delay(new Array[Byte](bufSize)), false - ) { (is, buf) => + ) { (is, buf, off) => F.async[Int] { cb => F.delay { - val task: Runnable = () => cb(Right(is.read(buf))) + val task: Runnable = () => cb(Right(is.read(buf, off, buf.length - off))) stdinExecutor.submit(task) }.map { fut => Some(F.delay { diff --git a/io/shared/src/main/scala/fs2/io/io.scala b/io/shared/src/main/scala/fs2/io/io.scala index 600f4f28cd..c209088153 100644 --- a/io/shared/src/main/scala/fs2/io/io.scala +++ b/io/shared/src/main/scala/fs2/io/io.scala @@ -45,7 +45,7 @@ package object io extends ioplatform { fis, F.delay(new Array[Byte](chunkSize)), closeAfterUse - )((is, buf) => F.blocking(is.read(buf))) + )((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off))) private[io] def readInputStreamCancelable[F[_]]( fis: F[InputStream], @@ -57,7 +57,7 @@ package object io extends ioplatform { fis, F.delay(new Array[Byte](chunkSize)), closeAfterUse - )((is, buf) => F.blocking(is.read(buf)).cancelable(cancel)) + )((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)).cancelable(cancel)) /** Reads all bytes from the specified `InputStream` with a buffer size of `chunkSize`. * Set `closeAfterUse` to false if the `InputStream` should not be closed after use. @@ -76,28 +76,31 @@ package object io extends ioplatform { fis, F.pure(new Array[Byte](chunkSize)), closeAfterUse - )((is, buf) => F.blocking(is.read(buf))) + )((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off))) - private def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte])( - read: (InputStream, Array[Byte]) => F[Int] + private def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte], offset: Int)( + read: (InputStream, Array[Byte], Int) => F[Int] )(implicit F: Sync[F] ): F[Option[Chunk[Byte]]] = - read(is, buf).map { numBytes => - if (numBytes < 0) None - else if (numBytes == 0) Some(Chunk.empty) - else if (numBytes < buf.size) Some(Chunk.array(buf, 0, numBytes)) - else Some(Chunk.array(buf)) + read(is, buf, offset).flatMap { numBytes => + if (numBytes < 0) { + if (offset == 0) F.pure(None) + else F.pure(Some(Chunk.array(buf, 0, offset))) + } else { + if (offset + numBytes == buf.size) F.pure(Some(Chunk.array(buf))) + else readBytesFromInputStream(is, buf, offset + numBytes)(read) + } } private[fs2] def readInputStreamGeneric[F[_]]( fis: F[InputStream], buf: F[Array[Byte]], closeAfterUse: Boolean - )(read: (InputStream, Array[Byte]) => F[Int])(implicit F: Sync[F]): Stream[F, Byte] = { + )(read: (InputStream, Array[Byte], Int) => F[Int])(implicit F: Sync[F]): Stream[F, Byte] = { def useIs(is: InputStream) = Stream - .eval(buf.flatMap(b => readBytesFromInputStream(is, b)(read))) + .eval(buf.flatMap(b => readBytesFromInputStream(is, b, 0)(read))) .repeat .unNoneTerminate .flatMap(c => Stream.chunk(c)) From cb4834d2b588b1eb3bc3d33ff4854f23cf42612d Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Wed, 11 Oct 2023 19:20:28 +0300 Subject: [PATCH 2/4] fix 'Process' stdout interop as this stream would not exhaust before process completion --- io/shared/src/main/scala/fs2/io/io.scala | 25 +++++++++++------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/io/shared/src/main/scala/fs2/io/io.scala b/io/shared/src/main/scala/fs2/io/io.scala index c209088153..ca9aad5dfb 100644 --- a/io/shared/src/main/scala/fs2/io/io.scala +++ b/io/shared/src/main/scala/fs2/io/io.scala @@ -82,14 +82,13 @@ package object io extends ioplatform { read: (InputStream, Array[Byte], Int) => F[Int] )(implicit F: Sync[F] - ): F[Option[Chunk[Byte]]] = - read(is, buf, offset).flatMap { numBytes => - if (numBytes < 0) { - if (offset == 0) F.pure(None) - else F.pure(Some(Chunk.array(buf, 0, offset))) - } else { - if (offset + numBytes == buf.size) F.pure(Some(Chunk.array(buf))) - else readBytesFromInputStream(is, buf, offset + numBytes)(read) + ): F[Option[(Chunk[Byte], Option[(Array[Byte], Int)])]] = + read(is, buf, offset).map { numBytes => + if (numBytes < 0) None + else if (numBytes == 0) Some(Chunk.empty -> Some(buf -> offset)) + else { + if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None) + else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes))) } } @@ -98,12 +97,10 @@ package object io extends ioplatform { buf: F[Array[Byte]], closeAfterUse: Boolean )(read: (InputStream, Array[Byte], Int) => F[Int])(implicit F: Sync[F]): Stream[F, Byte] = { - def useIs(is: InputStream) = - Stream - .eval(buf.flatMap(b => readBytesFromInputStream(is, b, 0)(read))) - .repeat - .unNoneTerminate - .flatMap(c => Stream.chunk(c)) + def useIs(is: InputStream) = Stream.unfoldChunkEval(Option.empty[(Array[Byte], Int)]) { + case None => buf.flatMap(b => readBytesFromInputStream(is, b, 0)(read)) + case Some((b, offset)) => readBytesFromInputStream(is, b, offset)(read) + } if (closeAfterUse) Stream.bracket(fis)(is => Sync[F].blocking(is.close())).flatMap(useIs) From 3747c5ceacf7b2c52d8b3c8f93bea08a523130c7 Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Thu, 12 Oct 2023 12:31:13 +0300 Subject: [PATCH 3/4] fix bincompat --- build.sbt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/build.sbt b/build.sbt index 9e0e3d91fc..f500e48b38 100644 --- a/build.sbt +++ b/build.sbt @@ -236,6 +236,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( ), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]( "fs2.io.file.Files.openSeekableByteChannel" + ), + // package-private method: #3318 + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "fs2.io.package.readInputStreamGeneric" ) ) From 4e108168db0b337f893cadd45be69dba118e9398 Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Fri, 20 Oct 2023 16:44:36 +0300 Subject: [PATCH 4/4] add test for buffer reuse --- .../test/scala/fs2/io/IoPlatformSuite.scala | 36 +++++++++++++++++++ io/shared/src/main/scala/fs2/io/io.scala | 7 ++-- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala b/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala index 38fb5cafee..0ff9a89646 100644 --- a/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala +++ b/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala @@ -40,6 +40,42 @@ class IoPlatformSuite extends Fs2Suite { // This suite runs for a long time, this avoids timeouts in CI. override def munitIOTimeout: Duration = 2.minutes + group("readInputStream") { + test("reuses internal buffer on smaller chunks") { + forAllF { (bytes: Array[Byte], chunkSize0: Int) => + val chunkSize = (chunkSize0 % 20).abs + 1 + fs2.Stream + .chunk(Chunk.array(bytes)) + .chunkN(chunkSize / 3 + 1) + .unchunks + .covary[IO] + // we know that '.toInputStream' reads by chunk + .through(fs2.io.toInputStream) + .flatMap(is => io.readInputStream(IO(is), chunkSize)) + .chunks + .zipWithPrevious + .assertForall { + case (None, _) => true // skip first element + case (_, _: Chunk.Singleton[_]) => true // skip singleton bytes + case (Some(_: Chunk.Singleton[_]), _) => true // skip singleton bytes + case (Some(Chunk.ArraySlice(bs1, o1, l1)), Chunk.ArraySlice(bs2, o2, _)) => + { + // if first slice buffer is not 'full' + (bs1.length != (o1 + l1)) && + // we expect that next slice will wrap same buffer + ((bs2 eq bs1) && (o2 == o1 + l1)) + } || { + // if first slice buffer is 'full' + (bs2.length == (o1 + l1)) && + // we expect new buffer allocated for next slice + ((bs2 ne bs1) && (o2 == 0)) + } + case _ => false // unexpected chunk subtype + } + } + } + } + group("readOutputStream") { test("writes data and terminates when `f` returns") { forAllF { (bytes: Array[Byte], chunkSize0: Int) => diff --git a/io/shared/src/main/scala/fs2/io/io.scala b/io/shared/src/main/scala/fs2/io/io.scala index ca9aad5dfb..6308cf0f05 100644 --- a/io/shared/src/main/scala/fs2/io/io.scala +++ b/io/shared/src/main/scala/fs2/io/io.scala @@ -85,11 +85,8 @@ package object io extends ioplatform { ): F[Option[(Chunk[Byte], Option[(Array[Byte], Int)])]] = read(is, buf, offset).map { numBytes => if (numBytes < 0) None - else if (numBytes == 0) Some(Chunk.empty -> Some(buf -> offset)) - else { - if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None) - else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes))) - } + else if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None) + else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes))) } private[fs2] def readInputStreamGeneric[F[_]](