Skip to content

Commit

Permalink
http-netty: Fix flaky FullDuplexAndSequentialModeTest (#3070)
Browse files Browse the repository at this point in the history
Motivation:

This test suite has been flaky for a while. As detailed in issue #1894,
this is because under some conditions the test running thread ends up
waiting on a latch that it is actually responsible to release.

Modifications:

Make sure the publishing of the InputStreamPublisher happens in on a
different thread so we don't end up in deadlock.

Fixes #1894
  • Loading branch information
bryce-anderson authored Sep 30, 2024
1 parent 19d0907 commit cc0fa91
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.StreamingHttpConnection;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.utils.EnforceSequentialModeRequesterFilter;
Expand Down Expand Up @@ -90,18 +91,23 @@ private static InputStream payload() {
return new BufferedInputStream(new ByteArrayInputStream(array));
}

private static Future<StreamingHttpResponse> stallingSendRequest(StreamingHttpConnection connection,
private Future<StreamingHttpResponse> stallingSendRequest(StreamingHttpConnection connection,
CountDownLatch continueRequest,
InputStream payload) {
return connection.request(connection.post(SVC_ECHO).payloadBody(fromInputStream(payload, CHUNK_SIZE)
final HttpExecutionContext ctx = connection.executionContext();
final String callerThreadName = Thread.currentThread().getName();
return connection.request(connection.post(SVC_ECHO).payloadBody(fromInputStream(payload, 1)
// We use `publishOn` to make sure we only publish from offload threads. Otherwise, in rare
// circumstances, we end up publishing from the test runner and end up deadlocked.
.publishOn(ctx.executor(), () -> callerThreadName.equals(Thread.currentThread().getName()))
.map(chunk -> {
try {
continueRequest.await(); // wait until the InputStream is closed
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throwException(ie);
}
return connection.executionContext().bufferAllocator().wrap(chunk);
return ctx.bufferAllocator().wrap(chunk);
}))).toFuture();
}
}

0 comments on commit cc0fa91

Please sign in to comment.