Skip to content

Commit

Permalink
Print errors to stderr if no handler provided.
Browse files Browse the repository at this point in the history
Rename watch(onEvent, onError) to subscribe(...).

Implement subscribe(onEvent) as
    subscribe(onEvent, Throwable::printStackTrace)
  • Loading branch information
TomasMikula committed Aug 26, 2014
1 parent 484d70e commit 54810d4
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 42 deletions.
17 changes: 13 additions & 4 deletions reactfx/src/main/java/org/reactfx/BiEventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@

public interface BiEventStream<A, B> extends EventStream<Tuple2<A, B>> {

Subscription subscribe(BiConsumer<? super A, ? super B> subscriber);
Subscription subscribe(
BiConsumer<? super A, ? super B> subscriber,
Consumer<? super Throwable> onError);

default Subscription subscribe(BiConsumer<? super A, ? super B> subscriber) {
return subscribe(subscriber, Throwable::printStackTrace);
}

@Override
default Subscription subscribe(Consumer<? super Tuple2<A, B>> subscriber) {
return subscribe((a, b) -> subscriber.accept(t(a, b)));
default Subscription subscribe(
Consumer<? super Tuple2<A, B>> subscriber,
Consumer<? super Throwable> onError) {
return subscribe((a, b) -> subscriber.accept(t(a, b)), onError);
}

@Deprecated
default Subscription watch(
BiConsumer<? super A, ? super B> subscriber,
Consumer<? super Throwable> monitor) {
return monitor(monitor).and(subscribe(subscriber));
return subscribe(subscriber, monitor);
}

default Subscription feedTo2(BiEventSink<? super A, ? super B> sink) {
Expand Down
8 changes: 5 additions & 3 deletions reactfx/src/main/java/org/reactfx/EitherEventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ public interface EitherEventStream<L, R> extends EventStream<Either<L, R>> {

default Subscription subscribe(
Consumer<? super L> leftSubscriber,
Consumer<? super R> rightSubscriber) {
Consumer<? super R> rightSubscriber,
Consumer<? super Throwable> onError) {
return subscribe(either -> {
either.ifLeft(leftSubscriber);
either.ifRight(rightSubscriber);
});
}, onError);
}

@Deprecated
default Subscription watch(
Consumer<? super L> leftSubscriber,
Consumer<? super R> rightSubscriber,
Consumer<? super Throwable> monitor) {
return subscribe(leftSubscriber, rightSubscriber).and(monitor(monitor));
return subscribe(leftSubscriber, rightSubscriber, monitor);
}

default EventStream<L> left() {
Expand Down
40 changes: 26 additions & 14 deletions reactfx/src/main/java/org/reactfx/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,49 @@
*/
public interface EventStream<T> {

/**
* Get notified every time this event stream emits a value or encounters
* an error. An error is encountered when a user provided function (e.g.
* an event subscriber or an argument to a stream combinator, such as
* {@link #map(Function)}), throws an exception.
* @param subscriber function to call on the emitted value.
* @param onError function to call for the encountered error.
* @return subscription that can be used to stop observing
* this event stream.
*/
Subscription subscribe(
Consumer<? super T> subscriber,
Consumer<? super Throwable> onError);

/**
* Get notified every time this event stream emits a value.
* @param subscriber function to call on the emitted value.
* @return subscription that can be used to stop observing
* this event stream.
*/
Subscription subscribe(Consumer<? super T> subscriber);
default Subscription subscribe(Consumer<? super T> subscriber) {
return subscribe(subscriber, Throwable::printStackTrace);
}

/**
* Get notified every time this event stream encounters an error. An error
* is encountered when a user provided function (e.g. an event subscriber
* or an argument to a stream combinator, such as {@link #map(Function)}),
* throws an exception.
* @param monitor function to call for the encountered error.
* @param onError function to call for the encountered error.
* @return subscription that can be used to stop monitoring this event
* stream.
*/
Subscription monitor(Consumer<? super Throwable> monitor);
Subscription monitor(Consumer<? super Throwable> onError);

/**
* Convenient method to subscribe to and monitor this stream. Is equivalent
* to {@code monitor(monitor).and(subscribe(subscriber))}.
* @see #subscribe(Consumer)
* @see #monitor(Consumer)
* @deprecated renamed to {@link #subscribe(Consumer, Consumer)}.
*/
@Deprecated
default Subscription watch(
Consumer<? super T> subscriber,
Consumer<? super Throwable> monitor) {
return monitor(monitor).and(subscribe(subscriber));
return subscribe(subscriber, monitor);
}

/**
Expand Down Expand Up @@ -1199,9 +1213,9 @@ default EventStream<Try<T>> materializeErrors() {
return new LazilyBoundStream<Try<T>>() {
@Override
protected Subscription subscribeToInputs() {
Subscription s2 = EventStream.this.monitor(er -> emit(Try.failure(er)));
Subscription s1 = EventStream.this.subscribe(t -> emit(Try.success(t)));
return s1.and(s2);
return EventStream.this.subscribe(
t -> emit(Try.success(t)),
er -> emit(Try.failure(er)));
}
};
}
Expand All @@ -1218,9 +1232,7 @@ default EventStream<T> handleErrors(Consumer<? super Throwable> handler) {
return new LazilyBoundStream<T>() {
@Override
protected Subscription subscribeToInputs() {
Subscription s2 = EventStream.this.monitor(handler);
Subscription s1 = EventStream.this.subscribe(this::emit);
return s1.and(s2);
return EventStream.this.subscribe(this::emit, handler);
}
};
}
Expand Down
21 changes: 13 additions & 8 deletions reactfx/src/main/java/org/reactfx/EventStreamBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,49 +115,54 @@ protected void noSubscribers() {
/**
* Subscribes to the given event stream by the given subscriber and also
* forwards errors reported by the given stream to this stream. This is
* equivalent to {@code stream.watch(subscriber, this::reportError)}.
* equivalent to {@code stream.subscribe(subscriber, this::reportError)}.
* @return subscription used to unsubscribe {@code subscriber} from
* {@code stream} and stop forwarding the errors.
*/
protected final <T> Subscription subscribeTo(
EventStream<T> stream,
Consumer<? super T> subscriber) {
return stream.watch(subscriber, this::reportError);
return stream.subscribe(subscriber, this::reportError);
}

/**
* Subscribes to the given event stream by the given subscriber and also
* forwards errors reported by the given stream to this stream. This is
* equivalent to {@code stream.watch(subscriber, this::reportError)}.
* equivalent to {@code stream.subscribe(subscriber, this::reportError)}.
* @return subscription used to unsubscribe {@code subscriber} from
* {@code stream} and stop forwarding the errors.
*/
protected final <A, B> Subscription subscribeToBi(
BiEventStream<A, B> stream,
BiConsumer<? super A, ? super B> subscriber) {
return stream.watch(subscriber, this::reportError);
return stream.subscribe(subscriber, this::reportError);
}

/**
* Subscribes to the given event stream by the given subscriber and also
* forwards errors reported by the given stream to this stream. This is
* equivalent to {@code stream.watch(subscriber, this::reportError)}.
* equivalent to {@code stream.subscribe(subscriber, this::reportError)}.
* @return subscription used to unsubscribe {@code subscriber} from
* {@code stream} and stop forwarding the errors.
*/
protected final <A, B, C> Subscription subscribeToTri(
TriEventStream<A, B, C> stream,
TriConsumer<? super A, ? super B, ? super C> subscriber) {
return stream.watch(subscriber, this::reportError);
return stream.subscribe(subscriber, this::reportError);
}

public final Subscription subscribe(S subscriber) {
public final Subscription subscribe(
S subscriber,
Consumer<? super Throwable> onError) {
Subscription s1 = monitor(onError);

subscribers = ListHelper.add(subscribers, subscriber);
if(ListHelper.size(subscribers) == 1) {
firstSubscriber();
}
newSubscriber(subscriber);
return () -> unsubscribe(subscriber);

return s1.and(() -> unsubscribe(subscriber));
}

public final Subscription monitor(Consumer<? super Throwable> monitor) {
Expand Down
6 changes: 4 additions & 2 deletions reactfx/src/main/java/org/reactfx/EventStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ public class EventStreams {
private static final EventStream<?> NEVER = new EventStream<Object>() {

@Override
public Subscription subscribe(Consumer<? super Object> subscriber) {
public Subscription subscribe(
Consumer<? super Object> subscriber,
Consumer<? super Throwable> onError) {
return Subscription.EMPTY;
}

@Override
public Subscription monitor(Consumer<? super Throwable> subscriber) {
public Subscription monitor(Consumer<? super Throwable> onError) {
return Subscription.EMPTY;
}
};
Expand Down
13 changes: 9 additions & 4 deletions reactfx/src/main/java/org/reactfx/PoorMansNStream.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
package org.reactfx;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.reactfx.util.TriConsumer;

interface PoorMansBiStream<A, B> extends BiEventStream<A, B> {

@Override
default Subscription subscribe(BiConsumer<? super A, ? super B> subscriber) {
return subscribe(t -> subscriber.accept(t._1, t._2));
default Subscription subscribe(
BiConsumer<? super A, ? super B> subscriber,
Consumer<? super Throwable> onError) {
return subscribe(t -> subscriber.accept(t._1, t._2), onError);
}
}

interface PoorMansTriStream<A, B, C> extends TriEventStream<A, B, C> {

@Override
default Subscription subscribe(TriConsumer<? super A, ? super B, ? super C> subscriber) {
return subscribe(t -> subscriber.accept(t._1, t._2, t._3));
default Subscription subscribe(
TriConsumer<? super A, ? super B, ? super C> subscriber,
Consumer<? super Throwable> onError) {
return subscribe(t -> subscriber.accept(t._1, t._2, t._3), onError);
}
}
17 changes: 13 additions & 4 deletions reactfx/src/main/java/org/reactfx/TriEventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@

public interface TriEventStream<A, B, C> extends EventStream<Tuple3<A, B, C>> {

Subscription subscribe(TriConsumer<? super A, ? super B, ? super C> subscriber);
Subscription subscribe(
TriConsumer<? super A, ? super B, ? super C> subscriber,
Consumer<? super Throwable> onError);

default Subscription subscribe(TriConsumer<? super A, ? super B, ? super C> subscriber) {
return subscribe(subscriber, Throwable::printStackTrace);
}

@Override
default Subscription subscribe(Consumer<? super Tuple3<A, B, C>> subscriber) {
return subscribe((a, b, c) -> subscriber.accept(t(a, b, c)));
default Subscription subscribe(
Consumer<? super Tuple3<A, B, C>> subscriber,
Consumer<? super Throwable> onError) {
return subscribe((a, b, c) -> subscriber.accept(t(a, b, c)), onError);
}

@Deprecated
default Subscription watch(
TriConsumer<? super A, ? super B, ? super C> subscriber,
Consumer<? super Throwable> monitor) {
return monitor(monitor).and(subscribe(subscriber));
return subscribe(subscriber, monitor);
}

default Subscription feedTo3(TriEventSink<? super A, ? super B, ? super C> sink) {
Expand Down
2 changes: 1 addition & 1 deletion reactfx/src/test/java/org/reactfx/ErrorReportingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void property_values_stream_with_faulty_first_value_test2() {
}
return String.valueOf(i);
})
.watch(emitted::add, errors::add);
.subscribe(emitted::add, errors::add);

intProperty.set(10);
intProperty.set(-2);
Expand Down
2 changes: 1 addition & 1 deletion reactfx/src/test/java/org/reactfx/HookTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public void recursionPreventionTest() {
EventCounter eventCounter = new EventCounter();
EventCounter errorCounter = new EventCounter();
EventSource<Integer> source = new EventSource<>();
source.hook(i -> source.push(i-1)).watch(eventCounter, errorCounter);
source.hook(i -> source.push(i-1)).subscribe(eventCounter, errorCounter);
source.push(5);
assertEquals(0, eventCounter.get());
assertEquals(1, errorCounter.get());
Expand Down
2 changes: 1 addition & 1 deletion reactfx/src/test/java/org/reactfx/RecursionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public void allowRecursionWithOneSubscriber() {
List<Integer> emitted = new ArrayList<>();
List<Throwable> errors = new ArrayList<>();
EventSource<Integer> source = new EventSource<>();
source.hook(emitted::add).watch(
source.hook(emitted::add).subscribe(
i -> { if(i > 0) source.push(i-1); },
errors::add);
source.push(5);
Expand Down

0 comments on commit 54810d4

Please sign in to comment.