Skip to content

Commit

Permalink
Merge pull request #3318 from seigert/fix/io-read_input_stream-overal…
Browse files Browse the repository at this point in the history
…location

Fix 'fs.io.readInputStreamGeneric' overallocation of underlying buffers
  • Loading branch information
mpilquist authored Oct 28, 2023
2 parents 62d25fa + ca5b901 commit 93c87e9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 19 deletions.
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.file.Files.openSeekableByteChannel"
),
// package-private method: #3318
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"fs2.io.package.readInputStreamGeneric"
)
)

Expand Down
4 changes: 2 additions & 2 deletions io/jvm/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ private[fs2] trait ioplatform extends iojvmnative {
F.pure(System.in),
F.delay(new Array[Byte](bufSize)),
false
) { (is, buf) =>
) { (is, buf, off) =>
F.async[Int] { cb =>
F.delay {
val task: Runnable = () => cb(Right(is.read(buf)))
val task: Runnable = () => cb(Right(is.read(buf, off, buf.length - off)))
stdinExecutor.submit(task)
}.map { fut =>
Some(F.delay {
Expand Down
36 changes: 36 additions & 0 deletions io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,42 @@ class IoPlatformSuite extends Fs2Suite {
// This suite runs for a long time, this avoids timeouts in CI.
override def munitIOTimeout: Duration = 2.minutes

group("readInputStream") {
test("reuses internal buffer on smaller chunks") {
forAllF { (bytes: Array[Byte], chunkSize0: Int) =>
val chunkSize = (chunkSize0 % 20).abs + 1
fs2.Stream
.chunk(Chunk.array(bytes))
.chunkN(chunkSize / 3 + 1)
.unchunks
.covary[IO]
// we know that '.toInputStream' reads by chunk
.through(fs2.io.toInputStream)
.flatMap(is => io.readInputStream(IO(is), chunkSize))
.chunks
.zipWithPrevious
.assertForall {
case (None, _) => true // skip first element
case (_, _: Chunk.Singleton[_]) => true // skip singleton bytes
case (Some(_: Chunk.Singleton[_]), _) => true // skip singleton bytes
case (Some(Chunk.ArraySlice(bs1, o1, l1)), Chunk.ArraySlice(bs2, o2, _)) =>
{
// if first slice buffer is not 'full'
(bs1.length != (o1 + l1)) &&
// we expect that next slice will wrap same buffer
((bs2 eq bs1) && (o2 == o1 + l1))
} || {
// if first slice buffer is 'full'
(bs2.length == (o1 + l1)) &&
// we expect new buffer allocated for next slice
((bs2 ne bs1) && (o2 == 0))
}
case _ => false // unexpected chunk subtype
}
}
}
}

group("readOutputStream") {
test("writes data and terminates when `f` returns") {
forAllF { (bytes: Array[Byte], chunkSize0: Int) =>
Expand Down
31 changes: 14 additions & 17 deletions io/shared/src/main/scala/fs2/io/io.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ package object io extends ioplatform {
fis,
F.delay(new Array[Byte](chunkSize)),
closeAfterUse
)((is, buf) => F.blocking(is.read(buf)))
)((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)))

private[io] def readInputStreamCancelable[F[_]](
fis: F[InputStream],
Expand All @@ -57,7 +57,7 @@ package object io extends ioplatform {
fis,
F.delay(new Array[Byte](chunkSize)),
closeAfterUse
)((is, buf) => F.blocking(is.read(buf)).cancelable(cancel))
)((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)).cancelable(cancel))

/** Reads all bytes from the specified `InputStream` with a buffer size of `chunkSize`.
* Set `closeAfterUse` to false if the `InputStream` should not be closed after use.
Expand All @@ -76,31 +76,28 @@ package object io extends ioplatform {
fis,
F.pure(new Array[Byte](chunkSize)),
closeAfterUse
)((is, buf) => F.blocking(is.read(buf)))
)((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)))

private def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte])(
read: (InputStream, Array[Byte]) => F[Int]
private def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte], offset: Int)(
read: (InputStream, Array[Byte], Int) => F[Int]
)(implicit
F: Sync[F]
): F[Option[Chunk[Byte]]] =
read(is, buf).map { numBytes =>
): F[Option[(Chunk[Byte], Option[(Array[Byte], Int)])]] =
read(is, buf, offset).map { numBytes =>
if (numBytes < 0) None
else if (numBytes == 0) Some(Chunk.empty)
else if (numBytes < buf.size) Some(Chunk.array(buf, 0, numBytes))
else Some(Chunk.array(buf))
else if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None)
else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes)))
}

private[fs2] def readInputStreamGeneric[F[_]](
fis: F[InputStream],
buf: F[Array[Byte]],
closeAfterUse: Boolean
)(read: (InputStream, Array[Byte]) => F[Int])(implicit F: Sync[F]): Stream[F, Byte] = {
def useIs(is: InputStream) =
Stream
.eval(buf.flatMap(b => readBytesFromInputStream(is, b)(read)))
.repeat
.unNoneTerminate
.flatMap(c => Stream.chunk(c))
)(read: (InputStream, Array[Byte], Int) => F[Int])(implicit F: Sync[F]): Stream[F, Byte] = {
def useIs(is: InputStream) = Stream.unfoldChunkEval(Option.empty[(Array[Byte], Int)]) {
case None => buf.flatMap(b => readBytesFromInputStream(is, b, 0)(read))
case Some((b, offset)) => readBytesFromInputStream(is, b, offset)(read)
}

if (closeAfterUse)
Stream.bracket(fis)(is => Sync[F].blocking(is.close())).flatMap(useIs)
Expand Down

0 comments on commit 93c87e9

Please sign in to comment.