From a8725446fdbd30da32a9c6f6cf7291bd77f69242 Mon Sep 17 00:00:00 2001 From: Maksim Ochenashko Date: Tue, 7 Jun 2022 13:05:36 +0300 Subject: [PATCH] Do not allocate more resources than the `maxTotal` --- build.sbt | 3 +- .../scala/org/typelevel/keypool/KeyPool.scala | 5 +++ .../typelevel/keypool/KeyPoolBuilder.scala | 3 ++ .../org/typelevel/keypool/KeyPoolSpec.scala | 41 ++++++++++++++----- .../org/typelevel/keypool/PoolSpec.scala | 26 ++++++++++++ 5 files changed, 65 insertions(+), 13 deletions(-) diff --git a/build.sbt b/build.sbt index 6cab399a..3e774010 100644 --- a/build.sbt +++ b/build.sbt @@ -59,8 +59,7 @@ lazy val commonSettings = Seq( Test / parallelExecution := false, libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % catsV, - "org.typelevel" %%% "cats-effect-kernel" % catsEffectV, - "org.typelevel" %%% "cats-effect-std" % catsEffectV % Test, + "org.typelevel" %%% "cats-effect-std" % catsEffectV, "org.scalameta" %%% "munit" % munitV % Test, "org.typelevel" %%% "munit-cats-effect-3" % munitCatsEffectV % Test ) diff --git a/core/src/main/scala/org/typelevel/keypool/KeyPool.scala b/core/src/main/scala/org/typelevel/keypool/KeyPool.scala index abda211a..934353b1 100644 --- a/core/src/main/scala/org/typelevel/keypool/KeyPool.scala +++ b/core/src/main/scala/org/typelevel/keypool/KeyPool.scala @@ -23,6 +23,7 @@ package org.typelevel.keypool import cats._ import cats.effect.kernel._ +import cats.effect.std.Semaphore import cats.syntax.all._ import scala.concurrent.duration._ import org.typelevel.keypool.internal._ @@ -64,6 +65,7 @@ object KeyPool { private[keypool] val kpDefaultReuseState: Reusable, private[keypool] val kpMaxPerKey: A => Int, private[keypool] val kpMaxTotal: Int, + private[keypool] val kpMaxTotalSem: Semaphore[F], private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]] ) extends KeyPool[F, A, B] { @@ -291,6 +293,7 @@ object KeyPool { } for { + _ <- kp.kpMaxTotalSem.permit optR <- Resource.eval(kp.kpVar.modify(go)) releasedState <- Resource.eval(Ref[F].of[Reusable](kp.kpDefaultReuseState)) resource <- Resource.make(optR.fold(kp.kpRes(k).allocated)(r => Applicative[F].pure(r))) { @@ -360,6 +363,7 @@ object KeyPool { kpVar <- Resource.make( Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]])) )(kpVar => KeyPool.destroy(kpVar)) + kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal)) _ <- idleTimeAllowedInPool match { case fd: FiniteDuration => val nanos = 0.seconds.max(fd) @@ -374,6 +378,7 @@ object KeyPool { kpDefaultReuseState, kpMaxPerKey, kpMaxTotal, + kpMaxTotalSem, kpVar ) } diff --git a/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala b/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala index 8f914203..d8dc4765 100644 --- a/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala +++ b/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala @@ -25,6 +25,7 @@ import internal.{PoolList, PoolMap} import cats._ import cats.syntax.all._ import cats.effect.kernel._ +import cats.effect.std.Semaphore import scala.concurrent.duration._ @deprecated("use KeyPool.Builder", "0.4.7") @@ -83,6 +84,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private ( kpVar <- Resource.make( Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]])) )(kpVar => KeyPool.destroy(kpVar)) + kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal)) _ <- idleTimeAllowedInPool match { case fd: FiniteDuration => val nanos = 0.seconds.max(fd) @@ -97,6 +99,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private ( kpDefaultReuseState, kpMaxPerKey, kpMaxTotal, + kpMaxTotalSem, kpVar ) } diff --git a/core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala b/core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala index b8ced032..f83734c8 100644 --- a/core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala +++ b/core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala @@ -23,6 +23,7 @@ package org.typelevel.keypool import cats.syntax.all._ import cats.effect._ +import cats.effect.std.CountDownLatch import scala.concurrent.duration._ import munit.CatsEffectSuite import scala.concurrent.ExecutionContext @@ -32,8 +33,6 @@ class KeypoolSpec extends CatsEffectSuite { override val munitExecutionContext: ExecutionContext = ExecutionContext.global test("Keep Resources marked to be kept") { - def nothing(ref: Ref[IO, Int]): IO[Unit] = - ref.get.void KeyPool .Builder( (i: Int) => Ref.of[IO, Int](i), @@ -54,8 +53,6 @@ class KeypoolSpec extends CatsEffectSuite { } test("Delete Resources marked to be deleted") { - def nothing(ref: Ref[IO, Int]): IO[Unit] = - ref.get.void KeyPool .Builder( (i: Int) => Ref.of[IO, Int](i), @@ -76,8 +73,6 @@ class KeypoolSpec extends CatsEffectSuite { } test("Delete Resource when pool is full") { - def nothing(ref: Ref[IO, Int]): IO[Unit] = - ref.get.void KeyPool .Builder( (i: Int) => Ref.of[IO, Int](i), @@ -101,8 +96,6 @@ class KeypoolSpec extends CatsEffectSuite { } test("Used Resource Cleaned Up By Reaper") { - def nothing(ref: Ref[IO, Int]): IO[Unit] = - ref.get.void KeyPool .Builder( (i: Int) => Ref.of[IO, Int](i), @@ -128,9 +121,6 @@ class KeypoolSpec extends CatsEffectSuite { } test("Used Resource Not Cleaned Up if Idle Time has not expired") { - def nothing(ref: Ref[IO, Int]): IO[Unit] = - ref.get.void - KeyPool .Builder( (i: Int) => Ref.of[IO, Int](i), @@ -154,4 +144,33 @@ class KeypoolSpec extends CatsEffectSuite { } yield assert(init === 1 && later === 1) } } + + test("Do not allocate more resources than the maxTotal") { + val MaxTotal = 10 + + KeyPool + .Builder( + (i: Int) => Ref.of[IO, Int](i), + nothing + ) + .withMaxTotal(MaxTotal) + .build + .use { pool => + for { + cdl <- CountDownLatch[IO](MaxTotal) + allocated <- IO.parReplicateAN(MaxTotal)( + MaxTotal, + pool.take(1).use(_ => cdl.release >> IO.never).start + ) + _ <- cdl.await // make sure the pool is exhausted + attempt1 <- pool.take(1).use_.timeout(100.millis).attempt + _ <- allocated.traverse(_.cancel) + attempt2 <- pool.take(1).use_.timeout(100.millis).attempt + } yield assert(attempt1.isLeft && attempt2.isRight) + } + } + + private def nothing(ref: Ref[IO, Int]): IO[Unit] = + ref.get.void + } diff --git a/core/src/test/scala/org/typelevel/keypool/PoolSpec.scala b/core/src/test/scala/org/typelevel/keypool/PoolSpec.scala index 2d1112d1..f52541cf 100644 --- a/core/src/test/scala/org/typelevel/keypool/PoolSpec.scala +++ b/core/src/test/scala/org/typelevel/keypool/PoolSpec.scala @@ -23,6 +23,7 @@ package org.typelevel.keypool import cats.syntax.all._ import cats.effect._ +import cats.effect.std.CountDownLatch import scala.concurrent.duration._ import munit.CatsEffectSuite import scala.concurrent.ExecutionContext @@ -136,6 +137,31 @@ class PoolSpec extends CatsEffectSuite { } } + test("Do not allocate more resources than the maxTotal") { + val MaxTotal = 10 + + Pool + .Builder( + Ref.of[IO, Int](1), + nothing + ) + .withMaxTotal(MaxTotal) + .build + .use { pool => + for { + cdl <- CountDownLatch[IO](MaxTotal) + allocated <- IO.parReplicateAN(MaxTotal)( + MaxTotal, + pool.take.use(_ => cdl.release >> IO.never).start + ) + _ <- cdl.await // make sure the pool is exhausted + attempt1 <- pool.take.use_.timeout(100.millis).attempt + _ <- allocated.traverse(_.cancel) + attempt2 <- pool.take.use_.timeout(100.millis).attempt + } yield assert(attempt1.isLeft && attempt2.isRight) + } + } + private def nothing(ref: Ref[IO, Int]): IO[Unit] = ref.get.void }