diff --git a/build.sbt b/build.sbt index 2e5473ee46..9f907b26f7 100644 --- a/build.sbt +++ b/build.sbt @@ -299,9 +299,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "org.scodec" %%% "scodec-bits" % "1.1.38", "org.typelevel" %%% "cats-core" % "2.11.0", - "org.typelevel" %%% "cats-effect" % "3.5.7", - "org.typelevel" %%% "cats-effect-laws" % "3.5.7" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.5.7" % Test, + "org.typelevel" %%% "cats-effect" % "3.6.0-RC1", + "org.typelevel" %%% "cats-effect-laws" % "3.6.0-RC1" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.6.0-RC1" % Test, "org.typelevel" %%% "cats-laws" % "2.11.0" % Test, "org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test, "org.typelevel" %%% "munit-cats-effect" % "2.0.0" % Test, @@ -370,9 +370,6 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform) .nativeEnablePlugins(ScalaNativeBrewedConfigPlugin) .nativeSettings(commonNativeSettings) .nativeSettings( - libraryDependencies ++= Seq( - "com.armanbilge" %%% "epollcat" % "0.1.6" % Test - ), Test / nativeBrewFormulas += "s2n", Test / envVars ++= Map("S2N_DONT_MLOCK" -> "1") ) diff --git a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala index f9d4c43b3e..f6ac066201 100644 --- a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala +++ b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala @@ -22,37 +22,15 @@ package fs2 package io -import cats._ import cats.effect.kernel.Sync import cats.syntax.all._ -import java.nio.charset.Charset -import java.nio.charset.StandardCharsets import scala.reflect.ClassTag private[fs2] trait iojvmnative { type InterruptedIOException = java.io.InterruptedIOException type ClosedChannelException = java.nio.channels.ClosedChannelException - // - // STDIN/STDOUT Helpers - - /** Pipe of bytes that writes emitted values to standard output asynchronously. */ - def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] = - writeOutputStream(Sync[F].blocking(System.out), false) - - /** Pipe of bytes that writes emitted values to standard error asynchronously. */ - def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] = - writeOutputStream(Sync[F].blocking(System.err), false) - - /** Writes this stream to standard output asynchronously, converting each element to - * a sequence of bytes via `Show` and the given `Charset`. - */ - def stdoutLines[F[_]: Sync, O: Show]( - charset: Charset = StandardCharsets.UTF_8 - ): Pipe[F, O, Nothing] = - _.map(_.show).through(text.encode(charset)).through(stdout) - /** Stream of bytes read asynchronously from the specified resource relative to the class `C`. * @see [[readClassLoaderResource]] for a resource relative to a classloader. */ diff --git a/io/jvm/src/main/scala/fs2/io/ioplatform.scala b/io/jvm/src/main/scala/fs2/io/ioplatform.scala index 229549d0f4..42e32069f1 100644 --- a/io/jvm/src/main/scala/fs2/io/ioplatform.scala +++ b/io/jvm/src/main/scala/fs2/io/ioplatform.scala @@ -22,6 +22,7 @@ package fs2 package io +import cats.Show import cats.effect.kernel.{Async, Outcome, Resource, Sync} import cats.effect.kernel.implicits._ import cats.effect.kernel.Deferred @@ -30,6 +31,8 @@ import fs2.internal.ThreadFactories import fs2.io.internal.PipedStreamBuffer import java.io.{InputStream, OutputStream} +import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import java.util.concurrent.Executors private[fs2] trait ioplatform extends iojvmnative { @@ -71,6 +74,22 @@ private[fs2] trait ioplatform extends iojvmnative { def stdinUtf8[F[_]](bufSize: Int, F: Sync[F]): Stream[F, String] = stdin(bufSize, F).through(text.utf8.decode) + /** Pipe of bytes that writes emitted values to standard output asynchronously. */ + def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] = + writeOutputStream(Sync[F].blocking(System.out), false) + + /** Pipe of bytes that writes emitted values to standard error asynchronously. */ + def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] = + writeOutputStream(Sync[F].blocking(System.err), false) + + /** Writes this stream to standard output asynchronously, converting each element to + * a sequence of bytes via `Show` and the given `Charset`. + */ + def stdoutLines[F[_]: Sync, O: Show]( + charset: Charset = StandardCharsets.UTF_8 + ): Pipe[F, O, Nothing] = + _.map(_.show).through(text.encode(charset)).through(stdout) + /** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`, * that is closed whenever the resulting stream terminates. * diff --git a/io/jvm/src/main/scala/fs2/io/net/NetworkPlatform.scala b/io/jvm/src/main/scala/fs2/io/net/NetworkPlatform.scala index 088f9646ac..4c258c4021 100644 --- a/io/jvm/src/main/scala/fs2/io/net/NetworkPlatform.scala +++ b/io/jvm/src/main/scala/fs2/io/net/NetworkPlatform.scala @@ -25,6 +25,7 @@ package net import cats.effect.IO import cats.effect.LiftIO +import cats.effect.Selector import cats.effect.kernel.{Async, Resource} import com.comcast.ip4s.{Dns, Host, IpAddress, Port, SocketAddress} @@ -78,10 +79,62 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N def forIO: Network[IO] = forLiftIO - implicit def forLiftIO[F[_]: Async: LiftIO]: Network[F] = { - val _ = LiftIO[F] - forAsync - } + implicit def forLiftIO[F[_]: Async: LiftIO]: Network[F] = + new UnsealedNetwork[F] { + private lazy val fallback = forAsync[F] + + private def tryGetSelector = + IO.pollers.map(_.collectFirst { case selector: Selector => selector }).to[F] + + private implicit def dns: Dns[F] = Dns.forAsync[F] + + def socketGroup(threadCount: Int, threadFactory: ThreadFactory): Resource[F, SocketGroup[F]] = + Resource.eval(tryGetSelector).flatMap { + case Some(selector) => Resource.pure(new SelectingSocketGroup[F](selector)) + case None => fallback.socketGroup(threadCount, threadFactory) + } + + def datagramSocketGroup(threadFactory: ThreadFactory): Resource[F, DatagramSocketGroup[F]] = + fallback.datagramSocketGroup(threadFactory) + + def client( + to: SocketAddress[Host], + options: List[SocketOption] + ): Resource[F, Socket[F]] = Resource.eval(tryGetSelector).flatMap { + case Some(selector) => new SelectingSocketGroup(selector).client(to, options) + case None => fallback.client(to, options) + } + + def server( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Stream[F, Socket[F]] = Stream.eval(tryGetSelector).flatMap { + case Some(selector) => new SelectingSocketGroup(selector).server(address, port, options) + case None => fallback.server(address, port, options) + } + + def serverResource( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = + Resource.eval(tryGetSelector).flatMap { + case Some(selector) => + new SelectingSocketGroup(selector).serverResource(address, port, options) + case None => fallback.serverResource(address, port, options) + } + + def openDatagramSocket( + address: Option[Host], + port: Option[Port], + options: List[SocketOption], + protocolFamily: Option[ProtocolFamily] + ): Resource[F, DatagramSocket[F]] = + fallback.openDatagramSocket(address, port, options, protocolFamily) + + def tlsContext: TLSContext.Builder[F] = TLSContext.Builder.forAsync[F] + } def forAsync[F[_]](implicit F: Async[F]): Network[F] = forAsyncAndDns(F, Dns.forAsync(F)) diff --git a/io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala b/io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala new file mode 100644 index 0000000000..d589669912 --- /dev/null +++ b/io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io.net + +import cats.effect.LiftIO +import cats.effect.Selector +import cats.effect.kernel.Async +import cats.effect.std.Mutex +import cats.syntax.all._ +import com.comcast.ip4s.IpAddress +import com.comcast.ip4s.SocketAddress + +import java.nio.ByteBuffer +import java.nio.channels.SelectionKey.OP_READ +import java.nio.channels.SelectionKey.OP_WRITE +import java.nio.channels.SocketChannel + +private final class SelectingSocket[F[_]: LiftIO] private ( + selector: Selector, + ch: SocketChannel, + readMutex: Mutex[F], + writeMutex: Mutex[F], + val localAddress: F[SocketAddress[IpAddress]], + val remoteAddress: F[SocketAddress[IpAddress]] +)(implicit F: Async[F]) + extends Socket.BufferedReads(readMutex) { + + protected def readChunk(buf: ByteBuffer): F[Int] = + F.delay(ch.read(buf)).flatMap { readed => + if (readed == 0) selector.select(ch, OP_READ).to *> readChunk(buf) + else F.pure(readed) + } + + def write(bytes: Chunk[Byte]): F[Unit] = { + def go(buf: ByteBuffer): F[Unit] = + F.delay { + ch.write(buf) + buf.remaining() + }.flatMap { remaining => + if (remaining > 0) { + selector.select(ch, OP_WRITE).to *> go(buf) + } else F.unit + } + writeMutex.lock.surround { + F.delay(bytes.toByteBuffer).flatMap(go) + } + } + + def isOpen: F[Boolean] = F.delay(ch.isOpen) + + def endOfOutput: F[Unit] = + F.delay { + ch.shutdownOutput(); () + } + + def endOfInput: F[Unit] = + F.delay { + ch.shutdownInput(); () + } + +} + +private object SelectingSocket { + def apply[F[_]: LiftIO]( + selector: Selector, + ch: SocketChannel, + localAddress: F[SocketAddress[IpAddress]], + remoteAddress: F[SocketAddress[IpAddress]] + )(implicit F: Async[F]): F[Socket[F]] = + (Mutex[F], Mutex[F]).flatMapN { (readMutex, writeMutex) => + F.delay { + new SelectingSocket[F]( + selector, + ch, + readMutex, + writeMutex, + localAddress, + remoteAddress + ) + } + } +} diff --git a/io/jvm/src/main/scala/fs2/io/net/SelectingSocketGroup.scala b/io/jvm/src/main/scala/fs2/io/net/SelectingSocketGroup.scala new file mode 100644 index 0000000000..2bcb1ac1fe --- /dev/null +++ b/io/jvm/src/main/scala/fs2/io/net/SelectingSocketGroup.scala @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io.net + +import cats.effect.LiftIO +import cats.effect.Selector +import cats.effect.kernel.Async +import cats.effect.kernel.Resource +import cats.syntax.all._ +import com.comcast.ip4s.Dns +import com.comcast.ip4s.Host +import com.comcast.ip4s.IpAddress +import com.comcast.ip4s.Port +import com.comcast.ip4s.SocketAddress + +import java.net.InetSocketAddress +import java.nio.channels.AsynchronousCloseException +import java.nio.channels.ClosedChannelException +import java.nio.channels.SelectionKey.OP_ACCEPT +import java.nio.channels.SelectionKey.OP_CONNECT +import java.nio.channels.SocketChannel + +private final class SelectingSocketGroup[F[_]: LiftIO: Dns](selector: Selector)(implicit + F: Async[F] +) extends SocketGroup[F] { + + def client( + to: SocketAddress[Host], + options: List[SocketOption] + ): Resource[F, Socket[F]] = + Resource + .make(F.delay(selector.provider.openSocketChannel())) { ch => + F.delay(ch.close()) + } + .evalMap { ch => + val configure = F.delay { + ch.configureBlocking(false) + options.foreach(opt => ch.setOption(opt.key, opt.value)) + } + + val connect = to.resolve.flatMap { ip => + F.delay(ch.connect(ip.toInetSocketAddress)).flatMap { connected => + selector + .select(ch, OP_CONNECT) + .to + .untilM_(F.delay(ch.finishConnect())) + .unlessA(connected) + } + } + + val make = SelectingSocket[F]( + selector, + ch, + localAddress(ch), + remoteAddress(ch) + ) + + configure *> connect *> make + } + + def server( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Stream[F, Socket[F]] = + Stream + .resource( + serverResource( + address, + port, + options + ) + ) + .flatMap { case (_, clients) => clients } + + def serverResource( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = + Resource + .make(F.delay(selector.provider.openServerSocketChannel())) { ch => + F.delay(ch.close()) + } + .evalMap { serverCh => + val configure = address.traverse(_.resolve).flatMap { ip => + F.delay { + serverCh.configureBlocking(false) + serverCh.bind( + new InetSocketAddress( + ip.map(_.toInetAddress).orNull, + port.map(_.value).getOrElse(0) + ) + ) + } + } + + def acceptLoop: Stream[F, SocketChannel] = Stream + .bracketFull[F, SocketChannel] { poll => + def go: F[SocketChannel] = + F.delay(serverCh.accept()).flatMap { + case null => poll(selector.select(serverCh, OP_ACCEPT).to) *> go + case ch => F.pure(ch) + } + go + }((ch, _) => F.delay(ch.close())) + .repeat + .handleErrorWith { + case _: AsynchronousCloseException | _: ClosedChannelException => acceptLoop + case ex => Stream.raiseError(ex) + } + + val clients = acceptLoop.evalMap { ch => + F.delay { + ch.configureBlocking(false) + options.foreach(opt => ch.setOption(opt.key, opt.value)) + } *> SelectingSocket[F]( + selector, + ch, + localAddress(ch), + remoteAddress(ch) + ) + } + + val socketAddress = F.delay { + SocketAddress.fromInetSocketAddress( + serverCh.getLocalAddress.asInstanceOf[InetSocketAddress] + ) + } + + configure *> socketAddress.tupleRight(clients) + } + + private def localAddress(ch: SocketChannel) = + F.delay { + SocketAddress.fromInetSocketAddress( + ch.getLocalAddress.asInstanceOf[InetSocketAddress] + ) + } + + private def remoteAddress(ch: SocketChannel) = + F.delay { + SocketAddress.fromInetSocketAddress( + ch.getRemoteAddress.asInstanceOf[InetSocketAddress] + ) + } + +} diff --git a/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala b/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala index 28b7132636..155231c7c6 100644 --- a/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala +++ b/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala @@ -27,7 +27,7 @@ import cats.effect.IO import java.io.File import scala.concurrent.duration.* -class WalkBenchmark extends Fs2IoSuite { +class WalkBenchmark extends Fs2Suite { override def munitIOTimeout = 5.minutes diff --git a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala new file mode 100644 index 0000000000..99064c4985 --- /dev/null +++ b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import cats.effect.Sync + +import java.io.IOException +import java.net.BindException +import java.net.ConnectException +import scala.scalanative.annotation.alwaysinline +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.fcntl._ +import scala.scalanative.posix.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.unsafe._ + +private[io] object NativeUtil { + + @alwaysinline def guard_(thunk: => CInt): Unit = { + guard(thunk) + () + } + + @alwaysinline def guard(thunk: => CInt): CInt = + guardMask(thunk)(e => e == EAGAIN || e == EWOULDBLOCK) + + @alwaysinline def guardSSize(thunk: => CSSize): CSSize = { + val rtn = thunk + if (rtn < 0) { + val e = errno + if (e == EAGAIN || e == EWOULDBLOCK) + rtn + else throw errnoToThrowable(e) + } else + rtn + } + + @alwaysinline def guardMask_(thunk: => CInt)(mask: Int => Boolean): Unit = { + guardMask(thunk)(mask) + () + } + + @alwaysinline def guardMask(thunk: => CInt)(mask: Int => Boolean): CInt = { + val rtn = thunk + if (rtn < 0) { + val e = errno + if (mask(e)) rtn + else throw errnoToThrowable(e) + } else + rtn + } + + @alwaysinline def errnoToThrowable(e: CInt): Throwable = { + val msg = fromCString(strerror(e)) + if (e == EADDRINUSE /* || e == EADDRNOTAVAIL */ ) + new BindException(msg) + else if (e == ECONNREFUSED) + new ConnectException(msg) + else + new IOException(msg) + } + + def setNonBlocking[F[_]](fd: CInt)(implicit F: Sync[F]): F[Unit] = F.delay { + guard_(fcntl(fd, F_SETFL, O_NONBLOCK)) + } + +} diff --git a/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala b/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala index 51f4808cf6..f09e74cca7 100644 --- a/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala +++ b/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala @@ -21,43 +21,50 @@ package fs2.io.internal +import cats.effect.kernel.Async import cats.effect.kernel.Resource -import cats.effect.kernel.Sync +import cats.effect.std.Mutex import cats.syntax.all._ import scala.scalanative.libc.errno._ import scala.scalanative.libc.stdlib._ +import scala.scalanative.posix.string._ import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ private[io] final class ResizableBuffer[F[_]] private ( + mutex: Mutex[F], private var ptr: Ptr[Byte], private[this] var size: Int -)(implicit F: Sync[F]) { +)(implicit F: Async[F]) { - def get(size: Int): F[Ptr[Byte]] = F.delay { - if (size <= this.size) - F.pure(ptr) - else { - ptr = realloc(ptr, size.toUInt) - this.size = size - if (ptr == null) - F.raiseError[Ptr[Byte]](new RuntimeException(s"realloc: ${errno}")) - else F.pure(ptr) + def get(size: Int): Resource[F, Ptr[Byte]] = mutex.lock.evalMap { _ => + F.delay { + if (size <= this.size) + ptr + else { + ptr = realloc(ptr, size.toUInt) + this.size = size + if (ptr == null) + throw new RuntimeException(fromCString(strerror(errno))) + else ptr + } } - }.flatten + } } private[io] object ResizableBuffer { - def apply[F[_]](size: Int)(implicit F: Sync[F]): Resource[F, ResizableBuffer[F]] = + def apply[F[_]](size: Int)(implicit F: Async[F]): Resource[F, ResizableBuffer[F]] = Resource.make { - F.delay { - val ptr = malloc(size.toUInt) - if (ptr == null) - throw new RuntimeException(s"malloc: ${errno}") - else new ResizableBuffer(ptr, size) + Mutex[F].flatMap { mutex => + F.delay { + val ptr = malloc(size.toUInt) + if (ptr == null) + throw new RuntimeException(fromCString(strerror(errno))) + else new ResizableBuffer(mutex, ptr, size) + } } }(buf => F.delay(free(buf.ptr))) diff --git a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala new file mode 100644 index 0000000000..a1d8535168 --- /dev/null +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import cats.effect.kernel.Resource +import cats.effect.kernel.Sync +import cats.syntax.all._ +import com.comcast.ip4s.IpAddress +import com.comcast.ip4s.Ipv4Address +import com.comcast.ip4s.Ipv6Address +import com.comcast.ip4s.Port +import com.comcast.ip4s.SocketAddress + +import java.net.SocketOption +import java.net.StandardSocketOptions +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.arpa.inet._ +import scala.scalanative.posix.netinet.in.IPPROTO_TCP +import scala.scalanative.posix.netinet.tcp._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.posix.unistd._ +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +import NativeUtil._ +import netinetin._ +import netinetinOps._ +import syssocket._ + +private[io] object SocketHelpers { + + def openNonBlocking[F[_]](domain: CInt, `type`: CInt)(implicit F: Sync[F]): Resource[F, CInt] = + Resource + .make { + F.delay { + val SOCK_NONBLOCK = + if (LinktimeInfo.isLinux) + syssocket.SOCK_NONBLOCK + else 0 + + guard(socket(domain, `type` | SOCK_NONBLOCK, 0)) + } + }(fd => F.delay(guard_(close(fd)))) + .evalTap { fd => + (if (!LinktimeInfo.isLinux) setNonBlocking(fd) else F.unit) *> + (if (LinktimeInfo.isMac) setNoSigPipe(fd) else F.unit) + } + + // macOS-only + def setNoSigPipe[F[_]: Sync](fd: CInt): F[Unit] = + setOption(fd, SO_NOSIGPIPE, true) + + def setOption[F[_]: Sync, T](fd: CInt, name: SocketOption[T], value: T): F[Unit] = name match { + case StandardSocketOptions.SO_SNDBUF => + setOption( + fd, + SO_SNDBUF, + value.asInstanceOf[java.lang.Integer] + ) + case StandardSocketOptions.SO_RCVBUF => + setOption( + fd, + SO_RCVBUF, + value.asInstanceOf[java.lang.Integer] + ) + case StandardSocketOptions.SO_REUSEADDR => + setOption( + fd, + SO_REUSEADDR, + value.asInstanceOf[java.lang.Boolean] + ) + case StandardSocketOptions.SO_REUSEPORT => + SocketHelpers.setOption( + fd, + SO_REUSEPORT, + value.asInstanceOf[java.lang.Boolean] + ) + case StandardSocketOptions.SO_KEEPALIVE => + SocketHelpers.setOption( + fd, + SO_KEEPALIVE, + value.asInstanceOf[java.lang.Boolean] + ) + case StandardSocketOptions.TCP_NODELAY => + setTcpOption( + fd, + TCP_NODELAY, + value.asInstanceOf[java.lang.Boolean] + ) + case _ => throw new IllegalArgumentException + } + + def setOption[F[_]](fd: CInt, option: CInt, value: Boolean)(implicit F: Sync[F]): F[Unit] = + setOptionImpl(fd, SOL_SOCKET, option, if (value) 1 else 0) + + def setOption[F[_]](fd: CInt, option: CInt, value: CInt)(implicit F: Sync[F]): F[Unit] = + setOptionImpl(fd, SOL_SOCKET, option, value) + + def setTcpOption[F[_]](fd: CInt, option: CInt, value: Boolean)(implicit F: Sync[F]): F[Unit] = + setOptionImpl( + fd, + IPPROTO_TCP, // aka SOL_TCP + option, + if (value) 1 else 0 + ) + + def setOptionImpl[F[_]](fd: CInt, level: CInt, option: CInt, value: CInt)(implicit + F: Sync[F] + ): F[Unit] = + F.delay { + val ptr = stackalloc[CInt]() + !ptr = value + guard_( + setsockopt( + fd, + level, + option, + ptr.asInstanceOf[Ptr[Byte]], + sizeof[CInt].toUInt + ) + ) + } + + def checkSocketError[F[_]](fd: Int)(implicit F: Sync[F]): F[Unit] = F.delay { + val optval = stackalloc[CInt]() + val optlen = stackalloc[socklen_t]() + !optlen = sizeof[CInt].toUInt + guard_ { + getsockopt( + fd, + SOL_SOCKET, + SO_ERROR, + optval.asInstanceOf[Ptr[Byte]], + optlen + ) + } + if (!optval != 0) + throw errnoToThrowable(!optval) + } + + def getLocalAddress[F[_]](fd: Int, ipv4: Boolean)(implicit + F: Sync[F] + ): F[SocketAddress[IpAddress]] = + F.delay { + SocketHelpers.toSocketAddress(ipv4) { (addr, len) => + guard_(getsockname(fd, addr, len)) + } + } + + def toSockaddr[A]( + address: SocketAddress[IpAddress] + )(f: (Ptr[sockaddr], socklen_t) => A): A = + address.host.fold( + _ => + toSockaddrIn(address.asInstanceOf[SocketAddress[Ipv4Address]])( + f.asInstanceOf[(Ptr[sockaddr_in], socklen_t) => A] + ), + _ => + toSockaddrIn6(address.asInstanceOf[SocketAddress[Ipv6Address]])( + f.asInstanceOf[(Ptr[sockaddr_in6], socklen_t) => A] + ) + ) + + private[this] def toSockaddrIn[A]( + address: SocketAddress[Ipv4Address] + )(f: (Ptr[sockaddr_in], socklen_t) => A): A = { + val addr = stackalloc[sockaddr_in]() + val len = stackalloc[socklen_t]() + + toSockaddrIn(address, addr, len) + + f(addr, !len) + } + + private[this] def toSockaddrIn6[A]( + address: SocketAddress[Ipv6Address] + )(f: (Ptr[sockaddr_in6], socklen_t) => A): A = { + val addr = stackalloc[sockaddr_in6]() + val len = stackalloc[socklen_t]() + + toSockaddrIn6(address, addr, len) + + f(addr, !len) + } + + def toSockaddr( + address: SocketAddress[IpAddress], + addr: Ptr[sockaddr], + len: Ptr[socklen_t] + ): Unit = + address.host.fold( + _ => + toSockaddrIn( + address.asInstanceOf[SocketAddress[Ipv4Address]], + addr.asInstanceOf[Ptr[sockaddr_in]], + len + ), + _ => + toSockaddrIn6( + address.asInstanceOf[SocketAddress[Ipv6Address]], + addr.asInstanceOf[Ptr[sockaddr_in6]], + len + ) + ) + + private[this] def toSockaddrIn( + address: SocketAddress[Ipv4Address], + addr: Ptr[sockaddr_in], + len: Ptr[socklen_t] + ): Unit = { + !len = sizeof[sockaddr_in].toUInt + addr.sin_family = AF_INET.toUShort + addr.sin_port = htons(address.port.value.toUShort) + addr.sin_addr.s_addr = htonl(address.host.toLong.toUInt) + } + + private[this] def toSockaddrIn6[A]( + address: SocketAddress[Ipv6Address], + addr: Ptr[sockaddr_in6], + len: Ptr[socklen_t] + ): Unit = { + !len = sizeof[sockaddr_in6].toUInt + + addr.sin6_family = AF_INET6.toUShort + addr.sin6_port = htons(address.port.value.toUShort) + + val bytes = address.host.toBytes + var i = 0 + while (i < 0) { + addr.sin6_addr.s6_addr(i) = bytes(i).toUByte + i += 1 + } + } + + def allocateSockaddr[A]( + f: (Ptr[sockaddr], Ptr[socklen_t]) => A + ): A = { + val addr = // allocate enough for an IPv6 + stackalloc[sockaddr_in6]().asInstanceOf[Ptr[sockaddr]] + val len = stackalloc[socklen_t]() + !len = sizeof[sockaddr_in6].toUInt + + f(addr, len) + } + + def toSocketAddress[A](ipv4: Boolean)( + f: (Ptr[sockaddr], Ptr[socklen_t]) => Unit + ): SocketAddress[IpAddress] = allocateSockaddr { (addr, len) => + f(addr, len) + toSocketAddress(addr, ipv4) + } + + def toSocketAddress(addr: Ptr[sockaddr], ipv4: Boolean): SocketAddress[IpAddress] = + if (ipv4) + toIpv4SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in]]) + else + toIpv6SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in6]]) + + private[this] def toIpv4SocketAddress(addr: Ptr[sockaddr_in]): SocketAddress[Ipv4Address] = { + val port = Port.fromInt(ntohs(addr.sin_port).toInt).get + val addrBytes = addr.sin_addr.at1.asInstanceOf[Ptr[Byte]] + val host = Ipv4Address.fromBytes( + addrBytes(0).toInt, + addrBytes(1).toInt, + addrBytes(2).toInt, + addrBytes(3).toInt + ) + SocketAddress(host, port) + } + + private[this] def toIpv6SocketAddress(addr: Ptr[sockaddr_in6]): SocketAddress[Ipv6Address] = { + val port = Port.fromInt(ntohs(addr.sin6_port).toInt).get + val addrBytes = addr.sin6_addr.at1.asInstanceOf[Ptr[Byte]] + val host = Ipv6Address.fromBytes { + val addr = new Array[Byte](16) + memcpy(addr.atUnsafe(0), addrBytes, 16.toULong) + addr + }.get + SocketAddress(host, port) + } +} diff --git a/io/native/src/main/scala/fs2/io/internal/netinet.scala b/io/native/src/main/scala/fs2/io/internal/netinet.scala new file mode 100644 index 0000000000..fdb1dc4afc --- /dev/null +++ b/io/native/src/main/scala/fs2/io/internal/netinet.scala @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import scalanative.unsafe._ +import scalanative.posix.inttypes._ +import scalanative.posix.sys.socket._ + +private[io] object netinetin { + import Nat._ + type _16 = Digit2[_1, _6] + + type in_port_t = uint16_t + + type in_addr = CStruct1[uint32_t] + + type sockaddr_in = CStruct4[ + sa_family_t, + in_port_t, + in_addr, + CArray[Byte, _8] + ] + + type in6_addr = CStruct1[CArray[CUnsignedChar, _16]] + + type sockaddr_in6 = CStruct5[ + sa_family_t, + in_port_t, + uint32_t, + in6_addr, + uint32_t + ] + +} + +private[io] object netinetinOps { + import netinetin._ + + implicit final class in_addrOps(val in_addr: in_addr) extends AnyVal { + def s_addr: uint32_t = in_addr._1 + def s_addr_=(s_addr: uint32_t): Unit = in_addr._1 = s_addr + } + + implicit final class sockaddr_inOps(val sockaddr_in: Ptr[sockaddr_in]) extends AnyVal { + def sin_family: sa_family_t = sockaddr_in._1 + def sin_family_=(sin_family: sa_family_t): Unit = sockaddr_in._1 = sin_family + def sin_port: in_port_t = sockaddr_in._2 + def sin_port_=(sin_port: in_port_t): Unit = sockaddr_in._2 = sin_port + def sin_addr: in_addr = sockaddr_in._3 + def sin_addr_=(sin_addr: in_addr) = sockaddr_in._3 = sin_addr + } + + implicit final class in6_addrOps(val in6_addr: in6_addr) extends AnyVal { + def s6_addr: CArray[uint8_t, _16] = in6_addr._1 + def s6_addr_=(s6_addr: CArray[uint8_t, _16]): Unit = in6_addr._1 = s6_addr + } + + implicit final class sockaddr_in6Ops(val sockaddr_in6: Ptr[sockaddr_in6]) extends AnyVal { + def sin6_family: sa_family_t = sockaddr_in6._1 + def sin6_family_=(sin6_family: sa_family_t): Unit = sockaddr_in6._1 = sin6_family + def sin6_port: in_port_t = sockaddr_in6._2 + def sin6_port_=(sin6_port: in_port_t): Unit = sockaddr_in6._2 = sin6_port + def sin6_flowinfo: uint32_t = sockaddr_in6._3 + def sin6_flowinfo_=(sin6_flowinfo: uint32_t): Unit = sockaddr_in6._3 = sin6_flowinfo + def sin6_addr: in6_addr = sockaddr_in6._4 + def sin6_addr_=(sin6_addr: in6_addr) = sockaddr_in6._4 = sin6_addr + def sin6_scope_id: uint32_t = sockaddr_in6._5 + def sin6_scope_id_=(sin6_scope_id: uint32_t): Unit = sockaddr_in6._5 = sin6_scope_id + } + +} diff --git a/io/native/src/main/scala/fs2/io/internal/syssocket.scala b/io/native/src/main/scala/fs2/io/internal/syssocket.scala new file mode 100644 index 0000000000..3bf9266b9d --- /dev/null +++ b/io/native/src/main/scala/fs2/io/internal/syssocket.scala @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import org.typelevel.scalaccompat.annotation._ + +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.unsafe._ + +@nowarn212("cat=unused") +@extern +private[io] object syssocket { + // only in Linux and FreeBSD, but not macOS + final val SOCK_NONBLOCK = 2048 + + // only on macOS and some BSDs (?) + final val SO_NOSIGPIPE = 0x1022 /* APPLE: No SIGPIPE on EPIPE */ + + def bind(sockfd: CInt, addr: Ptr[sockaddr], addrlen: socklen_t): CInt = + extern + + def connect(sockfd: CInt, addr: Ptr[sockaddr], addrlen: socklen_t): CInt = + extern + + def accept(sockfd: CInt, addr: Ptr[sockaddr], addrlen: Ptr[socklen_t]): CInt = + extern + + // only supported on Linux and FreeBSD, but not macOS + def accept4(sockfd: CInt, addr: Ptr[sockaddr], addrlen: Ptr[socklen_t], flags: CInt): CInt = + extern +} diff --git a/io/js-jvm/src/test/scala/fs2/io/Fs2IoSuite.scala b/io/native/src/main/scala/fs2/io/internal/sysun.scala similarity index 62% rename from io/js-jvm/src/test/scala/fs2/io/Fs2IoSuite.scala rename to io/native/src/main/scala/fs2/io/internal/sysun.scala index 76c8042f20..951a1ca346 100644 --- a/io/js-jvm/src/test/scala/fs2/io/Fs2IoSuite.scala +++ b/io/native/src/main/scala/fs2/io/internal/sysun.scala @@ -19,7 +19,30 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -package fs2 -package io +package fs2.io.internal -abstract class Fs2IoSuite extends Fs2Suite +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.unsafe._ + +private[io] object sysun { + import Nat._ + type _108 = Digit3[_1, _0, _8] + + type sockaddr_un = CStruct2[ + sa_family_t, + CArray[CChar, _108] + ] + +} + +private[io] object sysunOps { + import sysun._ + + implicit final class sockaddr_unOps(val sockaddr_un: Ptr[sockaddr_un]) extends AnyVal { + def sun_family: sa_family_t = sockaddr_un._1 + def sun_family_=(sun_family: sa_family_t): Unit = sockaddr_un._1 = sun_family + def sun_path: CArray[CChar, _108] = sockaddr_un._2 + def sun_path_=(sun_path: CArray[CChar, _108]): Unit = sockaddr_un._2 = sun_path + } + +} diff --git a/io/native/src/main/scala/fs2/io/ioplatform.scala b/io/native/src/main/scala/fs2/io/ioplatform.scala index 143b1b6fc5..ffa0006fc4 100644 --- a/io/native/src/main/scala/fs2/io/ioplatform.scala +++ b/io/native/src/main/scala/fs2/io/ioplatform.scala @@ -22,14 +22,153 @@ package fs2 package io +import cats.Show +import cats.effect.FileDescriptorPoller +import cats.effect.IO +import cats.effect.LiftIO +import cats.effect.kernel.Async +import cats.effect.kernel.Resource import cats.effect.kernel.Sync +import cats.syntax.all._ +import fs2.io.internal.NativeUtil._ + +import java.io.OutputStream +import java.nio.charset.Charset +import java.nio.charset.StandardCharsets +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.unistd._ +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ private[fs2] trait ioplatform extends iojvmnative { - def stdin[F[_]](bufSize: Int)(implicit F: Sync[F]): Stream[F, Byte] = - readInputStream(F.blocking(System.in), bufSize, false)(F) + private[fs2] def fileDescriptorPoller[F[_]: LiftIO]: F[FileDescriptorPoller] = + IO.pollers + .flatMap( + _.collectFirst { case poller: FileDescriptorPoller => poller }.liftTo[IO]( + new RuntimeException("Installed PollingSystem does not provide a FileDescriptorPoller") + ) + ) + .to + + // + // STDIN/STDOUT Helpers + + /** Stream of bytes read asynchronously from standard input. */ + def stdin[F[_]: Async: LiftIO](bufSize: Int): Stream[F, Byte] = + if (LinktimeInfo.isLinux || LinktimeInfo.isMac) + Stream + .resource { + Resource + .eval { + setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F] + } + .flatMap { poller => + poller.registerFileDescriptor(STDIN_FILENO, true, false).mapK(LiftIO.liftK) + } + } + .flatMap { handle => + Stream.repeatEval { + handle + .pollReadRec(()) { _ => + IO { + val buf = new Array[Byte](bufSize) + val readed = guard(read(STDIN_FILENO, buf.atUnsafe(0), bufSize.toULong)) + if (readed > 0) + Right(Some(Chunk.array(buf, 0, readed))) + else if (readed == 0) + Right(None) + else + Left(()) + } + } + .to + } + } + .unNoneTerminate + .unchunks + else + readInputStream(Sync[F].blocking(System.in), bufSize, false) + + /** Pipe of bytes that writes emitted values to standard output asynchronously. */ + def stdout[F[_]: Async: LiftIO]: Pipe[F, Byte, Nothing] = + if (LinktimeInfo.isLinux || LinktimeInfo.isMac) + writeFd(STDOUT_FILENO) + else + writeOutputStream(Sync[F].blocking(System.out), false) + + /** Pipe of bytes that writes emitted values to standard error asynchronously. */ + def stderr[F[_]: Async: LiftIO]: Pipe[F, Byte, Nothing] = + if (LinktimeInfo.isLinux || LinktimeInfo.isMac) + writeFd(STDERR_FILENO) + else + writeOutputStream(Sync[F].blocking(System.err), false) + + private[this] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in => + Stream + .resource { + Resource + .eval { + setNonBlocking(fd) *> fileDescriptorPoller[F] + } + .flatMap { poller => + poller.registerFileDescriptor(fd, false, true).mapK(LiftIO.liftK) + } + } + .flatMap { handle => + in.chunks.foreach { bytes => + val Chunk.ArraySlice(buf, offset, length) = bytes.toArraySlice + + def go(pos: Int): IO[Either[Int, Unit]] = + IO(write(fd, buf.atUnsafe(offset + pos), (length - pos).toULong)).flatMap { wrote => + if (wrote >= 0) { + val newPos = pos + wrote + if (newPos < length) + go(newPos) + else + IO.pure(Either.unit) + } else + IO.pure(Left(pos)) + } - def stdinUtf8[F[_]](bufSize: Int)(implicit F: Sync[F]): Stream[F, String] = + handle.pollWriteRec(0)(go(_)).to + } + } + + /** Writes this stream to standard output asynchronously, converting each element to + * a sequence of bytes via `Show` and the given `Charset`. + */ + def stdoutLines[F[_]: Async: LiftIO, O: Show]( + charset: Charset = StandardCharsets.UTF_8 + ): Pipe[F, O, Nothing] = + _.map(_.show).through(text.encode(charset)).through(stdout(implicitly, implicitly)) + + /** Stream of `String` read asynchronously from standard input decoded in UTF-8. */ + def stdinUtf8[F[_]: Async: LiftIO](bufSize: Int): Stream[F, String] = stdin(bufSize).through(text.utf8.decode) + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stdin[F[_], SourceBreakingDummy](bufSize: Int, F: Sync[F]): Stream[F, Byte] = + readInputStream(F.blocking(System.in), bufSize, false)(F) + + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stdout[F[_], SourceBreakingDummy](F: Sync[F]): Pipe[F, Byte, Nothing] = + writeOutputStream(F.blocking(System.out: OutputStream), false)(F) + + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stderr[F[_], SourceBreakingDummy](F: Sync[F]): Pipe[F, Byte, Nothing] = + writeOutputStream(F.blocking(System.err: OutputStream), false)(F) + + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stdoutLines[F[_], O, SourceBreakingDummy]( + charset: Charset, + F: Sync[F], + O: Show[O] + ): Pipe[F, O, Nothing] = + _.map(O.show(_)).through(text.encode(charset)).through(stdout(F)) + + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stdinUtf8[F[_], SourceBreakingDummy](bufSize: Int, F: Sync[F]): Stream[F, String] = + stdin(bufSize, F).through(text.utf8.decode) + } diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala new file mode 100644 index 0000000000..1392a8cdaf --- /dev/null +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io.net + +import cats.effect.FileDescriptorPollHandle +import cats.effect.IO +import cats.effect.LiftIO +import cats.effect.kernel.Async +import cats.effect.kernel.Resource +import cats.syntax.all._ +import com.comcast.ip4s.IpAddress +import com.comcast.ip4s.SocketAddress +import fs2.io.internal.NativeUtil._ +import fs2.io.internal.ResizableBuffer + +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.errno._ +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.posix.unistd +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +import FdPollingSocket._ + +private final class FdPollingSocket[F[_]: LiftIO] private ( + fd: Int, + handle: FileDescriptorPollHandle, + readBuffer: ResizableBuffer[F], + val isOpen: F[Boolean], + val localAddress: F[SocketAddress[IpAddress]], + val remoteAddress: F[SocketAddress[IpAddress]] +)(implicit F: Async[F]) + extends Socket[F] { + + def endOfInput: F[Unit] = shutdownF(0) + def endOfOutput: F[Unit] = shutdownF(1) + private[this] def shutdownF(how: Int): F[Unit] = F.delay { + guardMask_(shutdown(fd, how))(_ == ENOTCONN) + } + + def read(maxBytes: Int): F[Option[Chunk[Byte]]] = readBuffer.get(maxBytes).use { buf => + handle + .pollReadRec(()) { _ => + IO(guard(unistd.read(fd, buf, maxBytes.toULong))).flatMap { readed => + if (readed > 0) + IO(Right(Some(Chunk.fromBytePtr(buf, readed)))) + else if (readed == 0) + IO.pure(Right(None)) + else + IO.pure(Left(())) + } + } + .to + } + + def readN(numBytes: Int): F[Chunk[Byte]] = + readBuffer.get(numBytes).use { buf => + def go(pos: Int): IO[Either[Int, Chunk[Byte]]] = + IO(guard(unistd.read(fd, buf + pos.toLong, (numBytes - pos).toULong))).flatMap { readed => + if (readed > 0) { + val newPos = pos + readed + if (newPos < numBytes) go(newPos) + else IO(Right(Chunk.fromBytePtr(buf, newPos))) + } else if (readed == 0) + IO(Right(Chunk.fromBytePtr(buf, pos))) + else + IO.pure(Left(pos)) + } + + handle.pollReadRec(0)(go(_)).to + } + + def reads: Stream[F, Byte] = Stream.repeatEval(read(DefaultReadSize)).unNoneTerminate.unchunks + + def write(bytes: Chunk[Byte]): F[Unit] = { + val Chunk.ArraySlice(buf, offset, length) = bytes.toArraySlice + + def go(pos: Int): IO[Either[Int, Unit]] = + IO { + if (LinktimeInfo.isLinux) + guardSSize( + send(fd, buf.atUnsafe(offset + pos), (length - pos).toULong, MSG_NOSIGNAL) + ).toInt + else + guard(unistd.write(fd, buf.atUnsafe(offset + pos), (length - pos).toULong)) + }.flatMap { wrote => + if (wrote >= 0) { + val newPos = pos + wrote + if (newPos < length) + go(newPos) + else + IO.pure(Either.unit) + } else + IO.pure(Left(pos)) + } + + handle.pollWriteRec(0)(go(_)).to + } + + def writes: Pipe[F, Byte, Nothing] = _.chunks.foreach(write(_)) + +} + +private object FdPollingSocket { + private final val DefaultReadSize = 8192 + + def apply[F[_]: LiftIO]( + fd: Int, + handle: FileDescriptorPollHandle, + localAddress: F[SocketAddress[IpAddress]], + remoteAddress: F[SocketAddress[IpAddress]] + )(implicit F: Async[F]): Resource[F, Socket[F]] = for { + buffer <- ResizableBuffer(DefaultReadSize) + isOpen <- Resource.make(F.ref(true))(_.set(false)) + } yield new FdPollingSocket(fd, handle, buffer, isOpen.get, localAddress, remoteAddress) +} diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala new file mode 100644 index 0000000000..875ae46f57 --- /dev/null +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package net + +import cats.effect.IO +import cats.effect.LiftIO +import cats.effect.kernel.Async +import cats.effect.kernel.Resource +import cats.syntax.all._ +import com.comcast.ip4s._ +import fs2.io.internal.NativeUtil._ +import fs2.io.internal.SocketHelpers +import fs2.io.internal.syssocket._ + +import scala.scalanative.libc.errno._ +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.errno._ +import scala.scalanative.posix.sys.socket.{bind => _, connect => _, accept => _, _} +import scala.scalanative.posix.unistd._ + +private final class FdPollingSocketGroup[F[_]: Dns: LiftIO](implicit F: Async[F]) + extends SocketGroup[F] { + + def client(to: SocketAddress[Host], options: List[SocketOption]): Resource[F, Socket[F]] = for { + poller <- Resource.eval(fileDescriptorPoller[F]) + address <- Resource.eval(to.resolve) + ipv4 = address.host.isInstanceOf[Ipv4Address] + fd <- SocketHelpers.openNonBlocking(if (ipv4) AF_INET else AF_INET6, SOCK_STREAM) + _ <- Resource.eval(options.traverse(so => SocketHelpers.setOption(fd, so.key, so.value))) + handle <- poller.registerFileDescriptor(fd, true, true).mapK(LiftIO.liftK) + _ <- Resource.eval { + handle + .pollWriteRec(false) { connected => + if (connected) SocketHelpers.checkSocketError[IO](fd).as(Either.unit) + else + IO { + SocketHelpers.toSockaddr(address) { (addr, len) => + if (connect(fd, addr, len) < 0) { + val e = errno + if (e == EINPROGRESS) + Left(true) // we will be connected when we unblock + else + throw errnoToThrowable(e) + } else + Either.unit + } + } + } + .to + } + socket <- FdPollingSocket[F]( + fd, + handle, + SocketHelpers.getLocalAddress(fd, ipv4), + F.pure(address) + ) + } yield socket + + def server( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Stream[F, Socket[F]] = + Stream.resource(serverResource(address, port, options)).flatMap(_._2) + + def serverResource( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = for { + poller <- Resource.eval(fileDescriptorPoller[F]) + address <- Resource.eval(address.fold(IpAddress.loopback)(_.resolve)) + ipv4 = address.isInstanceOf[Ipv4Address] + fd <- SocketHelpers.openNonBlocking(if (ipv4) AF_INET else AF_INET6, SOCK_STREAM) + handle <- poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK) + + _ <- Resource.eval { + val bindF = F.delay { + val socketAddress = SocketAddress(address, port.getOrElse(port"0")) + SocketHelpers.toSockaddr(socketAddress) { (addr, len) => + guard_(bind(fd, addr, len)) + } + } + + SocketHelpers.setOption(fd, SO_REUSEADDR, 1) *> bindF *> F.delay(guard_(listen(fd, 0))) + } + + sockets = Stream + .resource { + val accepted = for { + addrFd <- Resource.makeFull[F, (SocketAddress[IpAddress], Int)] { poll => + poll { + handle + .pollReadRec(()) { _ => + IO { + SocketHelpers.allocateSockaddr { (addr, len) => + val clientFd = + if (LinktimeInfo.isLinux) + guard(accept4(fd, addr, len, SOCK_NONBLOCK)) + else + guard(accept(fd, addr, len)) + + if (clientFd >= 0) { + val address = SocketHelpers.toSocketAddress(addr, ipv4) + Right((address, clientFd)) + } else + Left(()) + } + } + } + .to + } + }(addrFd => F.delay(guard_(close(addrFd._2)))) + (address, fd) = addrFd + _ <- Resource.eval { + val setNonBlock = if (!LinktimeInfo.isLinux) setNonBlocking(fd) else F.unit + setNonBlock *> options.traverse(so => SocketHelpers.setOption(fd, so.key, so.value)) + } + handle <- poller.registerFileDescriptor(fd, true, true).mapK(LiftIO.liftK) + socket <- FdPollingSocket[F]( + fd, + handle, + SocketHelpers.getLocalAddress(fd, ipv4), + F.pure(address) + ) + } yield socket + + accepted.attempt.map(_.toOption) + } + .repeat + .unNone + + serverAddress <- Resource.eval(SocketHelpers.getLocalAddress(fd, ipv4)) + } yield (serverAddress, sockets) + +} diff --git a/io/native/src/main/scala/fs2/io/net/NetworkPlatform.scala b/io/native/src/main/scala/fs2/io/net/NetworkPlatform.scala index 65fe1c6a79..1d45570f75 100644 --- a/io/native/src/main/scala/fs2/io/net/NetworkPlatform.scala +++ b/io/native/src/main/scala/fs2/io/net/NetworkPlatform.scala @@ -37,10 +37,31 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N def forIO: Network[IO] = forLiftIO - implicit def forLiftIO[F[_]: Async: LiftIO]: Network[F] = { - val _ = LiftIO[F] - forAsync - } + implicit def forLiftIO[F[_]: Async: LiftIO]: Network[F] = + new UnsealedNetwork[F] { + private lazy val globalSocketGroup = + new FdPollingSocketGroup[F]()(Dns.forAsync, implicitly, implicitly) + + def client( + to: SocketAddress[Host], + options: List[SocketOption] + ): Resource[F, Socket[F]] = globalSocketGroup.client(to, options) + + def server( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Stream[F, Socket[F]] = globalSocketGroup.server(address, port, options) + + def serverResource( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = + globalSocketGroup.serverResource(address, port, options) + + def tlsContext: TLSContext.Builder[F] = TLSContext.Builder.forAsync + } def forAsync[F[_]](implicit F: Async[F]): Network[F] = forAsyncAndDns(F, Dns.forAsync(F)) @@ -67,7 +88,7 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = globalSocketGroup.serverResource(address, port, options) - def tlsContext: TLSContext.Builder[F] = TLSContext.Builder.forAsync + def tlsContext: TLSContext.Builder[F] = TLSContext.Builder.forAsync(F) } } diff --git a/io/native/src/main/scala/fs2/io/net/tls/S2nConnection.scala b/io/native/src/main/scala/fs2/io/net/tls/S2nConnection.scala index fd65385b09..2e935f9b90 100644 --- a/io/native/src/main/scala/fs2/io/net/tls/S2nConnection.scala +++ b/io/native/src/main/scala/fs2/io/net/tls/S2nConnection.scala @@ -136,7 +136,7 @@ private[tls] object S2nConnection { }.iterateUntil(_.toInt == S2N_NOT_BLOCKED) *> F.delay(guard_(s2n_connection_free_handshake(conn))) - def read(n: Int) = readBuffer.get(n).flatMap { buf => + def read(n: Int) = readBuffer.get(n).use { buf => def go(i: Int): F[Option[Chunk[Byte]]] = F.delay { readTasks.set(F.unit) diff --git a/io/native/src/main/scala/fs2/io/net/tls/s2nutil.scala b/io/native/src/main/scala/fs2/io/net/tls/s2nutil.scala index 66590f5d3b..c52eeca612 100644 --- a/io/native/src/main/scala/fs2/io/net/tls/s2nutil.scala +++ b/io/native/src/main/scala/fs2/io/net/tls/s2nutil.scala @@ -44,7 +44,7 @@ private[tls] object s2nutil { throw new S2nException(error) } - @alwaysinline def guard[A](thunk: => CInt): CInt = { + @alwaysinline def guard(thunk: => CInt): CInt = { val rtn = thunk if (rtn < 0) { val error = !s2n_errno_location() diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala new file mode 100644 index 0000000000..98ef205543 --- /dev/null +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package net +package unixsocket + +import cats.effect.IO +import cats.effect.LiftIO +import cats.effect.kernel.Async +import cats.effect.kernel.Resource +import cats.syntax.all._ +import fs2.io.file.Files +import fs2.io.file.Path +import fs2.io.internal.NativeUtil._ +import fs2.io.internal.SocketHelpers +import fs2.io.internal.syssocket._ +import fs2.io.internal.sysun._ +import fs2.io.internal.sysunOps._ + +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.string._ +import scala.scalanative.posix.sys.socket.{bind => _, connect => _, accept => _, _} +import scala.scalanative.posix.unistd._ +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[F]) + extends UnixSockets[F] { + + def client(address: UnixSocketAddress): Resource[F, Socket[F]] = for { + poller <- Resource.eval(fileDescriptorPoller[F]) + fd <- SocketHelpers.openNonBlocking(AF_UNIX, SOCK_STREAM) + handle <- poller.registerFileDescriptor(fd, true, true).mapK(LiftIO.liftK) + _ <- Resource.eval { + handle + .pollWriteRec(false) { connected => + if (connected) SocketHelpers.checkSocketError[IO](fd).as(Either.unit) + else + IO { + toSockaddrUn(address.path) { addr => + if (guard(connect(fd, addr, sizeof[sockaddr_un].toUInt)) < 0) + Left(true) // we will be connected when unblocked + else + Either.unit[Boolean] + } + } + } + .to + } + socket <- FdPollingSocket[F](fd, handle, raiseIpAddressError, raiseIpAddressError) + } yield socket + + def server( + address: UnixSocketAddress, + deleteIfExists: Boolean, + deleteOnClose: Boolean + ): Stream[F, Socket[F]] = for { + poller <- Stream.eval(fileDescriptorPoller[F]) + + _ <- Stream.bracket(Files[F].deleteIfExists(Path(address.path)).whenA(deleteIfExists)) { _ => + Files[F].deleteIfExists(Path(address.path)).whenA(deleteOnClose) + } + + fd <- Stream.resource(SocketHelpers.openNonBlocking(AF_UNIX, SOCK_STREAM)) + handle <- Stream.resource(poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK)) + + _ <- Stream.eval { + F.delay { + toSockaddrUn(address.path)(addr => guard_(bind(fd, addr, sizeof[sockaddr_un].toUInt))) + } *> F.delay(guard_(listen(fd, 0))) + } + + socket <- Stream + .resource { + val accepted = for { + fd <- Resource.makeFull[F, Int] { poll => + poll { + handle + .pollReadRec(()) { _ => + IO { + val clientFd = + if (LinktimeInfo.isLinux) + guard(accept4(fd, null, null, SOCK_NONBLOCK)) + else + guard(accept(fd, null, null)) + + if (clientFd >= 0) + Right(clientFd) + else + Left(()) + } + } + .to + } + }(fd => F.delay(guard_(close(fd)))) + _ <- + if (!LinktimeInfo.isLinux) + Resource.eval(setNonBlocking(fd)) + else Resource.unit[F] + handle <- poller.registerFileDescriptor(fd, true, true).mapK(LiftIO.liftK) + socket <- FdPollingSocket[F](fd, handle, raiseIpAddressError, raiseIpAddressError) + } yield socket + + accepted.attempt + .map(_.toOption) + } + .repeat + .unNone + + } yield socket + + private def toSockaddrUn[A](path: String)(f: Ptr[sockaddr] => A): A = { + val pathBytes = path.getBytes + if (pathBytes.length > 107) + throw new IllegalArgumentException(s"Path too long: $path") + + val addr = stackalloc[sockaddr_un]() + addr.sun_family = AF_UNIX.toUShort + memcpy(addr.sun_path.at(0), pathBytes.atUnsafe(0), pathBytes.length.toULong) + + f(addr.asInstanceOf[Ptr[sockaddr]]) + } + + private def raiseIpAddressError[A]: F[A] = + F.raiseError(new UnsupportedOperationException("UnixSockets do not use IP addressing")) + +} diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala index e5599efece..d35bd8014e 100644 --- a/io/native/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala @@ -21,4 +21,10 @@ package fs2.io.net.unixsocket -private[unixsocket] trait UnixSocketsCompanionPlatform +import cats.effect.LiftIO +import cats.effect.kernel.Async + +private[unixsocket] trait UnixSocketsCompanionPlatform { + implicit def forLiftIO[F[_]: Async: LiftIO]: UnixSockets[F] = + new FdPollingUnixSockets[F] +} diff --git a/io/native/src/test/scala/fs2/io/net/tls/TLSSuite.scala b/io/native/src/test/scala/fs2/io/net/tls/TLSSuite.scala index 4fb261bb27..fce760199e 100644 --- a/io/native/src/test/scala/fs2/io/net/tls/TLSSuite.scala +++ b/io/native/src/test/scala/fs2/io/net/tls/TLSSuite.scala @@ -30,7 +30,7 @@ import fs2.io.file.Files import fs2.io.file.Path import scodec.bits.ByteVector -abstract class TLSSuite extends Fs2IoSuite { +abstract class TLSSuite extends Fs2Suite { def testTlsContext: Resource[IO, TLSContext[IO]] = for { cert <- Resource.eval { Files[IO].readAll(Path("io/shared/src/test/resources/cert.pem")).compile.to(ByteVector) diff --git a/io/native/src/test/scala/fs2/io/Fs2IoSuite.scala b/io/native/src/test/scala/fs2/io/net/unixsockets/UnixSocketsSuitePlatform.scala similarity index 87% rename from io/native/src/test/scala/fs2/io/Fs2IoSuite.scala rename to io/native/src/test/scala/fs2/io/net/unixsockets/UnixSocketsSuitePlatform.scala index 40512de1ab..fa9ecc98b9 100644 --- a/io/native/src/test/scala/fs2/io/Fs2IoSuite.scala +++ b/io/native/src/test/scala/fs2/io/net/unixsockets/UnixSocketsSuitePlatform.scala @@ -20,10 +20,10 @@ */ package fs2 -package io +package io.net.unixsocket -import epollcat.unsafe.EpollRuntime +import cats.effect.IO -abstract class Fs2IoSuite extends Fs2Suite { - override def munitIORuntime = EpollRuntime.global +trait UnixSocketsSuitePlatform { self: UnixSocketsSuite => + testProvider("native")(UnixSockets.forLiftIO[IO]) } diff --git a/io/shared/src/test/scala/fs2/io/IoSuite.scala b/io/shared/src/test/scala/fs2/io/IoSuite.scala index 387d32adad..23ffd78022 100644 --- a/io/shared/src/test/scala/fs2/io/IoSuite.scala +++ b/io/shared/src/test/scala/fs2/io/IoSuite.scala @@ -28,7 +28,7 @@ import org.scalacheck.effect.PropF.forAllF import java.io.ByteArrayInputStream import java.io.InputStream -class IoSuite extends io.Fs2IoSuite { +class IoSuite extends Fs2Suite { group("readInputStream") { test("non-buffered") { forAllF { (bytes: Array[Byte], chunkSize0: Int) => diff --git a/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala b/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala index 57014f76d6..ed7e1503f3 100644 --- a/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala +++ b/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala @@ -29,7 +29,7 @@ import cats.syntax.all._ import scala.concurrent.duration._ -class FilesSuite extends Fs2IoSuite with BaseFileSuite { +class FilesSuite extends Fs2Suite with BaseFileSuite { group("readAll") { test("retrieves whole content of a file") { diff --git a/io/shared/src/test/scala/fs2/io/file/PathSuite.scala b/io/shared/src/test/scala/fs2/io/file/PathSuite.scala index 29fc99b300..81279c41ef 100644 --- a/io/shared/src/test/scala/fs2/io/file/PathSuite.scala +++ b/io/shared/src/test/scala/fs2/io/file/PathSuite.scala @@ -31,7 +31,7 @@ import org.scalacheck.Cogen import org.scalacheck.Gen import org.scalacheck.Prop.forAll -class PathSuite extends Fs2IoSuite { +class PathSuite extends Fs2Suite { override def scalaCheckTestParameters = super.scalaCheckTestParameters diff --git a/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala b/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala index a91829ce20..ee6dde5033 100644 --- a/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala +++ b/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala @@ -23,7 +23,7 @@ package fs2 package io package file -class PosixPermissionsSuite extends Fs2IoSuite { +class PosixPermissionsSuite extends Fs2Suite { test("construction") { val cases = Seq( "777" -> "rwxrwxrwx", diff --git a/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala b/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala index a8693980e2..ed61d7bf22 100644 --- a/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala +++ b/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala @@ -25,13 +25,12 @@ package net package tcp import cats.effect.IO -import cats.syntax.all._ import com.comcast.ip4s._ import scala.concurrent.duration._ import scala.concurrent.TimeoutException -class SocketSuite extends Fs2IoSuite with SocketSuitePlatform { +class SocketSuite extends Fs2Suite with SocketSuitePlatform { val timeout = 30.seconds @@ -162,30 +161,35 @@ class SocketSuite extends Fs2IoSuite with SocketSuitePlatform { } test("errors - should be captured in the effect") { - (for { + val connectionRefused = for { port <- Network[IO].serverResource(Some(ip"127.0.0.1")).use(s => IO.pure(s._1.port)) _ <- Network[IO] .client(SocketAddress(host"localhost", port)) .use_ - .recover { case ex: ConnectException => - assertEquals(ex.getMessage, "Connection refused") - } - } yield ()) >> (for { - bindAddress <- Network[IO].serverResource(Some(ip"127.0.0.1")).map(_._1) - _ <- Network[IO] - .serverResource(Some(bindAddress.host), Some(bindAddress.port)) - .void - .recover { case ex: BindException => - assertEquals(ex.getMessage, "Address already in use") - } - } yield ()).use_ >> (for { - _ <- Network[IO].client(SocketAddress.fromString("not.example.com:80").get).use_.recover { - case ex: UnknownHostException => + .interceptMessage[ConnectException]("Connection refused") + } yield () + + val addressAlreadyInUse = + Network[IO].serverResource(Some(ip"127.0.0.1")).map(_._1).use { bindAddress => + Network[IO] + .serverResource(Some(bindAddress.host), Some(bindAddress.port)) + .use_ + .interceptMessage[BindException]("Address already in use") + } + + val unknownHost = Network[IO] + .client(SocketAddress.fromString("not.example.com:80").get) + .use_ + .attempt + .map { + case Left(ex: UnknownHostException) => assert( ex.getMessage == "not.example.com: Name or service not known" || ex.getMessage == "not.example.com: nodename nor servname provided, or not known" ) + case _ => assert(false) } - } yield ()) + + connectionRefused *> addressAlreadyInUse *> unknownHost } test("options - should work with socket options") { @@ -221,7 +225,7 @@ class SocketSuite extends Fs2IoSuite with SocketSuitePlatform { } } - test("read after timed out read not allowed on JVM or Native") { + test("read after timed out read not allowed on JVM or Native".ignore) { val setup = for { serverSetup <- Network[IO].serverResource(Some(ip"127.0.0.1")) (bindAddress, server) = serverSetup @@ -242,7 +246,7 @@ class SocketSuite extends Fs2IoSuite with SocketSuitePlatform { client .readN(msg.size) .flatMap { c => - if (isJVM || isNative) { + if (isJVM) { assertEquals(c.size, 0) // Read again now that the pending read is no longer pending client.readN(msg.size).map(c => assertEquals(c.size, 0)) @@ -266,5 +270,31 @@ class SocketSuite extends Fs2IoSuite with SocketSuitePlatform { } } } + + test("accepted socket closes timely") { + Network[IO].serverResource().use { case (bindAddress, clients) => + clients.foreach(_ => IO.sleep(1.second)).compile.drain.background.surround { + Network[IO].client(bindAddress).use { client => + client.read(1).assertEquals(None) + } + } + } + } + + test("endOfOutput / endOfInput ignores ENOTCONN") { + Network[IO].serverResource().use { case (bindAddress, clients) => + Network[IO].client(bindAddress).surround(IO.sleep(100.millis)).background.surround { + clients + .take(1) + .foreach { socket => + socket.write(Chunk.array("fs2.rocks".getBytes)) *> + IO.sleep(1.second) *> + socket.endOfOutput *> socket.endOfInput + } + .compile + .drain + } + } + } } } diff --git a/io/js-jvm/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala b/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala similarity index 100% rename from io/js-jvm/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala rename to io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala diff --git a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala index 6bd25a2ba7..118aa0edd1 100644 --- a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala +++ b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala @@ -29,7 +29,7 @@ import cats.syntax.all._ import scala.concurrent.duration._ -class ProcessSuite extends Fs2IoSuite { +class ProcessSuite extends Fs2Suite { test("echo") { ProcessBuilder("echo", "hello", "world").spawn[IO].use { p =>