Skip to content

Commit

Permalink
PollerProvider -> PollingContext
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Oct 27, 2024
1 parent a2fd0cb commit 26ef827
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ abstract class PollingSystem {
/**
* Creates a new instance of the user-facing interface.
*/
def makeApi(provider: PollerProvider[Poller]): Api
def makeApi(ctx: PollingContext[Poller]): Api

/**
* Creates a new instance of the thread-local data structure used for polling.
Expand Down Expand Up @@ -104,7 +104,7 @@ abstract class PollingSystem {

}

trait PollerProvider[P] {
trait PollingContext[P] {

/**
* Register a callback to obtain a thread-local `Poller`
Expand Down
10 changes: 5 additions & 5 deletions core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ final class SelectorSystem private (selectorProvider: SelectorProvider) extends

def close(): Unit = ()

def makeApi(provider: PollerProvider[Poller]): Selector =
new SelectorImpl(provider, selectorProvider)
def makeApi(ctx: PollingContext[Poller]): Selector =
new SelectorImpl(ctx, selectorProvider)

def makePoller(): Poller = new Poller(selectorProvider.openSelector())

Expand Down Expand Up @@ -100,13 +100,13 @@ final class SelectorSystem private (selectorProvider: SelectorProvider) extends
}

final class SelectorImpl private[SelectorSystem] (
pollerProvider: PollerProvider[Poller],
ctx: PollingContext[Poller],
val provider: SelectorProvider
) extends Selector {

def select(ch: SelectableChannel, ops: Int): IO[Int] = IO.async { selectCb =>
IO.async_[Option[IO[Unit]]] { cb =>
pollerProvider.accessPoller { poller =>
ctx.accessPoller { poller =>
try {
val selector = poller.selector
val key = ch.keyFor(selector)
Expand All @@ -123,7 +123,7 @@ final class SelectorSystem private (selectorProvider: SelectorProvider) extends
}

val cancel = IO {
if (pollerProvider.ownPoller(poller))
if (ctx.ownPoller(poller))
node.remove()
else
node.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object SleepSystem extends PollingSystem {

def close(): Unit = ()

def makeApi(provider: PollerProvider[Poller]): Api = this
def makeApi(ctx: PollingContext[Poller]): Api = this

def makePoller(): Poller = this

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
reportFailure0: Throwable => Unit
) extends ExecutionContextExecutor
with Scheduler
with PollerProvider[P] {
with PollingContext[P] {

import TracingConstants._
import WorkStealingThreadPoolConstants._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ object EpollSystem extends PollingSystem {

def close(): Unit = ()

def makeApi(provider: PollerProvider[Poller]): Api =
new FileDescriptorPollerImpl(provider)
def makeApi(ctx: PollingContext[Poller]): Api =
new FileDescriptorPollerImpl(ctx)

def makePoller(): Poller = {
val fd = epoll_create1(0)
Expand All @@ -67,7 +67,7 @@ object EpollSystem extends PollingSystem {
def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()

private final class FileDescriptorPollerImpl private[EpollSystem] (
provider: PollerProvider[Poller])
ctx: PollingContext[Poller])
extends FileDescriptorPoller {

def registerFileDescriptor(
Expand All @@ -78,7 +78,7 @@ object EpollSystem extends PollingSystem {
Resource {
(Mutex[IO], Mutex[IO]).flatMapN { (readMutex, writeMutex) =>
IO.async_[(PollHandle, IO[Unit])] { cb =>
provider.accessPoller { epoll =>
ctx.accessPoller { epoll =>
val handle = new PollHandle(readMutex, writeMutex)
epoll.register(fd, reads, writes, handle, cb)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
val loop = new EventLoopExecutorScheduler[system.Poller](64, system)
val poller = loop.poller
val api = system.makeApi(
new PollerProvider[system.Poller] {
new PollingContext[system.Poller] {
def accessPoller(cb: system.Poller => Unit) = cb(poller)
def ownPoller(poller: system.Poller) = true
}
Expand Down
14 changes: 7 additions & 7 deletions core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ object KqueueSystem extends PollingSystem {

def close(): Unit = ()

def makeApi(provider: PollerProvider[Poller]): FileDescriptorPoller =
new FileDescriptorPollerImpl(provider)
def makeApi(ctx: PollingContext[Poller]): FileDescriptorPoller =
new FileDescriptorPollerImpl(ctx)

def makePoller(): Poller = {
val fd = kqueue()
Expand All @@ -67,7 +67,7 @@ object KqueueSystem extends PollingSystem {
def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()

private final class FileDescriptorPollerImpl private[KqueueSystem] (
provider: PollerProvider[Poller]
ctx: PollingContext[Poller]
) extends FileDescriptorPoller {
def registerFileDescriptor(
fd: Int,
Expand All @@ -76,7 +76,7 @@ object KqueueSystem extends PollingSystem {
): Resource[IO, FileDescriptorPollHandle] =
Resource.eval {
(Mutex[IO], Mutex[IO]).mapN {
new PollHandle(provider, fd, _, _)
new PollHandle(ctx, fd, _, _)
}
}
}
Expand All @@ -86,7 +86,7 @@ object KqueueSystem extends PollingSystem {
(filter.toLong << 32) | ident.toLong

private final class PollHandle(
provider: PollerProvider[Poller],
ctx: PollingContext[Poller],
fd: Int,
readMutex: Mutex[IO],
writeMutex: Mutex[IO]
Expand All @@ -101,7 +101,7 @@ object KqueueSystem extends PollingSystem {
else
IO.async[Unit] { kqcb =>
IO.async_[Option[IO[Unit]]] { cb =>
provider.accessPoller { kqueue =>
ctx.accessPoller { kqueue =>
kqueue.evSet(fd, EVFILT_READ, EV_ADD.toUShort, kqcb)
cb(Right(Some(IO(kqueue.removeCallback(fd, EVFILT_READ)))))
}
Expand All @@ -121,7 +121,7 @@ object KqueueSystem extends PollingSystem {
else
IO.async[Unit] { kqcb =>
IO.async_[Option[IO[Unit]]] { cb =>
provider.accessPoller { kqueue =>
ctx.accessPoller { kqueue =>
kqueue.evSet(fd, EVFILT_WRITE, EV_ADD.toUShort, kqcb)
cb(Right(Some(IO(kqueue.removeCallback(fd, EVFILT_WRITE)))))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ abstract class PollingExecutorScheduler(pollEvery: Int)
type Poller = outer.type
private[this] var needsPoll = true
def close(): Unit = ()
def makeApi(provider: PollerProvider[Poller]): Api = outer
def makeApi(ctx: PollingContext[Poller]): Api = outer
def makePoller(): Poller = outer
def closePoller(poller: Poller): Unit = ()
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object SleepSystem extends PollingSystem {

def close(): Unit = ()

def makeApi(provider: PollerProvider[Poller]): Api = this
def makeApi(ctx: PollingContext[Poller]): Api = this

def makePoller(): Poller = this

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import cats.effect.std.Semaphore
import cats.effect.unsafe.{
IORuntime,
IORuntimeConfig,
PollerProvider,
PollingContext,
PollingSystem,
SleepSystem,
WorkStealingThreadPool
Expand Down Expand Up @@ -514,10 +514,10 @@ trait IOPlatformSpecification extends DetectPlatform { self: BaseSpec with Scala
}
}

def makeApi(provider: PollerProvider[Poller]): DummySystem.Api =
def makeApi(ctx: PollingContext[Poller]): DummySystem.Api =
new DummyPoller {
def poll = IO.async_[Unit] { cb =>
provider.accessPoller { poller =>
ctx.accessPoller { poller =>
poller.getAndUpdate(cb :: _)
()
}
Expand Down

0 comments on commit 26ef827

Please sign in to comment.