Skip to content

Commit

Permalink
fix: Added priority queue sink and source extending queue, fixing ano…
Browse files Browse the repository at this point in the history
…ther mima error check
  • Loading branch information
neomaclin committed Jan 5, 2024
1 parent 385bc7e commit 452289a
Showing 1 changed file with 45 additions and 4 deletions.
49 changes: 45 additions & 4 deletions std/shared/src/main/scala/cats/effect/std/PQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,47 @@ object PQueue {
}

trait PQueueSource[F[_], A] extends QueueSource[F, A] {
override def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] =
super.tryTakeN(maxN)

/**
* Attempts to dequeue elements from the PQueue, if they are available without semantically
* blocking. This is a convenience method that recursively runs `tryTake`. It does not provide
* any additional performance benefits.
*
* @param maxN
* The max elements to dequeue. Passing `None` will try to dequeue the whole queue.
*
* @return
* an effect that contains the dequeued elements from the PQueue
*
* Note: If there are multiple elements with least priority, the order in which they are
* dequeued is undefined.
*/
override def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = {
PQueueSource.assertMaxNPositive(maxN)

def loop(i: Int, limit: Int, acc: List[A]): F[List[A]] =
if (i >= limit)
F.pure(acc.reverse)
else
tryTake flatMap {
case Some(a) => loop(i + 1, limit, a :: acc)
case None => F.pure(acc.reverse)
}

maxN match {
case Some(limit) => loop(0, limit, Nil)
case None => loop(0, Int.MaxValue, Nil)
}
}

}

object PQueueSource {
private def assertMaxNPositive(maxN: Option[Int]): Unit = maxN match {
case Some(n) if n <= 0 =>
throw new IllegalArgumentException(s"Provided maxN parameter must be positive, was $n")
case _ => ()
}

implicit def catsFunctorForPQueueSource[F[_]: Functor]: Functor[PQueueSource[F, *]] =
new Functor[PQueueSource[F, *]] {
Expand All @@ -238,8 +273,14 @@ object PQueueSource {

trait PQueueSink[F[_], A] extends QueueSink[F, A] {

override def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] =
super.tryOfferN(list)
override def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = list match {
case Nil => F.pure(list)
case h :: t =>
tryOffer(h).ifM(
tryOfferN(t),
F.pure(list)
)
}

}

Expand Down

0 comments on commit 452289a

Please sign in to comment.