From 7c610bbbd5100ae20b8b03a67eae4c06c946247b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Aug 2024 21:20:33 +0000 Subject: [PATCH 1/6] Introduce `PollerProvider` --- .../cats/effect/unsafe/PollingSystem.scala | 22 +++++++++++++------ .../unsafe/IORuntimeCompanionPlatform.scala | 2 +- .../cats/effect/unsafe/SelectorSystem.scala | 14 ++++++------ .../cats/effect/unsafe/SleepSystem.scala | 2 +- .../unsafe/WorkStealingThreadPool.scala | 15 ++++++++++--- .../cats/effect/unsafe/WorkerThread.scala | 5 ++++- .../cats/effect/unsafe/EpollSystem.scala | 8 +++---- .../unsafe/IORuntimeCompanionPlatform.scala | 8 ++++++- .../cats/effect/unsafe/KqueueSystem.scala | 14 ++++++------ .../unsafe/PollingExecutorScheduler.scala | 2 +- .../cats/effect/unsafe/SleepSystem.scala | 2 +- .../cats/effect/IOPlatformSpecification.scala | 5 +++-- 12 files changed, 63 insertions(+), 36 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index cc36ef2b8d..3b3e27ac48 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -50,13 +50,8 @@ abstract class PollingSystem { /** * Creates a new instance of the user-facing interface. - * - * @param access - * callback to obtain a thread-local `Poller`. - * @return - * an instance of the user-facing interface `Api`. */ - def makeApi(access: (Poller => Unit) => Unit): Api + def makeApi(provider: PollerProvider[Poller]): Api /** * Creates a new instance of the thread-local data structure used for polling. @@ -109,7 +104,20 @@ abstract class PollingSystem { } -private object PollingSystem { +trait PollerProvider[P] { + + /** + * Register a callback to obtain a thread-local `Poller` + */ + def accessPoller(cb: P => Unit): Unit + + /** + * Returns `true` if it is safe to interact with this `Poller` + */ + def ownPoller(poller: P): Boolean +} + +object PollingSystem { /** * Type alias for a `PollingSystem` that has a specified `Poller` type. diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/jvm/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index cd321ab187..b72b91af5c 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -152,7 +152,7 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type ( threadPool, - pollingSystem.makeApi(threadPool.accessPoller), + pollingSystem.makeApi(threadPool), { () => unregisterMBeans() threadPool.shutdown() diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala index bbb853b947..28376e7a9a 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala @@ -24,16 +24,16 @@ import java.nio.channels.spi.{AbstractSelector, SelectorProvider} import SelectorSystem._ -final class SelectorSystem private (provider: SelectorProvider) extends PollingSystem { +final class SelectorSystem private (selectorProvider: SelectorProvider) extends PollingSystem { type Api = Selector def close(): Unit = () - def makeApi(access: (Poller => Unit) => Unit): Selector = - new SelectorImpl(access, provider) + def makeApi(provider: PollerProvider[Poller]): Selector = + new SelectorImpl(provider, selectorProvider) - def makePoller(): Poller = new Poller(provider.openSelector()) + def makePoller(): Poller = new Poller(selectorProvider.openSelector()) def closePoller(poller: Poller): Unit = poller.selector.close() @@ -107,15 +107,15 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS } final class SelectorImpl private[SelectorSystem] ( - access: (Poller => Unit) => Unit, + poller: PollerProvider[Poller], val provider: SelectorProvider ) extends Selector { def select(ch: SelectableChannel, ops: Int): IO[Int] = IO.async { selectCb => IO.async_[CallbackNode] { cb => - access { data => + poller.accessPoller { poller => try { - val selector = data.selector + val selector = poller.selector val key = ch.keyFor(selector) val node = if (key eq null) { // not yet registered on this selector diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala index 46ffa909e3..5ca8aae87f 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -26,7 +26,7 @@ object SleepSystem extends PollingSystem { def close(): Unit = () - def makeApi(access: (Poller => Unit) => Unit): Api = this + def makeApi(provider: PollerProvider[Poller]): Api = this def makePoller(): Poller = this diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 38ead18cb2..ea182f78f5 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -61,7 +61,7 @@ import WorkStealingThreadPool._ * contention. Work stealing is tried using a linear search starting from a random worker thread * index. */ -private[effect] final class WorkStealingThreadPool[P]( +private[effect] final class WorkStealingThreadPool[P <: AnyRef]( threadCount: Int, // number of worker threads private[unsafe] val threadPrefix: String, // prefix for the name of worker threads private[unsafe] val blockerThreadPrefix: String, // prefix for the name of worker threads currently in a blocking region @@ -71,7 +71,8 @@ private[effect] final class WorkStealingThreadPool[P]( system: PollingSystem.WithPoller[P], reportFailure0: Throwable => Unit ) extends ExecutionContextExecutor - with Scheduler { + with Scheduler + with PollerProvider[P] { import TracingConstants._ import WorkStealingThreadPoolConstants._ @@ -87,7 +88,7 @@ private[effect] final class WorkStealingThreadPool[P]( private[unsafe] val pollers: Array[P] = new Array[AnyRef](threadCount).asInstanceOf[Array[P]] - private[unsafe] def accessPoller(cb: P => Unit): Unit = { + def accessPoller(cb: P => Unit): Unit = { // figure out where we are val thread = Thread.currentThread() @@ -101,6 +102,14 @@ private[effect] final class WorkStealingThreadPool[P]( } else scheduleExternal(() => accessPoller(cb)) } + def ownPoller(poller: P): Boolean = { + val thread = Thread.currentThread() + if (thread.isInstanceOf[WorkerThread[_]]) { + val worker = thread.asInstanceOf[WorkerThread[P]] + worker.ownsPoller(poller) + } else false + } + /** * Atomic variable for used for publishing changes to the references in the `workerThreads` * array. Worker threads can be changed whenever blocking code is encountered on the pool. diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index c42d71c97f..a45c1babcc 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean * system when compared to a fixed size thread pool whose worker threads all draw tasks from a * single global work queue. */ -private final class WorkerThread[P]( +private final class WorkerThread[P <: AnyRef]( idx: Int, // Local queue instance with exclusive write access. private[this] var queue: LocalQueue, @@ -291,6 +291,9 @@ private final class WorkerThread[P]( foreign.toMap } + private[unsafe] def ownsPoller(poller: P): Boolean = + poller eq _poller + private[unsafe] def ownsTimers(timers: TimerHeap): Boolean = sleepers eq timers diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index 9874edb58e..b32660aabf 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -47,8 +47,8 @@ object EpollSystem extends PollingSystem { def close(): Unit = () - def makeApi(access: (Poller => Unit) => Unit): Api = - new FileDescriptorPollerImpl(access) + def makeApi(provider: PollerProvider[Poller]): Api = + new FileDescriptorPollerImpl(provider) def makePoller(): Poller = { val fd = epoll_create1(0) @@ -67,7 +67,7 @@ object EpollSystem extends PollingSystem { def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () private final class FileDescriptorPollerImpl private[EpollSystem] ( - access: (Poller => Unit) => Unit) + provider: PollerProvider[Poller]) extends FileDescriptorPoller { def registerFileDescriptor( @@ -78,7 +78,7 @@ object EpollSystem extends PollingSystem { Resource { (Mutex[IO], Mutex[IO]).flatMapN { (readMutex, writeMutex) => IO.async_[(PollHandle, IO[Unit])] { cb => - access { epoll => + provider.accessPoller { epoll => val handle = new PollHandle(readMutex, writeMutex) epoll.register(fd, reads, writes, handle, cb) } diff --git a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index 075fed2b7e..d3f6a78fbf 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -30,7 +30,13 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type ): (ExecutionContext with Scheduler, system.Api, () => Unit) = { val loop = new EventLoopExecutorScheduler[system.Poller](64, system) val poller = loop.poller - (loop, system.makeApi(cb => cb(poller)), () => loop.shutdown()) + val api = system.makeApi( + new PollerProvider[system.Poller] { + def accessPoller(cb: system.Poller => Unit) = cb(poller) + def ownPoller(poller: system.Poller) = true + } + ) + (loop, api, () => loop.shutdown()) } def createDefaultPollingSystem(): PollingSystem = diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index 3a26a4eb6d..028bca1540 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -46,8 +46,8 @@ object KqueueSystem extends PollingSystem { def close(): Unit = () - def makeApi(access: (Poller => Unit) => Unit): FileDescriptorPoller = - new FileDescriptorPollerImpl(access) + def makeApi(provider: PollerProvider[Poller]): FileDescriptorPoller = + new FileDescriptorPollerImpl(provider) def makePoller(): Poller = { val fd = kqueue() @@ -67,7 +67,7 @@ object KqueueSystem extends PollingSystem { def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () private final class FileDescriptorPollerImpl private[KqueueSystem] ( - access: (Poller => Unit) => Unit + provider: PollerProvider[Poller] ) extends FileDescriptorPoller { def registerFileDescriptor( fd: Int, @@ -76,7 +76,7 @@ object KqueueSystem extends PollingSystem { ): Resource[IO, FileDescriptorPollHandle] = Resource.eval { (Mutex[IO], Mutex[IO]).mapN { - new PollHandle(access, fd, _, _) + new PollHandle(provider, fd, _, _) } } } @@ -86,7 +86,7 @@ object KqueueSystem extends PollingSystem { (filter.toLong << 32) | ident.toLong private final class PollHandle( - access: (Poller => Unit) => Unit, + provider: PollerProvider[Poller], fd: Int, readMutex: Mutex[IO], writeMutex: Mutex[IO] @@ -101,7 +101,7 @@ object KqueueSystem extends PollingSystem { else IO.async[Unit] { kqcb => IO.async_[Option[IO[Unit]]] { cb => - access { kqueue => + provider.accessPoller { kqueue => kqueue.evSet(fd, EVFILT_READ, EV_ADD.toUShort, kqcb) cb(Right(Some(IO(kqueue.removeCallback(fd, EVFILT_READ))))) } @@ -121,7 +121,7 @@ object KqueueSystem extends PollingSystem { else IO.async[Unit] { kqcb => IO.async_[Option[IO[Unit]]] { cb => - access { kqueue => + provider.accessPoller { kqueue => kqueue.evSet(fd, EVFILT_WRITE, EV_ADD.toUShort, kqcb) cb(Right(Some(IO(kqueue.removeCallback(fd, EVFILT_WRITE))))) } diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala index c37a16677f..617d876a8b 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala @@ -32,7 +32,7 @@ abstract class PollingExecutorScheduler(pollEvery: Int) type Poller = outer.type private[this] var needsPoll = true def close(): Unit = () - def makeApi(access: (Poller => Unit) => Unit): Api = outer + def makeApi(provider: PollerProvider[Poller]): Api = outer def makePoller(): Poller = outer def closePoller(poller: Poller): Unit = () def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = { diff --git a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index cea4bca406..c1bf404c94 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -24,7 +24,7 @@ object SleepSystem extends PollingSystem { def close(): Unit = () - def makeApi(access: (Poller => Unit) => Unit): Api = this + def makeApi(provider: PollerProvider[Poller]): Api = this def makePoller(): Poller = this diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala index 63ad3e78ca..1603ff0b52 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala @@ -20,6 +20,7 @@ import cats.effect.std.Semaphore import cats.effect.unsafe.{ IORuntime, IORuntimeConfig, + PollerProvider, PollingSystem, SleepSystem, WorkStealingThreadPool @@ -513,10 +514,10 @@ trait IOPlatformSpecification extends DetectPlatform { self: BaseSpec with Scala } } - def makeApi(access: (Poller => Unit) => Unit): DummySystem.Api = + def makeApi(provider: PollerProvider[Poller]): DummySystem.Api = new DummyPoller { def poll = IO.async_[Unit] { cb => - access { poller => + provider.accessPoller { poller => poller.getAndUpdate(cb :: _) () } From 5489d3a9bf861d83a4bd6f951eed0e4364fe8f2a Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 10 Sep 2024 00:59:41 +0000 Subject: [PATCH 2/6] Use `ownPoller` to avoid leaks in `SelectorSystem` --- .../cats/effect/unsafe/SelectorSystem.scala | 113 ++++++++++++------ 1 file changed, 74 insertions(+), 39 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala index 28376e7a9a..c5f5e011ef 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala @@ -21,6 +21,7 @@ import scala.util.control.NonFatal import java.nio.channels.SelectableChannel import java.nio.channels.spi.{AbstractSelector, SelectorProvider} +import java.util.Iterator import SelectorSystem._ @@ -68,29 +69,21 @@ final class SelectorSystem private (selectorProvider: SelectorProvider) extends val value = if (error ne null) Left(error) else Right(readyOps) - var head: CallbackNode = null - var prev: CallbackNode = null - var node = key.attachment().asInstanceOf[CallbackNode] - while (node ne null) { - val next = node.next + val callbacks = key.attachment().asInstanceOf[Callbacks] + val iter = callbacks.iterator() + while (iter.hasNext()) { + val node = iter.next() - if ((node.interest & readyOps) != 0) { // execute callback and drop this node + if ((node.interest & readyOps) != 0) { // drop this node and execute callback + node.remove() val cb = node.callback if (cb != null) { cb(value) polled = true } - if (prev ne null) prev.next = next - } else { // keep this node - prev = node - if (head eq null) - head = node } - - node = next } - key.attach(head) // if key was canceled this will null attachment () } @@ -107,42 +100,38 @@ final class SelectorSystem private (selectorProvider: SelectorProvider) extends } final class SelectorImpl private[SelectorSystem] ( - poller: PollerProvider[Poller], + pollerProvider: PollerProvider[Poller], val provider: SelectorProvider ) extends Selector { def select(ch: SelectableChannel, ops: Int): IO[Int] = IO.async { selectCb => - IO.async_[CallbackNode] { cb => - poller.accessPoller { poller => + IO.async_[Option[IO[Unit]]] { cb => + pollerProvider.accessPoller { poller => try { val selector = poller.selector val key = ch.keyFor(selector) val node = if (key eq null) { // not yet registered on this selector - val node = new CallbackNode(ops, selectCb, null) - ch.register(selector, ops, node) - node + val cbs = new Callbacks + ch.register(selector, ops, cbs) + cbs.append(ops, selectCb) } else { // existing key // mixin the new interest key.interestOps(key.interestOps() | ops) - val node = - new CallbackNode(ops, selectCb, key.attachment().asInstanceOf[CallbackNode]) - key.attach(node) - node + val cbs = key.attachment().asInstanceOf[Callbacks] + cbs.append(ops, selectCb) + } + + val cancel = IO { + if (pollerProvider.ownPoller(poller)) + node.remove() + else + node.clear() } - cb(Right(node)) + cb(Right(Some(cancel))) } catch { case ex if NonFatal(ex) => cb(Left(ex)) } } - }.map { node => - Some { - IO { - // set all interest bits - node.interest = -1 - // clear for gc - node.callback = null - } - } } } @@ -161,9 +150,55 @@ object SelectorSystem { def apply(): SelectorSystem = apply(SelectorProvider.provider()) - private final class CallbackNode( - var interest: Int, - var callback: Either[Throwable, Int] => Unit, - var next: CallbackNode - ) + private final class Callbacks { + + private var head: Node = null + private var last: Node = null + + def append(interest: Int, callback: Either[Throwable, Int] => Unit): Node = { + val node = new Node(interest, callback) + if (last ne null) { + last.next = node + node.prev = last + } else { + head = node + } + last = node + node + } + + def iterator(): Iterator[Node] = new Iterator[Node] { + private var _next = head + + def hasNext() = _next ne null + + def next() = { + val next = _next + _next = next.next + next + } + } + + final class Node( + var interest: Int, + var callback: Either[Throwable, Int] => Unit + ) { + var prev: Node = null + var next: Node = null + + def remove(): Unit = { + if (prev ne null) prev.next = next + else head = next + + if (next ne null) next.prev = prev + else last = prev + } + + def clear(): Unit = { + interest = -1 // set all interest bits + callback = null // clear for gc + } + } + } + } From a2fd0cb0212a04954e365dfeec5a71cb31647410 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 23 Oct 2024 19:56:59 +0000 Subject: [PATCH 3/6] `ownPoller` is best-effort --- .../src/main/scala/cats/effect/unsafe/PollingSystem.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index 3b3e27ac48..5a11d598b2 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -112,7 +112,9 @@ trait PollerProvider[P] { def accessPoller(cb: P => Unit): Unit /** - * Returns `true` if it is safe to interact with this `Poller` + * Returns `true` if it is safe to interact with this `Poller`. Implementors of this method + * may be best-effort: it is always safe to return `false`, so callers must have an adequate + * fallback for the non-owning case. */ def ownPoller(poller: P): Boolean } From 26ef8276d9347d7fd1d169d175d7bfd5a0367830 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 27 Oct 2024 19:44:20 +0000 Subject: [PATCH 4/6] `PollerProvider` -> `PollingContext` --- .../scala/cats/effect/unsafe/PollingSystem.scala | 4 ++-- .../scala/cats/effect/unsafe/SelectorSystem.scala | 10 +++++----- .../scala/cats/effect/unsafe/SleepSystem.scala | 2 +- .../effect/unsafe/WorkStealingThreadPool.scala | 2 +- .../scala/cats/effect/unsafe/EpollSystem.scala | 8 ++++---- .../effect/unsafe/IORuntimeCompanionPlatform.scala | 2 +- .../scala/cats/effect/unsafe/KqueueSystem.scala | 14 +++++++------- .../effect/unsafe/PollingExecutorScheduler.scala | 2 +- .../scala/cats/effect/unsafe/SleepSystem.scala | 2 +- .../cats/effect/IOPlatformSpecification.scala | 6 +++--- 10 files changed, 26 insertions(+), 26 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index 5a11d598b2..17046cd6cd 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -51,7 +51,7 @@ abstract class PollingSystem { /** * Creates a new instance of the user-facing interface. */ - def makeApi(provider: PollerProvider[Poller]): Api + def makeApi(ctx: PollingContext[Poller]): Api /** * Creates a new instance of the thread-local data structure used for polling. @@ -104,7 +104,7 @@ abstract class PollingSystem { } -trait PollerProvider[P] { +trait PollingContext[P] { /** * Register a callback to obtain a thread-local `Poller` diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala index c5f5e011ef..fbf6da0065 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala @@ -31,8 +31,8 @@ final class SelectorSystem private (selectorProvider: SelectorProvider) extends def close(): Unit = () - def makeApi(provider: PollerProvider[Poller]): Selector = - new SelectorImpl(provider, selectorProvider) + def makeApi(ctx: PollingContext[Poller]): Selector = + new SelectorImpl(ctx, selectorProvider) def makePoller(): Poller = new Poller(selectorProvider.openSelector()) @@ -100,13 +100,13 @@ final class SelectorSystem private (selectorProvider: SelectorProvider) extends } final class SelectorImpl private[SelectorSystem] ( - pollerProvider: PollerProvider[Poller], + ctx: PollingContext[Poller], val provider: SelectorProvider ) extends Selector { def select(ch: SelectableChannel, ops: Int): IO[Int] = IO.async { selectCb => IO.async_[Option[IO[Unit]]] { cb => - pollerProvider.accessPoller { poller => + ctx.accessPoller { poller => try { val selector = poller.selector val key = ch.keyFor(selector) @@ -123,7 +123,7 @@ final class SelectorSystem private (selectorProvider: SelectorProvider) extends } val cancel = IO { - if (pollerProvider.ownPoller(poller)) + if (ctx.ownPoller(poller)) node.remove() else node.clear() diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala index 5ca8aae87f..c64c86ee80 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -26,7 +26,7 @@ object SleepSystem extends PollingSystem { def close(): Unit = () - def makeApi(provider: PollerProvider[Poller]): Api = this + def makeApi(ctx: PollingContext[Poller]): Api = this def makePoller(): Poller = this diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index ea182f78f5..d715e51d5e 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -72,7 +72,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( reportFailure0: Throwable => Unit ) extends ExecutionContextExecutor with Scheduler - with PollerProvider[P] { + with PollingContext[P] { import TracingConstants._ import WorkStealingThreadPoolConstants._ diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index b32660aabf..fa7b299337 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -47,8 +47,8 @@ object EpollSystem extends PollingSystem { def close(): Unit = () - def makeApi(provider: PollerProvider[Poller]): Api = - new FileDescriptorPollerImpl(provider) + def makeApi(ctx: PollingContext[Poller]): Api = + new FileDescriptorPollerImpl(ctx) def makePoller(): Poller = { val fd = epoll_create1(0) @@ -67,7 +67,7 @@ object EpollSystem extends PollingSystem { def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () private final class FileDescriptorPollerImpl private[EpollSystem] ( - provider: PollerProvider[Poller]) + ctx: PollingContext[Poller]) extends FileDescriptorPoller { def registerFileDescriptor( @@ -78,7 +78,7 @@ object EpollSystem extends PollingSystem { Resource { (Mutex[IO], Mutex[IO]).flatMapN { (readMutex, writeMutex) => IO.async_[(PollHandle, IO[Unit])] { cb => - provider.accessPoller { epoll => + ctx.accessPoller { epoll => val handle = new PollHandle(readMutex, writeMutex) epoll.register(fd, reads, writes, handle, cb) } diff --git a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index d3f6a78fbf..79f1b823da 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -31,7 +31,7 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type val loop = new EventLoopExecutorScheduler[system.Poller](64, system) val poller = loop.poller val api = system.makeApi( - new PollerProvider[system.Poller] { + new PollingContext[system.Poller] { def accessPoller(cb: system.Poller => Unit) = cb(poller) def ownPoller(poller: system.Poller) = true } diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index 028bca1540..aece3265c2 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -46,8 +46,8 @@ object KqueueSystem extends PollingSystem { def close(): Unit = () - def makeApi(provider: PollerProvider[Poller]): FileDescriptorPoller = - new FileDescriptorPollerImpl(provider) + def makeApi(ctx: PollingContext[Poller]): FileDescriptorPoller = + new FileDescriptorPollerImpl(ctx) def makePoller(): Poller = { val fd = kqueue() @@ -67,7 +67,7 @@ object KqueueSystem extends PollingSystem { def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () private final class FileDescriptorPollerImpl private[KqueueSystem] ( - provider: PollerProvider[Poller] + ctx: PollingContext[Poller] ) extends FileDescriptorPoller { def registerFileDescriptor( fd: Int, @@ -76,7 +76,7 @@ object KqueueSystem extends PollingSystem { ): Resource[IO, FileDescriptorPollHandle] = Resource.eval { (Mutex[IO], Mutex[IO]).mapN { - new PollHandle(provider, fd, _, _) + new PollHandle(ctx, fd, _, _) } } } @@ -86,7 +86,7 @@ object KqueueSystem extends PollingSystem { (filter.toLong << 32) | ident.toLong private final class PollHandle( - provider: PollerProvider[Poller], + ctx: PollingContext[Poller], fd: Int, readMutex: Mutex[IO], writeMutex: Mutex[IO] @@ -101,7 +101,7 @@ object KqueueSystem extends PollingSystem { else IO.async[Unit] { kqcb => IO.async_[Option[IO[Unit]]] { cb => - provider.accessPoller { kqueue => + ctx.accessPoller { kqueue => kqueue.evSet(fd, EVFILT_READ, EV_ADD.toUShort, kqcb) cb(Right(Some(IO(kqueue.removeCallback(fd, EVFILT_READ))))) } @@ -121,7 +121,7 @@ object KqueueSystem extends PollingSystem { else IO.async[Unit] { kqcb => IO.async_[Option[IO[Unit]]] { cb => - provider.accessPoller { kqueue => + ctx.accessPoller { kqueue => kqueue.evSet(fd, EVFILT_WRITE, EV_ADD.toUShort, kqcb) cb(Right(Some(IO(kqueue.removeCallback(fd, EVFILT_WRITE))))) } diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala index 617d876a8b..29abf9f6c4 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala @@ -32,7 +32,7 @@ abstract class PollingExecutorScheduler(pollEvery: Int) type Poller = outer.type private[this] var needsPoll = true def close(): Unit = () - def makeApi(provider: PollerProvider[Poller]): Api = outer + def makeApi(ctx: PollingContext[Poller]): Api = outer def makePoller(): Poller = outer def closePoller(poller: Poller): Unit = () def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = { diff --git a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index c1bf404c94..05ec62ec72 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -24,7 +24,7 @@ object SleepSystem extends PollingSystem { def close(): Unit = () - def makeApi(provider: PollerProvider[Poller]): Api = this + def makeApi(ctx: PollingContext[Poller]): Api = this def makePoller(): Poller = this diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala index 1603ff0b52..8bd2dfc1cd 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala @@ -20,7 +20,7 @@ import cats.effect.std.Semaphore import cats.effect.unsafe.{ IORuntime, IORuntimeConfig, - PollerProvider, + PollingContext, PollingSystem, SleepSystem, WorkStealingThreadPool @@ -514,10 +514,10 @@ trait IOPlatformSpecification extends DetectPlatform { self: BaseSpec with Scala } } - def makeApi(provider: PollerProvider[Poller]): DummySystem.Api = + def makeApi(ctx: PollingContext[Poller]): DummySystem.Api = new DummyPoller { def poll = IO.async_[Unit] { cb => - provider.accessPoller { poller => + ctx.accessPoller { poller => poller.getAndUpdate(cb :: _) () } From c125e73a1e059880e878a18dba17c27708787417 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 27 Oct 2024 19:49:09 +0000 Subject: [PATCH 5/6] seal `PollingContext` --- .../src/main/scala/cats/effect/unsafe/PollingSystem.scala | 4 +++- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 2 +- .../scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index 17046cd6cd..0a74e76403 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -104,7 +104,7 @@ abstract class PollingSystem { } -trait PollingContext[P] { +sealed trait PollingContext[P] { /** * Register a callback to obtain a thread-local `Poller` @@ -119,6 +119,8 @@ trait PollingContext[P] { def ownPoller(poller: P): Boolean } +private[unsafe] trait UnsealedPollingContext[P] extends PollingContext[P] + object PollingSystem { /** diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index d715e51d5e..daa9e0f765 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -72,7 +72,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( reportFailure0: Throwable => Unit ) extends ExecutionContextExecutor with Scheduler - with PollingContext[P] { + with UnsealedPollingContext[P] { import TracingConstants._ import WorkStealingThreadPoolConstants._ diff --git a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index 79f1b823da..abc209d84f 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -31,7 +31,7 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type val loop = new EventLoopExecutorScheduler[system.Poller](64, system) val poller = loop.poller val api = system.makeApi( - new PollingContext[system.Poller] { + new UnsealedPollingContext[system.Poller] { def accessPoller(cb: system.Poller => Unit) = cb(poller) def ownPoller(poller: system.Poller) = true } From cef80fe09f72461281713e0cefe58585a97e2715 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 29 Oct 2024 11:27:15 -0700 Subject: [PATCH 6/6] Revert superfluous changes --- .../src/main/scala/cats/effect/unsafe/SelectorSystem.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala index fbf6da0065..c089e16b7f 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala @@ -25,16 +25,16 @@ import java.util.Iterator import SelectorSystem._ -final class SelectorSystem private (selectorProvider: SelectorProvider) extends PollingSystem { +final class SelectorSystem private (provider: SelectorProvider) extends PollingSystem { type Api = Selector def close(): Unit = () def makeApi(ctx: PollingContext[Poller]): Selector = - new SelectorImpl(ctx, selectorProvider) + new SelectorImpl(ctx, provider) - def makePoller(): Poller = new Poller(selectorProvider.openSelector()) + def makePoller(): Poller = new Poller(provider.openSelector()) def closePoller(poller: Poller): Unit = poller.selector.close()