Skip to content

Commit

Permalink
Merge pull request #1996 from mpilquist/topic/extend-scope-to
Browse files Browse the repository at this point in the history
Add Pull.extendScopeTo combinator
  • Loading branch information
mpilquist authored Aug 18, 2020
2 parents 4fb37b0 + 8e21716 commit 2f10feb
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
21 changes: 21 additions & 0 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fs2

import cats.{Eval => _, _}
import cats.effect._
import cats.implicits._
import fs2.internal._
import fs2.internal.FreeC.{Eval, Result}

Expand Down Expand Up @@ -119,6 +120,19 @@ object Pull extends PullLowPriority {
def eval[F[_], R](fr: F[R]): Pull[F, INothing, R] =
new Pull(Eval[F, R](fr))

/**
* Extends the scope of the currently open resources to the specified stream, preventing them
* from being finalized until after `s` completes execution, even if the returned pull is converted
* to a stream, compiled, and evaluated before `s` is compiled and evaluated.
*/
def extendScopeTo[F[_], O](
s: Stream[F, O]
)(implicit F: MonadError[F, Throwable]): Pull[F, INothing, Stream[F, O]] =
for {
scope <- Pull.getScope[F]
lease <- Pull.eval(scope.leaseOrError)
} yield s.onFinalize(lease.cancel.redeem(F.raiseError(_), _ => F.unit))

/**
* Repeatedly uses the output of the pull as input for the next step of the pull.
* Halts when a step terminates with `None` or `Pull.raiseError`.
Expand Down Expand Up @@ -164,6 +178,13 @@ object Pull extends PullLowPriority {
*/
def fromEither[F[x]] = new PartiallyAppliedFromEither[F]

/**
* Gets the current scope, allowing manual leasing or interruption.
* This is a low-level method and generally should not be used by user code.
*/
def getScope[F[_]]: Pull[F, INothing, Scope[F]] =
new Pull(FreeC.GetScope[F]())

/**
* Returns a pull that evaluates the supplied by-name each time the pull is used,
* allowing use of a mutable value in pull computations.
Expand Down
10 changes: 10 additions & 0 deletions core/shared/src/main/scala/fs2/Scope.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package fs2

import cats.MonadError
import cats.implicits._

/**
* Represents a period of stream execution in which resources are acquired and released.
*
Expand Down Expand Up @@ -27,6 +30,13 @@ abstract class Scope[F[_]] {
*/
def lease: F[Option[Scope.Lease[F]]]

/** Like [[lease]], but fails with an error if the scope is closed. */
def leaseOrError(implicit F: MonadError[F, Throwable]): F[Scope.Lease[F]] =
lease.flatMap {
case Some(l) => F.pure(l)
case None => F.raiseError(new Throwable("Scope closed at time of lease"))
}

/**
* Interrupts evaluation of the current scope. Only scopes previously indicated with Stream.interruptScope may be interrupted.
* For other scopes this will fail.
Expand Down

0 comments on commit 2f10feb

Please sign in to comment.