Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve ExitCase in resources + fix memory leak #535

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
ProblemFilters.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool.destroy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool.reap"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool.put"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.typelevel.keypool.KeyPool.put"),
ProblemFilters
.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool#KeyPoolConcrete.this"),
ProblemFilters.exclude[DirectMissingMethodProblem](
Expand Down
114 changes: 65 additions & 49 deletions core/src/main/scala/org/typelevel/keypool/KeyPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import cats.effect.std.Semaphore
import cats.syntax.all._
import scala.concurrent.duration._
import org.typelevel.keypool.internal._
import cats.effect.kernel.Resource.ExitCase

/**
* This pools internal guarantees are that the max number of values are in the pool at any time, not
Expand Down Expand Up @@ -68,7 +69,7 @@ object KeyPool {
private[keypool] val kpMaxIdle: Int,
private[keypool] val kpMaxTotal: Int,
private[keypool] val kpMaxTotalSem: Semaphore[F],
private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
private[keypool] val kpVar: Ref[F, PoolMap[A, (B, ExitCase => F[Unit])]]
) extends KeyPool[F, A, B] {

def take(k: A): Resource[F, Managed[F, B]] =
Expand Down Expand Up @@ -115,37 +116,39 @@ object KeyPool {
* Make a 'KeyPool' inactive and destroy all idle resources.
*/
private[keypool] def destroy[F[_]: MonadThrow, A, B](
kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
): F[Unit] = for {
m <- kpVar.getAndSet(PoolMap.closed[A, (B, F[Unit])])
_ <- m match {
case PoolClosed() => Applicative[F].unit
case PoolOpen(_, m2) =>
m2.toList.traverse_ { case (_, pl) =>
pl.toList
.traverse_ { case (_, r) =>
r._2.attempt.void
}
}
}
} yield ()
kpVar: Ref[F, PoolMap[A, (B, ExitCase => F[Unit])]],
exit: ExitCase
): F[Unit] =
for {
m <- kpVar.getAndSet(PoolMap.closed[A, (B, ExitCase => F[Unit])])
_ <- m match {
case PoolClosed() => Applicative[F].unit
case PoolOpen(_, m2) =>
m2.toList.traverse_ { case (_, pl) =>
pl.toList
.traverse_ { case (_, r) =>
r._2(exit).attempt.void
}
}
}
} yield ()

/**
* Run a reaper thread, which will destroy old resources. It will stop running once our pool
* switches to PoolClosed.
*/
private[keypool] def reap[F[_], A, B](
idleTimeAllowedInPoolNanos: FiniteDuration,
kpVar: Ref[F, PoolMap[A, (B, F[Unit])]],
kpVar: Ref[F, PoolMap[A, (B, ExitCase => F[Unit])]],
onReaperException: Throwable => F[Unit]
)(implicit F: Temporal[F]): F[Unit] = {
// We are going to do non-referentially transparent things as we may be waiting for our modification to go through
// which may change the state depending on when the modification block is running atomically at the moment
def findStale(
now: FiniteDuration,
idleCount: Int,
m: Map[A, PoolList[(B, F[Unit])]]
): (PoolMap[A, (B, F[Unit])], List[(A, (B, F[Unit]))]) = {
m: Map[A, PoolList[(B, ExitCase => F[Unit])]]
): (PoolMap[A, (B, ExitCase => F[Unit])], List[(A, (B, ExitCase => F[Unit]))]) = {
val isNotStale: FiniteDuration => Boolean =
time =>
time + idleTimeAllowedInPoolNanos >= now // Time value is alright inside the KeyPool in nanos.
Expand All @@ -157,19 +160,24 @@ object KeyPool {
// (Map key (PoolList resource), [resource])
@annotation.tailrec
def findStale_(
toKeep: List[(A, PoolList[(B, F[Unit])])] => List[(A, PoolList[(B, F[Unit])])],
toDestroy: List[(A, (B, F[Unit]))] => List[(A, (B, F[Unit]))],
l: List[(A, PoolList[(B, F[Unit])])]
): (Map[A, PoolList[(B, F[Unit])]], List[(A, (B, F[Unit]))]) = {
toKeep: List[(A, PoolList[(B, ExitCase => F[Unit])])] => List[
(A, PoolList[(B, ExitCase => F[Unit])])
],
toDestroy: List[(A, (B, ExitCase => F[Unit]))] => List[(A, (B, ExitCase => F[Unit]))],
l: List[(A, PoolList[(B, ExitCase => F[Unit])])]
): (Map[A, PoolList[(B, ExitCase => F[Unit])]], List[(A, (B, ExitCase => F[Unit]))]) = {
l match {
case Nil => (toKeep(List.empty).toMap, toDestroy(List.empty))
case (key, pList) :: rest =>
// Can use span since we know everything will be ordered as the time is
// when it is placed back into the pool.
val (notStale, stale) = pList.toList.span(r => isNotStale(r._1))
val toDestroy_ : List[(A, (B, F[Unit]))] => List[(A, (B, F[Unit]))] = l =>
toDestroy(stale.map(t => key -> t._2) ++ l)
val toKeep_ : List[(A, PoolList[(B, F[Unit])])] => List[(A, PoolList[(B, F[Unit])])] =
val toDestroy_ : List[(A, (B, ExitCase => F[Unit]))] => List[
(A, (B, ExitCase => F[Unit]))
] = l => toDestroy(stale.map(t => key -> t._2) ++ l)
val toKeep_ : List[(A, PoolList[(B, ExitCase => F[Unit])])] => List[
(A, PoolList[(B, ExitCase => F[Unit])])
] =
l =>
PoolList.fromList(notStale) match {
case None => toKeep(l)
Expand All @@ -187,9 +195,8 @@ object KeyPool {
val sleep = Temporal[F].sleep(5.seconds).void

// Wait 5 Seconds
def loop: F[Unit] = for {
now <- Temporal[F].monotonic
_ <- {
def loop: F[Unit] = Temporal[F].monotonic
Copy link
Member Author

@kubukoz kubukoz Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: this change also avoids a memory leak on Scala 3 (due to better-monadic-for not eliminating the final map).

tl;dr previously it was

def loop = for {
  x <- fx
  _ <- (somethingSomething >> loop)
} yield ()

which desugars to

fx.flatMap { x => (somethingSomething >> loop).map(_ => ()) }

which allocates a Map node on every run and never cleans it up (the same goes for the top-level flatMap, and apparently the lambdas too).

Proof: run this for a minute or so.

//> using dep "org.typelevel::cats-effect:3.5.3"
import cats.effect.*

def loop: IO[Unit] = for {
  _ <- IO.unit
  _ <- IO.unit >> loop
} yield ()

object CatsDemo extends IOApp.Simple {
  def run = loop
}
image

.flatMap { now =>
kpVar.tryModify {
case p @ PoolClosed() => (p, F.unit)
case p @ PoolOpen(idleCount, m) =>
Expand All @@ -199,25 +206,26 @@ object KeyPool {
val (m_, toDestroy) = findStale(now, idleCount, m)
(
m_,
toDestroy.traverse_(_._2._2).attempt.flatMap {
// In this context, we're closing the resource due to it not being used for a while - hence a Succeeded exit case.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: special case here: everywhere else we have an ExitCase to propagate, but the reaper deliberately closes resources out of thin air: I figured Succeeded makes the most sense for a resource that's no longer necessary. Canceled didn't sound right, but happy to discuss if anyone disagrees

toDestroy.traverse_(_._2._2(ExitCase.Succeeded)).attempt.flatMap {
case Left(t) => onReaperException(t)
// .handleErrorWith(t => F.delay(t.printStackTrace())) // CHEATING?
case Right(()) => F.unit
}
)
}
}
}.flatMap {
}
.flatMap {
case Some(act) => act >> sleep >> loop
case None => loop
}
} yield ()

loop
}

private[keypool] def state[F[_]: Functor, A, B](
kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
kpVar: Ref[F, PoolMap[A, (B, ExitCase => F[Unit])]]
): F[(Int, Map[A, Int])] =
kpVar.get.map {
case PoolClosed() =>
Expand All @@ -237,8 +245,8 @@ object KeyPool {
kp: KeyPoolConcrete[F, A, B],
k: A,
r: B,
destroy: F[Unit]
): F[Unit] = {
destroy: ExitCase => F[Unit]
): ExitCase => F[Unit] = {
def addToList[Z](
now: FiniteDuration,
maxCount: Int,
Expand All @@ -254,7 +262,10 @@ object KeyPool {
else (l, Some(x))
}
}
def go(now: FiniteDuration, pc: PoolMap[A, (B, F[Unit])]): (PoolMap[A, (B, F[Unit])], F[Unit]) =
def go(
now: FiniteDuration,
pc: PoolMap[A, (B, ExitCase => F[Unit])]
): (PoolMap[A, (B, ExitCase => F[Unit])], ExitCase => F[Unit]) =
pc match {
case p @ PoolClosed() => (p, destroy)
case p @ PoolOpen(idleCount, m) =>
Expand All @@ -264,25 +275,28 @@ object KeyPool {
case None =>
val cnt_ = idleCount + 1
val m_ = PoolMap.open(cnt_, m + (k -> One((r, destroy), now)))
(m_, Applicative[F].pure(()))
(m_, Function.const[F[Unit], ExitCase](Applicative[F].unit))
case Some(l) =>
val (l_, mx) = addToList(now, kp.kpMaxPerKey(k), (r, destroy), l)
val cnt_ = idleCount + mx.fold(1)(_ => 0)
val m_ = PoolMap.open(cnt_, m + (k -> l_))
(m_, mx.fold(Applicative[F].unit)(_ => destroy))
(m_, mx.fold((_: ExitCase) => Applicative[F].unit)(_ => destroy))
}
}

Clock[F].monotonic.flatMap { now =>
kp.kpVar.modify(pm => go(now, pm)).flatten
}
(exit: ExitCase) =>
Clock[F].monotonic.flatMap { now =>
kp.kpVar.modify(pm => go(now, pm)).flatMap(_(exit))
}
}

private[keypool] def take[F[_]: Temporal, A, B](
kp: KeyPoolConcrete[F, A, B],
k: A
): Resource[F, Managed[F, B]] = {
def go(pm: PoolMap[A, (B, F[Unit])]): (PoolMap[A, (B, F[Unit])], Option[(B, F[Unit])]) =
def go(
pm: PoolMap[A, (B, ExitCase => F[Unit])]
): (PoolMap[A, (B, ExitCase => F[Unit])], Option[(B, ExitCase => F[Unit])]) =
pm match {
case p @ PoolClosed() => (p, None)
case pOrig @ PoolOpen(idleCount, m) =>
Expand All @@ -299,14 +313,14 @@ object KeyPool {
_ <- kp.kpMaxTotalSem.permit
optR <- Resource.eval(kp.kpVar.modify(go))
releasedState <- Resource.eval(Ref[F].of[Reusable](kp.kpDefaultReuseState))
resource <- Resource.makeFull[F, (B, F[Unit])] { poll =>
optR.fold(poll(kp.kpRes(k).allocated))(r => Applicative[F].pure(r))
} { resource =>
resource <- Resource.makeCaseFull[F, (B, ExitCase => F[Unit])] { poll =>
optR.fold(poll(kp.kpRes(k).allocatedCase))(r => Applicative[F].pure(r))
} { (resource, exitCase) =>
for {
reusable <- releasedState.get
out <- reusable match {
case Reusable.Reuse => put(kp, k, resource._1, resource._2).attempt.void
case Reusable.DontReuse => resource._2.attempt.void
case Reusable.Reuse => put(kp, k, resource._1, resource._2).apply(exitCase).attempt.void
case Reusable.DontReuse => resource._2(exitCase).attempt.void
}
} yield out
}
Expand Down Expand Up @@ -370,9 +384,11 @@ object KeyPool {
def keepRunning[Z](fa: F[Z]): F[Z] =
fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa)
for {
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
kpVar <- Resource.makeCase(
Ref[F].of[PoolMap[A, (B, ExitCase => F[Unit])]](
PoolMap.open(0, Map.empty[A, PoolList[(B, ExitCase => F[Unit])]])
)
)(KeyPool.destroy)
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong))
_ <- idleTimeAllowedInPool match {
case fd: FiniteDuration =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import cats.effect.kernel._
import cats.effect.kernel.syntax.spawn._
import cats.effect.std.Semaphore
import scala.concurrent.duration._
import cats.effect.kernel.Resource.ExitCase

@deprecated("use KeyPool.Builder", "0.4.7")
final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
Expand Down Expand Up @@ -88,9 +89,11 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
def keepRunning[Z](fa: F[Z]): F[Z] =
fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa)
for {
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
kpVar <- Resource.makeCase(
Ref[F].of[PoolMap[A, (B, ExitCase => F[Unit])]](
PoolMap.open(0, Map.empty[A, PoolList[(B, ExitCase => F[Unit])]])
)
)(KeyPool.destroy)
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong))
_ <- idleTimeAllowedInPool match {
case fd: FiniteDuration =>
Expand Down