Skip to content

Commit

Permalink
Do not extract cause from execution exception + fix code analysis war…
Browse files Browse the repository at this point in the history
…nings
  • Loading branch information
emil-bar committed Dec 30, 2024
1 parent a9dc2b6 commit 2a4860a
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 147 deletions.
203 changes: 90 additions & 113 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public List<T> runToList() throws Exception {
* Required for creating async forks responsible for writing to channel
*/
public Source<T> runToChannel(UnsupervisedScope scope) {
return runToChannelInternal(scope, () -> Channel.withScopedBufferSize());
return runToChannelInternal(scope, Channel::withScopedBufferSize);
}

/** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result
Expand Down Expand Up @@ -138,7 +138,7 @@ public <U> U runFold(U zero, BiFunction<U, T, U> f) throws Exception {
* Ignores all elements emitted by the flow. Blocks until the flow completes.
*/
public void runDrain() throws Exception {
runForeach(t -> {});
runForeach(_ -> {});
}

/**
Expand Down Expand Up @@ -251,15 +251,11 @@ public List<T> runTakeLast(int n) throws Exception {
public Flow<T> buffer(int bufferCapacity) {
return usingEmit(emit -> {
Channel<T> ch = new Channel<>(bufferCapacity);
try {
unsupervised(scope -> {
runLastToChannelAsync(scope, ch);
FlowEmit.channelToEmit(ch, emit);
return null;
});
} catch (ExecutionException e) {
throw (Exception) e.getCause();
}
unsupervised(scope -> {
runLastToChannelAsync(scope, ch);
FlowEmit.channelToEmit(ch, emit);
return null;
});
});
}

Expand Down Expand Up @@ -349,7 +345,7 @@ public Flow<T> take(int n) {
* @param n The number of elements in a group.
*/
public Flow<List<T>> grouped(int n) {
return groupedWeighted(n, t -> 1L);
return groupedWeighted(n, _ -> 1L);
}

/**
Expand Down Expand Up @@ -387,9 +383,7 @@ public Flow<List<T>> groupedWeighted(long minWeight, Function<T, Long> costFn) {
* Discard all elements emitted by this flow. The returned flow completes only when this flow completes (successfully or with an error).
*/
public Flow<Void> drain() {
return Flows.usingEmit(emit -> {
last.run(t -> {});
});
return Flows.usingEmit(_ -> last.run(_ -> {}));
}

/**
Expand Down Expand Up @@ -486,7 +480,7 @@ public Flow<T> throttle(int elements, Duration per) {
throw new IllegalArgumentException("requirement failed: per time must be >= 1 ms");
}
long emitEveryMillis = per.toMillis() / elements;
return tap(t -> {
return tap(_ -> {
try {
sleep(Duration.ofMillis(emitEveryMillis));
} catch (InterruptedException e) {
Expand Down Expand Up @@ -531,7 +525,6 @@ public Flow<T> takeWhile(Predicate<T> f, boolean includeFirstFailing) {
* The flow to be appended to this flow.
*/
public Flow<T> concat(Flow<T> other) {
//noinspection unchecked
return Flows.concat(this, other);
}

Expand Down Expand Up @@ -571,30 +564,26 @@ public Flow<T> drop(int n) {
*/
public Flow<T> merge(Flow<T> other, boolean propagateDoneLeft, boolean propagateDoneRight) {
return Flows.usingEmit(emit -> {
try {
unsupervised(scope -> {
Source<T> c1 = this.runToChannel(scope);
Source<T> c2 = other.runToChannel(scope);

boolean continueLoop = true;
while (continueLoop) {
switch (selectOrClosed(c1.receiveClause(), c2.receiveClause())) {
case ChannelDone done -> {
if (c1.isClosedForReceive()) {
if (!propagateDoneLeft) FlowEmit.channelToEmit(c2, emit);
} else if (!propagateDoneRight) FlowEmit.channelToEmit(c1, emit);
continueLoop = false;
}
case ChannelError error -> throw error.toException();
case Object r -> //noinspection unchecked
emit.apply((T) r);
unsupervised(scope -> {
Source<T> c1 = this.runToChannel(scope);
Source<T> c2 = other.runToChannel(scope);

boolean continueLoop = true;
while (continueLoop) {
switch (selectOrClosed(c1.receiveClause(), c2.receiveClause())) {
case ChannelDone _ -> {
if (c1.isClosedForReceive()) {
if (!propagateDoneLeft) FlowEmit.channelToEmit(c2, emit);
} else if (!propagateDoneRight) FlowEmit.channelToEmit(c1, emit);
continueLoop = false;
}
case ChannelError error -> throw error.toException();
case Object r -> //noinspection unchecked
emit.apply((T) r);
}
return null;
});
} catch (ExecutionException e) {
throw (Exception) e.getCause();
}
}
return null;
});
});
}

Expand Down Expand Up @@ -689,54 +678,49 @@ public <U> Flow<U> mapPar(int parallelism, Function<T, U> f) {
// creating a nested scope, so that in case of errors, we can clean up any mapping forks in a "local" fashion,
// that is without closing the main scope; any error management must be done in the forks, as the scope is
// unsupervised
try {
Scopes.unsupervised(scope -> {
// a fork which runs the `last` pipeline, and for each emitted element creates a fork
// notifying only the `results` channels, as it will cause the scope to end, and any other forks to be
// interrupted, including the inProgress-fork, which might be waiting on a join()
forkPropagate(scope, results, () -> {
last.run(value -> {
semaphore.acquire();
inProgress.sendOrClosed(forkMapping(scope, f, semaphore, value, results));
});
inProgress.doneOrClosed();
return null;
Scopes.unsupervised(scope -> {
// a fork which runs the `last` pipeline, and for each emitted element creates a fork
// notifying only the `results` channels, as it will cause the scope to end, and any other forks to be
// interrupted, including the inProgress-fork, which might be waiting on a join()
forkPropagate(scope, results, () -> {
last.run(value -> {
semaphore.acquire();
inProgress.sendOrClosed(forkMapping(scope, f, semaphore, value, results));
});
inProgress.doneOrClosed();
return null;
});

// a fork in which we wait for the created forks to finish (in sequence), and forward the mapped values to `results`
// this extra step is needed so that if there's an error in any of the mapping forks, it's discovered as quickly as
// possible in the main body
scope.forkUnsupervised((Callable<T>) () -> {
while (true) {
switch (inProgress.receiveOrClosed()) {
// in the fork's result is a `None`, the error is already propagated to the `results` channel
case ChannelDone ignored -> {
results.done();
// a fork in which we wait for the created forks to finish (in sequence), and forward the mapped values to `results`
// this extra step is needed so that if there's an error in any of the mapping forks, it's discovered as quickly as
// possible in the main body
scope.forkUnsupervised((Callable<T>) () -> {
while (true) {
switch (inProgress.receiveOrClosed()) {
// in the fork's result is a `None`, the error is already propagated to the `results` channel
case ChannelDone ignored -> {
results.done();
return null;
}
case ChannelError(Throwable e) -> throw new IllegalStateException("inProgress should never be closed with an error", e);
case Object fork -> {
//noinspection unchecked
Optional<U> result = ((Fork<Optional<U>>) fork).join();
if (result.isPresent()) {
results.sendOrClosed(result.get());
} else {
return null;
}
case ChannelError(Throwable e) -> throw new IllegalStateException("inProgress should never be closed with an error", e);
case Object fork -> {
//noinspection unchecked
Optional<U> result = ((Fork<Optional<U>>) fork).join();
if (result.isPresent()) {
results.sendOrClosed(result.get());
} else {
return null;
}
}
}
}
});

// in the main body, we call the `emit` methods using the (sequentially received) results; when an error occurs,
// the scope ends, interrupting any forks that are still running
FlowEmit.channelToEmit(results, emit);
return null;
}
});
} catch (ExecutionException e) {
// rethrowing original exception
throw (Exception) e.getCause();
}

// in the main body, we call the `emit` methods using the (sequentially received) results; when an error occurs,
// the scope ends, interrupting any forks that are still running
FlowEmit.channelToEmit(results, emit);
return null;
});
});
}

Expand All @@ -757,38 +741,33 @@ public <U> Flow<U> mapParUnordered(int parallelism, Function<T, U> f) {
return Flows.usingEmit(emit -> {
Channel<U> results = Channel.withScopedBufferSize();
Semaphore s = new Semaphore(parallelism);
try {
unsupervised(unsupervisedScope -> { // the outer scope, used for the fork which runs the `last` pipeline
forkPropagate(unsupervisedScope, results, () -> {
supervised(scope -> { // the inner scope, in which user forks are created, and which is used to wait for all to complete when done
try {
last.run(t -> {
s.acquire();
scope.forkUser(() -> {
try {
results.sendOrClosed(f.apply(t));
s.release();
} catch (Throwable cause) {
results.errorOrClosed(cause);
}
return null;
});
unsupervised(unsupervisedScope -> { // the outer scope, used for the fork which runs the `last` pipeline
forkPropagate(unsupervisedScope, results, () -> {
supervised(scope -> { // the inner scope, in which user forks are created, and which is used to wait for all to complete when done
try {
last.run(t -> {
s.acquire();
scope.forkUser(() -> {
try {
results.sendOrClosed(f.apply(t));
s.release();
} catch (Throwable cause) {
results.errorOrClosed(cause);
}
return null;
});
} catch (Exception e) {
results.errorOrClosed(e);
}
return null;
});
results.doneOrClosed();
});
} catch (Exception e) {
results.errorOrClosed(e);
}
return null;
});
FlowEmit.channelToEmit(results, emit);
results.doneOrClosed();
return null;
});
} catch (ExecutionException e) {
// rethrowing original exception
throw (Exception) e.getCause();
}
FlowEmit.channelToEmit(results, emit);
return null;
});
});
}

Expand Down Expand Up @@ -834,14 +813,13 @@ public Flow<List<T>> sliding(int n, int step) {
* Attaches the given {@link Sink} to this flow, meaning elements that pass through will also be sent to the sink. If emitting an
* element, or sending to the `other` sink blocks, no elements will be processed until both are done. The elements are first emitted by
* the flow and then, only if that was successful, to the `other` sink.
*
* <p>
* If this flow fails, then failure is passed to the `other` sink as well. If the `other` sink is failed or complete, this becomes a
* failure of the returned flow (contrary to {@link #alsoToTap} where it's ignored).
*
* @param other
* The sink to which elements from this flow will be sent.
* @see
* {@link #alsoToTap} for a version that drops elements when the `other` sink is not available for receive.
* @see #alsoToTap for a version that drops elements when the `other` sink is not available for receive.
*/
public Flow<T> alsoTo(Sink<T> other) {
return Flows.usingEmit(emit -> {
Expand All @@ -866,14 +844,13 @@ public Flow<T> alsoTo(Sink<T> other) {
/**
* Attaches the given {@link Sink} to this flow, meaning elements that pass through will also be sent to the sink. If the `other`
* sink is not available for receive, the elements are still emitted by the returned flow, but not sent to the `other` sink.
*
* <p>
* If this flow fails, then failure is passed to the `other` sink as well. If the `other` sink fails or closes, then failure or closure
* is ignored and it doesn't affect the resulting flow (contrary to {@link #alsoTo} where it's propagated).
*
* @param other
* The sink to which elements from this source will be sent.
* @see
* {@link #alsoTo} for a version that ensures that elements are emitted both by the returned flow and sent to the `other` sink.
* @see #alsoTo for a version that ensures that elements are emitted both by the returned flow and sent to the `other` sink.
*/
public Flow<T> alsoToTap(Sink<T> other) {
return Flows.usingEmit(emit -> {
Expand Down
15 changes: 5 additions & 10 deletions flows/src/main/java/com/softwaremill/jox/flows/Flows.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -178,7 +177,7 @@ public static <T> Flow<T> repeatEvalWhileDefined(Supplier<Optional<T>> supplierF

/** Create a flow which sleeps for the given `timeout` and then completes as done. */
public static <T> Flow<T> timeout(Duration timeout) {
return usingEmit(emit -> Thread.sleep(timeout.toMillis()));
return usingEmit(_ -> Thread.sleep(timeout.toMillis()));
}

/**
Expand All @@ -196,17 +195,13 @@ public static <T> Flow<T> concat(Flow<T>... flows) {

/** Creates an empty flow, which emits no elements and completes immediately. */
public static <T> Flow<T> empty() {
return usingEmit(emit -> {});
return usingEmit(_ -> {});
}

/** Creates a flow that emits a single element when `from` completes, or throws an exception when `from` fails. */
public static <T> Flow<T> fromCompletableFuture(CompletableFuture<T> from) {
return usingEmit(emit -> {
try {
emit.apply(from.get());
} catch (ExecutionException e) {
throw (Exception) e.getCause();
}
emit.apply(from.get());
});
}

Expand All @@ -226,7 +221,7 @@ public static <T> Flow<T> fromFutureSource(CompletableFuture<Source<T>> from) {
* The {@link java.lang.Exception} to fail with
*/
public static <T> Flow<T> failed(Exception t) {
return usingEmit(emit -> {
return usingEmit(_ -> {
throw t;
});
}
Expand Down Expand Up @@ -267,7 +262,7 @@ public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, bo

while (true) {
var received = availableSources.get(currentSourceIndex).receiveOrClosed();
if (received instanceof ChannelDone done) {
if (received instanceof ChannelDone) {
/// channel is done, remove it from the list of available sources
availableSources.remove(currentSourceIndex);
currentSourceIndex = currentSourceIndex == 0 ? availableSources.size() - 1 : currentSourceIndex - 1;
Expand Down
Loading

0 comments on commit 2a4860a

Please sign in to comment.