diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java index 11da75f1..ab7f4a04 100644 --- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java +++ b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java @@ -106,6 +106,13 @@ private static CompletableFuture> combine(List(mapper, Dispatcher.virtual(parallelism), Function.identity()); } + static Collector>> collectingToStream(Function mapper, Executor executor) { + requireNonNull(executor, "executor can't be null"); + requireNonNull(mapper, "mapper can't be null"); + + return new AsyncParallelCollector<>(mapper, Dispatcher.from(executor), Function.identity()); + } + static Collector>> collectingToStream(Function mapper, Executor executor, int parallelism) { requireNonNull(executor, "executor can't be null"); requireNonNull(mapper, "mapper can't be null"); @@ -133,6 +140,14 @@ private static CompletableFuture> combine(List(mapper, Dispatcher.virtual(parallelism), s -> s.collect(collector)); } + static Collector> collectingWithCollector(Collector collector, Function mapper, Executor executor) { + requireNonNull(collector, "collector can't be null"); + requireNonNull(executor, "executor can't be null"); + requireNonNull(mapper, "mapper can't be null"); + + return new AsyncParallelCollector<>(mapper, Dispatcher.from(executor), s -> s.collect(collector)); + } + static Collector> collectingWithCollector(Collector collector, Function mapper, Executor executor, int parallelism) { requireNonNull(collector, "collector can't be null"); requireNonNull(executor, "executor can't be null"); diff --git a/src/main/java/com/pivovarit/collectors/Dispatcher.java b/src/main/java/com/pivovarit/collectors/Dispatcher.java index a0d0ae40..de4db63a 100644 --- a/src/main/java/com/pivovarit/collectors/Dispatcher.java +++ b/src/main/java/com/pivovarit/collectors/Dispatcher.java @@ -47,6 +47,15 @@ private Dispatcher(int permits) { this.limiter = new Semaphore(permits); } + private Dispatcher(Executor executor) { + this.executor = executor; + this.limiter = null; + } + + static Dispatcher from(Executor executor) { + return new Dispatcher<>(executor); + } + static Dispatcher from(Executor executor, int permits) { return new Dispatcher<>(executor, permits); } diff --git a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java index 2a3a35b3..0ea272bd 100644 --- a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java +++ b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java @@ -95,6 +95,32 @@ private ParallelCollectors() { return AsyncParallelCollector.collectingWithCollector(collector, mapper, executor, parallelism); } + /** + * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} with unlimited parallelism + * and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}. + * + *
+ * Example: + *
{@code
+     * CompletableFuture> result = Stream.of(1, 2, 3)
+     *   .collect(parallel(i -> foo(i), toList(), executor));
+     * }
+ * + * @param mapper a transformation to be performed in parallel + * @param collector the {@code Collector} describing the reduction + * @param executor the {@code Executor} to use for asynchronous execution + * @param the type of the collected elements + * @param the result returned by {@code mapper} + * @param the reduction result {@code collector} + * + * @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel + * + * @since 3.3.0 + */ + public static Collector> parallel(Function mapper, Collector collector, Executor executor) { + return AsyncParallelCollector.collectingWithCollector(collector, mapper, executor); + } + /** * A convenience {@link Collector} used for executing parallel computations using Virtual Threads * and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements. @@ -175,6 +201,33 @@ private ParallelCollectors() { return AsyncParallelCollector.collectingToStream(mapper, executor, parallelism); } + /** + * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} with unlimited parallelism + * and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements. + * + *

+ * The collector maintains the order of processed {@link Stream}. Instances should not be reused. + * + *
+ * Example: + *
{@code
+     * CompletableFuture> result = Stream.of(1, 2, 3)
+     *   .collect(parallel(i -> foo(), executor));
+     * }
+ * + * @param mapper a transformation to be performed in parallel + * @param executor the {@code Executor} to use for asynchronous execution + * @param the type of the collected elements + * @param the result returned by {@code mapper} + * + * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel + * + * @since 3.3.0 + */ + public static Collector>> parallel(Function mapper, Executor executor) { + return AsyncParallelCollector.collectingToStream(mapper, executor); + } + /** * A convenience {@link Collector} used for executing parallel computations using Virtual Threads * and returning a {@link Stream} instance returning results as they arrive. @@ -228,6 +281,33 @@ private ParallelCollectors() { return ParallelStreamCollector.streaming(mapper, parallelism); } + /** + * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} with unlimited parallelism + * and returning a {@link Stream} instance returning results as they arrive. + *

+ * For the parallelism of 1, the stream is executed by the calling thread. + * + *
+ * Example: + *

{@code
+     * Stream.of(1, 2, 3)
+     *   .collect(parallelToStream(i -> foo(), executor, 2))
+     *   .forEach(System.out::println);
+     * }
+ * + * @param mapper a transformation to be performed in parallel + * @param executor the {@code Executor} to use for asynchronous execution + * @param the type of the collected elements + * @param the result returned by {@code mapper} + * + * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel + * + * @since 3.3.0 + */ + public static Collector> parallelToStream(Function mapper, Executor executor) { + return ParallelStreamCollector.streaming(mapper, executor); + } + /** * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive. @@ -309,6 +389,33 @@ private ParallelCollectors() { return ParallelStreamCollector.streamingOrdered(mapper, parallelism); } + /** + * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} with unlimited parallelism + * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order. + *

+ * For the parallelism of 1, the stream is executed by the calling thread. + * + *
+ * Example: + *

{@code
+     * Stream.of(1, 2, 3)
+     *   .collect(parallelToOrderedStream(i -> foo(), executor))
+     *   .forEach(System.out::println);
+     * }
+ * + * @param mapper a transformation to be performed in parallel + * @param executor the {@code Executor} to use for asynchronous execution + * @param the type of the collected elements + * @param the result returned by {@code mapper} + * + * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel + * + * @since 3.3.0 + */ + public static Collector> parallelToOrderedStream(Function mapper, Executor executor) { + return ParallelStreamCollector.streamingOrdered(mapper, executor); + } + /** * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order. diff --git a/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java b/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java index eaf0b30b..64fe8a84 100644 --- a/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java +++ b/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java @@ -97,6 +97,13 @@ public Set characteristics() { return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.virtual(parallelism)); } + static Collector> streaming(Function mapper, Executor executor) { + requireNonNull(executor, "executor can't be null"); + requireNonNull(mapper, "mapper can't be null"); + + return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.from(executor)); + } + static Collector> streaming(Function mapper, Executor executor, int parallelism) { requireNonNull(executor, "executor can't be null"); requireNonNull(mapper, "mapper can't be null"); @@ -118,6 +125,13 @@ public Set characteristics() { return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.virtual(parallelism)); } + static Collector> streamingOrdered(Function mapper, Executor executor) { + requireNonNull(executor, "executor can't be null"); + requireNonNull(mapper, "mapper can't be null"); + + return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.from(executor)); + } + static Collector> streamingOrdered(Function mapper, Executor executor, int parallelism) { requireNonNull(executor, "executor can't be null"); diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index 468cfe43..1fb5dbc4 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -72,20 +72,22 @@ Stream collectors() { virtualThreadsTests((m, e, p) -> adapt(parallel(m)), "ParallelCollectors.parallel() [virtual]", true), virtualThreadsTests((m, e, p) -> adapt(parallel(m, p)), "ParallelCollectors.parallel() [virtual]", true), // platform threads - tests((m, e, p) -> parallel(m, toList(), e, p), format("ParallelCollectors.parallel(toList(), p=%d)", PARALLELISM), true), - tests((m, e, p) -> parallel(m, toSet(), e, p), format("ParallelCollectors.parallel(toSet(), p=%d)", PARALLELISM), false), - tests((m, e, p) -> parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.parallel(toCollection(), p=%d)", PARALLELISM), true), - tests((m, e, p) -> adapt(parallel(m, e, p)), format("ParallelCollectors.parallel(p=%d)", PARALLELISM), true) + tests((m, e, p) -> parallel(m, toList(), e, p), format("ParallelCollectors.parallel(toList(), p=%d)", PARALLELISM), true, true), + tests((m, e, p) -> parallel(m, toSet(), e, p), format("ParallelCollectors.parallel(toSet(), p=%d)", PARALLELISM), false, true), + tests((m, e, p) -> parallel(m, toList(), e), "ParallelCollectors.parallel(toList(), p=inf)", true, false), + tests((m, e, p) -> parallel(m, toSet(), e), "ParallelCollectors.parallel(toSet(), p=inf)", false, false), + tests((m, e, p) -> parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.parallel(toCollection(), p=%d)", PARALLELISM), true, true), + tests((m, e, p) -> adapt(parallel(m, e, p)), format("ParallelCollectors.parallel(p=%d)", PARALLELISM), true, true) ).flatMap(i -> i); } @TestFactory Stream batching_collectors() { return of( - batchTests((m, e, p) -> Batching.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true), - batchTests((m, e, p) -> Batching.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false), - batchTests((m, e, p) -> Batching.parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true), - batchTests((m, e, p) -> adapt(Batching.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true) + batchTests((m, e, p) -> Batching.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true, true), + batchTests((m, e, p) -> Batching.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false, true), + batchTests((m, e, p) -> Batching.parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true, true), + batchTests((m, e, p) -> adapt(Batching.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true, true) ).flatMap(i -> i); } @@ -186,7 +188,7 @@ private static > Stream virtualThread : tests; } - private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { + private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder, boolean limitedParallelism) { var tests = of( shouldCollect(collector, name, 1), shouldCollect(collector, name, PARALLELISM), @@ -195,19 +197,19 @@ private static > Stream tests(Collect shouldCollectToEmpty(collector, name), shouldStartConsumingImmediately(collector, name), shouldNotBlockTheCallingThread(collector, name), - shouldRespectParallelism(collector, name), shouldHandleThrowable(collector, name), shouldShortCircuitOnException(collector, name), shouldInterruptOnException(collector, name), shouldHandleRejectedExecutionException(collector, name), shouldRemainConsistent(collector, name), - shouldRejectInvalidParallelism(collector, name), shouldHandleExecutorRejection(collector, name) ); - return maintainsOrder - ? Stream.concat(tests, of(shouldMaintainOrder(collector, name))) - : tests; + tests = maintainsOrder ? Stream.concat(tests, of(shouldMaintainOrder(collector, name))) : tests; + tests = limitedParallelism ? of(shouldRespectParallelism(collector, name)) : tests; + tests = limitedParallelism ? of(shouldRejectInvalidParallelism(collector, name)) : tests; + + return tests; } private static > Stream virtualThreadsStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { @@ -249,9 +251,9 @@ private static > Stream streamingTest : tests; } - private static > Stream batchTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { + private static > Stream batchTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder, boolean limitedParallelism) { return Stream.concat( - tests(collector, name, maintainsOrder), + tests(collector, name, maintainsOrder, limitedParallelism), of(shouldProcessOnNThreadsETParallelism(collector, name))); }