Skip to content

Commit

Permalink
Move lazy binding functionality to ObservableBase.
Browse files Browse the repository at this point in the history
Leaving nothing left in LazilyBoundStream => deleting.
  • Loading branch information
TomasMikula committed Dec 14, 2014
1 parent f825277 commit db1b847
Show file tree
Hide file tree
Showing 29 changed files with 118 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;

class AccumulateBetweenStream<T, A> extends LazilyBoundStream<T> {
class AccumulateBetweenStream<T, A> extends EventStreamBase<T> {
private final EventStream<T> source;
private final EventStream<?> ticks;
private final Function<? super T, ? extends A> initialTransformation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;

class AccumulateUntilLaterStream<T, A> extends LazilyBoundStream<T> {
class AccumulateUntilLaterStream<T, A> extends EventStreamBase<T> {
private final EventStream<T> source;
private final Function<? super T, ? extends A> initialTransformation;
private final BiFunction<? super A, ? super T, ? extends A> accumulation;
Expand Down
2 changes: 1 addition & 1 deletion reactfx/src/main/java/org/reactfx/AccumulatingStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;

class AccumulatingStream<T, U> extends LazilyBoundStream<U> {
class AccumulatingStream<T, U> extends EventStreamBase<U> {
private final EventStream<T> input;
private final Function<? super T, ? extends U> initialTransformation;
private final BiFunction<? super U, ? super T, ? extends U> reduction;
Expand Down
16 changes: 8 additions & 8 deletions reactfx/src/main/java/org/reactfx/Await.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.reactfx.util.TriConsumer;
import org.reactfx.util.Try;

class Await<T, F, R> extends LazilyBoundStream<R> implements AwaitingEventStream<R> {
class Await<T, F, R> extends EventStreamBase<R> implements AwaitingEventStream<R> {

public static <T> AwaitingEventStream<T> awaitCompletionStage(
EventStream<CompletionStage<T>> source,
Expand Down Expand Up @@ -61,7 +61,7 @@ static <T> void addCompletionHandler(
t.addEventHandler(WORKER_STATE_CANCELLED, e -> handler.accept(null, null, true));
}

static <T> TriConsumer<LazilyBoundStream<T>, T, Throwable> reportingEmitter() {
static <T> TriConsumer<EventStreamBase<T>, T, Throwable> reportingEmitter() {
return (stream, value, error) -> {
if(error == null) {
stream.emit(value);
Expand All @@ -71,7 +71,7 @@ static <T> TriConsumer<LazilyBoundStream<T>, T, Throwable> reportingEmitter() {
};
}

static <T> TriConsumer<LazilyBoundStream<Try<T>>, T, Throwable> tryEmitter() {
static <T> TriConsumer<EventStreamBase<Try<T>>, T, Throwable> tryEmitter() {
return (stream, value, error) -> {
if(error == null) {
stream.emit(Try.success(value));
Expand All @@ -84,12 +84,12 @@ static <T> TriConsumer<LazilyBoundStream<Try<T>>, T, Throwable> tryEmitter() {
private final EventStream<F> source;
private final Indicator pending = new Indicator();
private final BiConsumer<F, TriConsumer<T, Throwable, Boolean>> addCompletionHandler;
private final TriConsumer<LazilyBoundStream<R>, T, Throwable> emitter;
private final TriConsumer<EventStreamBase<R>, T, Throwable> emitter;

private Await(
EventStream<F> source,
BiConsumer<F, TriConsumer<T, Throwable, Boolean>> addCompletionHandler,
TriConsumer<LazilyBoundStream<R>, T, Throwable> emitter) {
TriConsumer<EventStreamBase<R>, T, Throwable> emitter) {
this.source = source;
this.addCompletionHandler = addCompletionHandler;
this.emitter = emitter;
Expand Down Expand Up @@ -120,7 +120,7 @@ protected final Subscription subscribeToInputs() {
}


class AwaitLatest<T, F, R> extends LazilyBoundStream<R> implements AwaitingEventStream<R> {
class AwaitLatest<T, F, R> extends EventStreamBase<R> implements AwaitingEventStream<R> {

public static <T> AwaitingEventStream<T> awaitCompletionStage(
EventStream<CompletionStage<T>> source,
Expand Down Expand Up @@ -214,7 +214,7 @@ public static <T> AwaitingEventStream<Try<T>> tryAwaitTask(
private final EventStream<?> cancelImpulse;
private final Consumer<F> canceller;
private final BiConsumer<F, TriConsumer<T, Throwable, Boolean>> addCompletionHandler;
private final TriConsumer<LazilyBoundStream<R>, T, Throwable> emitter;
private final TriConsumer<EventStreamBase<R>, T, Throwable> emitter;

private long revision = 0;
private F expectedFuture = null;
Expand All @@ -226,7 +226,7 @@ private AwaitLatest(
EventStream<?> cancelImpulse,
Consumer<F> canceller,
BiConsumer<F, TriConsumer<T, Throwable, Boolean>> addCompletionHandler,
TriConsumer<LazilyBoundStream<R>, T, Throwable> emitter) {
TriConsumer<EventStreamBase<R>, T, Throwable> emitter) {
this.source = source;
this.cancelImpulse = cancelImpulse;
this.canceller = canceller;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


public final class ConnectableEventSource<T>
extends LazilyBoundStream<T>
extends EventStreamBase<T>
implements ConnectableEventStream<T>, ConnectableEventSink<T> {

private MapHelper<EventStream<? extends T>, Subscription> subscriptions = null;
Expand Down
2 changes: 1 addition & 1 deletion reactfx/src/main/java/org/reactfx/DistinctStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Objects;


class DistinctStream<T> extends LazilyBoundStream<T> {
class DistinctStream<T> extends EventStreamBase<T> {
static final Object NONE = new Object();
private final EventStream<T> input;
private Object previous = NONE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import org.reactfx.util.Tuple2;

class EmitBothOnEachStream<A, I> extends LazilyBoundStream<Tuple2<A, I>> {
class EmitBothOnEachStream<A, I> extends EventStreamBase<Tuple2<A, I>> {
private final EventStream<A> source;
private final EventStream<I> impulse;

Expand Down
2 changes: 1 addition & 1 deletion reactfx/src/main/java/org/reactfx/EmitOnEachStream.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.reactfx;


class EmitOnEachStream<T> extends LazilyBoundStream<T> {
class EmitOnEachStream<T> extends EventStreamBase<T> {
private final EventStream<T> source;
private final EventStream<?> impulse;

Expand Down
2 changes: 1 addition & 1 deletion reactfx/src/main/java/org/reactfx/EmitOnStream.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.reactfx;


class EmitOnStream<T> extends LazilyBoundStream<T> {
class EmitOnStream<T> extends EventStreamBase<T> {
private final EventStream<T> source;
private final EventStream<?> impulse;

Expand Down
5 changes: 5 additions & 0 deletions reactfx/src/main/java/org/reactfx/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,9 @@ public class EventSource<T>
public final void push(T value) {
emit(value);
}

@Override
protected final Subscription subscribeToInputs() {
return Subscription.EMPTY;
}
}
8 changes: 4 additions & 4 deletions reactfx/src/main/java/org/reactfx/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ default <U> EventStream<U> flatMap(Function<? super T, ? extends EventStream<U>>
*/
default <U> EventStream<Either<T, U>> or(EventStream<? extends U> right) {
EventStream<T> left = this;
return new LazilyBoundStream<Either<T, U>>() {
return new EventStreamBase<Either<T, U>>() {
@Override
protected Subscription subscribeToInputs() {
return Subscription.multi(
Expand Down Expand Up @@ -1294,7 +1294,7 @@ default EventStream<T> guardedBy(Guardian... guardians) {
* thrown by its subscribers.
*/
default EventStream<Try<T>> materializeErrors() {
return new LazilyBoundStream<Try<T>>() {
return new EventStreamBase<Try<T>>() {
@Override
protected Subscription subscribeToInputs() {
return EventStream.this.subscribe(
Expand All @@ -1313,7 +1313,7 @@ protected Subscription subscribeToInputs() {
* by its subscribers.
*/
default EventStream<T> handleErrors(Consumer<? super Throwable> handler) {
return new LazilyBoundStream<T>() {
return new EventStreamBase<T>() {
@Override
protected Subscription subscribeToInputs() {
return EventStream.this.subscribe(this::emit, handler);
Expand All @@ -1325,7 +1325,7 @@ protected Subscription subscribeToInputs() {
* Returns a stream of errors reported by this event stream.
*/
default EventStream<Throwable> errors() {
return new LazilyBoundStream<Throwable>() {
return new EventStreamBase<Throwable>() {
@Override
protected Subscription subscribeToInputs() {
return EventStream.this.monitor(this::emit);
Expand Down
19 changes: 18 additions & 1 deletion reactfx/src/main/java/org/reactfx/EventStreamBase.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.reactfx;

import java.util.function.Consumer;


/**
* Base class for event streams.
* Adds support for error propagation on top of {@link ObservableBase}.
*
* @param <T> type of events
* @param <T> type of events emitted by this event stream.
*/
public abstract class EventStreamBase<T>
extends ObservableBase<Subscriber<? super T>, T>
Expand Down Expand Up @@ -56,6 +60,19 @@ protected final void reportError(Throwable thrown) {
}
}

/**
* Subscribes to the given event stream by the given subscriber and also
* forwards errors reported by the given stream to this stream. This is
* equivalent to {@code stream.subscribe(subscriber, this::reportError)}.
* @return subscription used to unsubscribe {@code subscriber} from
* {@code stream} and stop forwarding the errors.
*/
protected final <U> Subscription subscribeTo(
EventStream<U> stream,
Consumer<? super U> subscriber) {
return stream.subscribe(subscriber, this::reportError);
}

@Override
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return observe(subscriber);
Expand Down
Loading

0 comments on commit db1b847

Please sign in to comment.