Skip to content

Commit

Permalink
Add Publisher.switchMap
Browse files Browse the repository at this point in the history
Motivation:
Publisher.switchMap can be used to flatten an async stream of
publishers while always taking results from the latest publisher
and cancelling the previous Publisher.
  • Loading branch information
Scottmitch committed Aug 20, 2023
1 parent 4837cd8 commit 8b5047c
Show file tree
Hide file tree
Showing 4 changed files with 1,062 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,6 @@ public final Publisher<T> onErrorResume(Predicate<? super Throwable> predicate,
* return results;
* }</pre>
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* each mapped {@link Publisher}.
* @param <R> The type of mapped {@link Publisher}.
* @return A new {@link Publisher} which flattens the emissions from all mapped {@link Publisher}s.
* @see <a href="https://reactivex.io/documentation/operators/flatmap.html">ReactiveX flatMap operator.</a>
Expand Down Expand Up @@ -871,7 +870,6 @@ public final <R> Publisher<R> flatMapMerge(Function<? super T, ? extends Publish
* createAndThrowACompositeException(errors);
* }</pre>
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* each mapped {@link Publisher}.
* @param <R> The type of mapped {@link Publisher}.
* @return A new {@link Publisher} which flattens the emissions from all mapped {@link Publisher}s.
* @see <a href="https://reactivex.io/documentation/operators/flatmap.html">ReactiveX flatMap operator.</a>
Expand Down Expand Up @@ -1618,6 +1616,86 @@ public final <R> Publisher<R> flatMapConcatIterable(Function<? super T, ? extend
return new PublisherConcatMapIterable<>(this, mapper);
}

/**
* Return a {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled. Both upstream and the last switched {@link Publisher} must complete
* before the returned {@link Publisher} completes. If either upstream or the currently active {@link Publisher}
* terminate in error the returned {@link Publisher} is terminated with that error.
* <pre>{@code
* ExecutorService e = ...;
* List<Future<List<R>>> futures = ...; // assume this is thread safe
*
* for (T t : resultOfThisPublisher()) {
* // Note that flatMap process results in parallel.
* futures.add(e.submit(() -> {
* // Approximation: control flow is simplified here but when a later mapper is applied any incomplete
* // results from a previous mapper are cancelled and result in empty results.
* return mapper.apply(t); // Asynchronous result is flatten into a value by this operator.
* }));
* }
* List<R> results = new ArrayList<>(futures.size());
* // This is an approximation, this operator does not provide any ordering guarantees for the results.
* for (Future<List<R>> future : futures) {
* List<R> rList = future.get(); // Throws if the processing for this item failed.
* results.addAll(rList);
* }
* return results;
* }</pre>
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* @param <R> The type of mapped {@link Publisher}.
* @return A {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled.
* @see <a href="https://reactivex.io/documentation/operators/switch.html">ReactiveX switch operator.</a>
* @see <a href=
"https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html">
Kotlin flatMapLatest</a>
* @see #switchMapDelayError(Function)
*/
public final <R> Publisher<R> switchMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return new PublisherSwitchMap<>(this, 0, mapper);
}

/**
* Return a {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled. Both upstream and the last switched {@link Publisher} must terminate
* before the returned {@link Publisher} terminates (including errors).
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* @param <R> The type of mapped {@link Publisher}.
* @return A {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled.
* @see <a href="https://reactivex.io/documentation/operators/switch.html">ReactiveX switch operator.</a>
* @see <a href=
"https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html">
Kotlin flatMapLatest</a>
* @see #switchMap(Function)
* @see #switchMapDelayError(Function, int)
*/
public final <R> Publisher<R> switchMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return new PublisherSwitchMap<>(this, true, mapper);
}

/**
* Return a {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled. Both upstream and the last switched {@link Publisher} must terminate
* before the returned {@link Publisher} terminates (including errors).
* @param mapper Convert each item emitted by this {@link Publisher} into another {@link Publisher}.
* @param maxDelayedErrorsHint The maximum amount of errors that will be queued. After this point exceptions maybe
* discarded to reduce memory consumption.
* @param <R> The type of mapped {@link Publisher}.
* @return A {@link Publisher} that will switch to the latest {@link Publisher} emitted from {@code mapper} and the
* prior {@link Publisher} will be cancelled.
* @see <a href="https://reactivex.io/documentation/operators/switch.html">ReactiveX switch operator.</a>
* @see <a href=
"https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html">
Kotlin flatMapLatest</a>
* @see #switchMap(Function)
* @see #switchMapDelayError(Function)
*/
public final <R> Publisher<R> switchMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper,
int maxDelayedErrorsHint) {
return new PublisherSwitchMap<>(this, maxDelayedErrorsHint, mapper);
}

/**
* Merge two {@link Publisher}s together. There is no guaranteed ordering of events emitted from the returned
* {@link Publisher}.
Expand Down
Loading

0 comments on commit 8b5047c

Please sign in to comment.