diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index e5c8f8eb..8b53452a 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -165,11 +165,8 @@ void shouldExecuteEagerlyOnProvidedThreadPool() { private static > Stream virtualThreadsTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { var tests = of( - shouldCollect(collector, name, 1), - shouldCollect(collector, name, PARALLELISM), shouldCollectNElementsWithNParallelism(collector, name, 1), shouldCollectNElementsWithNParallelism(collector, name, PARALLELISM), - shouldCollectToEmpty(collector, name), shouldStartConsumingImmediately(collector, name), shouldNotBlockTheCallingThread(collector, name), shouldHandleThrowable(collector, name), @@ -185,11 +182,8 @@ private static > Stream virtualThread private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder, boolean limitedParallelism) { var tests = of( - shouldCollect(collector, name, 1), - shouldCollect(collector, name, PARALLELISM), shouldCollectNElementsWithNParallelism(collector, name, 1), shouldCollectNElementsWithNParallelism(collector, name, PARALLELISM), - shouldCollectToEmpty(collector, name), shouldStartConsumingImmediately(collector, name), shouldNotBlockTheCallingThread(collector, name), shouldHandleThrowable(collector, name), @@ -209,9 +203,6 @@ private static > Stream tests(Collect private static > Stream virtualThreadsStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { var tests = of( - shouldCollect(collector, name, 1), - shouldCollect(collector, name, PARALLELISM), - shouldCollectToEmpty(collector, name), shouldStartConsumingImmediately(collector, name), shouldNotBlockTheCallingThread(collector, name), shouldHandleThrowable(collector, name), @@ -226,9 +217,6 @@ private static > Stream virtualThread private static > Stream streamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { var tests = of( - shouldCollect(collector, name, 1), - shouldCollect(collector, name, PARALLELISM), - shouldCollectToEmpty(collector, name), shouldStartConsumingImmediately(collector, name), shouldNotBlockTheCallingThread(collector, name), shouldRespectParallelism(collector, name), @@ -268,14 +256,6 @@ private static > DynamicTest shouldNotBlockTheCall }); } - private static > DynamicTest shouldCollectToEmpty(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return dynamicTest(format("%s: should collect to empty", name), () -> { - withExecutor(e -> { - assertThat(Stream.empty().collect(collector.apply(i -> i, e, PARALLELISM)).join()).isEmpty(); - }); - }); - } - private static > DynamicTest shouldRespectParallelism(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return dynamicTest(format("%s: should respect parallelism", name), () -> { int parallelism = 2; @@ -333,20 +313,6 @@ private static > DynamicTest shouldProcessOnNThrea }); } - private static > DynamicTest shouldCollect(CollectorSupplier, Executor, Integer, Collector>> factory, String name, int parallelism) { - return dynamicTest(format("%s: should collect with parallelism %s", name, parallelism), () -> { - var elements = IntStream.range(0, 10).boxed().toList(); - - withExecutor(e -> { - Collector> ctor = factory.apply(i -> i, e, parallelism); - Collection result = elements.stream().collect(ctor) - .join(); - - assertThat(result).hasSameElementsAs(elements); - }); - }); - } - private static > DynamicTest shouldCollectNElementsWithNParallelism(CollectorSupplier, Executor, Integer, Collector>> factory, String name, int parallelism) { return dynamicTest(format("%s: should collect %s elements with parallelism %s", name, parallelism, parallelism), () -> { var elements = IntStream.iterate(0, i -> i + 1).limit(parallelism).boxed().toList(); diff --git a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java new file mode 100644 index 00000000..c60a7b14 --- /dev/null +++ b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java @@ -0,0 +1,69 @@ +package com.pivovarit.collectors.test; + +import com.pivovarit.collectors.ParallelCollectors; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.TestFactory; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static com.pivovarit.collectors.test.BasicParallelismTest.CollectorDefinition.collector; +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +class BasicParallelismTest { + + private static Stream> allBounded() { + return Stream.of( + collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join().toList())), + collector("parallel(toList(), e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p), CompletableFuture::join)), + collector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)), + collector("parallelToStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p), Stream::toList)), + collector("parallelToStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p), Stream::toList)), + collector("parallelToOrderedStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p), Stream::toList)), + collector("parallelToOrderedStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p), Stream::toList)) + ); + } + + @TestFactory + Stream shouldProcessEmptyWithMaxParallelism() { + return Stream.of(1, 2, 4, 8, 16, 32, 64, 100) + .flatMap(p -> allBounded() + .map(c -> DynamicTest.dynamicTest("%s (parallelism: %d)".formatted(c.name(), p), () -> { + assertThat(Stream.empty().collect(c.factory().collector(i -> i, p))).isEmpty(); + }))); + } + + @TestFactory + Stream shouldProcessAllElementsWithMaxParallelism() { + return Stream.of(1, 2, 4, 8, 16, 32, 64, 100) + .flatMap(p -> allBounded() + .map(c -> DynamicTest.dynamicTest("%s (parallelism: %d)".formatted(c.name(), p), () -> { + var list = IntStream.range(0, 100).boxed().toList(); + List result = list.stream().collect(c.factory().collector(i -> i, p)); + assertThat(result).containsExactlyInAnyOrderElementsOf(list); + }))); + } + + protected record CollectorDefinition(String name, CollectorFactory factory) { + static CollectorDefinition collector(String name, CollectorFactory collector) { + return new CollectorDefinition<>(name, collector); + } + } + + @FunctionalInterface + interface CollectorFactory { + Collector> collector(Function f, Integer p); + } + + private static Executor e() { + return Executors.newCachedThreadPool(); + } +} diff --git a/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java new file mode 100644 index 00000000..38d54a7b --- /dev/null +++ b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java @@ -0,0 +1,100 @@ +package com.pivovarit.collectors.test; + +import com.pivovarit.collectors.ParallelCollectors; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.TestFactory; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static com.pivovarit.collectors.test.BasicProcessingTest.CollectorDefinition.collector; +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +class BasicProcessingTest { + + private static Stream> all() { + return Stream.of( + collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())), + collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())), + collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())), + collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)), + collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)), + collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)), + collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)), + collector("parallelToStream()", f -> collectingAndThen(ParallelCollectors.parallelToStream(f), Stream::toList)), + collector("parallelToStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e()), Stream::toList)), + collector("parallelToStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p()), Stream::toList)), + collector("parallelToStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p()), Stream::toList)), + collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)), + collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)), + collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)), + collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList)) + ); + } + + public static Stream> allOrdered() { + return Stream.of( + collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())), + collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())), + collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())), + collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)), + collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)), + collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)), + collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)), + collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)), + collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)), + collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)), + collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList)) + ); + } + + @TestFactory + Stream shouldProcessEmpty() { + return all() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + assertThat(Stream.empty().collect(c.collector().apply(i -> i))).isEmpty(); + })); + } + + @TestFactory + Stream shouldProcessAllElements() { + return all() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var list = IntStream.range(0, 100).boxed().toList(); + List result = list.stream().collect(c.collector().apply(i -> i)); + assertThat(result).containsExactlyInAnyOrderElementsOf(list); + })); + } + + @TestFactory + Stream shouldProcessAllElementsInOrder() { + return allOrdered() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var list = IntStream.range(0, 100).boxed().toList(); + List result = list.stream().collect(c.collector().apply(i -> i)); + assertThat(result).containsAnyElementsOf(list); + })); + } + + record CollectorDefinition(String name, Function, Collector>> collector) { + static CollectorDefinition collector(String name, Function, Collector>> collector) { + return new CollectorDefinition<>(name, collector); + } + } + + private static Executor e() { + return Executors.newCachedThreadPool(); + } + + private static int p() { + return 4; + } +}