Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added priority queue sink and source extending queue #3930

95 changes: 8 additions & 87 deletions std/shared/src/main/scala/cats/effect/std/PQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,37 +214,7 @@ object PQueue {
else ()
}

trait PQueueSource[F[_], A] {

/**
* Dequeues the least element from the PQueue, possibly fiber blocking until an element
* becomes available.
*
* O(log(n))
*
* Note: If there are multiple elements with least priority, the order in which they are
* dequeued is undefined. If you want to break ties with FIFO order you will need an
* additional `Ref[F, Long]` to track insertion, and embed that information into your instance
* for `Order[A]`.
*/
def take: F[A]

/**
* Attempts to dequeue the least element from the PQueue, if one is available without fiber
* blocking.
*
* O(log(n))
*
* @return
* an effect that describes whether the dequeueing of an element from the PQueue succeeded
* without blocking, with `None` denoting that no element was available
*
* Note: If there are multiple elements with least priority, the order in which they are
* dequeued is undefined. If you want to break ties with FIFO order you will need an
* additional `Ref[F, Long]` to track insertion, and embed that information into your instance
* for `Order[A]`.
*/
def tryTake: F[Option[A]]
trait PQueueSource[F[_], A] extends QueueSource[F, A] {

/**
* Attempts to dequeue elements from the PQueue, if they are available without semantically
Expand All @@ -260,33 +230,12 @@ trait PQueueSource[F[_], A] {
* Note: If there are multiple elements with least priority, the order in which they are
* dequeued is undefined.
*/
def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = {
PQueueSource.assertMaxNPositive(maxN)

def loop(i: Int, limit: Int, acc: List[A]): F[List[A]] =
if (i >= limit)
F.pure(acc.reverse)
else
tryTake flatMap {
case Some(a) => loop(i + 1, limit, a :: acc)
case None => F.pure(acc.reverse)
}

maxN match {
case Some(limit) => loop(0, limit, Nil)
case None => loop(0, Int.MaxValue, Nil)
}
}
override def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] =
QueueSource.tryTakeN[F, A](maxN, tryTake)

def size: F[Int]
}

object PQueueSource {
private def assertMaxNPositive(maxN: Option[Int]): Unit = maxN match {
case Some(n) if n <= 0 =>
throw new IllegalArgumentException(s"Provided maxN parameter must be positive, was $n")
case _ => ()
}

implicit def catsFunctorForPQueueSource[F[_]: Functor]: Functor[PQueueSource[F, *]] =
new Functor[PQueueSource[F, *]] {
Expand All @@ -302,31 +251,7 @@ object PQueueSource {
}
}

trait PQueueSink[F[_], A] {

/**
* Enqueues the given element, possibly fiber blocking until sufficient capacity becomes
* available.
*
* O(log(n))
*
* @param a
* the element to be put in the PQueue
*/
def offer(a: A): F[Unit]

/**
* Attempts to enqueue the given element without fiber blocking.
*
* O(log(n))
*
* @param a
* the element to be put in the PQueue
* @return
* an effect that describes whether the enqueuing of the given element succeeded without
* blocking
*/
def tryOffer(a: A): F[Boolean]
trait PQueueSink[F[_], A] extends QueueSink[F, A] {

/**
* Attempts to enqueue the given elements without semantically blocking. If an item in the
Expand All @@ -339,17 +264,13 @@ trait PQueueSink[F[_], A] {
* @return
* an effect that contains the remaining valus that could not be offered.
*/
def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = list match {
case Nil => F.pure(list)
case h :: t =>
tryOffer(h).ifM(
tryOfferN(t),
F.pure(list)
)
}
override def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] =
QueueSink.tryOfferN(list, tryOffer)

}

object PQueueSink {

implicit def catsContravariantForPQueueSink[F[_]]: Contravariant[PQueueSink[F, *]] =
new Contravariant[PQueueSink[F, *]] {
override def contramap[A, B](fa: PQueueSink[F, A])(f: B => A): PQueueSink[F, B] =
Expand Down
29 changes: 20 additions & 9 deletions std/shared/src/main/scala/cats/effect/std/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,16 @@ trait QueueSource[F[_], A] {
* @return
* an effect that contains the dequeued elements
*/
def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] = {
def tryTakeN(maxN: Option[Int])(implicit F: Monad[F]): F[List[A]] =
QueueSource.tryTakeN[F, A](maxN, tryTake)

def size: F[Int]
}

object QueueSource {

private[std] def tryTakeN[F[_], A](maxN: Option[Int], tryTake: F[Option[A]])(
implicit F: Monad[F]): F[List[A]] = {
QueueSource.assertMaxNPositive(maxN)

def loop(i: Int, limit: Int, acc: List[A]): F[List[A]] =
Expand All @@ -1153,10 +1162,6 @@ trait QueueSource[F[_], A] {
}
}

def size: F[Int]
}

object QueueSource {
private[std] def assertMaxNPositive(maxN: Option[Int]): Unit = maxN match {
case Some(n) if n <= 0 =>
throw new IllegalArgumentException(s"Provided maxN parameter must be positive, was $n")
Expand Down Expand Up @@ -1212,17 +1217,23 @@ trait QueueSink[F[_], A] {
* @return
* an effect that contains the remaining valus that could not be offered.
*/
def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] = list match {
def tryOfferN(list: List[A])(implicit F: Monad[F]): F[List[A]] =
QueueSink.tryOfferN[F, A](list, tryOffer)

}

object QueueSink {

private[std] def tryOfferN[F[_], A](list: List[A], tryOffer: A => F[Boolean])(
implicit F: Monad[F]): F[List[A]] = list match {
case Nil => F.pure(list)
case h :: t =>
tryOffer(h).ifM(
tryOfferN(t),
tryOfferN(t, tryOffer),
F.pure(list)
)
}
}

object QueueSink {
implicit def catsContravariantForQueueSink[F[_]]: Contravariant[QueueSink[F, *]] =
new Contravariant[QueueSink[F, *]] {
override def contramap[A, B](fa: QueueSink[F, A])(f: B => A): QueueSink[F, B] =
Expand Down
Loading