Skip to content

Commit

Permalink
Bug fix: in some fringe cases the request may still be not fully comp…
Browse files Browse the repository at this point in the history
…leted while the response has already been fully committed by the HTTP/1.1 stream handler. Connections that cannot be kept alive must be closed only once both request and response streams are fully complete
  • Loading branch information
ok2c committed Jan 12, 2025
1 parent fd5c617 commit c897f5a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import org.apache.hc.core5.http.protocol.RequestTargetHost;
import org.apache.hc.core5.http.protocol.RequestValidateHost;
import org.apache.hc.core5.http.support.BasicRequestBuilder;
import org.apache.hc.core5.http.support.BasicResponseBuilder;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.testing.SSLTestContexts;
Expand Down Expand Up @@ -298,7 +299,7 @@ void testPostIdentityTransfer() throws Exception {
}

@Test
void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
void testPostIdentityTransferOutOfSequenceResponseNotOK() throws Exception {
final Http1TestServer server = resources.server();
final Http1TestClient client = resources.client();

Expand Down Expand Up @@ -336,6 +337,84 @@ void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
}
}

@Test
void testPostOutOfSequenceResponseOK() throws Exception {
final Http1TestServer server = resources.server();
final Http1TestClient client = resources.client();

server.register("/hello", () -> new ImmediateResponseExchangeHandler(200, "Welcome"));
final InetSocketAddress serverEndpoint = server.start();

final HttpHost target = target(serverEndpoint);

client.start();

final int reqNo = 5;

final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
final ClientSessionEndpoint streamEndpoint = connectFuture.get();

for (int i = 0; i < reqNo; i++) {
final BasicHttpRequest request = BasicRequestBuilder.post()
.setHttpHost(target)
.setPath("/hello")
.build();
final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
new BasicRequestProducer(request, new MultiLineEntityProducer("Hello", 512 * i)),
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());

Assertions.assertNotNull(result);
final HttpResponse response = result.getHead();
final String entity = result.getBody();
Assertions.assertNotNull(response);
Assertions.assertEquals(200, response.getCode());
Assertions.assertEquals("Welcome", entity);
}
}

@Test
void testPostOutOfSequenceResponseOKConnectionClose() throws Exception {
final Http1TestServer server = resources.server();
final Http1TestClient client = resources.client();

server.register("/hello", () -> new ImmediateResponseExchangeHandler(
BasicResponseBuilder.create(200)
.addHeader(HttpHeaders.CONNECTION, "Close")
.build(),
"Welcome"));
final InetSocketAddress serverEndpoint = server.start();

final HttpHost target = target(serverEndpoint);

client.start();

final int reqNo = 5;

for (int i = 0; i < reqNo; i++) {
final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
final ClientSessionEndpoint streamEndpoint = connectFuture.get();

final BasicHttpRequest request = BasicRequestBuilder.post()
.setHttpHost(target)
.setPath("/hello")
.build();
final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
new BasicRequestProducer(request, new MultiLineEntityProducer("Hello", 512 * i)),
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());

streamEndpoint.close();

Assertions.assertNotNull(result);
final HttpResponse response = result.getHead();
final String entity = result.getBody();
Assertions.assertNotNull(response);
Assertions.assertEquals(200, response.getCode());
Assertions.assertEquals("Welcome", entity);
}
}

@Test
void testSimpleGetsPipelined() throws Exception {
final Http1TestServer server = resources.server();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ public void requestOutput() {

@Override
public void endStream(final List<? extends Header> trailers) throws IOException {
outputChannel.complete(trailers);
requestState = MessageState.COMPLETE;
if (!keepAlive && responseState == MessageState.COMPLETE) {
outputChannel.close();
}
outputChannel.complete(trailers);
}

@Override
Expand Down Expand Up @@ -269,10 +272,10 @@ void dataEnd(final List<? extends Header> trailers) throws HttpException, IOExce
if (done.get() || responseState != MessageState.BODY) {
throw new ProtocolException("Unexpected message data");
}
if (!keepAlive) {
responseState = MessageState.COMPLETE;
if (!keepAlive && requestState == MessageState.COMPLETE) {
outputChannel.close();
}
responseState = MessageState.COMPLETE;
exchangeHandler.streamEnd(trailers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ public void requestOutput() {
@Override
public void endStream(final List<? extends Header> trailers) throws IOException {
outputChannel.complete(trailers);
if (!keepAlive) {
responseState = MessageState.COMPLETE;
if (requestState == MessageState.COMPLETE && !keepAlive) {
outputChannel.close();
}
responseState = MessageState.COMPLETE;
}

@Override
Expand Down Expand Up @@ -324,6 +324,9 @@ void dataEnd(final List<? extends Header> trailers) throws HttpException, IOExce
throw new ProtocolException("Unexpected message data");
}
requestState = MessageState.COMPLETE;
if (responseState == MessageState.COMPLETE && !keepAlive) {
outputChannel.close();
}
exchangeHandler.streamEnd(trailers);
}

Expand Down

0 comments on commit c897f5a

Please sign in to comment.