From 452289ab869b717d6ff812524b058700ad8a9bfc Mon Sep 17 00:00:00 2001 From: Neo Lin Date: Fri, 5 Jan 2024 10:39:59 -0500 Subject: [PATCH] fix: Added priority queue sink and source extending queue, fixing another mima error check --- .../main/scala/cats/effect/std/PQueue.scala | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/PQueue.scala b/std/shared/src/main/scala/cats/effect/std/PQueue.scala index 00e1e75f2c..b9204fd7b4 100644 --- a/std/shared/src/main/scala/cats/effect/std/PQueue.scala +++ b/std/shared/src/main/scala/cats/effect/std/PQueue.scala @@ -215,12 +215,47 @@ object PQueue { } trait PQueueSource[F[_], A] extends QueueSource[F, A] { - override def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = - super.tryTakeN(maxN) + + /** + * Attempts to dequeue elements from the PQueue, if they are available without semantically + * blocking. This is a convenience method that recursively runs `tryTake`. It does not provide + * any additional performance benefits. + * + * @param maxN + * The max elements to dequeue. Passing `None` will try to dequeue the whole queue. + * + * @return + * an effect that contains the dequeued elements from the PQueue + * + * Note: If there are multiple elements with least priority, the order in which they are + * dequeued is undefined. + */ + override def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = { + PQueueSource.assertMaxNPositive(maxN) + + def loop(i: Int, limit: Int, acc: List[A]): F[List[A]] = + if (i >= limit) + F.pure(acc.reverse) + else + tryTake flatMap { + case Some(a) => loop(i + 1, limit, a :: acc) + case None => F.pure(acc.reverse) + } + + maxN match { + case Some(limit) => loop(0, limit, Nil) + case None => loop(0, Int.MaxValue, Nil) + } + } } object PQueueSource { + private def assertMaxNPositive(maxN: Option[Int]): Unit = maxN match { + case Some(n) if n <= 0 => + throw new IllegalArgumentException(s"Provided maxN parameter must be positive, was $n") + case _ => () + } implicit def catsFunctorForPQueueSource[F[_]: Functor]: Functor[PQueueSource[F, *]] = new Functor[PQueueSource[F, *]] { @@ -238,8 +273,14 @@ object PQueueSource { trait PQueueSink[F[_], A] extends QueueSink[F, A] { - override def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = - super.tryOfferN(list) + override def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = list match { + case Nil => F.pure(list) + case h :: t => + tryOffer(h).ifM( + tryOfferN(t), + F.pure(list) + ) + } }