Skip to content

Commit

Permalink
Mark connection as closing on exception caught
Browse files Browse the repository at this point in the history
Motivation:

`NettyChannelPublisher` will force closure of the `Channel` in case of
exception after the cause is propagated to users. In case users don't
have offloading, there is a risk to retry on the same IO thread.
We should notify `LoadBalancer` that this connection is closing to avoid
retrying on the same connection.

Modifications:

- Invoke `connection.notifyOnClosing()` inside `exceptionCaught` before
notifying `NettyChannelPublisher` about an error;
- Enhance `ReserveConnectionTest` to make sure reserved connection is
closed in case of `IOException`;
- Add `ConnectionClosedAfterIoExceptionTest` to make sure retries use
a different connection in case of `IOException` and the failed
connection gets closed;

Result:

Retries always select a different connection if existing connection
observes an exception.
  • Loading branch information
idelpivnitskiy committed Aug 18, 2023
1 parent 4837cd8 commit 5607727
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.BlockingHttpClient;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpResponse;
import io.servicetalk.http.netty.RetryingHttpRequesterFilter.BackOffPolicy;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Completable.failed;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

/**
* Verifies that connection is marked as "closing" after it receives
* {@link ChannelInboundHandler#exceptionCaught(ChannelHandlerContext, Throwable)} event.
*/
class ConnectionClosedAfterIoExceptionTest {

@RegisterExtension
static final ExecutionContextExtension SERVER_CTX =
ExecutionContextExtension.cached("server-io", "server-executor")
.setClassLevel(true);
@RegisterExtension
static final ExecutionContextExtension CLIENT_CTX =
ExecutionContextExtension.cached("client-io", "client-executor")
.setClassLevel(true);

@ParameterizedTest(name = "{displayName} [{index}]: protocol={0}")
@EnumSource(HttpProtocol.class)
void test(HttpProtocol protocol) throws Exception {
AtomicReference<FilterableStreamingHttpConnection> firstConnection = new AtomicReference<>();
BlockingQueue<Throwable> errors = new LinkedBlockingQueue<>();
try (ServerContext serverContext = BuilderUtils.newServerBuilder(SERVER_CTX, protocol)
// Fail only the first connect attempt
.appendEarlyConnectionAcceptor(conn -> errors.isEmpty() ? failed(DELIBERATE_EXCEPTION) : completed())
.listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok());
BlockingHttpClient client = BuilderUtils.newClientBuilder(serverContext, CLIENT_CTX, protocol)
.appendConnectionFactoryFilter(original -> new DelegatingConnectionFactory<InetSocketAddress,
FilterableStreamingHttpConnection>(original) {
@Override
public Single<FilterableStreamingHttpConnection> newConnection(InetSocketAddress address,
@Nullable ContextMap context, @Nullable TransportObserver observer) {
return delegate().newConnection(address, context, observer)
.whenOnSuccess(connection -> firstConnection.compareAndSet(null, connection));
}
})
.appendClientFilter(new RetryingHttpRequesterFilter.Builder()
.retryOther((metaData, t) -> {
errors.add(t);
return BackOffPolicy.ofImmediateBounded();
})
.build())
.buildBlocking()) {

HttpResponse response = client.request(client.get("/"));
assertThat(response.status(), is(OK));

assertThat("Unexpected number of errors, likely retried more than expected", errors, hasSize(1));
assertThat("Did not propagate original IoException", errors.poll(),
anyOf(instanceOf(IOException.class), not(instanceOf(ClosedChannelException.class))));

// Make sure that the first connection was properly closed:
final FilterableStreamingHttpConnection connection = firstConnection.get();
connection.onClose().toFuture().get();
connection.connectionContext().onClose().toFuture().get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,61 @@
package io.servicetalk.http.netty;

import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.BlockingHttpClient;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpContextKeys;
import io.servicetalk.http.api.HttpRequest;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.HttpServerContext;
import io.servicetalk.http.api.ReservedBlockingHttpConnection;
import io.servicetalk.http.api.ReservedHttpConnection;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.http.netty.HttpsProxyTest.safeClose;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ReserveConnectionTest {

@RegisterExtension
static final ExecutionContextExtension SERVER_CTX =
ExecutionContextExtension.cached("server-io", "server-executor")
.setClassLevel(true);
@RegisterExtension
static final ExecutionContextExtension CLIENT_CTX =
ExecutionContextExtension.cached("client-io", "client-executor")
.setClassLevel(true);

private HttpServerContext serverContext;
private BlockingHttpClient httpClient;
private AtomicInteger createdConnections;

@BeforeEach
void setup() throws Exception {
createdConnections = new AtomicInteger(0);
private final AtomicInteger createdConnections = new AtomicInteger(0);

serverContext = HttpServers
.forAddress(localAddress(0))
.listenBlocking((ctx1, request, responseFactory) -> responseFactory.ok()).toFuture().get();
private void setup(HttpProtocol protocol, boolean reject) throws Exception {
HttpServerBuilder serverBuilder = BuilderUtils.newServerBuilder(SERVER_CTX, protocol);
if (reject) {
serverBuilder.appendEarlyConnectionAcceptor(conn -> Completable.failed(DELIBERATE_EXCEPTION));
}
serverContext = serverBuilder.listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok());

InetSocketAddress listenAddress = (InetSocketAddress) serverContext.listenAddress();
httpClient = HttpClients
.forSingleAddress(listenAddress.getHostName(), listenAddress.getPort())
httpClient = BuilderUtils.newClientBuilder(serverContext, CLIENT_CTX, protocol)
.appendConnectionFactoryFilter(o ->
new DelegatingConnectionFactory<InetSocketAddress, FilterableStreamingHttpConnection>(o) {
@Override
Expand All @@ -71,12 +88,14 @@ public Single<FilterableStreamingHttpConnection> newConnection(

@AfterEach
void cleanup() throws Exception {
httpClient.close();
serverContext.close();
safeClose(httpClient);
safeClose(serverContext);
}

@Test
void reusesConnectionOnReserve() throws Exception {
@ParameterizedTest(name = "{displayName} [{index}]: protocol={0}")
@EnumSource(HttpProtocol.class)
void reusesConnectionOnReserve(HttpProtocol protocol) throws Exception {
setup(protocol, false);
HttpRequest metaData = httpClient.get("/");

ReservedBlockingHttpConnection connection = httpClient.reserveConnection(metaData);
Expand All @@ -86,11 +105,13 @@ void reusesConnectionOnReserve() throws Exception {
connection = httpClient.reserveConnection(metaData);
connection.release();

assertEquals(1, createdConnections.get());
assertThat(createdConnections.get(), is(1));
}

@Test
void canForceNewConnection() throws Exception {
@ParameterizedTest(name = "{displayName} [{index}]: protocol={0}")
@EnumSource(HttpProtocol.class)
void canForceNewConnection(HttpProtocol protocol) throws Exception {
setup(protocol, false);
HttpRequest metaData = httpClient.get("/");
metaData.context().put(HttpContextKeys.HTTP_FORCE_NEW_CONNECTION, true);

Expand All @@ -101,6 +122,21 @@ void canForceNewConnection() throws Exception {
connection = httpClient.reserveConnection(metaData);
connection.release();

assertEquals(3, createdConnections.get());
assertThat(createdConnections.get(), is(3));
}

@ParameterizedTest(name = "{displayName} [{index}]: protocol={0}")
@EnumSource(HttpProtocol.class)
void reservedConnectionClosesOnExceptionCaught(HttpProtocol protocol) throws Exception {
setup(protocol, true);
HttpRequest metaData = httpClient.get("/");

ReservedHttpConnection connection = httpClient.reserveConnection(metaData).asConnection();
ExecutionException e = assertThrows(ExecutionException.class,
() -> connection.request(connection.get("/")).toFuture().get());
assertThat(e.getCause(), is(instanceOf(ClosedChannelException.class)));
connection.onClose().toFuture().get();

assertThat(createdConnections.get(), is(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,10 @@ public void handlerRemoved(ChannelHandlerContext ctx) {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// NettyChannelPublisher will force closure of the channel in case of exception after the cause is
// propagated to users. In case users don't have offloading, there is a risk to retry on the same IO thread.
// We should notify LoadBalancer that this connection is closing to avoid retrying on the same connection.
connection.notifyOnClosing();
connection.nettyChannelPublisher.channelOnError(unwrapThrowable(cause));
}

Expand Down

0 comments on commit 5607727

Please sign in to comment.