Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not allocate more resources than the maxTotal #394

Merged
merged 4 commits into from
Jun 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ lazy val core = crossProject(JVMPlatform, JSPlatform)
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.typelevel.keypool.KeyPool#KeyPoolConcrete.kpDestroy"
)
),
ProblemFilters
.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPoolBuilder.this"),
ProblemFilters
.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool#Builder.this")
)
)

Expand All @@ -59,8 +63,7 @@ lazy val commonSettings = Seq(
Test / parallelExecution := false,
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-core" % catsV,
"org.typelevel" %%% "cats-effect-kernel" % catsEffectV,
"org.typelevel" %%% "cats-effect-std" % catsEffectV % Test,
"org.typelevel" %%% "cats-effect-std" % catsEffectV,
"org.scalameta" %%% "munit" % munitV % Test,
"org.typelevel" %%% "munit-cats-effect-3" % munitCatsEffectV % Test
)
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/org/typelevel/keypool/KeyPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package org.typelevel.keypool
import cats._
import cats.effect.kernel._
import cats.effect.kernel.syntax.spawn._
import cats.effect.std.Semaphore
import cats.syntax.all._
import scala.concurrent.duration._
import org.typelevel.keypool.internal._
Expand Down Expand Up @@ -64,7 +65,9 @@ object KeyPool {
private[keypool] val kpRes: A => Resource[F, B],
private[keypool] val kpDefaultReuseState: Reusable,
private[keypool] val kpMaxPerKey: A => Int,
private[keypool] val kpMaxIdle: Int,
private[keypool] val kpMaxTotal: Int,
private[keypool] val kpMaxTotalSem: Semaphore[F],
private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
) extends KeyPool[F, A, B] {

Expand Down Expand Up @@ -255,7 +258,7 @@ object KeyPool {
pc match {
case p @ PoolClosed() => (p, destroy)
case p @ PoolOpen(idleCount, m) =>
if (idleCount > kp.kpMaxTotal) (p, destroy)
if (idleCount > kp.kpMaxIdle) (p, destroy)
else
m.get(k) match {
case None =>
Expand Down Expand Up @@ -293,6 +296,7 @@ object KeyPool {
}

for {
_ <- kp.kpMaxTotalSem.permit
rossabaker marked this conversation as resolved.
Show resolved Hide resolved
optR <- Resource.eval(kp.kpVar.modify(go))
releasedState <- Resource.eval(Ref[F].of[Reusable](kp.kpDefaultReuseState))
resource <- Resource.make(optR.fold(kp.kpRes(k).allocated)(r => Applicative[F].pure(r))) {
Expand All @@ -313,6 +317,7 @@ object KeyPool {
val kpDefaultReuseState: Reusable,
val idleTimeAllowedInPool: Duration,
val kpMaxPerKey: A => Int,
val kpMaxIdle: Int,
val kpMaxTotal: Int,
val onReaperException: Throwable => F[Unit]
) {
Expand All @@ -321,13 +326,15 @@ object KeyPool {
kpDefaultReuseState: Reusable = this.kpDefaultReuseState,
idleTimeAllowedInPool: Duration = this.idleTimeAllowedInPool,
kpMaxPerKey: A => Int = this.kpMaxPerKey,
kpMaxIdle: Int = this.kpMaxIdle,
kpMaxTotal: Int = this.kpMaxTotal,
onReaperException: Throwable => F[Unit] = this.onReaperException
): Builder[F, A, B] = new Builder[F, A, B](
kpRes,
kpDefaultReuseState,
idleTimeAllowedInPool,
kpMaxPerKey,
kpMaxIdle,
kpMaxTotal,
onReaperException
)
Expand All @@ -349,6 +356,9 @@ object KeyPool {
def withMaxPerKey(f: A => Int): Builder[F, A, B] =
copy(kpMaxPerKey = f)

def withMaxIdle(maxIdle: Int): Builder[F, A, B] =
copy(kpMaxIdle = maxIdle)

def withMaxTotal(total: Int): Builder[F, A, B] =
copy(kpMaxTotal = total)

Expand All @@ -362,6 +372,7 @@ object KeyPool {
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong))
_ <- idleTimeAllowedInPool match {
case fd: FiniteDuration =>
val nanos = 0.seconds.max(fd)
Expand All @@ -373,7 +384,9 @@ object KeyPool {
kpRes,
kpDefaultReuseState,
kpMaxPerKey,
kpMaxIdle,
kpMaxTotal,
kpMaxTotalSem,
kpVar
)
}
Expand All @@ -388,6 +401,7 @@ object KeyPool {
Defaults.defaultReuseState,
Defaults.idleTimeAllowedInPool,
Defaults.maxPerKey,
Defaults.maxIdle,
Defaults.maxTotal,
Defaults.onReaperException[F]
)
Expand All @@ -402,6 +416,7 @@ object KeyPool {
val defaultReuseState = Reusable.Reuse
val idleTimeAllowedInPool = 30.seconds
def maxPerKey[K](k: K): Int = Function.const(100)(k)
val maxIdle = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have maxIdle equal to maxTotal always?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Basically, the rule should be this one: maxIdle <= maxTotal

So the builder method can be defined as the following:

def withMaxTotal(total: Int) = 
  copy(maxTotal = total, maxIdle = math.min(maxIdle, total))

Should withMaxIdle throw an exception when maxIdle > maxTotal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commons-pool2 has three independent settings: maxTotal, maxIdle, and minIdle.

  • maxTotal is how many you can handle in a burst
  • maxIdle and minIdle let you tune how many you can afford to keep warm in order to usually have one ready under normal load

minIdle <= maxIdle <= maxTotal ought to always be true. minIdle == maxIdle == maxTotal will tend to be true when creating them is expensive and idling them is cheap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an ideal world, I would expect a builder to have validation logic under the hood.
Neither negative maxTotal nor maxIdle makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be surprising to me if withMaxTotal implemented the math.min. Imagine reading from a config, and withMaxTotal got called before withMaxIdle.

I can imagine implementing math.min at creation time, logging a warning (http4s-blaze does this with timeout values), or raising an error.

val maxTotal = 100
def onReaperException[F[_]: Applicative] = { (t: Throwable) =>
Function.const(Applicative[F].unit)(t)
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import cats._
import cats.syntax.all._
import cats.effect.kernel._
import cats.effect.kernel.syntax.spawn._
import cats.effect.std.Semaphore
import scala.concurrent.duration._

@deprecated("use KeyPool.Builder", "0.4.7")
Expand All @@ -35,6 +36,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
val kpDefaultReuseState: Reusable,
val idleTimeAllowedInPool: Duration,
val kpMaxPerKey: A => Int,
val kpMaxIdle: Int,
val kpMaxTotal: Int,
val onReaperException: Throwable => F[Unit]
) {
Expand All @@ -44,6 +46,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
kpDefaultReuseState: Reusable = this.kpDefaultReuseState,
idleTimeAllowedInPool: Duration = this.idleTimeAllowedInPool,
kpMaxPerKey: A => Int = this.kpMaxPerKey,
kpMaxIdle: Int = this.kpMaxIdle,
kpMaxTotal: Int = this.kpMaxTotal,
onReaperException: Throwable => F[Unit] = this.onReaperException
): KeyPoolBuilder[F, A, B] = new KeyPoolBuilder[F, A, B](
Expand All @@ -52,6 +55,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
kpDefaultReuseState,
idleTimeAllowedInPool,
kpMaxPerKey,
kpMaxIdle,
kpMaxTotal,
onReaperException
)
Expand All @@ -71,6 +75,9 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
def withMaxPerKey(f: A => Int): KeyPoolBuilder[F, A, B] =
copy(kpMaxPerKey = f)

def withMaxIdle(maxIdle: Int): KeyPoolBuilder[F, A, B] =
copy(kpMaxIdle = maxIdle)

def withMaxTotal(total: Int): KeyPoolBuilder[F, A, B] =
copy(kpMaxTotal = total)

Expand All @@ -84,6 +91,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong))
_ <- idleTimeAllowedInPool match {
case fd: FiniteDuration =>
val nanos = 0.seconds.max(fd)
Expand All @@ -95,7 +103,9 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
(a: A) => Resource.make[F, B](kpCreate(a))(kpDestroy),
kpDefaultReuseState,
kpMaxPerKey,
kpMaxIdle,
kpMaxTotal,
kpMaxTotalSem,
kpVar
)
}
Expand All @@ -113,6 +123,7 @@ object KeyPoolBuilder {
Defaults.defaultReuseState,
Defaults.idleTimeAllowedInPool,
Defaults.maxPerKey,
Defaults.maxIdle,
Defaults.maxTotal,
Defaults.onReaperException[F]
)
Expand All @@ -121,6 +132,7 @@ object KeyPoolBuilder {
val defaultReuseState = Reusable.Reuse
val idleTimeAllowedInPool = 30.seconds
def maxPerKey[K](k: K): Int = Function.const(100)(k)
val maxIdle = 100
val maxTotal = 100
def onReaperException[F[_]: Applicative] = { (t: Throwable) =>
Function.const(Applicative[F].unit)(t)
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/typelevel/keypool/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,22 @@ object Pool {
val kpRes: Resource[F, B],
val kpDefaultReuseState: Reusable,
val idleTimeAllowedInPool: Duration,
val kpMaxIdle: Int,
val kpMaxTotal: Int,
val onReaperException: Throwable => F[Unit]
) {
private def copy(
kpRes: Resource[F, B] = this.kpRes,
kpDefaultReuseState: Reusable = this.kpDefaultReuseState,
idleTimeAllowedInPool: Duration = this.idleTimeAllowedInPool,
kpMaxIdle: Int = this.kpMaxIdle,
kpMaxTotal: Int = this.kpMaxTotal,
onReaperException: Throwable => F[Unit] = this.onReaperException
): Builder[F, B] = new Builder[F, B](
kpRes,
kpDefaultReuseState,
idleTimeAllowedInPool,
kpMaxIdle,
kpMaxTotal,
onReaperException
)
Expand All @@ -105,6 +108,9 @@ object Pool {
def withIdleTimeAllowedInPool(duration: Duration): Builder[F, B] =
copy(idleTimeAllowedInPool = duration)

def withMaxIdle(maxIdle: Int): Builder[F, B] =
copy(kpMaxIdle = maxIdle)

def withMaxTotal(total: Int): Builder[F, B] =
copy(kpMaxTotal = total)

Expand All @@ -117,6 +123,7 @@ object Pool {
kpDefaultReuseState = kpDefaultReuseState,
idleTimeAllowedInPool = idleTimeAllowedInPool,
kpMaxPerKey = _ => kpMaxTotal,
kpMaxIdle = kpMaxIdle,
kpMaxTotal = kpMaxTotal,
onReaperException = onReaperException
)
Expand All @@ -138,6 +145,7 @@ object Pool {
res,
Defaults.defaultReuseState,
Defaults.idleTimeAllowedInPool,
Defaults.maxIdle,
Defaults.maxTotal,
Defaults.onReaperException[F]
)
Expand All @@ -151,6 +159,7 @@ object Pool {
private object Defaults {
val defaultReuseState = Reusable.Reuse
val idleTimeAllowedInPool = 30.seconds
val maxIdle = 100
val maxTotal = 100
def onReaperException[F[_]: Applicative] = { (t: Throwable) =>
Function.const(Applicative[F].unit)(t)
Expand Down
41 changes: 30 additions & 11 deletions core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package org.typelevel.keypool

import cats.syntax.all._
import cats.effect._
import cats.effect.std.CountDownLatch
import scala.concurrent.duration._
import munit.CatsEffectSuite
import scala.concurrent.ExecutionContext
Expand All @@ -32,8 +33,6 @@ class KeypoolSpec extends CatsEffectSuite {
override val munitExecutionContext: ExecutionContext = ExecutionContext.global

test("Keep Resources marked to be kept") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -54,8 +53,6 @@ class KeypoolSpec extends CatsEffectSuite {
}

test("Delete Resources marked to be deleted") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -76,8 +73,6 @@ class KeypoolSpec extends CatsEffectSuite {
}

test("Delete Resource when pool is full") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -101,8 +96,6 @@ class KeypoolSpec extends CatsEffectSuite {
}

test("Used Resource Cleaned Up By Reaper") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -128,9 +121,6 @@ class KeypoolSpec extends CatsEffectSuite {
}

test("Used Resource Not Cleaned Up if Idle Time has not expired") {
def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void

KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
Expand All @@ -154,4 +144,33 @@ class KeypoolSpec extends CatsEffectSuite {
} yield assert(init === 1 && later === 1)
}
}

test("Do not allocate more resources than the maxTotal") {
val MaxTotal = 10

KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
nothing
)
.withMaxTotal(MaxTotal)
.build
.use { pool =>
for {
cdl <- CountDownLatch[IO](MaxTotal)
allocated <- IO.parReplicateAN(MaxTotal)(
MaxTotal,
pool.take(1).use(_ => cdl.release >> IO.never).start
)
_ <- cdl.await // make sure the pool is exhausted
attempt1 <- pool.take(1).use_.timeout(100.millis).attempt
_ <- allocated.traverse(_.cancel)
attempt2 <- pool.take(1).use_.timeout(100.millis).attempt
} yield assert(attempt1.isLeft && attempt2.isRight)
}
}

private def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void

}
26 changes: 26 additions & 0 deletions core/src/test/scala/org/typelevel/keypool/PoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package org.typelevel.keypool

import cats.syntax.all._
import cats.effect._
import cats.effect.std.CountDownLatch
import scala.concurrent.duration._
import munit.CatsEffectSuite
import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -136,6 +137,31 @@ class PoolSpec extends CatsEffectSuite {
}
}

test("Do not allocate more resources than the maxTotal") {
val MaxTotal = 10

Pool
.Builder(
Ref.of[IO, Int](1),
nothing
)
.withMaxTotal(MaxTotal)
.build
.use { pool =>
for {
cdl <- CountDownLatch[IO](MaxTotal)
allocated <- IO.parReplicateAN(MaxTotal)(
MaxTotal,
pool.take.use(_ => cdl.release >> IO.never).start
)
_ <- cdl.await // make sure the pool is exhausted
attempt1 <- pool.take.use_.timeout(100.millis).attempt
_ <- allocated.traverse(_.cancel)
attempt2 <- pool.take.use_.timeout(100.millis).attempt
} yield assert(attempt1.isLeft && attempt2.isRight)
}
}

private def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void
}