Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement more flows ops methods part 2 #75

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
352 changes: 313 additions & 39 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java

Large diffs are not rendered by default.

105 changes: 93 additions & 12 deletions flows/src/main/java/com/softwaremill/jox/flows/Flows.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package com.softwaremill.jox.flows;

import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.Fork;
import static com.softwaremill.jox.structured.Scopes.unsupervised;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelClosed;
import com.softwaremill.jox.ChannelDone;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.Fork;

public final class Flows {

private Flows() {}
Expand Down Expand Up @@ -170,7 +177,7 @@ public static <T> Flow<T> repeatEvalWhileDefined(Supplier<Optional<T>> supplierF

/** Create a flow which sleeps for the given `timeout` and then completes as done. */
public static <T> Flow<T> timeout(Duration timeout) {
return usingEmit(emit -> Thread.sleep(timeout.toMillis()));
return usingEmit(_ -> Thread.sleep(timeout.toMillis()));
}

/**
Expand All @@ -188,17 +195,13 @@ public static <T> Flow<T> concat(Flow<T>... flows) {

/** Creates an empty flow, which emits no elements and completes immediately. */
public static <T> Flow<T> empty() {
return usingEmit(emit -> {});
return usingEmit(_ -> {});
}

/** Creates a flow that emits a single element when `from` completes, or throws an exception when `from` fails. */
public static <T> Flow<T> fromCompletableFuture(CompletableFuture<T> from) {
return usingEmit(emit -> {
try {
emit.apply(from.get());
} catch (ExecutionException e) {
throw (Exception) e.getCause();
}
emit.apply(from.get());
});
}

Expand All @@ -218,8 +221,86 @@ public static <T> Flow<T> fromFutureSource(CompletableFuture<Source<T>> from) {
* The {@link java.lang.Exception} to fail with
*/
public static <T> Flow<T> failed(Exception t) {
return usingEmit(emit -> {
return usingEmit(_ -> {
throw t;
});
}
}

/**
* Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order
* of elements in all flows is preserved.
* <p>
* If any of the flows is done before the others, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow
* is completed immediately, otherwise the interleaving continues with the remaining non-completed flows. Once all but one flows are
* complete, the elements of the remaining non-complete flow are emitted by the returned flow.
* <p>
* The provided flows are run concurrently and asynchronously.
*
* @param flows
* The flows whose elements will be interleaved.
* @param segmentSize
* The number of elements sent from each flow before switching to the next one.
* @param eagerComplete
* If `true`, the returned flow is completed as soon as any of the flows completes. If `false`, the interleaving continues with the
* remaining non-completed flows.
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must have created this method before adding scope value for buffer capacity. I'll overload the method in the next PR

public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, boolean eagerComplete, int bufferCapacity) {
if (flows.isEmpty()) {
return Flows.empty();
} else if (flows.size() == 1) {
return flows.getFirst();
} else {
return usingEmit(emit -> {
Channel<T> results = new Channel<>(bufferCapacity);
unsupervised(scope -> {
scope.forkUnsupervised(() -> {
List<Source<T>> availableSources = new ArrayList<>(flows.stream()
.map(flow -> flow.runToChannel(scope))
.toList());
int currentSourceIndex = 0;
int elementsRead = 0;

while (true) {
var received = availableSources.get(currentSourceIndex).receiveOrClosed();
if (received instanceof ChannelDone) {
/// channel is done, remove it from the list of available sources
availableSources.remove(currentSourceIndex);
currentSourceIndex = currentSourceIndex == 0 ? availableSources.size() - 1 : currentSourceIndex - 1;

// if all sources are done, or eagerComplete break the loop
if (eagerComplete || availableSources.isEmpty()) {
results.doneOrClosed();
break;
} else {
// switch to the next source
currentSourceIndex = (currentSourceIndex + 1) % availableSources.size();
elementsRead = 0;
}
} else if (received instanceof ChannelError(Throwable cause)) {
// if any source fails, propagate the error
results.errorOrClosed(cause);
break;
} else {
elementsRead++;

// switch to the next source when segmentSize is reached and there are more sources available
if (elementsRead == segmentSize && availableSources.size() > 1) {
currentSourceIndex = (currentSourceIndex + 1) % availableSources.size();
elementsRead = 0;
}
//noinspection unchecked
Object result = results.sendOrClosed((T) received);
if (result instanceof ChannelClosed) {
break;
}
}
}
return null;
});
FlowEmit.channelToEmit(results, emit);
return null;
});
});
}
}
}
244 changes: 244 additions & 0 deletions flows/src/test/java/com/softwaremill/jox/flows/FlowAlsoToTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package com.softwaremill.jox.flows;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelClosedException;
import com.softwaremill.jox.ChannelDone;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.structured.Scopes;
import org.junit.jupiter.api.Test;

public class FlowAlsoToTest {

@Test
void alsoTo_shouldSendToBothSinks() throws Exception {
// given
var c = new Channel<Integer>(10);

// when
List<Integer> result = Flows.fromValues(1, 2, 3)
.alsoTo(c)
.runToList();

// then
assertEquals(List.of(1, 2, 3), result);
assertEquals(List.of(1, 2, 3), c.toList());
}

@Test
void alsoTo_shouldSendToBothSinksAndNotHangWhenOtherSinkIsRendezvousChannel() throws Exception {
Scopes.supervised(scope -> {
// given
var c = new Channel<Integer>();
var f = scope.fork(c::toList);

// when
List<Integer> result = Flows.fromValues(1, 2, 3, 4, 5)
.alsoTo(c)
.runToList();

// then
assertEquals(List.of(1, 2, 3, 4, 5), result);
assertEquals(List.of(1, 2, 3, 4, 5), f.join());
return null;
});
}

@Test
void alsoTo_shouldCloseMainFlowWhenOtherCloses() throws Exception {
Scopes.supervised(scope -> {
// given
var c = new Channel<Integer>();
scope.fork(() -> {
var list = List.of(c.receiveOrClosed(), c.receiveOrClosed(), c.receiveOrClosed());
c.doneOrClosed();
// a send() from the main thread might be waiting - we need to consume that, and only then the main thread
// will discover that the channel is closed
c.receiveOrClosed();
return list;
});
Flow<Integer> flow = Flows.iterate(1, i -> i + 1)
.take(100)
.alsoTo(c);

// when & then
assertThrows(ChannelClosedException.class, flow::runToList);
return null;
});
}

@Test
void alsoTo_shouldCloseMainFlowWithErrorWhenOtherErrors() throws Exception {
Scopes.supervised(scope -> {
// given
var c = new Channel<Integer>(1);
var f = scope.fork(() -> {
c.receiveOrClosed();
c.receiveOrClosed();
c.receiveOrClosed();
c.errorOrClosed(new IllegalStateException());
return null;
});

Flow<Integer> flow = Flows.iterate(1, i -> i + 1)
.take(100)
.alsoTo(c);

// when & then
assertThrows(ChannelClosedException.class, flow::runToList);
f.join();
return null;
});
}

@Test
void alsoTo_shouldCloseOtherChannelWithErrorWhenMainErrors() throws Exception {
Scopes.supervised(scope -> {
// given
var other = new Channel<Integer>(0);
var forkOther = scope.forkUnsupervised(other::toList);

Flow<Integer> flow = Flows.iterate(1, i -> i + 1)
.take(100)
.concat(Flows.failed(new IllegalStateException()))
.alsoTo(other);

// when & then
assertThrows(IllegalStateException.class, flow::runToList);
assertThrows(ExecutionException.class, forkOther::join);
return null;
});
}

@Test
void alsoToTap_shouldSendToBothSinksWhenOtherIsFaster() throws Exception {
// given
var other = new Channel<Integer>(10);
Flow<Integer> flow = Flows
.fromValues(1, 2, 3)
.alsoToTap(other)
.tap(v -> {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

// when & then
assertEquals(List.of(1, 2, 3), flow.runToList());
assertEquals(List.of(1, 2, 3), other.toList());
}

@Test
void alsoTapTo_shouldSendToBothSinksWhenOtherIsSlower() throws Exception {
Scopes.supervised(scope -> {
// given
var other = new Channel<Integer>();
var slowConsumerFork = scope.fork(() -> {
var consumed = new LinkedList<>();
while (true) {
Thread.sleep(100);
var result = other.receiveOrClosed();
if (result instanceof ChannelDone || result instanceof ChannelError) {
break;
} else {
consumed.add(result);
}
}
return consumed;
});
var main = new Channel<Integer>();
scope.fork(() -> {
for (int i = 1; i <= 20; i++) {
main.send(i);
Thread.sleep(10);
}
main.done();
return null;
});

// when
List<Integer> result = Flows.fromSource(main).alsoToTap(other).runToList();

// then
assertEquals(IntStream.rangeClosed(1, 20).boxed().toList(), result);
assertThat(slowConsumerFork.join(), hasSize(lessThan(10)));
return null;
});
}

@Test
void alsoTapTo_shouldNotFailTheFlowWhenTheOtherSinkFails() throws Exception {
Scopes.supervised(scope -> {
// given
var other = new Channel<Integer>();
var f = scope.fork(() -> {
var v = other.receiveOrClosed();
other.error(new RuntimeException("boom!"));
return v;
});

// when
List<Integer> result = Flows
.iterate(1, i -> i + 1)
.take(10)
.tap(v -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.alsoToTap(other)
.runToList();

// then
assertEquals(IntStream.rangeClosed(1, 10).boxed().toList(), result);
assertEquals(1, f.join());
return null;
});
}

@Test
void alsoTapTo_shouldNotCloseTheFlowWhenTheOtherSinkCloses() throws Exception {
Scopes.supervised(scope -> {
// given
var other = new Channel<Integer>();
var f = scope.fork(() -> {
var v = other.receiveOrClosed();
other.done();
return v;
});

// when
List<Integer> result = Flows
.iterate(1, i -> i + 1)
.take(10)
.tap(v -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.alsoToTap(other)
.runToList();

// then
assertEquals(IntStream.rangeClosed(1, 10).boxed().toList(), result);
assertEquals(1, f.join());
return null;
});
}
}
Loading
Loading