Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I/O polling #3240

Open
wants to merge 86 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
65e8dec
Bump to CE 3.5-4f9e57b
armanbilge Dec 17, 2022
073723b
Sketch `FdPollingSocket`
armanbilge Dec 17, 2022
ce77f22
Better error-handling in `ResizableBuffer`
armanbilge Dec 17, 2022
3bc1410
Impl socket close, add native util
armanbilge Dec 17, 2022
862ae58
Tidy unused type param
armanbilge Dec 17, 2022
9e475e8
Add socket address helpers
armanbilge Dec 17, 2022
f93c662
Implement `endOfInput`, `endOfOutput`
armanbilge Dec 17, 2022
06c0fe7
Wip socket reading
armanbilge Dec 17, 2022
a75a5ab
Take address as ctor args
armanbilge Dec 18, 2022
c104cd7
Implement reading
armanbilge Dec 18, 2022
c8e2316
Address warnings
armanbilge Dec 18, 2022
f0fd81d
Bikeshed
armanbilge Dec 18, 2022
86ceb06
Implement writing
armanbilge Dec 18, 2022
3eb0e18
Bump CE snapshot, adopt new fd polling api
armanbilge Dec 21, 2022
8889e05
First attempt at unix sockets
armanbilge Dec 22, 2022
5c62bc2
Cross-compile unixsockets tests
armanbilge Dec 22, 2022
adba432
Workaround another borked method
armanbilge Dec 22, 2022
0aa9260
Bump base version
armanbilge Dec 22, 2022
64d7a76
Implement non-blocking std{in,out,err}
armanbilge Dec 22, 2022
c27e03d
Fix unix socket error handling
armanbilge Dec 22, 2022
ebc0ca5
Simplify unix socket connect
armanbilge Dec 22, 2022
43b0c06
Fix simplified unix socket connect
armanbilge Dec 22, 2022
6e25eab
stackalloc `sockaddr_un` ftw
armanbilge Dec 22, 2022
1cf0db4
Add more socket option helpers
armanbilge Dec 22, 2022
74b80f9
`accept4` is a linux thing
armanbilge Dec 22, 2022
84fc9b1
First attempt at fd polling socket group
armanbilge Dec 22, 2022
1678d52
`raiseSocketError` -> `checkSocketError`
armanbilge Dec 22, 2022
b5b3a04
Expose new polling system `Network`
armanbilge Dec 22, 2022
e682a37
Fix exceptions, tweak test
armanbilge Dec 22, 2022
220f6bc
Add forgotten `guard`s
armanbilge Dec 22, 2022
ef299db
Workaround BSD `sa_family` quirk
armanbilge Dec 22, 2022
f6ecd2b
Unused import
armanbilge Dec 22, 2022
830564a
Fixing+debugging
armanbilge Dec 22, 2022
0ece5fc
Revert "Workaround BSD `sa_family` quirk"
armanbilge Dec 22, 2022
26a4e4f
Explicitly track if ipv4/ipv6 socket
armanbilge Dec 23, 2022
b0f71fe
Fix Scala 3 compile
armanbilge Dec 23, 2022
8c21312
Implement `SelectorPollingSocket`
armanbilge Dec 26, 2022
283eda4
Implement `SelectorPollingSocketGroup`
armanbilge Dec 26, 2022
f66cd37
Expose polling-based `Network`
armanbilge Dec 26, 2022
c4e0046
Coalesce `evalTap` / `evalMap`
armanbilge Dec 26, 2022
2b40f46
Fix accept cancelation
armanbilge Dec 26, 2022
ab235db
Fix `remoteAddress`
armanbilge Dec 26, 2022
1dbbdca
Ignore invalid test
armanbilge Dec 26, 2022
a02a2ed
Bump ce
armanbilge Dec 28, 2022
ac5c657
Bump base version
armanbilge Dec 28, 2022
93f8128
Merge remote-tracking branch 'upstream/main' into feature/jvm-polling…
armanbilge Apr 21, 2023
e1a3444
Bump CE snapshot
armanbilge Apr 27, 2023
51345a9
Bump CE snapshot
armanbilge Apr 28, 2023
e96c4e9
Merge branch 'feature/jvm-polling-system' into feature/polling
armanbilge Jun 14, 2023
2e815d6
Merge branch 'feature/native-polling-system' into feature/polling
armanbilge Jun 14, 2023
33d9f0c
Update to latest CE snapshot
armanbilge Jun 14, 2023
eb52f5e
Optimizations
armanbilge Jun 14, 2023
897ce2f
Set `SO_REUSEADDR=true` by default
armanbilge Jun 14, 2023
ab663db
Set socket options on accepted sockets
armanbilge Jun 14, 2023
5385c29
Fix method name
armanbilge Jun 14, 2023
97b3cdf
Remove flaky test
armanbilge Jun 14, 2023
1580d81
Fix Scala 3 compile
armanbilge Jun 14, 2023
895ea67
Fix connect error handling
armanbilge Jun 15, 2023
75a2246
Fix test
armanbilge Jun 15, 2023
3d9ccbb
Use Cirrus for testing ARM and macOS
armanbilge Jun 15, 2023
1400219
Fix ci task name
armanbilge Jun 15, 2023
02797f7
Fix Cirrus Dockerfile
armanbilge Jun 15, 2023
5072dd3
Install cmake in Dockerfile
armanbilge Jun 15, 2023
e358dce
Remove stray debug println
armanbilge Jun 15, 2023
1af22dd
Fix socket close leak
armanbilge Jun 15, 2023
21d7c00
Poke ci
armanbilge Jun 15, 2023
caed90f
Use custom docker image
armanbilge Jun 19, 2023
f0ef5da
Fix accept loop
armanbilge Jun 19, 2023
4b17c53
Install git in docker
armanbilge Jun 19, 2023
2a6cc31
Revert "Use custom docker image"
armanbilge Jun 22, 2023
21a9bfe
Install zlib
armanbilge Jun 22, 2023
80804cc
Install Node.js
armanbilge Jun 22, 2023
40241c8
Remove epollcat dep
armanbilge Jun 29, 2023
9f78bdd
Merge branch 'main' into feature/polling
armanbilge Aug 25, 2023
441eaa0
Ignore `ENOTCONN` on socket shutdown
armanbilge Aug 25, 2023
e4ffdd4
Merge remote-tracking branch 'upstream/main' into feature/polling
armanbilge Sep 5, 2023
500e545
Use `atUnsafe`
armanbilge Sep 5, 2023
1e8d405
Add nowarn
armanbilge Sep 5, 2023
30afaf4
Merge remote-tracking branch 'upstream/main' into feature/polling
armanbilge Sep 15, 2023
4b5f50b
Bump base version
armanbilge Sep 15, 2023
7515c87
Merge remote-tracking branch 'upstream/main' into feature/polling
armanbilge Oct 3, 2023
4ae0e16
Merge remote-tracking branch 'upstream/main' into feature/polling
armanbilge Jan 8, 2024
365636d
Bump CE
armanbilge Jan 8, 2024
17767c3
Merge branch 'main' into feature/polling
armanbilge Dec 27, 2024
518efae
No more Fs2IoSuite
armanbilge Dec 27, 2024
1a076cd
Delete .cirrus/Dockerfile
armanbilge Dec 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
libraryDependencies ++= Seq(
"org.scodec" %%% "scodec-bits" % "1.1.38",
"org.typelevel" %%% "cats-core" % "2.11.0",
"org.typelevel" %%% "cats-effect" % "3.5.7",
"org.typelevel" %%% "cats-effect-laws" % "3.5.7" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.5.7" % Test,
"org.typelevel" %%% "cats-effect" % "3.6.0-RC1",
"org.typelevel" %%% "cats-effect-laws" % "3.6.0-RC1" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.6.0-RC1" % Test,
"org.typelevel" %%% "cats-laws" % "2.11.0" % Test,
"org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0" % Test,
Expand Down Expand Up @@ -370,9 +370,6 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform)
.nativeEnablePlugins(ScalaNativeBrewedConfigPlugin)
.nativeSettings(commonNativeSettings)
.nativeSettings(
libraryDependencies ++= Seq(
"com.armanbilge" %%% "epollcat" % "0.1.6" % Test
),
Test / nativeBrewFormulas += "s2n",
Test / envVars ++= Map("S2N_DONT_MLOCK" -> "1")
)
Expand Down
22 changes: 0 additions & 22 deletions io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,15 @@
package fs2
package io

import cats._
import cats.effect.kernel.Sync
import cats.syntax.all._

import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import scala.reflect.ClassTag

private[fs2] trait iojvmnative {
type InterruptedIOException = java.io.InterruptedIOException
type ClosedChannelException = java.nio.channels.ClosedChannelException

//
// STDIN/STDOUT Helpers

/** Pipe of bytes that writes emitted values to standard output asynchronously. */
def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] =
writeOutputStream(Sync[F].blocking(System.out), false)

/** Pipe of bytes that writes emitted values to standard error asynchronously. */
def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] =
writeOutputStream(Sync[F].blocking(System.err), false)

/** Writes this stream to standard output asynchronously, converting each element to
* a sequence of bytes via `Show` and the given `Charset`.
*/
def stdoutLines[F[_]: Sync, O: Show](
charset: Charset = StandardCharsets.UTF_8
): Pipe[F, O, Nothing] =
_.map(_.show).through(text.encode(charset)).through(stdout)

/** Stream of bytes read asynchronously from the specified resource relative to the class `C`.
* @see [[readClassLoaderResource]] for a resource relative to a classloader.
*/
Expand Down
19 changes: 19 additions & 0 deletions io/jvm/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package fs2
package io

import cats.Show
import cats.effect.kernel.{Async, Outcome, Resource, Sync}
import cats.effect.kernel.implicits._
import cats.effect.kernel.Deferred
Expand All @@ -30,6 +31,8 @@ import fs2.internal.ThreadFactories
import fs2.io.internal.PipedStreamBuffer

import java.io.{InputStream, OutputStream}
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import java.util.concurrent.Executors

private[fs2] trait ioplatform extends iojvmnative {
Expand Down Expand Up @@ -71,6 +74,22 @@ private[fs2] trait ioplatform extends iojvmnative {
def stdinUtf8[F[_]](bufSize: Int, F: Sync[F]): Stream[F, String] =
stdin(bufSize, F).through(text.utf8.decode)

/** Pipe of bytes that writes emitted values to standard output asynchronously. */
def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] =
writeOutputStream(Sync[F].blocking(System.out), false)

/** Pipe of bytes that writes emitted values to standard error asynchronously. */
def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] =
writeOutputStream(Sync[F].blocking(System.err), false)

/** Writes this stream to standard output asynchronously, converting each element to
* a sequence of bytes via `Show` and the given `Charset`.
*/
def stdoutLines[F[_]: Sync, O: Show](
charset: Charset = StandardCharsets.UTF_8
): Pipe[F, O, Nothing] =
_.map(_.show).through(text.encode(charset)).through(stdout)

/** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`,
* that is closed whenever the resulting stream terminates.
*
Expand Down
61 changes: 57 additions & 4 deletions io/jvm/src/main/scala/fs2/io/net/NetworkPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package net

import cats.effect.IO
import cats.effect.LiftIO
import cats.effect.Selector
import cats.effect.kernel.{Async, Resource}

import com.comcast.ip4s.{Dns, Host, IpAddress, Port, SocketAddress}
Expand Down Expand Up @@ -78,10 +79,62 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N

def forIO: Network[IO] = forLiftIO

implicit def forLiftIO[F[_]: Async: LiftIO]: Network[F] = {
val _ = LiftIO[F]
forAsync
}
implicit def forLiftIO[F[_]: Async: LiftIO]: Network[F] =
new UnsealedNetwork[F] {
private lazy val fallback = forAsync[F]

private def tryGetSelector =
IO.pollers.map(_.collectFirst { case selector: Selector => selector }).to[F]

private implicit def dns: Dns[F] = Dns.forAsync[F]

def socketGroup(threadCount: Int, threadFactory: ThreadFactory): Resource[F, SocketGroup[F]] =
Resource.eval(tryGetSelector).flatMap {
case Some(selector) => Resource.pure(new SelectingSocketGroup[F](selector))
case None => fallback.socketGroup(threadCount, threadFactory)
}

def datagramSocketGroup(threadFactory: ThreadFactory): Resource[F, DatagramSocketGroup[F]] =
fallback.datagramSocketGroup(threadFactory)

def client(
to: SocketAddress[Host],
options: List[SocketOption]
): Resource[F, Socket[F]] = Resource.eval(tryGetSelector).flatMap {
case Some(selector) => new SelectingSocketGroup(selector).client(to, options)
case None => fallback.client(to, options)
}

def server(
address: Option[Host],
port: Option[Port],
options: List[SocketOption]
): Stream[F, Socket[F]] = Stream.eval(tryGetSelector).flatMap {
case Some(selector) => new SelectingSocketGroup(selector).server(address, port, options)
case None => fallback.server(address, port, options)
}

def serverResource(
address: Option[Host],
port: Option[Port],
options: List[SocketOption]
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] =
Resource.eval(tryGetSelector).flatMap {
case Some(selector) =>
new SelectingSocketGroup(selector).serverResource(address, port, options)
case None => fallback.serverResource(address, port, options)
}

def openDatagramSocket(
address: Option[Host],
port: Option[Port],
options: List[SocketOption],
protocolFamily: Option[ProtocolFamily]
): Resource[F, DatagramSocket[F]] =
fallback.openDatagramSocket(address, port, options, protocolFamily)

def tlsContext: TLSContext.Builder[F] = TLSContext.Builder.forAsync[F]
}

def forAsync[F[_]](implicit F: Async[F]): Network[F] =
forAsyncAndDns(F, Dns.forAsync(F))
Expand Down
102 changes: 102 additions & 0 deletions io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package io.net

import cats.effect.LiftIO
import cats.effect.Selector
import cats.effect.kernel.Async
import cats.effect.std.Mutex
import cats.syntax.all._
import com.comcast.ip4s.IpAddress
import com.comcast.ip4s.SocketAddress

import java.nio.ByteBuffer
import java.nio.channels.SelectionKey.OP_READ
import java.nio.channels.SelectionKey.OP_WRITE
import java.nio.channels.SocketChannel

private final class SelectingSocket[F[_]: LiftIO] private (
selector: Selector,
ch: SocketChannel,
readMutex: Mutex[F],
writeMutex: Mutex[F],
val localAddress: F[SocketAddress[IpAddress]],
val remoteAddress: F[SocketAddress[IpAddress]]
)(implicit F: Async[F])
extends Socket.BufferedReads(readMutex) {

protected def readChunk(buf: ByteBuffer): F[Int] =
F.delay(ch.read(buf)).flatMap { readed =>
if (readed == 0) selector.select(ch, OP_READ).to *> readChunk(buf)
else F.pure(readed)
}

def write(bytes: Chunk[Byte]): F[Unit] = {
def go(buf: ByteBuffer): F[Unit] =
F.delay {
ch.write(buf)
buf.remaining()
}.flatMap { remaining =>
if (remaining > 0) {
selector.select(ch, OP_WRITE).to *> go(buf)
} else F.unit
}
writeMutex.lock.surround {
F.delay(bytes.toByteBuffer).flatMap(go)
}
}

def isOpen: F[Boolean] = F.delay(ch.isOpen)

def endOfOutput: F[Unit] =
F.delay {
ch.shutdownOutput(); ()
}

def endOfInput: F[Unit] =
F.delay {
ch.shutdownInput(); ()
}

}

private object SelectingSocket {
def apply[F[_]: LiftIO](
selector: Selector,
ch: SocketChannel,
localAddress: F[SocketAddress[IpAddress]],
remoteAddress: F[SocketAddress[IpAddress]]
)(implicit F: Async[F]): F[Socket[F]] =
(Mutex[F], Mutex[F]).flatMapN { (readMutex, writeMutex) =>
F.delay {
new SelectingSocket[F](
selector,
ch,
readMutex,
writeMutex,
localAddress,
remoteAddress
)
}
}
}
Loading
Loading