Skip to content

Commit

Permalink
Bug fix: Corrected the state management in the async HTTP/1.1 protoco…
Browse files Browse the repository at this point in the history
…l handler when committing requests asynchronously
  • Loading branch information
ok2c committed Jan 21, 2025
1 parent 81a3c82 commit b3b467d
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.hc.core5.testing.extension;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

public class ExecutorResource implements AfterEachCallback {

private final ExecutorService executorService;

public ExecutorResource(final ExecutorService executorService) {
this.executorService = executorService;
}

public ExecutorResource(final int nThreads) {
this.executorService = Executors.newFixedThreadPool(nThreads);
}

public ExecutorService getExecutorService() {
return executorService;
}

@Override
public void afterEach(final ExtensionContext extensionContext) throws Exception {
executorService.shutdownNow();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.NHttpMessageParser;
import org.apache.hc.core5.http.nio.NHttpMessageWriter;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.nio.SessionOutputBuffer;
import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
Expand Down Expand Up @@ -127,6 +128,7 @@
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.testing.SSLTestContexts;
import org.apache.hc.core5.testing.extension.ExecutorResource;
import org.apache.hc.core5.testing.extension.nio.Http1TestResources;
import org.apache.hc.core5.util.CharArrayBuffer;
import org.apache.hc.core5.util.TextUtils;
Expand All @@ -142,13 +144,15 @@ abstract class Http1IntegrationTest {
private static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(2);

private final URIScheme scheme;
private final ReentrantLock lock = new ReentrantLock();
@RegisterExtension
private final Http1TestResources resources;
@RegisterExtension
final ExecutorResource executorResource;

public Http1IntegrationTest(final URIScheme scheme) {
this.scheme = scheme;
this.resources = new Http1TestResources(scheme, TIMEOUT);
this.executorResource = new ExecutorResource(5);
}

private HttpHost target(final InetSocketAddress serverEndpoint) {
Expand Down Expand Up @@ -1094,6 +1098,7 @@ void testDelayedExpectationVerification() throws Exception {

server.register("*", () -> new AsyncServerExchangeHandler() {

private final ReentrantLock lock = new ReentrantLock();
private final Random random = new Random(System.currentTimeMillis());
private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
"All is well");
Expand All @@ -1105,7 +1110,7 @@ public void handleRequest(
final ResponseChannel responseChannel,
final HttpContext context) throws HttpException, IOException {

Executors.newSingleThreadExecutor().execute(() -> {
executorResource.getExecutorService().execute(() -> {
try {
if (entityDetails != null) {
final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
Expand Down Expand Up @@ -2107,4 +2112,82 @@ void testHeaderTooLargePost() throws Exception {
Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
}

@Test
void testDelayedRequestSubmission() throws Exception {
final Http1TestServer server = resources.server();
final Http1TestClient client = resources.client();

server.register("/hello", () -> new SingleLineResponseHandler("All is well"));
final InetSocketAddress serverEndpoint = server.start();

final HttpHost target = target(serverEndpoint);

client.start();

final Future<ClientSessionEndpoint> connectFuture = client.connect(target, TIMEOUT);
final ClientSessionEndpoint streamEndpoint = connectFuture.get();

final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
for (int i = 0; i < 3; i++) {
final BasicHttpRequest request = BasicRequestBuilder.post()
.setHttpHost(target)
.setPath("/hello")
.build();
final AsyncEntityProducer entityProducer = AsyncEntityProducers.create("Some important message");
queue.add(streamEndpoint.execute(
new AsyncRequestProducer() {

private final Random random = new Random(System.currentTimeMillis());

@Override
public void sendRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
executorResource.getExecutorService().execute(() -> {
try {
Thread.sleep(random.nextInt(1000));
channel.sendRequest(request, entityProducer, context);
} catch (final Exception ignore) {
// ignore
}
});
}

@Override
public boolean isRepeatable() {
return entityProducer.isRepeatable();
}

@Override
public int available() {
return entityProducer.available();
}

@Override
public void produce(final DataStreamChannel channel) throws IOException {
entityProducer.produce(channel);
}

@Override
public void failed(final Exception cause) {
entityProducer.failed(cause);
}

@Override
public void releaseResources() {
entityProducer.releaseResources();
}

},
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
}
while (!queue.isEmpty()) {
final Future<Message<HttpResponse, String>> future = queue.remove();
final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
Assertions.assertNotNull(result);
final HttpResponse response = result.getHead();
Assertions.assertNotNull(response);
Assertions.assertEquals(200, response.getCode());
Assertions.assertEquals("All is well", result.getBody());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ String getRequestMethod() {
boolean isOutputReady() {
switch (requestState) {
case IDLE:
case ACK:
return true;
case HEADERS:
case ACK:
return false;
case BODY:
return exchangeHandler.available() > 0;
default:
Expand Down Expand Up @@ -186,6 +188,7 @@ void produceOutput() throws HttpException, IOException {
switch (requestState) {
case IDLE:
requestState = MessageState.HEADERS;
outputChannel.suspendOutput();
exchangeHandler.produceRequest((request, entityDetails, httpContext) -> commitRequest(request, entityDetails), context);
break;
case ACK:
Expand Down

0 comments on commit b3b467d

Please sign in to comment.