Skip to content

Commit

Permalink
Eliminate benign race condition in DirectStreamObserver (#33419)
Browse files Browse the repository at this point in the history
A dynamic race condition detector correct identifies an unsynchronized
read-after-write between the field initializer and the first call of onNext.

Additionally a minor refactor to make more clear what the increment/boundary
conditions are here.
  • Loading branch information
kennknowles authored Jan 22, 2025
1 parent 8c4bec8 commit 842e0e3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public final class DirectStreamObserver<T> implements StreamObserver<T> {
private final int maxMessagesBeforeCheck;

private final Object lock = new Object();
private int numMessages = -1;
private int numMessages;

public DirectStreamObserver(Phaser phaser, CallStreamObserver<T> outboundObserver) {
this(phaser, outboundObserver, DEFAULT_MAX_MESSAGES_BEFORE_CHECK);
Expand All @@ -69,7 +69,7 @@ public DirectStreamObserver(Phaser phaser, CallStreamObserver<T> outboundObserve
@Override
public void onNext(T value) {
synchronized (lock) {
if (++numMessages >= maxMessagesBeforeCheck) {
if (numMessages >= maxMessagesBeforeCheck) {
numMessages = 0;
int waitSeconds = 1;
int totalSecondsWaited = 0;
Expand Down Expand Up @@ -114,6 +114,7 @@ public void onNext(T value) {
}
}
outboundObserver.onNext(value);
numMessages += 1;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -224,17 +225,19 @@ public void testIsReadyCheckDoesntBlockIfPhaserCallbackNeverHappens() throws Exc
public void testMessageCheckInterval() throws Exception {
final AtomicInteger index = new AtomicInteger();
ArrayListMultimap<Integer, String> values = ArrayListMultimap.create();

// An observer that is always ready but puts items into a new bucket each time it is queried
CallStreamObserver<String> bucketingObserver =
TestStreams.withOnNext((String t) -> assertTrue(values.put(index.get(), t)))
.withIsReady(
() -> {
index.incrementAndGet();
return true;
})
.build();

final DirectStreamObserver<String> streamObserver =
new DirectStreamObserver<>(
new AdvancingPhaser(1),
TestStreams.withOnNext((String t) -> assertTrue(values.put(index.get(), t)))
.withIsReady(
() -> {
index.incrementAndGet();
return true;
})
.build(),
10);
new DirectStreamObserver<>(new AdvancingPhaser(1), bucketingObserver, 10);

List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4");
List<Future<String>> results = new ArrayList<>();
Expand Down

0 comments on commit 842e0e3

Please sign in to comment.