Skip to content

Commit

Permalink
Do not allocate more resources than the maxTotal
Browse files Browse the repository at this point in the history
  • Loading branch information
iRevive committed Jun 7, 2022
1 parent 53e30cc commit a872544
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 13 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ lazy val commonSettings = Seq(
Test / parallelExecution := false,
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-core" % catsV,
"org.typelevel" %%% "cats-effect-kernel" % catsEffectV,
"org.typelevel" %%% "cats-effect-std" % catsEffectV % Test,
"org.typelevel" %%% "cats-effect-std" % catsEffectV,
"org.scalameta" %%% "munit" % munitV % Test,
"org.typelevel" %%% "munit-cats-effect-3" % munitCatsEffectV % Test
)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/typelevel/keypool/KeyPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package org.typelevel.keypool

import cats._
import cats.effect.kernel._
import cats.effect.std.Semaphore
import cats.syntax.all._
import scala.concurrent.duration._
import org.typelevel.keypool.internal._
Expand Down Expand Up @@ -64,6 +65,7 @@ object KeyPool {
private[keypool] val kpDefaultReuseState: Reusable,
private[keypool] val kpMaxPerKey: A => Int,
private[keypool] val kpMaxTotal: Int,
private[keypool] val kpMaxTotalSem: Semaphore[F],
private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
) extends KeyPool[F, A, B] {

Expand Down Expand Up @@ -291,6 +293,7 @@ object KeyPool {
}

for {
_ <- kp.kpMaxTotalSem.permit
optR <- Resource.eval(kp.kpVar.modify(go))
releasedState <- Resource.eval(Ref[F].of[Reusable](kp.kpDefaultReuseState))
resource <- Resource.make(optR.fold(kp.kpRes(k).allocated)(r => Applicative[F].pure(r))) {
Expand Down Expand Up @@ -360,6 +363,7 @@ object KeyPool {
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal))
_ <- idleTimeAllowedInPool match {
case fd: FiniteDuration =>
val nanos = 0.seconds.max(fd)
Expand All @@ -374,6 +378,7 @@ object KeyPool {
kpDefaultReuseState,
kpMaxPerKey,
kpMaxTotal,
kpMaxTotalSem,
kpVar
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import internal.{PoolList, PoolMap}
import cats._
import cats.syntax.all._
import cats.effect.kernel._
import cats.effect.std.Semaphore
import scala.concurrent.duration._

@deprecated("use KeyPool.Builder", "0.4.7")
Expand Down Expand Up @@ -83,6 +84,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal))
_ <- idleTimeAllowedInPool match {
case fd: FiniteDuration =>
val nanos = 0.seconds.max(fd)
Expand All @@ -97,6 +99,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
kpDefaultReuseState,
kpMaxPerKey,
kpMaxTotal,
kpMaxTotalSem,
kpVar
)
}
Expand Down
41 changes: 30 additions & 11 deletions core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package org.typelevel.keypool

import cats.syntax.all._
import cats.effect._
import cats.effect.std.CountDownLatch
import scala.concurrent.duration._
import munit.CatsEffectSuite
import scala.concurrent.ExecutionContext
Expand All @@ -32,8 +33,6 @@ class KeypoolSpec extends CatsEffectSuite {
override val munitExecutionContext: ExecutionContext = ExecutionContext.global

test("Keep Resources marked to be kept") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -54,8 +53,6 @@ class KeypoolSpec extends CatsEffectSuite {
}

test("Delete Resources marked to be deleted") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -76,8 +73,6 @@ class KeypoolSpec extends CatsEffectSuite {
}

test("Delete Resource when pool is full") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -101,8 +96,6 @@ class KeypoolSpec extends CatsEffectSuite {
}

test("Used Resource Cleaned Up By Reaper") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -128,9 +121,6 @@ class KeypoolSpec extends CatsEffectSuite {
}

test("Used Resource Not Cleaned Up if Idle Time has not expired") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void

KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -154,4 +144,33 @@ class KeypoolSpec extends CatsEffectSuite {
} yield assert(init === 1 && later === 1)
}
}

test("Do not allocate more resources than the maxTotal") {
val MaxTotal = 10

KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
nothing
)
.withMaxTotal(MaxTotal)
.build
.use { pool =>
for {
cdl <- CountDownLatch[IO](MaxTotal)
allocated <- IO.parReplicateAN(MaxTotal)(
MaxTotal,
pool.take(1).use(_ => cdl.release >> IO.never).start
)
_ <- cdl.await // make sure the pool is exhausted
attempt1 <- pool.take(1).use_.timeout(100.millis).attempt
_ <- allocated.traverse(_.cancel)
attempt2 <- pool.take(1).use_.timeout(100.millis).attempt
} yield assert(attempt1.isLeft && attempt2.isRight)
}
}

private def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void

}
26 changes: 26 additions & 0 deletions core/src/test/scala/org/typelevel/keypool/PoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package org.typelevel.keypool

import cats.syntax.all._
import cats.effect._
import cats.effect.std.CountDownLatch
import scala.concurrent.duration._
import munit.CatsEffectSuite
import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -136,6 +137,31 @@ class PoolSpec extends CatsEffectSuite {
}
}

test("Do not allocate more resources than the maxTotal") {
val MaxTotal = 10

Pool
.Builder(
Ref.of[IO, Int](1),
nothing
)
.withMaxTotal(MaxTotal)
.build
.use { pool =>
for {
cdl <- CountDownLatch[IO](MaxTotal)
allocated <- IO.parReplicateAN(MaxTotal)(
MaxTotal,
pool.take.use(_ => cdl.release >> IO.never).start
)
_ <- cdl.await // make sure the pool is exhausted
attempt1 <- pool.take.use_.timeout(100.millis).attempt
_ <- allocated.traverse(_.cancel)
attempt2 <- pool.take.use_.timeout(100.millis).attempt
} yield assert(attempt1.isLeft && attempt2.isRight)
}
}

private def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
}

0 comments on commit a872544

Please sign in to comment.