Skip to content

Commit

Permalink
Add forkPropagate, use in stream operators (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Aug 6, 2024
1 parent 1ef58a9 commit 7cab06e
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 176 deletions.
23 changes: 5 additions & 18 deletions core/src/main/scala/ox/channels/SourceCompanionIOOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import scala.util.control.NonFatal

trait SourceCompanionIOOps:

Expand All @@ -25,7 +24,7 @@ trait SourceCompanionIOOps:
*/
def fromInputStream(is: InputStream, chunkSize: Int = 1024)(using Ox, StageCapacity, IO): Source[Chunk[Byte]] =
val chunks = StageCapacity.newChannel[Chunk[Byte]]
fork {
forkPropagate(chunks) {
try
repeatWhile {
val buf = new Array[Byte](chunkSize)
Expand All @@ -37,14 +36,7 @@ trait SourceCompanionIOOps:
if readBytes > 0 then chunks.send(if readBytes == chunkSize then Chunk.fromArray(buf) else Chunk.fromArray(buf.take(readBytes)))
true
}
catch
case t: Throwable =>
chunks.errorOrClosed(t).discard
finally
try is.close()
catch
case t: Throwable =>
chunks.errorOrClosed(t).discard
finally is.close()
}
chunks

Expand Down Expand Up @@ -72,8 +64,8 @@ trait SourceCompanionIOOps:
// Some file systems don't support file channels
Files.newByteChannel(path, StandardOpenOption.READ)

fork {
try {
forkPropagate(chunks) {
try
repeatWhile {
val buf = ByteBuffer.allocate(chunkSize)
val readBytes = jFileChannel.read(buf)
Expand All @@ -84,11 +76,6 @@ trait SourceCompanionIOOps:
if readBytes > 0 then chunks.send(Chunk.fromArray(if readBytes == chunkSize then buf.array else buf.array.take(readBytes)))
true
}
} catch case t: Throwable => chunks.errorOrClosed(t).discard
finally
try jFileChannel.close()
catch
case NonFatal(closeException) =>
chunks.errorOrClosed(closeException).discard
finally jFileChannel.close()
}
chunks
126 changes: 55 additions & 71 deletions core/src/main/scala/ox/channels/SourceCompanionOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,70 +19,60 @@ trait SourceCompanionOps:

def fromIterator[T](it: => Iterator[T])(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
fork {
forkPropagate(c) {
val theIt = it
try
while theIt.hasNext do c.sendOrClosed(theIt.next()).discard
c.doneOrClosed()
catch case t: Throwable => c.errorOrClosed(t)
while theIt.hasNext do c.sendOrClosed(theIt.next()).discard
c.doneOrClosed().discard
}
c

def fromFork[T](f: Fork[T])(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
fork {
try
c.sendOrClosed(f.join())
c.doneOrClosed()
catch case t: Throwable => c.errorOrClosed(t)
forkPropagate(c) {
c.sendOrClosed(f.join())
c.doneOrClosed().discard
}
c

def iterate[T](zero: T)(f: T => T)(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
fork {
forkPropagate(c) {
var t = zero
try
forever {
c.sendOrClosed(t)
t = f(t)
}
catch case t: Throwable => c.errorOrClosed(t)
forever {
c.sendOrClosed(t)
t = f(t)
}
}
c

/** A range of number, from `from`, to `to` (inclusive), stepped by `step`. */
def range(from: Int, to: Int, step: Int)(using Ox, StageCapacity): Source[Int] =
val c = StageCapacity.newChannel[Int]
fork {
forkPropagate(c) {
var t = from
try
repeatWhile {
c.sendOrClosed(t)
t = t + step
t <= to
}
c.doneOrClosed()
catch case t: Throwable => c.errorOrClosed(t)
repeatWhile {
c.sendOrClosed(t)
t = t + step
t <= to
}
c.doneOrClosed().discard
}
c

def unfold[S, T](initial: S)(f: S => Option[(T, S)])(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
fork {
forkPropagate(c) {
var s = initial
try
repeatWhile {
f(s) match
case Some((value, next)) =>
c.sendOrClosed(value)
s = next
true
case None =>
c.doneOrClosed()
false
}
catch case t: Throwable => c.errorOrClosed(t)
repeatWhile {
f(s) match
case Some((value, next)) =>
c.sendOrClosed(value)
s = next
true
case None =>
c.doneOrClosed()
false
}
}
c

Expand Down Expand Up @@ -147,12 +137,10 @@ trait SourceCompanionOps:
*/
def repeatEval[T](f: => T)(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
fork {
try
forever {
c.sendOrClosed(f).discard
}
catch case t: Throwable => c.errorOrClosed(t)
forkPropagate(c) {
forever {
c.sendOrClosed(f).discard
}
}
c

Expand All @@ -168,14 +156,12 @@ trait SourceCompanionOps:
*/
def repeatEvalWhileDefined[T](f: => Option[T])(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
fork {
try
repeatWhile {
f match
case Some(value) => c.sendOrClosed(value); true
case None => c.doneOrClosed(); false
}
catch case t: Throwable => c.errorOrClosed(t)
forkPropagate(c) {
repeatWhile {
f match
case Some(value) => c.sendOrClosed(value); true
case None => c.doneOrClosed(); false
}
}
c

Expand All @@ -190,27 +176,25 @@ trait SourceCompanionOps:

def concat[T](sources: Seq[() => Source[T]])(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
fork {
forkPropagate(c) {
var currentSource: Option[Source[T]] = None
val sourcesIterator = sources.iterator
var continue = true
try
while continue do
currentSource match
case None if sourcesIterator.hasNext => currentSource = Some(sourcesIterator.next()())
case None =>
c.doneOrClosed()
continue = false
case Some(source) =>
source.receiveOrClosed() match
case ChannelClosed.Done =>
currentSource = None
case ChannelClosed.Error(r) =>
c.errorOrClosed(r)
continue = false
case t: T @unchecked =>
c.sendOrClosed(t).discard
catch case t: Throwable => c.errorOrClosed(t)
while continue do
currentSource match
case None if sourcesIterator.hasNext => currentSource = Some(sourcesIterator.next()())
case None =>
c.doneOrClosed()
continue = false
case Some(source) =>
source.receiveOrClosed() match
case ChannelClosed.Done =>
currentSource = None
case ChannelClosed.Error(r) =>
c.errorOrClosed(r)
continue = false
case t: T @unchecked =>
c.sendOrClosed(t).discard
}
c

Expand Down
Loading

0 comments on commit 7cab06e

Please sign in to comment.