Skip to content

Commit

Permalink
Implement more flows ops methods part 2
Browse files Browse the repository at this point in the history
* interleave
* mapConcat
* mapPar
* mapParUnordered
* sliding
* alsoTo
* alsoToTap
  • Loading branch information
emil-bar committed Dec 23, 2024
1 parent add2290 commit a9dc2b6
Show file tree
Hide file tree
Showing 8 changed files with 1,471 additions and 81 deletions.
297 changes: 297 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java

Large diffs are not rendered by default.

92 changes: 89 additions & 3 deletions flows/src/main/java/com/softwaremill/jox/flows/Flows.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
package com.softwaremill.jox.flows;

import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.Fork;
import static com.softwaremill.jox.structured.Scopes.unsupervised;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelClosed;
import com.softwaremill.jox.ChannelDone;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.Fork;

public final class Flows {

private Flows() {}
Expand Down Expand Up @@ -222,4 +230,82 @@ public static <T> Flow<T> failed(Exception t) {
throw t;
});
}
}

/**
* Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order
* of elements in all flows is preserved.
* <p>
* If any of the flows is done before the others, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow
* is completed immediately, otherwise the interleaving continues with the remaining non-completed flows. Once all but one flows are
* complete, the elements of the remaining non-complete flow are emitted by the returned flow.
* <p>
* The provided flows are run concurrently and asynchronously.
*
* @param flows
* The flows whose elements will be interleaved.
* @param segmentSize
* The number of elements sent from each flow before switching to the next one.
* @param eagerComplete
* If `true`, the returned flow is completed as soon as any of the flows completes. If `false`, the interleaving continues with the
* remaining non-completed flows.
*/
public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, boolean eagerComplete, int bufferCapacity) {
if (flows.isEmpty()) {
return Flows.empty();
} else if (flows.size() == 1) {
return flows.getFirst();
} else {
return usingEmit(emit -> {
Channel<T> results = new Channel<>(bufferCapacity);
unsupervised(scope -> {
scope.forkUnsupervised(() -> {
List<Source<T>> availableSources = new ArrayList<>(flows.stream()
.map(flow -> flow.runToChannel(scope))
.toList());
int currentSourceIndex = 0;
int elementsRead = 0;

while (true) {
var received = availableSources.get(currentSourceIndex).receiveOrClosed();
if (received instanceof ChannelDone done) {
/// channel is done, remove it from the list of available sources
availableSources.remove(currentSourceIndex);
currentSourceIndex = currentSourceIndex == 0 ? availableSources.size() - 1 : currentSourceIndex - 1;

// if all sources are done, or eagerComplete break the loop
if (eagerComplete || availableSources.isEmpty()) {
results.doneOrClosed();
break;
} else {
// switch to the next source
currentSourceIndex = (currentSourceIndex + 1) % availableSources.size();
elementsRead = 0;
}
} else if (received instanceof ChannelError(Throwable cause)) {
// if any source fails, propagate the error
results.errorOrClosed(cause);
break;
} else {
elementsRead++;

// switch to the next source when segmentSize is reached and there are more sources available
if (elementsRead == segmentSize && availableSources.size() > 1) {
currentSourceIndex = (currentSourceIndex + 1) % availableSources.size();
elementsRead = 0;
}
//noinspection unchecked
Object result = results.sendOrClosed((T) received);
if (result instanceof ChannelClosed) {
break;
}
}
}
return null;
});
FlowEmit.channelToEmit(results, emit);
return null;
});
});
}
}
}
244 changes: 244 additions & 0 deletions flows/src/test/java/com/softwaremill/jox/flows/FlowAlsoToTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package com.softwaremill.jox.flows;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelClosedException;
import com.softwaremill.jox.ChannelDone;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.structured.Scopes;
import org.junit.jupiter.api.Test;

public class FlowAlsoToTest {

@Test
void alsoTo_shouldSendToBothSinks() throws Exception {
// given
var c = new Channel<Integer>(10);

// when
List<Integer> result = Flows.fromValues(1, 2, 3)
.alsoTo(c)
.runToList();

// then
assertEquals(List.of(1, 2, 3), result);
assertEquals(List.of(1, 2, 3), c.toList());
}

@Test
void alsoTo_shouldSendToBothSinksAndNotHangWhenOtherSinkIsRendezvousChannel() throws Exception {
Scopes.supervised(scope -> {
// given
var c = new Channel<Integer>();
var f = scope.fork(c::toList);

// when
List<Integer> result = Flows.fromValues(1, 2, 3, 4, 5)
.alsoTo(c)
.runToList();

// then
assertEquals(List.of(1, 2, 3, 4, 5), result);
assertEquals(List.of(1, 2, 3, 4, 5), f.join());
return null;
});
}

@Test
void alsoTo_shouldCloseMainFlowWhenOtherCloses() throws Exception {
Scopes.supervised(scope -> {
// given
var c = new Channel<Integer>();
scope.fork(() -> {
var list = List.of(c.receiveOrClosed(), c.receiveOrClosed(), c.receiveOrClosed());
c.doneOrClosed();
// a send() from the main thread might be waiting - we need to consume that, and only then the main thread
// will discover that the channel is closed
c.receiveOrClosed();
return list;
});
Flow<Integer> flow = Flows.iterate(1, i -> i + 1)
.take(100)
.alsoTo(c);

// when & then
assertThrows(ChannelClosedException.class, flow::runToList);
return null;
});
}

@Test
void alsoTo_shouldCloseMainFlowWithErrorWhenOtherErrors() throws Exception {
Scopes.supervised(scope -> {
// given
var c = new Channel<Integer>(1);
var f = scope.fork(() -> {
c.receiveOrClosed();
c.receiveOrClosed();
c.receiveOrClosed();
c.errorOrClosed(new IllegalStateException());
return null;
});

Flow<Integer> flow = Flows.iterate(1, i -> i + 1)
.take(100)
.alsoTo(c);

// when & then
assertThrows(ChannelClosedException.class, flow::runToList);
f.join();
return null;
});
}

@Test
void alsoTo_shouldCloseOtherChannelWithErrorWhenMainErrors() throws Exception {
Scopes.supervised(scope -> {
// given
var other = new Channel<Integer>(0);
var forkOther = scope.forkUnsupervised(other::toList);

Flow<Integer> flow = Flows.iterate(1, i -> i + 1)
.take(100)
.concat(Flows.failed(new IllegalStateException()))
.alsoTo(other);

// when & then
assertThrows(IllegalStateException.class, flow::runToList);
assertThrows(ExecutionException.class, forkOther::join);
return null;
});
}

@Test
void alsoToTap_shouldSendToBothSinksWhenOtherIsFaster() throws Exception {
// given
var other = new Channel<Integer>(10);
Flow<Integer> flow = Flows
.fromValues(1, 2, 3)
.alsoToTap(other)
.tap(v -> {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

// when & then
assertEquals(List.of(1, 2, 3), flow.runToList());
assertEquals(List.of(1, 2, 3), other.toList());
}

@Test
void alsoTapTo_shouldSendToBothSinksWhenOtherIsSlower() throws Exception {
Scopes.supervised(scope -> {
// given
var other = new Channel<Integer>();
var slowConsumerFork = scope.fork(() -> {
var consumed = new LinkedList<>();
while (true) {
Thread.sleep(100);
var result = other.receiveOrClosed();
if (result instanceof ChannelDone || result instanceof ChannelError) {
break;
} else {
consumed.add(result);
}
}
return consumed;
});
var main = new Channel<Integer>();
scope.fork(() -> {
for (int i = 1; i <= 20; i++) {
main.send(i);
Thread.sleep(10);
}
main.done();
return null;
});

// when
List<Integer> result = Flows.fromSource(main).alsoToTap(other).runToList();

// then
assertEquals(IntStream.rangeClosed(1, 20).boxed().toList(), result);
assertThat(slowConsumerFork.join(), hasSize(lessThan(10)));
return null;
});
}

@Test
void alsoTapTo_shouldNotFailTheFlowWhenTheOtherSinkFails() throws Exception {
Scopes.supervised(scope -> {
// given
var other = new Channel<Integer>();
var f = scope.fork(() -> {
var v = other.receiveOrClosed();
other.error(new RuntimeException("boom!"));
return v;
});

// when
List<Integer> result = Flows
.iterate(1, i -> i + 1)
.take(10)
.tap(v -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.alsoToTap(other)
.runToList();

// then
assertEquals(IntStream.rangeClosed(1, 10).boxed().toList(), result);
assertEquals(1, f.join());
return null;
});
}

@Test
void alsoTapTo_shouldNotCloseTheFlowWhenTheOtherSinkCloses() throws Exception {
Scopes.supervised(scope -> {
// given
var other = new Channel<Integer>();
var f = scope.fork(() -> {
var v = other.receiveOrClosed();
other.done();
return v;
});

// when
List<Integer> result = Flows
.iterate(1, i -> i + 1)
.take(10)
.tap(v -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.alsoToTap(other)
.runToList();

// then
assertEquals(IntStream.rangeClosed(1, 10).boxed().toList(), result);
assertEquals(1, f.join());
return null;
});
}
}
Loading

0 comments on commit a9dc2b6

Please sign in to comment.