From ae7ced1c51491df4cabee16b8bc42ab56aeca13e Mon Sep 17 00:00:00 2001 From: Alexey Shuksto Date: Fri, 20 Oct 2023 16:44:36 +0300 Subject: [PATCH] add test for buffer reuse --- io/shared/src/main/scala/fs2/io/io.scala | 7 ++-- io/shared/src/test/scala/fs2/io/IoSuite.scala | 34 +++++++++++++++++++ 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/io/shared/src/main/scala/fs2/io/io.scala b/io/shared/src/main/scala/fs2/io/io.scala index ca9aad5dfb..6308cf0f05 100644 --- a/io/shared/src/main/scala/fs2/io/io.scala +++ b/io/shared/src/main/scala/fs2/io/io.scala @@ -85,11 +85,8 @@ package object io extends ioplatform { ): F[Option[(Chunk[Byte], Option[(Array[Byte], Int)])]] = read(is, buf, offset).map { numBytes => if (numBytes < 0) None - else if (numBytes == 0) Some(Chunk.empty -> Some(buf -> offset)) - else { - if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None) - else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes))) - } + else if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None) + else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes))) } private[fs2] def readInputStreamGeneric[F[_]]( diff --git a/io/shared/src/test/scala/fs2/io/IoSuite.scala b/io/shared/src/test/scala/fs2/io/IoSuite.scala index 387d32adad..b7aa03ccae 100644 --- a/io/shared/src/test/scala/fs2/io/IoSuite.scala +++ b/io/shared/src/test/scala/fs2/io/IoSuite.scala @@ -51,6 +51,40 @@ class IoSuite extends io.Fs2IoSuite { .assertEquals(bytes.toVector) } } + + test("reuse") { + forAllF { (bytes: Array[Byte], chunkSize0: Int) => + val chunkSize = (chunkSize0 % 20).abs + 1 + fs2.Stream + .chunk(Chunk.array(bytes)) + .chunkN(chunkSize / 3 + 1) + .unchunks + .covary[IO] + // we know that '.toInputStream' reads by chunk + .through(fs2.io.toInputStream) + .flatMap(is => io.readInputStream(IO(is), chunkSize)) + .chunks + .zipWithPrevious + .assertForall { + case (None, _) => true // skip first element + case (_, _: Chunk.Singleton[_]) => true // skip singleton bytes + case (Some(_: Chunk.Singleton[_]), _) => true // skip singleton bytes + case (Some(Chunk.ArraySlice(bs1, o1, l1)), Chunk.ArraySlice(bs2, o2, _)) => + { + // if first slice buffer is not 'full' + (bs1.length != (o1 + l1)) && + // we expect that next slice will wrap same buffer + ((bs2 eq bs1) && (o2 == o1 + l1)) + } || { + // if first slice buffer is 'full' + (bs2.length == (o1 + l1)) && + // we expect new buffer allocated for next slice + ((bs2 ne bs1) && (o2 == 0)) + } + case _ => false // unexpected chunk subtype + } + } + } } group("unsafeReadInputStream") {