From 71bd06795cba606f8457dce020a62a7e696c5f27 Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Fri, 20 Oct 2023 16:44:36 +0300 Subject: [PATCH] add test for buffer reuse --- .../test/scala/fs2/io/IoPlatformSuite.scala | 36 +++++++++++++++++++ io/shared/src/main/scala/fs2/io/io.scala | 7 ++-- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala b/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala index 38fb5cafee..0ff9a89646 100644 --- a/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala +++ b/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala @@ -40,6 +40,42 @@ class IoPlatformSuite extends Fs2Suite { // This suite runs for a long time, this avoids timeouts in CI. override def munitIOTimeout: Duration = 2.minutes + group("readInputStream") { + test("reuses internal buffer on smaller chunks") { + forAllF { (bytes: Array[Byte], chunkSize0: Int) => + val chunkSize = (chunkSize0 % 20).abs + 1 + fs2.Stream + .chunk(Chunk.array(bytes)) + .chunkN(chunkSize / 3 + 1) + .unchunks + .covary[IO] + // we know that '.toInputStream' reads by chunk + .through(fs2.io.toInputStream) + .flatMap(is => io.readInputStream(IO(is), chunkSize)) + .chunks + .zipWithPrevious + .assertForall { + case (None, _) => true // skip first element + case (_, _: Chunk.Singleton[_]) => true // skip singleton bytes + case (Some(_: Chunk.Singleton[_]), _) => true // skip singleton bytes + case (Some(Chunk.ArraySlice(bs1, o1, l1)), Chunk.ArraySlice(bs2, o2, _)) => + { + // if first slice buffer is not 'full' + (bs1.length != (o1 + l1)) && + // we expect that next slice will wrap same buffer + ((bs2 eq bs1) && (o2 == o1 + l1)) + } || { + // if first slice buffer is 'full' + (bs2.length == (o1 + l1)) && + // we expect new buffer allocated for next slice + ((bs2 ne bs1) && (o2 == 0)) + } + case _ => false // unexpected chunk subtype + } + } + } + } + group("readOutputStream") { test("writes data and terminates when `f` returns") { forAllF { (bytes: Array[Byte], chunkSize0: Int) => diff --git a/io/shared/src/main/scala/fs2/io/io.scala b/io/shared/src/main/scala/fs2/io/io.scala index ca9aad5dfb..6308cf0f05 100644 --- a/io/shared/src/main/scala/fs2/io/io.scala +++ b/io/shared/src/main/scala/fs2/io/io.scala @@ -85,11 +85,8 @@ package object io extends ioplatform { ): F[Option[(Chunk[Byte], Option[(Array[Byte], Int)])]] = read(is, buf, offset).map { numBytes => if (numBytes < 0) None - else if (numBytes == 0) Some(Chunk.empty -> Some(buf -> offset)) - else { - if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None) - else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes))) - } + else if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None) + else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes))) } private[fs2] def readInputStreamGeneric[F[_]](