diff --git a/.scalafmt.conf b/.scalafmt.conf index d69db93533..75fe45e540 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = "3.7.17" +version = "3.8.0" style = default diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md deleted file mode 100644 index ea4a9b6ec2..0000000000 --- a/CODE_OF_CONDUCT.md +++ /dev/null @@ -1,14 +0,0 @@ -# Code of Conduct - -We are committed to providing a friendly, safe and welcoming environment for all, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, nationality, or other such characteristics. - -Everyone is expected to follow the [Scala Code of Conduct](https://www.scala-lang.org/conduct/) when discussing the project on the available communication channels. If you are being harassed, please contact us immediately so that we can support you. - -## Moderation - -Any questions, concerns, or moderation requests please contact a member of the project. - -- Michael Pilquist [Email](mailto:mpilquist@gmail.com) [Twitter](https://twitter.com/mpilquist) [GitHub](https://github.com/mpilquist) -- Pavel Chlupacek [GitHub](https://github.com/pchlupacek) -- Fabio Labella [GitHub](https://github.com/systemfw) - diff --git a/benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala b/benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala new file mode 100644 index 0000000000..335dda7d6f --- /dev/null +++ b/benchmark/src/main/scala/fs2/benchmark/FlowInteropBenchmark.scala @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package benchmark + +import cats.effect.IO +import cats.effect.unsafe.implicits.global + +import org.openjdk.jmh.annotations.{ + Benchmark, + BenchmarkMode, + Mode, + OutputTimeUnit, + Param, + Scope, + State +} + +import java.util.concurrent.TimeUnit +import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription} + +import scala.concurrent.Future + +@State(Scope.Thread) +@BenchmarkMode(Array(Mode.Throughput)) +@OutputTimeUnit(TimeUnit.SECONDS) +class FlowInteropBenchmark { + @Param(Array("1024", "5120", "10240")) + var totalElements: Int = _ + + @Param(Array("1000")) + var iterations: Int = _ + + @Benchmark + def fastPublisher(): Unit = { + def publisher = + new Publisher[Int] { + override final def subscribe(subscriber: Subscriber[? >: Int]): Unit = + subscriber.onSubscribe( + new Subscription { + @volatile var i: Int = 0 + @volatile var canceled: Boolean = false + + override final def request(n: Long): Unit = { + Future { + var j = 0 + while (j < n && i < totalElements && !canceled) { + subscriber.onNext(i) + i += 1 + j += 1 + } + + if (i == totalElements || canceled) { + subscriber.onComplete() + } + }(global.compute) + + // Discarding the Future so it runs in the background. + () + } + + override final def cancel(): Unit = + canceled = true + } + ) + } + + val stream = + interop.flow.fromPublisher[IO](publisher, chunkSize = 512) + + val program = + stream.compile.toVector + + program.replicateA_(iterations).unsafeRunSync() + } +} diff --git a/build.sbt b/build.sbt index c15269d0a0..4d3158d255 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._ Global / onChangedBuildSource := ReloadOnSourceChanges -ThisBuild / tlBaseVersion := "3.9" +ThisBuild / tlBaseVersion := "3.10" ThisBuild / organization := "co.fs2" ThisBuild / organizationName := "Functional Streams for Scala" @@ -11,7 +11,7 @@ ThisBuild / startYear := Some(2013) val Scala213 = "2.13.12" ThisBuild / scalaVersion := Scala213 -ThisBuild / crossScalaVersions := Seq("2.12.18", Scala213, "3.3.1") +ThisBuild / crossScalaVersions := Seq("2.12.19", Scala213, "3.3.3") ThisBuild / tlVersionIntroduced := Map("3" -> "3.0.3") ThisBuild / githubWorkflowOSes := Seq("ubuntu-latest") @@ -294,9 +294,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "org.scodec" %%% "scodec-bits" % "1.1.38", "org.typelevel" %%% "cats-core" % "2.10.0", - "org.typelevel" %%% "cats-effect" % "3.5.3", - "org.typelevel" %%% "cats-effect-laws" % "3.5.3" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.5.3" % Test, + "org.typelevel" %%% "cats-effect" % "3.5.4", + "org.typelevel" %%% "cats-effect-laws" % "3.5.4" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.5.4" % Test, "org.typelevel" %%% "cats-laws" % "2.10.0" % Test, "org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test, "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M4" % Test, @@ -354,7 +354,7 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform) .jvmSettings( Test / fork := true, libraryDependencies ++= Seq( - "com.github.jnr" % "jnr-unixsocket" % "0.38.21" % Optional, + "com.github.jnr" % "jnr-unixsocket" % "0.38.22" % Optional, "com.google.jimfs" % "jimfs" % "1.3.0" % Test ) ) diff --git a/core/shared/src/main/scala-2.12/fs2/ChunkPlatform.scala b/core/shared/src/main/scala-2.12/fs2/ChunkPlatform.scala index 041bffd2c0..a0d4461198 100644 --- a/core/shared/src/main/scala-2.12/fs2/ChunkPlatform.scala +++ b/core/shared/src/main/scala-2.12/fs2/ChunkPlatform.scala @@ -31,7 +31,7 @@ import scala.reflect.ClassTag private[fs2] trait ChunkPlatform[+O] { self: Chunk[O] => - def asSeqPlatform: Option[IndexedSeq[O]] = + private[fs2] def asSeqPlatform: Option[IndexedSeq[O]] = None } diff --git a/core/shared/src/main/scala-2.13/fs2/ChunkPlatform.scala b/core/shared/src/main/scala-2.13/fs2/ChunkPlatform.scala index e7f2353c98..44e5ce4e76 100644 --- a/core/shared/src/main/scala-2.13/fs2/ChunkPlatform.scala +++ b/core/shared/src/main/scala-2.13/fs2/ChunkPlatform.scala @@ -26,7 +26,7 @@ import scala.collection.immutable.ArraySeq private[fs2] trait ChunkPlatform[+O] extends Chunk213And3Compat[O] { self: Chunk[O] => - def asSeqPlatform: Option[IndexedSeq[O]] = + private[fs2] def asSeqPlatform: Option[IndexedSeq[O]] = this match { case arraySlice: Chunk.ArraySlice[?] => Some( diff --git a/core/shared/src/main/scala-3/fs2/ChunkPlatform.scala b/core/shared/src/main/scala-3/fs2/ChunkPlatform.scala index 3ab49ed656..1fb816e0e4 100644 --- a/core/shared/src/main/scala-3/fs2/ChunkPlatform.scala +++ b/core/shared/src/main/scala-3/fs2/ChunkPlatform.scala @@ -39,7 +39,7 @@ private[fs2] trait ChunkPlatform[+O] extends Chunk213And3Compat[O] { case _ => new Chunk.IArraySlice(IArray.unsafeFromArray(toArray(ct)), 0, size) } - def asSeqPlatform: Option[IndexedSeq[O]] = + private[fs2] def asSeqPlatform: Option[IndexedSeq[O]] = this match { case arraySlice: Chunk.ArraySlice[_] => Some( diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index b4c69671fa..d1aea979a7 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -568,6 +568,48 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Stream.eval(fstream) } + /** Pulls up to the specified number of chunks from the source stream while concurrently allowing + * downstream to process emitted chunks. Unlike `prefetchN`, all accumulated chunks are emitted + * as a single chunk upon downstream pulling. + * + * The `chunkLimit` parameter controls backpressure on the source stream. + */ + def conflateChunks[F2[x] >: F[x]: Concurrent](chunkLimit: Int): Stream[F2, Chunk[O]] = + Stream.eval(Channel.bounded[F2, Chunk[O]](chunkLimit)).flatMap { chan => + val producer = chunks.through(chan.sendAll) + val consumer = chan.stream.chunks.map(_.combineAll) + consumer.concurrently(producer) + } + + /** Like `conflateChunks` but uses the supplied `zero` and `f` values to combine the elements of + * each output chunk in to a single value. + */ + def conflate[F2[x] >: F[x]: Concurrent, O2](chunkLimit: Int, zero: O2)( + f: (O2, O) => O2 + ): Stream[F2, O2] = + conflateChunks[F2](chunkLimit).map(_.foldLeft(zero)(f)) + + /** Like `conflate` but combines elements of the output chunk with the supplied function. + */ + def conflate1[F2[x] >: F[x]: Concurrent, O2 >: O](chunkLimit: Int)( + f: (O2, O2) => O2 + ): Stream[F2, O2] = + conflateChunks[F2](chunkLimit).map(_.iterator.reduce(f)) + + /** Like `conflate1` but combines elements using the semigroup of the output type. + */ + def conflateSemigroup[F2[x] >: F[x]: Concurrent, O2 >: O: Semigroup]( + chunkLimit: Int + ): Stream[F2, O2] = + conflate1[F2, O2](chunkLimit)(Semigroup[O2].combine) + + /** Conflates elements and then maps the supplied function over the output chunk and combines the results using a semigroup. + */ + def conflateMap[F2[x] >: F[x]: Concurrent, O2: Semigroup]( + chunkLimit: Int + )(f: O => O2): Stream[F2, O2] = + conflateChunks[F2](chunkLimit).map(_.iterator.map(f).reduce(Semigroup[O2].combine)) + /** Prepends a chunk onto the front of this stream. * * @example {{{ @@ -2398,7 +2440,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan => chan.stream.unchunks.concurrently { chunks.through(chan.sendAll) - } } @@ -3605,9 +3646,13 @@ object Stream extends StreamLowPriority { def getNextChunk(i: Iterator[A]): F[Option[(Chunk[A], Iterator[A])]] = F.suspend(hint) { - for (_ <- 1 to chunkSize if i.hasNext) yield i.next() - }.map { s => - if (s.isEmpty) None else Some((Chunk.from(s), i)) + val bldr = Vector.newBuilder[A] + var cnt = 0 + while (cnt < chunkSize && i.hasNext) { + bldr += i.next() + cnt += 1 + } + if (cnt == 0) None else Some((Chunk.from(bldr.result()), i)) } Stream.unfoldChunkEval(iterator)(getNextChunk) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 0a9de56988..9feb2d65d9 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -704,16 +704,18 @@ class StreamCombinatorsSuite extends Fs2Suite { } test("fromIterator") { - forAllF { (x: List[Int], cs: Int) => + // Note: important to use Vector here and not List in order to prevent https://github.com/typelevel/fs2/issues/3415 + forAllF { (x: Vector[Int], cs: Int) => val chunkSize = (cs % 4096).abs + 1 - Stream.fromIterator[IO](x.iterator, chunkSize).assertEmits(x) + Stream.fromIterator[IO](x.iterator, chunkSize).assertEmits(x.toList) } } test("fromBlockingIterator") { - forAllF { (x: List[Int], cs: Int) => + // Note: important to use Vector here and not List in order to prevent https://github.com/typelevel/fs2/issues/3415 + forAllF { (x: Vector[Int], cs: Int) => val chunkSize = (cs % 4096).abs + 1 - Stream.fromBlockingIterator[IO](x.iterator, chunkSize).assertEmits(x) + Stream.fromBlockingIterator[IO](x.iterator, chunkSize).assertEmits(x.toList) } } diff --git a/core/shared/src/test/scala/fs2/StreamConflateSuite.scala b/core/shared/src/test/scala/fs2/StreamConflateSuite.scala new file mode 100644 index 0000000000..c50b7c4451 --- /dev/null +++ b/core/shared/src/test/scala/fs2/StreamConflateSuite.scala @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 + +import cats.effect.IO +import cats.effect.testkit.TestControl + +import scala.concurrent.duration._ + +class StreamConflateSuite extends Fs2Suite { + + test("conflateMap") { + TestControl.executeEmbed( + Stream + .iterate(0)(_ + 1) + .covary[IO] + .metered(10.millis) + .conflateMap(100)(List(_)) + .metered(101.millis) + .take(5) + .compile + .toList + .assertEquals( + List(0) :: (1 until 10).toList :: 10.until(40).toList.grouped(10).toList + ) + ) + } +} diff --git a/flake.lock b/flake.lock index f60b5e27fc..80c450b76b 100644 --- a/flake.lock +++ b/flake.lock @@ -2,15 +2,15 @@ "nodes": { "devshell": { "inputs": { - "nixpkgs": "nixpkgs", - "systems": "systems" + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" }, "locked": { - "lastModified": 1698410321, - "narHash": "sha256-MphuSlgpmKwtJncGMohryHiK55J1n6WzVQ/OAfmfoMc=", + "lastModified": 1708939976, + "narHash": "sha256-O5+nFozxz2Vubpdl1YZtPrilcIXPcRAjqNdNE8oCRoA=", "owner": "numtide", "repo": "devshell", - "rev": "1aed986e3c81a4f6698e85a7452cbfcc4b31a36e", + "rev": "5ddecd67edbd568ebe0a55905273e56cc82aabe3", "type": "github" }, "original": { @@ -20,15 +20,33 @@ } }, "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1701680307, + "narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "4022d587cbbfd70fe950c1e2083a02621806a725", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "flake-utils_2": { "inputs": { "systems": "systems_2" }, "locked": { - "lastModified": 1694529238, - "narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=", + "lastModified": 1709126324, + "narHash": "sha256-q6EQdSeUZOG26WelxqkmR7kArjgWCdw5sfJVHPH/7j8=", "owner": "numtide", "repo": "flake-utils", - "rev": "ff7b65b44d01cf9ba6a71320833626af21126384", + "rev": "d465f4819400de7c8d874d50b982301f28a84605", "type": "github" }, "original": { @@ -39,11 +57,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1677383253, - "narHash": "sha256-UfpzWfSxkfXHnb4boXZNaKsAcUrZT9Hw+tao1oZxd08=", + "lastModified": 1704161960, + "narHash": "sha256-QGua89Pmq+FBAro8NriTuoO/wNaUtugt29/qqA8zeeM=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "9952d6bc395f5841262b006fbace8dd7e143b634", + "rev": "63143ac2c9186be6d9da6035fa22620018c85932", "type": "github" }, "original": { @@ -55,11 +73,11 @@ }, "nixpkgs_2": { "locked": { - "lastModified": 1698553279, - "narHash": "sha256-T/9P8yBSLcqo/v+FTOBK+0rjzjPMctVymZydbvR/Fak=", + "lastModified": 1710097495, + "narHash": "sha256-B7Ea7q7hU7SE8wOPJ9oXEBjvB89yl2csaLjf5v/7jr8=", "owner": "nixos", "repo": "nixpkgs", - "rev": "90e85bc7c1a6fc0760a94ace129d3a1c61c3d035", + "rev": "d40e866b1f98698d454dad8f592fe7616ff705a4", "type": "github" }, "original": { @@ -115,15 +133,15 @@ "typelevel-nix": { "inputs": { "devshell": "devshell", - "flake-utils": "flake-utils", + "flake-utils": "flake-utils_2", "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1698638641, - "narHash": "sha256-piIhN6bnCGd3yVB4M17SOX/Ej9zq4Q4+KzzWZRRPaVs=", + "lastModified": 1710188850, + "narHash": "sha256-KbNmyxEvcnq5h/wfeL1ZxO9RwoNRjJ0IgYlUZpdSlLo=", "owner": "typelevel", "repo": "typelevel-nix", - "rev": "5c281f550dcb73eafe7341767d384bcdc3ec14fd", + "rev": "60c3868688cb8f5f7ebc781f6e122c061ae35d4d", "type": "github" }, "original": { diff --git a/io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala b/io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala index 50f8934d6a..19fd4effc2 100644 --- a/io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala +++ b/io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala @@ -29,7 +29,13 @@ import cats.syntax.all._ import com.comcast.ip4s._ +import scala.concurrent.duration._ + class UdpSuite extends Fs2Suite with UdpSuitePlatform { + def sendAndReceive(socket: DatagramSocket[IO], toSend: Datagram): IO[Datagram] = + socket + .write(toSend) >> socket.read.timeoutTo(1.second, IO.defer(sendAndReceive(socket, toSend))) + group("udp") { test("echo one") { val msg = Chunk.array("Hello, world!".getBytes) @@ -38,15 +44,11 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform { .flatMap { serverSocket => Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort => val serverAddress = SocketAddress(ip"127.0.0.1", serverPort) - val server = serverSocket.reads - .evalMap(packet => serverSocket.write(packet)) - .drain - val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket => - Stream(Datagram(serverAddress, msg)) - .through(clientSocket.writes) - .drain ++ Stream.eval(clientSocket.read) + val server = serverSocket.reads.foreach(packet => serverSocket.write(packet)) + val client = Stream.resource(Network[IO].openDatagramSocket()).evalMap { clientSocket => + sendAndReceive(clientSocket, Datagram(serverAddress, msg)) } - server.mergeHaltBoth(client) + client.concurrently(server) } } .compile @@ -69,21 +71,17 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform { .flatMap { serverSocket => Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort => val serverAddress = SocketAddress(ip"127.0.0.1", serverPort) - val server = serverSocket.reads - .evalMap(packet => serverSocket.write(packet)) - .drain + val server = serverSocket.reads.foreach(packet => serverSocket.write(packet)) val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket => Stream .emits(msgs.map(msg => Datagram(serverAddress, msg))) - .flatMap { msg => - Stream.exec(clientSocket.write(msg)) ++ Stream.eval(clientSocket.read) - } + .evalMap(msg => sendAndReceive(clientSocket, msg)) } val clients = Stream .constant(client) .take(numClients.toLong) .parJoin(numParallelClients) - server.mergeHaltBoth(clients) + clients.concurrently(server) } } .compile @@ -110,15 +108,13 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform { .exec( v4Interfaces.traverse_(interface => serverSocket.join(groupJoin, interface)) ) ++ - serverSocket.reads - .evalMap(packet => serverSocket.write(packet)) - .drain + serverSocket.reads.foreach(packet => serverSocket.write(packet)) val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket => Stream(Datagram(SocketAddress(group.address, serverPort), msg)) .through(clientSocket.writes) .drain ++ Stream.eval(clientSocket.read) } - server.mergeHaltBoth(client) + client.concurrently(server) } } .compile diff --git a/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala b/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala index 11938f6687..6fee6275c1 100644 --- a/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala +++ b/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala @@ -23,8 +23,8 @@ package fs2 package io package file -import cats.effect.kernel.Async -import cats.effect.kernel.Resource +import cats.Traverse +import cats.effect.kernel.{Async, Resource} import cats.syntax.all._ import fs2.io.file.Files.UnsealedFiles import fs2.io.internal.facade @@ -175,7 +175,7 @@ private[fs2] trait FilesCompanionPlatform { ) ).adaptError { case IOException(ex) => ex } else - walk(path, Int.MaxValue, true).evalTap(deleteIfExists).compile.drain + walk(path, WalkOptions.Default.withFollowLinks(true)).evalTap(deleteIfExists).compile.drain override def exists(path: Path, followLinks: Boolean): F[Boolean] = (if (followLinks) @@ -369,6 +369,54 @@ private[fs2] trait FilesCompanionPlatform { override def size(path: Path): F[Long] = stat(path).map(_.size.toString.toLong) + override def walkWithAttributes(start: Path, options: WalkOptions): Stream[F, PathInfo] = { + + def go( + start: Path, + maxDepth: Int, + ancestry: List[Either[Path, FileKey]] + ): Stream[F, PathInfo] = + Stream.eval(getBasicFileAttributes(start, followLinks = false)).mask.flatMap { attr => + Stream.emit(PathInfo(start, attr)) ++ { + if (maxDepth == 0) Stream.empty + else if (attr.isDirectory) + list(start).mask.flatMap { path => + go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry) + } + else if (attr.isSymbolicLink && options.followLinks) + Stream.eval(getBasicFileAttributes(start, followLinks = true)).mask.flatMap { attr => + val fileKey = attr.fileKey + val isCycle = Traverse[List].existsM(ancestry) { + case Right(ancestorKey) => F.pure(fileKey.contains(ancestorKey)) + case Left(ancestorPath) => isSameFile(start, ancestorPath) + } + + Stream.eval(isCycle).flatMap { isCycle => + if (!isCycle) + list(start).mask.flatMap { path => + go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry) + } + else if (options.allowCycles) + Stream.empty + else + Stream.raiseError(new FileSystemLoopException(start.toString)) + } + + } + else + Stream.empty + } + } + + go( + start, + options.maxDepth, + Nil + ) + .chunkN(options.chunkSize) + .flatMap(Stream.chunk) + } + override def writeAll(path: Path, _flags: Flags): Pipe[F, Byte, Nothing] = in => in.through { diff --git a/io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala index 590da203af..2b0af08b81 100644 --- a/io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala @@ -27,20 +27,21 @@ import cats.effect.kernel.{Async, Resource, Sync} import cats.syntax.all._ import java.nio.channels.{FileChannel, SeekableByteChannel} -import java.nio.file.{Files => JFiles, Path => JPath, _} +import java.nio.file.{Files => JFiles, Path => JPath, FileSystemLoopException => _, _} import java.nio.file.attribute.{ BasicFileAttributeView, BasicFileAttributes => JBasicFileAttributes, PosixFileAttributes => JPosixFileAttributes, - PosixFilePermissions + PosixFilePermissions, + FileTime } import java.security.Principal import java.util.stream.{Stream => JStream} import scala.concurrent.duration._ +import scala.util.control.NonFatal import fs2.io.CollectionCompat._ -import java.nio.file.attribute.FileTime private[file] trait FilesPlatform[F[_]] extends DeprecatedFilesApi[F] { self: Files[F] => @@ -389,6 +390,102 @@ private[file] trait FilesCompanionPlatform { .resource(Resource.fromAutoCloseable(javaCollection)) .flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize)) + private case class WalkEntry( + path: Path, + attr: JBasicFileAttributes, + depth: Int, + ancestry: List[Either[Path, NioFileKey]] + ) + + override def walkWithAttributes( + start: Path, + options: WalkOptions + ): Stream[F, PathInfo] = { + import scala.collection.immutable.Queue + + def loop(toWalk0: Queue[WalkEntry]): Stream[F, PathInfo] = { + val partialWalk = Sync[F].interruptible { + var acc = Vector.empty[PathInfo] + var toWalk = toWalk0 + + while (acc.size < options.chunkSize && toWalk.nonEmpty && !Thread.interrupted()) { + val entry = toWalk.head + toWalk = toWalk.drop(1) + acc = acc :+ PathInfo(entry.path, new DelegatingBasicFileAttributes(entry.attr)) + if (entry.depth < options.maxDepth) { + val dir = + if (entry.attr.isDirectory) entry.path + else if (options.followLinks && entry.attr.isSymbolicLink) { + try { + val targetAttr = + JFiles.readAttributes(entry.path.toNioPath, classOf[JBasicFileAttributes]) + val fileKey = Option(targetAttr.fileKey).map(NioFileKey(_)) + val isCycle = entry.ancestry.exists { + case Right(ancestorKey) => + fileKey.contains(ancestorKey) + case Left(ancestorPath) => + JFiles.isSameFile(entry.path.toNioPath, ancestorPath.toNioPath) + } + if (isCycle) + if (options.allowCycles) null + else throw new FileSystemLoopException(entry.path.toString) + else entry.path + } catch { + case t: FileSystemLoopException => throw t + case NonFatal(_) => null + } + } else null + if (dir ne null) { + try { + val listing = JFiles.list(dir.toNioPath) + try { + val descendants = listing.iterator.asScala.flatMap { p => + try + Some( + WalkEntry( + Path.fromNioPath(p), + JFiles.readAttributes( + p, + classOf[JBasicFileAttributes], + LinkOption.NOFOLLOW_LINKS + ), + entry.depth + 1, + Option(entry.attr.fileKey) + .map(NioFileKey(_)) + .toRight(entry.path) :: entry.ancestry + ) + ) + catch { + case NonFatal(_) => None + } + } + toWalk = Queue.empty ++ descendants ++ toWalk + } finally listing.close() + } catch { + case NonFatal(_) => () + } + } + } + } + + Stream.chunk(Chunk.from(acc)) ++ (if (toWalk.isEmpty) Stream.empty else loop(toWalk)) + } + Stream.eval(partialWalk).flatten + } + + Stream + .eval(Sync[F].interruptible { + WalkEntry( + start, + JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes]), + 0, + Nil + ) + }) + .mask + .flatMap(w => loop(Queue(w))) + } + def createWatcher: Resource[F, Watcher[F]] = Watcher.default(this, F) def watch( diff --git a/io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala b/io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala index d67248ed66..559328084d 100644 --- a/io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala +++ b/io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala @@ -30,6 +30,7 @@ import java.nio.file.{Files => JFiles, Path => JPath, _} import java.nio.file.attribute.{BasicFileAttributes => JBasicFileAttributes} import scala.concurrent.duration._ +import scala.util.control.NonFatal trait BaseFileSuite extends Fs2Suite { @@ -77,11 +78,15 @@ trait BaseFileSuite extends Fs2Suite { dir.toNioPath, new SimpleFileVisitor[JPath] { override def visitFile(path: JPath, attrs: JBasicFileAttributes) = { - JFiles.delete(path) + try JFiles.deleteIfExists(path) + catch { case NonFatal(_) => () } FileVisitResult.CONTINUE } + override def visitFileFailed(path: JPath, e: IOException) = + FileVisitResult.CONTINUE override def postVisitDirectory(path: JPath, e: IOException) = { - JFiles.delete(path) + try JFiles.deleteIfExists(path) + catch { case NonFatal(_) => () } FileVisitResult.CONTINUE } } diff --git a/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala b/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala new file mode 100644 index 0000000000..d6a8bb71ec --- /dev/null +++ b/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package file + +import cats.effect.IO +import java.io.File +import scala.concurrent.duration.* + +class WalkBenchmark extends Fs2IoSuite { + + override def munitIOTimeout = 5.minutes + + private var target: Path = _ + + override def beforeAll() = { + super.beforeAll() + val file = File.createTempFile("fs2-benchmarks-", "-walk") + file.delete() + file.mkdir() + target = Path(file.toString) + + val MaxDepth = 7 + val Names = 'A'.to('E').toList.map(_.toString) + + def loop(cwd: File, depth: Int): Unit = + if (depth < MaxDepth) { + Names.foreach { name => + val sub = new File(cwd, name) + sub.mkdir() + loop(sub, depth + 1) + } + } else if (depth == MaxDepth) { + Names.foreach { name => + val sub = new File(cwd, name) + sub.createNewFile() + loop(sub, depth + 1) + } + } + + loop(file, 0) + } + + def time[A](f: => A): FiniteDuration = { + val start = System.nanoTime() + val _ = f + (System.nanoTime() - start).nanos + } + + test("Files.walk has similar performance to java.nio.file.Files.walk") { + val fs2Time = time( + Files[IO] + .walk(target) + .compile + .count + .unsafeRunSync() + ) + val fs2EagerTime = time( + Files[IO] + .walk(target, WalkOptions.Eager) + .compile + .count + .unsafeRunSync() + ) + val nioTime = time(java.nio.file.Files.walk(target.toNioPath).count()) + val epsilon = nioTime.toNanos * 1.5 + println(s"fs2 took: ${fs2Time.toMillis} ms") + println(s"fs2 eager took: ${fs2EagerTime.toMillis} ms") + println(s"nio took: ${nioTime.toMillis} ms") + assert( + (fs2Time - nioTime).toNanos.abs < epsilon, + s"fs2 time: $fs2Time, nio time: $nioTime, diff: ${fs2Time - nioTime}" + ) + } + + test("walk is interruptible") { + val elapsed = time( + Files[IO] + .walk(target) + .interruptAfter(1.second) + .compile + .count + .unsafeRunSync() + ) + assert(elapsed < 1250.milliseconds) + } + + test("walk eager is interruptible") { + val elapsed = time( + Files[IO] + .walk(target, WalkOptions.Eager) + .interruptAfter(1.second) + .compile + .count + .unsafeRunSync() + ) + assert(elapsed < 1250.milliseconds) + } +} diff --git a/io/shared/src/main/scala/fs2/io/file/Files.scala b/io/shared/src/main/scala/fs2/io/file/Files.scala index 61f3bb1c36..ae618aba1d 100644 --- a/io/shared/src/main/scala/fs2/io/file/Files.scala +++ b/io/shared/src/main/scala/fs2/io/file/Files.scala @@ -31,7 +31,6 @@ import cats.effect.std.Hotswap import cats.syntax.all._ import scala.concurrent.duration._ -import cats.Traverse /** Provides operations related to working with files in the effect `F`. * @@ -375,11 +374,31 @@ sealed trait Files[F[_]] extends FilesPlatform[F] { /** Creates a stream of paths contained in a given file tree. Depth is unlimited. */ def walk(start: Path): Stream[F, Path] = - walk(start, Int.MaxValue, false) + walk(start, WalkOptions.Default) + + /** Creates a stream of paths contained in a given file tree. + * + * The `options` parameter allows for customizing the walk behavior. The `WalkOptions` + * type provides both `WalkOptions.Default` and `WalkOptions.Eager` as starting points, + * and further customizations can be specified via methods on the returned options value. + * For example, to eagerly walk a directory while following symbolic links, emitting all + * paths as a single chunk, use `walk(start, WalkOptions.Eager.withFollowLinks(true))`. + */ + def walk(start: Path, options: WalkOptions): Stream[F, Path] = + walkWithAttributes(start, options).map(_.path) /** Creates a stream of paths contained in a given file tree down to a given depth. */ - def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] + @deprecated("Use walk(start, WalkOptions.Default.withMaxDepth(..).withFollowLinks(..))", "3.10") + def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] = + walk(start, WalkOptions.Default.withMaxDepth(maxDepth).withFollowLinks(followLinks)) + + /** Like `walk` but returns a `PathInfo`, which provides both the `Path` and `BasicFileAttributes`. */ + def walkWithAttributes(start: Path): Stream[F, PathInfo] = + walkWithAttributes(start, WalkOptions.Default) + + /** Like `walk` but returns a `PathInfo`, which provides both the `Path` and `BasicFileAttributes`. */ + def walkWithAttributes(start: Path, options: WalkOptions): Stream[F, PathInfo] /** Writes all data to the file at the specified path. * @@ -505,44 +524,6 @@ object Files extends FilesCompanionPlatform with FilesLowPriority { case _: NoSuchFileException => () }) - def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] = { - - def go(start: Path, maxDepth: Int, ancestry: List[Either[Path, FileKey]]): Stream[F, Path] = - Stream.emit(start) ++ { - if (maxDepth == 0) Stream.empty - else - Stream.eval(getBasicFileAttributes(start, followLinks = false)).mask.flatMap { attr => - if (attr.isDirectory) - list(start).mask.flatMap { path => - go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry) - } - else if (attr.isSymbolicLink && followLinks) - Stream.eval(getBasicFileAttributes(start, followLinks = true)).mask.flatMap { - attr => - val fileKey = attr.fileKey - val isCycle = Traverse[List].existsM(ancestry) { - case Right(ancestorKey) => F.pure(fileKey.contains(ancestorKey)) - case Left(ancestorPath) => isSameFile(start, ancestorPath) - } - - Stream.eval(isCycle).flatMap { isCycle => - if (!isCycle) - list(start).mask.flatMap { path => - go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry) - } - else - Stream.raiseError(new FileSystemLoopException(start.toString)) - } - - } - else - Stream.empty - } - } - - Stream.eval(getBasicFileAttributes(start, followLinks)) >> go(start, maxDepth, Nil) - } - def writeAll( path: Path, flags: Flags diff --git a/io/shared/src/main/scala/fs2/io/file/PathInfo.scala b/io/shared/src/main/scala/fs2/io/file/PathInfo.scala new file mode 100644 index 0000000000..fe42791aa0 --- /dev/null +++ b/io/shared/src/main/scala/fs2/io/file/PathInfo.scala @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.file + +/** Provides a `Path` and its associated `BasicFileAttributes`. */ +case class PathInfo(path: Path, attributes: BasicFileAttributes) diff --git a/io/shared/src/main/scala/fs2/io/file/Permissions.scala b/io/shared/src/main/scala/fs2/io/file/Permissions.scala index f8f7772601..abff52ce75 100644 --- a/io/shared/src/main/scala/fs2/io/file/Permissions.scala +++ b/io/shared/src/main/scala/fs2/io/file/Permissions.scala @@ -55,6 +55,12 @@ import PosixPermission._ */ final class PosixPermissions private (val value: Int) extends Permissions { + def add(p: PosixPermission): PosixPermissions = + new PosixPermissions(value | p.value) + + def remove(p: PosixPermission): PosixPermissions = + new PosixPermissions(value ^ p.value) + override def equals(that: Any): Boolean = that match { case other: PosixPermissions => value == other.value case _ => false diff --git a/io/shared/src/main/scala/fs2/io/file/WalkOptions.scala b/io/shared/src/main/scala/fs2/io/file/WalkOptions.scala new file mode 100644 index 0000000000..1a05c28709 --- /dev/null +++ b/io/shared/src/main/scala/fs2/io/file/WalkOptions.scala @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package file + +/** Options that customize a filesystem walk via `Files[F].walk`. */ +sealed trait WalkOptions { + + /** Size of chunks emitted from the walk. + * + * Implementations *may* use this for optimization, batching file system operations. + * + * A chunk size of 1 hints to the implementation to use the maximally laziness in + * file system access, emitting a single path at a time. + * + * A chunk size of `Int.MaxValue` hints to the implementation to perform all file system + * operations at once, emitting a single chunk with all paths. + */ + def chunkSize: Int + + /** Maximum depth to walk. A value of 0 results in emitting just the starting path. + * A value of 1 results in emitting the starting path and all direct descendants. + */ + def maxDepth: Int + + /** Indicates whether links are followed during the walk. If false, the path of + * each link is emitted. If true, links are followed and their contents are emitted. + */ + def followLinks: Boolean + + /** Indicates whether to allow cycles when following links. If true, any link causing a + * cycle is emitted as the link path. If false, a cycle results in walk failing with a `FileSystemLoopException`. + */ + def allowCycles: Boolean + + /** Returns a new `WalkOptions` with the specified chunk size. */ + def withChunkSize(chunkSize: Int): WalkOptions + + /** Returns a new `WalkOptions` with the specified max depth. */ + def withMaxDepth(maxDepth: Int): WalkOptions + + /** Returns a new `WalkOptions` with the specified value for `followLinks`. */ + def withFollowLinks(value: Boolean): WalkOptions + + /** Returns a new `WalkOptions` with the specified value for `allowCycles`. */ + def withAllowCycles(value: Boolean): WalkOptions +} + +object WalkOptions { + private case class DefaultWalkOptions( + chunkSize: Int, + maxDepth: Int, + followLinks: Boolean, + allowCycles: Boolean + ) extends WalkOptions { + def withChunkSize(chunkSize: Int): WalkOptions = copy(chunkSize = chunkSize) + def withMaxDepth(maxDepth: Int): WalkOptions = copy(maxDepth = maxDepth) + def withFollowLinks(value: Boolean): WalkOptions = copy(followLinks = value) + def withAllowCycles(value: Boolean): WalkOptions = copy(allowCycles = value) + override def toString = + s"WalkOptions(chunkSize = $chunkSize, maxDepth = $maxDepth, followLinks = $followLinks, allowCycles = $allowCycles)" + } + + /** Default walk options, using a large chunk size, unlimited depth, and no link following. */ + val Default: WalkOptions = DefaultWalkOptions(4096, Int.MaxValue, false, false) + + /** Like `Default` but uses the maximum chunk size, hinting the implementation should perform all file system operations before emitting any paths. */ + val Eager: WalkOptions = Default.withChunkSize(Int.MaxValue) + + /** Like `Default` but uses the minimum chunk size, hinting the implementation should perform minumum number of file system operations before emitting each path. */ + val Lazy: WalkOptions = Default.withChunkSize(1) +} diff --git a/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala b/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala index bc0048b22b..bc1359373b 100644 --- a/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala +++ b/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala @@ -568,10 +568,9 @@ class FilesSuite extends Fs2IoSuite with BaseFileSuite { Stream .resource(tempFilesHierarchy) .flatMap(topDir => Files[IO].walk(topDir)) - .map(_ => 1) .compile - .foldMonoid - .assertEquals(31) // the root + 5 children + 5 files per child directory + .count + .assertEquals(31L) // the root + 5 children + 5 files per child directory } test("can delete files in a nested tree") { @@ -591,6 +590,122 @@ class FilesSuite extends Fs2IoSuite with BaseFileSuite { .foldMonoid .assertEquals(25) } + + test("maxDepth = 0") { + Stream + .resource(tempFilesHierarchy) + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withMaxDepth(0))) + .compile + .count + .assertEquals(1L) // the root + } + + test("maxDepth = 1") { + Stream + .resource(tempFilesHierarchy) + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withMaxDepth(1))) + .compile + .count + .assertEquals(6L) // the root + 5 children + } + + test("maxDepth = 1 / eager") { + Stream + .resource(tempFilesHierarchy) + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Eager.withMaxDepth(1))) + .compile + .count + .assertEquals(6L) // the root + 5 children + } + + test("maxDepth = 2") { + Stream + .resource(tempFilesHierarchy) + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withMaxDepth(2))) + .compile + .count + .assertEquals(31L) // the root + 5 children + 5 files per child directory + } + + test("followLinks = true") { + Stream + .resource((tempFilesHierarchy, tempFilesHierarchy).tupled) + .evalMap { case (topDir, secondDir) => + Files[IO].createSymbolicLink(topDir / "link", secondDir).as(topDir) + } + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withFollowLinks(true))) + .compile + .count + .assertEquals(31L * 2) + } + + test("followLinks = false") { + Stream + .resource((tempFilesHierarchy, tempFilesHierarchy).tupled) + .evalMap { case (topDir, secondDir) => + Files[IO].createSymbolicLink(topDir / "link", secondDir).as(topDir) + } + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default)) + .compile + .count + .assertEquals(32L) + } + + test("followLinks with cycle") { + Stream + .resource(tempFilesHierarchy) + .evalTap { topDir => + Files[IO].createSymbolicLink(topDir / "link", topDir) + } + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withFollowLinks(true))) + .compile + .count + .intercept[FileSystemLoopException] + } + + test("followLinks with cycle / eager") { + Stream + .resource(tempFilesHierarchy) + .evalTap { topDir => + Files[IO].createSymbolicLink(topDir / "link", topDir) + } + .flatMap(topDir => + Files[IO] + .walk(topDir, WalkOptions.Eager.withFollowLinks(true)) + ) + .compile + .count + .intercept[FileSystemLoopException] + } + + test("followLinks with cycle / cycles allowed") { + Stream + .resource(tempFilesHierarchy) + .evalTap { topDir => + Files[IO].createSymbolicLink(topDir / "link", topDir) + } + .flatMap(topDir => + Files[IO].walk(topDir, WalkOptions.Default.withFollowLinks(true).withAllowCycles(true)) + ) + .compile + .count + .assertEquals(32L) + } + + test("followLinks with cycle / eager / cycles allowed") { + Stream + .resource(tempFilesHierarchy) + .evalTap { topDir => + Files[IO].createSymbolicLink(topDir / "link", topDir) + } + .flatMap(topDir => + Files[IO] + .walk(topDir, WalkOptions.Eager.withFollowLinks(true).withAllowCycles(true)) + ) + .compile + .count + .assertEquals(32L) + } } test("writeRotate") { diff --git a/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala b/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala index 95692655a5..a91829ce20 100644 --- a/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala +++ b/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala @@ -59,4 +59,28 @@ class PosixPermissionsSuite extends Fs2IoSuite { assertEquals(PosixPermissions.fromString("rwx"), None) assertEquals(PosixPermissions.fromString("rwxrwxrw?"), None) } + + test("add/remove all") { + case class TC(p: PosixPermission, adding: String, removing: String) + val all = Seq( + TC(PosixPermission.OwnerRead, "400", "377"), + TC(PosixPermission.OwnerWrite, "600", "177"), + TC(PosixPermission.OwnerExecute, "700", "077"), + TC(PosixPermission.GroupRead, "740", "037"), + TC(PosixPermission.GroupWrite, "760", "017"), + TC(PosixPermission.GroupExecute, "770", "007"), + TC(PosixPermission.OthersRead, "774", "003"), + TC(PosixPermission.OthersWrite, "776", "001"), + TC(PosixPermission.OthersExecute, "777", "000") + ) + var goingUp = PosixPermissions.fromInt(0).get + var goingDown = PosixPermissions.fromInt(511).get + all.foreach { case TC(p, adding, removing) => + goingUp = goingUp.add(p) + assertEquals(goingUp, PosixPermissions.fromOctal(adding).get) + + goingDown = goingDown.remove(p) + assertEquals(goingDown, PosixPermissions.fromOctal(removing).get) + } + } } diff --git a/project/build.properties b/project/build.properties index abbbce5da4..04267b14af 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.8 +sbt.version=1.9.9 diff --git a/project/plugins.sbt b/project/plugins.sbt index 5ecb9404d4..2fc78fc253 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ -val sbtTypelevelVersion = "0.6.5" +val sbtTypelevelVersion = "0.6.7" addSbtPlugin("org.typelevel" % "sbt-typelevel" % sbtTypelevelVersion) addSbtPlugin("org.typelevel" % "sbt-typelevel-site" % sbtTypelevelVersion) addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.15.0")