From 2a4860a2093b93d24a8e73240d3da2b8f64136ef Mon Sep 17 00:00:00 2001 From: emilb Date: Mon, 30 Dec 2024 11:22:17 +0100 Subject: [PATCH] Do not extract cause from execution exception + fix code analysis warnings --- .../java/com/softwaremill/jox/flows/Flow.java | 203 ++++++++---------- .../com/softwaremill/jox/flows/Flows.java | 15 +- .../softwaremill/jox/flows/FlowMapTest.java | 19 +- .../com/softwaremill/jox/flows/FlowTest.java | 22 +- .../com/softwaremill/jox/flows/FlowsTest.java | 6 +- 5 files changed, 118 insertions(+), 147 deletions(-) diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java index cdc0bcb..6ad583f 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java @@ -88,7 +88,7 @@ public List runToList() throws Exception { * Required for creating async forks responsible for writing to channel */ public Source runToChannel(UnsupervisedScope scope) { - return runToChannelInternal(scope, () -> Channel.withScopedBufferSize()); + return runToChannelInternal(scope, Channel::withScopedBufferSize); } /** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result @@ -138,7 +138,7 @@ public U runFold(U zero, BiFunction f) throws Exception { * Ignores all elements emitted by the flow. Blocks until the flow completes. */ public void runDrain() throws Exception { - runForeach(t -> {}); + runForeach(_ -> {}); } /** @@ -251,15 +251,11 @@ public List runTakeLast(int n) throws Exception { public Flow buffer(int bufferCapacity) { return usingEmit(emit -> { Channel ch = new Channel<>(bufferCapacity); - try { - unsupervised(scope -> { - runLastToChannelAsync(scope, ch); - FlowEmit.channelToEmit(ch, emit); - return null; - }); - } catch (ExecutionException e) { - throw (Exception) e.getCause(); - } + unsupervised(scope -> { + runLastToChannelAsync(scope, ch); + FlowEmit.channelToEmit(ch, emit); + return null; + }); }); } @@ -349,7 +345,7 @@ public Flow take(int n) { * @param n The number of elements in a group. */ public Flow> grouped(int n) { - return groupedWeighted(n, t -> 1L); + return groupedWeighted(n, _ -> 1L); } /** @@ -387,9 +383,7 @@ public Flow> groupedWeighted(long minWeight, Function costFn) { * Discard all elements emitted by this flow. The returned flow completes only when this flow completes (successfully or with an error). */ public Flow drain() { - return Flows.usingEmit(emit -> { - last.run(t -> {}); - }); + return Flows.usingEmit(_ -> last.run(_ -> {})); } /** @@ -486,7 +480,7 @@ public Flow throttle(int elements, Duration per) { throw new IllegalArgumentException("requirement failed: per time must be >= 1 ms"); } long emitEveryMillis = per.toMillis() / elements; - return tap(t -> { + return tap(_ -> { try { sleep(Duration.ofMillis(emitEveryMillis)); } catch (InterruptedException e) { @@ -531,7 +525,6 @@ public Flow takeWhile(Predicate f, boolean includeFirstFailing) { * The flow to be appended to this flow. */ public Flow concat(Flow other) { - //noinspection unchecked return Flows.concat(this, other); } @@ -571,30 +564,26 @@ public Flow drop(int n) { */ public Flow merge(Flow other, boolean propagateDoneLeft, boolean propagateDoneRight) { return Flows.usingEmit(emit -> { - try { - unsupervised(scope -> { - Source c1 = this.runToChannel(scope); - Source c2 = other.runToChannel(scope); - - boolean continueLoop = true; - while (continueLoop) { - switch (selectOrClosed(c1.receiveClause(), c2.receiveClause())) { - case ChannelDone done -> { - if (c1.isClosedForReceive()) { - if (!propagateDoneLeft) FlowEmit.channelToEmit(c2, emit); - } else if (!propagateDoneRight) FlowEmit.channelToEmit(c1, emit); - continueLoop = false; - } - case ChannelError error -> throw error.toException(); - case Object r -> //noinspection unchecked - emit.apply((T) r); + unsupervised(scope -> { + Source c1 = this.runToChannel(scope); + Source c2 = other.runToChannel(scope); + + boolean continueLoop = true; + while (continueLoop) { + switch (selectOrClosed(c1.receiveClause(), c2.receiveClause())) { + case ChannelDone _ -> { + if (c1.isClosedForReceive()) { + if (!propagateDoneLeft) FlowEmit.channelToEmit(c2, emit); + } else if (!propagateDoneRight) FlowEmit.channelToEmit(c1, emit); + continueLoop = false; } + case ChannelError error -> throw error.toException(); + case Object r -> //noinspection unchecked + emit.apply((T) r); } - return null; - }); - } catch (ExecutionException e) { - throw (Exception) e.getCause(); - } + } + return null; + }); }); } @@ -689,54 +678,49 @@ public Flow mapPar(int parallelism, Function f) { // creating a nested scope, so that in case of errors, we can clean up any mapping forks in a "local" fashion, // that is without closing the main scope; any error management must be done in the forks, as the scope is // unsupervised - try { - Scopes.unsupervised(scope -> { - // a fork which runs the `last` pipeline, and for each emitted element creates a fork - // notifying only the `results` channels, as it will cause the scope to end, and any other forks to be - // interrupted, including the inProgress-fork, which might be waiting on a join() - forkPropagate(scope, results, () -> { - last.run(value -> { - semaphore.acquire(); - inProgress.sendOrClosed(forkMapping(scope, f, semaphore, value, results)); - }); - inProgress.doneOrClosed(); - return null; + Scopes.unsupervised(scope -> { + // a fork which runs the `last` pipeline, and for each emitted element creates a fork + // notifying only the `results` channels, as it will cause the scope to end, and any other forks to be + // interrupted, including the inProgress-fork, which might be waiting on a join() + forkPropagate(scope, results, () -> { + last.run(value -> { + semaphore.acquire(); + inProgress.sendOrClosed(forkMapping(scope, f, semaphore, value, results)); }); + inProgress.doneOrClosed(); + return null; + }); - // a fork in which we wait for the created forks to finish (in sequence), and forward the mapped values to `results` - // this extra step is needed so that if there's an error in any of the mapping forks, it's discovered as quickly as - // possible in the main body - scope.forkUnsupervised((Callable) () -> { - while (true) { - switch (inProgress.receiveOrClosed()) { - // in the fork's result is a `None`, the error is already propagated to the `results` channel - case ChannelDone ignored -> { - results.done(); + // a fork in which we wait for the created forks to finish (in sequence), and forward the mapped values to `results` + // this extra step is needed so that if there's an error in any of the mapping forks, it's discovered as quickly as + // possible in the main body + scope.forkUnsupervised((Callable) () -> { + while (true) { + switch (inProgress.receiveOrClosed()) { + // in the fork's result is a `None`, the error is already propagated to the `results` channel + case ChannelDone ignored -> { + results.done(); + return null; + } + case ChannelError(Throwable e) -> throw new IllegalStateException("inProgress should never be closed with an error", e); + case Object fork -> { + //noinspection unchecked + Optional result = ((Fork>) fork).join(); + if (result.isPresent()) { + results.sendOrClosed(result.get()); + } else { return null; } - case ChannelError(Throwable e) -> throw new IllegalStateException("inProgress should never be closed with an error", e); - case Object fork -> { - //noinspection unchecked - Optional result = ((Fork>) fork).join(); - if (result.isPresent()) { - results.sendOrClosed(result.get()); - } else { - return null; - } - } } } - }); - - // in the main body, we call the `emit` methods using the (sequentially received) results; when an error occurs, - // the scope ends, interrupting any forks that are still running - FlowEmit.channelToEmit(results, emit); - return null; + } }); - } catch (ExecutionException e) { - // rethrowing original exception - throw (Exception) e.getCause(); - } + + // in the main body, we call the `emit` methods using the (sequentially received) results; when an error occurs, + // the scope ends, interrupting any forks that are still running + FlowEmit.channelToEmit(results, emit); + return null; + }); }); } @@ -757,38 +741,33 @@ public Flow mapParUnordered(int parallelism, Function f) { return Flows.usingEmit(emit -> { Channel results = Channel.withScopedBufferSize(); Semaphore s = new Semaphore(parallelism); - try { - unsupervised(unsupervisedScope -> { // the outer scope, used for the fork which runs the `last` pipeline - forkPropagate(unsupervisedScope, results, () -> { - supervised(scope -> { // the inner scope, in which user forks are created, and which is used to wait for all to complete when done - try { - last.run(t -> { - s.acquire(); - scope.forkUser(() -> { - try { - results.sendOrClosed(f.apply(t)); - s.release(); - } catch (Throwable cause) { - results.errorOrClosed(cause); - } - return null; - }); + unsupervised(unsupervisedScope -> { // the outer scope, used for the fork which runs the `last` pipeline + forkPropagate(unsupervisedScope, results, () -> { + supervised(scope -> { // the inner scope, in which user forks are created, and which is used to wait for all to complete when done + try { + last.run(t -> { + s.acquire(); + scope.forkUser(() -> { + try { + results.sendOrClosed(f.apply(t)); + s.release(); + } catch (Throwable cause) { + results.errorOrClosed(cause); + } + return null; }); - } catch (Exception e) { - results.errorOrClosed(e); - } - return null; - }); - results.doneOrClosed(); + }); + } catch (Exception e) { + results.errorOrClosed(e); + } return null; }); - FlowEmit.channelToEmit(results, emit); + results.doneOrClosed(); return null; }); - } catch (ExecutionException e) { - // rethrowing original exception - throw (Exception) e.getCause(); - } + FlowEmit.channelToEmit(results, emit); + return null; + }); }); } @@ -834,14 +813,13 @@ public Flow> sliding(int n, int step) { * Attaches the given {@link Sink} to this flow, meaning elements that pass through will also be sent to the sink. If emitting an * element, or sending to the `other` sink blocks, no elements will be processed until both are done. The elements are first emitted by * the flow and then, only if that was successful, to the `other` sink. - * + *

* If this flow fails, then failure is passed to the `other` sink as well. If the `other` sink is failed or complete, this becomes a * failure of the returned flow (contrary to {@link #alsoToTap} where it's ignored). * * @param other * The sink to which elements from this flow will be sent. - * @see - * {@link #alsoToTap} for a version that drops elements when the `other` sink is not available for receive. + * @see #alsoToTap for a version that drops elements when the `other` sink is not available for receive. */ public Flow alsoTo(Sink other) { return Flows.usingEmit(emit -> { @@ -866,14 +844,13 @@ public Flow alsoTo(Sink other) { /** * Attaches the given {@link Sink} to this flow, meaning elements that pass through will also be sent to the sink. If the `other` * sink is not available for receive, the elements are still emitted by the returned flow, but not sent to the `other` sink. - * + *

* If this flow fails, then failure is passed to the `other` sink as well. If the `other` sink fails or closes, then failure or closure * is ignored and it doesn't affect the resulting flow (contrary to {@link #alsoTo} where it's propagated). * * @param other * The sink to which elements from this source will be sent. - * @see - * {@link #alsoTo} for a version that ensures that elements are emitted both by the returned flow and sent to the `other` sink. + * @see #alsoTo for a version that ensures that elements are emitted both by the returned flow and sent to the `other` sink. */ public Flow alsoToTap(Sink other) { return Flows.usingEmit(emit -> { diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flows.java b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java index 9adf572..c24a535 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/Flows.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java @@ -9,7 +9,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -178,7 +177,7 @@ public static Flow repeatEvalWhileDefined(Supplier> supplierF /** Create a flow which sleeps for the given `timeout` and then completes as done. */ public static Flow timeout(Duration timeout) { - return usingEmit(emit -> Thread.sleep(timeout.toMillis())); + return usingEmit(_ -> Thread.sleep(timeout.toMillis())); } /** @@ -196,17 +195,13 @@ public static Flow concat(Flow... flows) { /** Creates an empty flow, which emits no elements and completes immediately. */ public static Flow empty() { - return usingEmit(emit -> {}); + return usingEmit(_ -> {}); } /** Creates a flow that emits a single element when `from` completes, or throws an exception when `from` fails. */ public static Flow fromCompletableFuture(CompletableFuture from) { return usingEmit(emit -> { - try { - emit.apply(from.get()); - } catch (ExecutionException e) { - throw (Exception) e.getCause(); - } + emit.apply(from.get()); }); } @@ -226,7 +221,7 @@ public static Flow fromFutureSource(CompletableFuture> from) { * The {@link java.lang.Exception} to fail with */ public static Flow failed(Exception t) { - return usingEmit(emit -> { + return usingEmit(_ -> { throw t; }); } @@ -267,7 +262,7 @@ public static Flow interleaveAll(List> flows, int segmentSize, bo while (true) { var received = availableSources.get(currentSourceIndex).receiveOrClosed(); - if (received instanceof ChannelDone done) { + if (received instanceof ChannelDone) { /// channel is done, remove it from the list of available sources availableSources.remove(currentSourceIndex); currentSourceIndex = currentSourceIndex == 0 ? availableSources.size() - 1 : currentSourceIndex - 1; diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowMapTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowMapTest.java index 45c2689..46c0cb4 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowMapTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowMapTest.java @@ -24,7 +24,6 @@ import java.util.function.Function; import com.softwaremill.jox.ChannelError; -import com.softwaremill.jox.ChannelErrorException; import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.Scopes; import org.junit.jupiter.api.Assertions; @@ -101,7 +100,7 @@ void shouldPropagateErrorFromFlatMap() { }); // then - assertThrows(RuntimeException.class, () -> mapped.runForeach(i -> {})); + assertThrows(RuntimeException.class, mapped::runDrain); } @@ -266,7 +265,7 @@ void mapPar_shouldCancelOtherRunningForksWhenThereIsAnError() throws ExecutionEx // then assertEquals(2, s2.receive()); assertEquals(4, s2.receive()); - assertEquals(boom, ((ChannelError) s2.receiveOrClosed()).cause().getCause()); + assertEquals(boom, ((ChannelError) s2.receiveOrClosed()).cause().getCause().getCause()); // checking if the forks aren't left running Thread.sleep(200); @@ -296,7 +295,7 @@ void mapPar_shouldPropagateErrors() { s2.runToList(); Assertions.fail("should have thrown"); } catch (Exception e) { - assertEquals(boom, e.getCause()); + assertEquals(boom, e.getCause().getCause()); assertThat(started.get(), allOf( greaterThanOrEqualTo(4), lessThanOrEqualTo(7) // 4 successful + at most 3 taking up all the permits @@ -387,8 +386,8 @@ void mapParUnordered_testPropagateErrors() { }); // then - Exception exception = assertThrows(ChannelErrorException.class, flow2::runToList); - assertEquals(boom, exception.getCause()); + ExecutionException exception = assertThrows(ExecutionException.class, flow2::runToList); + assertEquals(boom, exception.getCause().getCause()); assertThat(started.get(), allOf( greaterThanOrEqualTo(2), // 1 needs to start & finish; 2 other need to start; and then the failing one has to start & proceed lessThanOrEqualTo(7) // 4 successful + at most 3 taking up all the permits @@ -426,7 +425,7 @@ void mapParUnordered_testCompleteRunningForksAndNotStartNewOnesWhenMappingFuncti // then assertEquals(Set.of(2, 4), received); - assertEquals(boom, channelError.cause().getCause()); + assertEquals(boom, channelError.cause().getCause().getCause()); assertTrue(s2.isClosedForReceive()); // checking if the forks aren't left running @@ -456,8 +455,8 @@ void mapParUnordered_testCompleteRunningForksAndNotStartNewOnesWhenUpstreamFails }); // then - ChannelErrorException exception = assertThrows(ChannelErrorException.class, flow2::runToList); - assertInstanceOf(IllegalStateException.class, exception.getCause()); + ExecutionException exception = assertThrows(ExecutionException.class, flow2::runToList); + assertInstanceOf(IllegalStateException.class, exception.getCause().getCause()); // checking if the forks aren't left running TimeUnit.MILLISECONDS.sleep(200); @@ -500,7 +499,7 @@ void mapParUnordered_testCancelRunningForksWhenSurroundingScopeClosesDueToError( // then assertEquals(Set.of(2, 4), received); - assertEquals(boom, errorOrClosed.cause().getCause()); + assertEquals(boom, errorOrClosed.cause().getCause().getCause()); assertTrue(s2.isClosedForReceive()); // wait for all threads to finish diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java index 6711983..785aceb 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java @@ -14,9 +14,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import com.softwaremill.jox.ChannelClosedException; import com.softwaremill.jox.ChannelError; import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.Scopes; @@ -130,13 +130,13 @@ void shouldWorkWithMultipleAsyncBoundaries() throws Throwable { @Test void shouldPropagateErrorsWhenUsingBuffer() { - Exception exception = assertThrows(ChannelClosedException.class, () -> { + Exception exception = assertThrows(ExecutionException.class, () -> { Flows.fromValues(1, 2, 3) - .map(value -> { throw new IllegalStateException(); }) + .map(_ -> { throw new IllegalStateException(); }) .buffer(5) .runToList(); }); - assertInstanceOf(IllegalStateException.class, exception.getCause()); + assertInstanceOf(IllegalStateException.class, exception.getCause().getCause()); } @Test @@ -163,7 +163,7 @@ void shouldTap() throws Throwable { flow .tap(results::add) .map(i -> i * 2) - .runForeach(j -> { + .runForeach(_ -> { }); // then @@ -454,31 +454,31 @@ void shouldMergeWithATickFlow() throws Exception { @Test void shouldPropagateErrorFromTheLeft() { // given - var c1 = Flows.fromValues(1, 2, 3).concat(Flows.failed(new IllegalStateException())); + var c1 = Flows.fromValues(1, 2, 3).concat(Flows.failed(new IllegalStateException())); var c2 = Flows.fromValues(4, 5, 6); var s = c1.merge(c2, false, false); // when - var exception = assertThrows(ChannelClosedException.class, s::runToList); + var exception = assertThrows(ExecutionException.class, s::runToList); // then - assertInstanceOf(IllegalStateException.class, exception.getCause()); + assertInstanceOf(IllegalStateException.class, exception.getCause().getCause()); } @Test void shouldPropagateErrorFromTheRight() { // given var c1 = Flows.fromValues(1, 2, 3); - var c2 = Flows.fromValues(4, 5, 6).concat(Flows.failed(new IllegalStateException())); + var c2 = Flows.fromValues(4, 5, 6).concat(Flows.failed(new IllegalStateException())); var s = c1.merge(c2, false, false); // when - var exception = assertThrows(ChannelClosedException.class, s::runToList); + var exception = assertThrows(ExecutionException.class, s::runToList); // then - assertInstanceOf(IllegalStateException.class, exception.getCause()); + assertInstanceOf(IllegalStateException.class, exception.getCause().getCause()); } @Test diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java index cb979ac..2eb00df 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java @@ -76,9 +76,9 @@ void shouldReturnOriginalFutureFailureWhenFutureFails() { RuntimeException failure = new RuntimeException("future failed"); // when & then - assertThrows(RuntimeException.class, - () -> Flows.fromCompletableFuture(CompletableFuture.failedFuture(failure)).runToList(), - failure.getMessage()); + ExecutionException exception = assertThrows(ExecutionException.class, + () -> Flows.fromCompletableFuture(CompletableFuture.failedFuture(failure)).runToList()); + assertEquals(failure, exception.getCause()); } @Test