From 75a0c7955cc06b1166ddb78893d792de1ed7b058 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 23 Apr 2024 19:34:02 +0000 Subject: [PATCH 1/6] Add `PollingSystem#steal` --- .../cats/effect/unsafe/PollingSystem.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index cc36ef2b8d..1587799ec1 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -91,6 +91,23 @@ abstract class PollingSystem { */ def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean + /** + * Makes a best-effort to steal completed I/O events. Not all polling systems support this. + * + * This method is safe to call concurrently from threads that do not own the poller. + * + * @param poller + * the thread-local [[Poller]] used to poll events. + * + * @param reportFailure + * callback that handles any failures that occur during stealing. + * + * @return + * whether any events were stolen. e.g. if the method returned due to timeout, this should + * be `false`. + */ + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean + /** * @return * whether poll should be called again (i.e., there are more events to be polled) From 6685820ea2ed7d596fc106c204a4b4dc457d76dc Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 3 Aug 2024 00:07:37 +0000 Subject: [PATCH 2/6] Implement `SleepSystem.steal` --- core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala index 46ffa909e3..af0cfc1869 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -42,6 +42,8 @@ object SleepSystem extends PollingSystem { false } + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false + def needsPoll(poller: Poller): Boolean = false def interrupt(targetThread: Thread, targetPoller: Poller): Unit = From 729ffe6ce0baa8436f7e4984ee2ee2c665fd40d4 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 5 Aug 2024 19:40:25 +0000 Subject: [PATCH 3/6] Implement `SelectorSystem#steal` --- .../cats/effect/unsafe/SelectorSystem.scala | 60 ++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala index bbb853b947..c7b52d6d68 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala @@ -19,8 +19,9 @@ package unsafe import scala.util.control.NonFatal -import java.nio.channels.SelectableChannel +import java.nio.channels.{ClosedSelectorException, SelectableChannel} import java.nio.channels.spi.{AbstractSelector, SelectorProvider} +import java.util.ConcurrentModificationException import SelectorSystem._ @@ -63,7 +64,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS } catch { case ex if NonFatal(ex) => error = ex - readyOps = -1 // interest all waiters + readyOps = -1 // notify all waiters } val value = if (error ne null) Left(error) else Right(readyOps) @@ -98,6 +99,61 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS } else false } + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = { + val selector = poller.selector + + val keys = + try { + if (selector.isOpen()) { + selector.selectNow() + selector.selectedKeys() + } else return false + } catch { // selector closed concurrently + case _: ClosedSelectorException => return false + } + + var polled = false + val ready = keys.iterator() + + try { + while (ready.hasNext()) { + val key = ready.next() + + var readyOps = 0 + var error: Throwable = null + try { + readyOps = key.readyOps() + } catch { + case ex if NonFatal(ex) => + error = ex + readyOps = -1 // notify all waiters + } + + val value = if (error ne null) Left(error) else Right(readyOps) + + var node = key.attachment().asInstanceOf[CallbackNode] + while (node ne null) { + val next = node.next + + if ((node.interest & readyOps) != 0) { // execute callback + val cb = node.callback + if (cb != null) { + cb(value) + polled = true + } + } + + node = next + } + } + } catch { + case _: ConcurrentModificationException => + // owner thread concurrently processing selected keys, so suppress and exit loop + } + + polled + } + def needsPoll(poller: Poller): Boolean = !poller.selector.keys().isEmpty() From 80be0d0c0307b190bb7c337fcec723e43161b746 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 5 Aug 2024 20:02:43 +0000 Subject: [PATCH 4/6] Add `SelectorSystemSpec` --- .../cats/effect/IOPlatformSpecification.scala | 2 ++ .../effect/unsafe/SelectorSystemSpec.scala | 34 +++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 tests/jvm/src/test/scala/cats/effect/unsafe/SelectorSystemSpec.scala diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala index 63ad3e78ca..cbd448c127 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala @@ -513,6 +513,8 @@ trait IOPlatformSpecification extends DetectPlatform { self: BaseSpec with Scala } } + def steal(poller: Poller, reportFailure: Throwable => Unit) = false + def makeApi(access: (Poller => Unit) => Unit): DummySystem.Api = new DummyPoller { def poll = IO.async_[Unit] { cb => diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/SelectorSystemSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/SelectorSystemSpec.scala new file mode 100644 index 0000000000..70c0906a14 --- /dev/null +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/SelectorSystemSpec.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +class SelectorSystemSpec extends BaseSpec { + + "SelectorSystem" should { + "not blocker stealer when owner is polling" in real { + IO(SelectorSystem()).flatMap { s => + IO(s.makePoller()).flatMap { p => + IO.interruptible(s.poll(p, -1, _ => ())).background.surround { + IO(s.steal(p, _ => ()) should beFalse) + } + } + } + } + } + +} From cc14b5e334378fdb2745e9abd55a01cfb9823ec3 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 6 Aug 2024 19:03:43 +0000 Subject: [PATCH 5/6] Implement no-op `steal`s on Native --- core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala | 2 ++ .../native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala | 2 ++ .../scala/cats/effect/unsafe/PollingExecutorScheduler.scala | 1 + core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala | 2 ++ 4 files changed, 7 insertions(+) diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index 9874edb58e..c415c6b9f5 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -62,6 +62,8 @@ object EpollSystem extends PollingSystem { def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = poller.poll(nanos) + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false + def needsPoll(poller: Poller): Boolean = poller.needsPoll() def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index 3a26a4eb6d..54cca6f15c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -61,6 +61,8 @@ object KqueueSystem extends PollingSystem { def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = poller.poll(nanos) + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false + def needsPoll(poller: Poller): Boolean = poller.needsPoll() diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala index c37a16677f..6f3ac8fb0e 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala @@ -43,6 +43,7 @@ abstract class PollingExecutorScheduler(pollEvery: Int) poller.poll(nanos.nanos) true } + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false def needsPoll(poller: Poller) = needsPoll def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () } diff --git a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index cea4bca406..6a3ead7c36 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -36,6 +36,8 @@ object SleepSystem extends PollingSystem { false } + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false + def needsPoll(poller: Poller): Boolean = false def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () From 38694245759729dc4c1d1a6f07b82c50c0c30dad Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 6 Aug 2024 19:34:36 +0000 Subject: [PATCH 6/6] Remove broken `SelectorSystem#steal` impl --- .../cats/effect/unsafe/SelectorSystem.scala | 58 +------------------ .../effect/unsafe/SelectorSystemSpec.scala | 34 ----------- 2 files changed, 2 insertions(+), 90 deletions(-) delete mode 100644 tests/jvm/src/test/scala/cats/effect/unsafe/SelectorSystemSpec.scala diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala index c7b52d6d68..2bdc432ff4 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala @@ -19,9 +19,8 @@ package unsafe import scala.util.control.NonFatal -import java.nio.channels.{ClosedSelectorException, SelectableChannel} +import java.nio.channels.SelectableChannel import java.nio.channels.spi.{AbstractSelector, SelectorProvider} -import java.util.ConcurrentModificationException import SelectorSystem._ @@ -99,60 +98,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS } else false } - def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = { - val selector = poller.selector - - val keys = - try { - if (selector.isOpen()) { - selector.selectNow() - selector.selectedKeys() - } else return false - } catch { // selector closed concurrently - case _: ClosedSelectorException => return false - } - - var polled = false - val ready = keys.iterator() - - try { - while (ready.hasNext()) { - val key = ready.next() - - var readyOps = 0 - var error: Throwable = null - try { - readyOps = key.readyOps() - } catch { - case ex if NonFatal(ex) => - error = ex - readyOps = -1 // notify all waiters - } - - val value = if (error ne null) Left(error) else Right(readyOps) - - var node = key.attachment().asInstanceOf[CallbackNode] - while (node ne null) { - val next = node.next - - if ((node.interest & readyOps) != 0) { // execute callback - val cb = node.callback - if (cb != null) { - cb(value) - polled = true - } - } - - node = next - } - } - } catch { - case _: ConcurrentModificationException => - // owner thread concurrently processing selected keys, so suppress and exit loop - } - - polled - } + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false def needsPoll(poller: Poller): Boolean = !poller.selector.keys().isEmpty() diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/SelectorSystemSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/SelectorSystemSpec.scala deleted file mode 100644 index 70c0906a14..0000000000 --- a/tests/jvm/src/test/scala/cats/effect/unsafe/SelectorSystemSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2020-2024 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect -package unsafe - -class SelectorSystemSpec extends BaseSpec { - - "SelectorSystem" should { - "not blocker stealer when owner is polling" in real { - IO(SelectorSystem()).flatMap { s => - IO(s.makePoller()).flatMap { p => - IO.interruptible(s.poll(p, -1, _ => ())).background.surround { - IO(s.steal(p, _ => ()) should beFalse) - } - } - } - } - } - -}