Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect state possible after retrying ServiceDiscoverer events #3006

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static java.util.Objects.requireNonNull;

/**
* Provide a way to describe a partition using a collection of of attributes. Typically only a single type of any
* Provide a way to describe a partition using a collection of attributes. Typically only a single type of any
* particular {@link Key} exists in each {@link PartitionAttributes}. For example:
* <pre>
* { [Key(shard) = "shard X"], [Key(data center) = "data center X"], [Key(is main) = "false/true"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_INIT_DURATION;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_MAX_DELAY;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;

@Deprecated // FIXME: 0.43 - remove deprecated class
final class DefaultPartitionedHttpClientBuilder<U, R> implements PartitionedHttpClientBuilder<U, R> {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPartitionedHttpClientBuilder.class);
private static final AtomicInteger CLIENT_ID = new AtomicInteger();

private final U address;
private final Function<HttpRequestMetaData, PartitionAttributesBuilder> partitionAttributesBuilderFactory;
Expand Down Expand Up @@ -101,15 +101,11 @@ final class DefaultPartitionedHttpClientBuilder<U, R> implements PartitionedHttp

@Override
public StreamingHttpClient buildStreaming() {
final String targetResource = targetResource(address);
final HttpExecutionContext executionContext = executionContextBuilder.build();
BiIntFunction<Throwable, ? extends Completable> sdRetryStrategy = serviceDiscovererRetryStrategy;
if (sdRetryStrategy == null) {
sdRetryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
}
final ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> psd =
new DefaultSingleAddressHttpClientBuilder.RetryingServiceDiscoverer<>(serviceDiscoverer,
sdRetryStrategy);
new RetryingServiceDiscoverer<>(targetResource, serviceDiscoverer, serviceDiscovererRetryStrategy,
executionContext, DefaultPartitionedHttpClientBuilder::makeUnavailable);

final PartitionedClientFactory<U, R, FilterableStreamingHttpClient> clientFactory = (pa, sd) -> {
// build new context, user may have changed anything on the builder from the filter
Expand Down Expand Up @@ -139,6 +135,30 @@ public StreamingHttpClient buildStreaming() {
return new FilterableClientToClient(partitionedClient, executionContext);
}

private static <U> String targetResource(final U address) {
return address + "/" + CLIENT_ID.incrementAndGet();
}

private static <R> PartitionedServiceDiscovererEvent<R> makeUnavailable(
final PartitionedServiceDiscovererEvent<R> event) {
return new PartitionedServiceDiscovererEvent<R>() {
@Override
public PartitionAttributes partitionAddress() {
return event.partitionAddress();
}

@Override
public R address() {
return event.address();
}

@Override
public Status status() {
return UNAVAILABLE;
}
};
}

private static final class DefaultPartitionedStreamingHttpClientFilter<U, R> implements
FilterableStreamingHttpClient {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.buffer.api.CharSequences;
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionFactoryFilter;
import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
import io.servicetalk.client.api.DelegatingServiceDiscoverer;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.ServiceDiscoverer;
Expand Down Expand Up @@ -66,17 +67,17 @@

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.netty.util.NetUtil.toSocketAddressString;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
Expand All @@ -86,7 +87,6 @@
import static io.servicetalk.http.netty.StrategyInfluencerAwareConversions.toConditionalConnectionFilterFactory;
import static java.lang.Integer.parseInt;
import static java.time.Duration.ofMinutes;
import static java.time.Duration.ofSeconds;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -105,9 +105,7 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
new RetryingHttpRequesterFilter.Builder().build();
private static final StreamingHttpConnectionFilterFactory DEFAULT_IDLE_TIMEOUT_FILTER =
new IdleTimeoutConnectionFilter(ofMinutes(5));

static final Duration SD_RETRY_STRATEGY_INIT_DURATION = ofSeconds(2);
static final Duration SD_RETRY_STRATEGY_MAX_DELAY = ofSeconds(128);
private static final AtomicInteger CLIENT_ID = new AtomicInteger();

private final U address;
@Nullable
Expand All @@ -116,7 +114,7 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
final HttpExecutionContextBuilder executionContextBuilder;
private final ClientStrategyInfluencerChainBuilder strategyComputation;
private HttpLoadBalancerFactory<R> loadBalancerFactory;
private ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> serviceDiscoverer;
private ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer;
private Function<U, CharSequence> hostToCharSequenceFunction =
DefaultSingleAddressHttpClientBuilder::toAuthorityForm;
private boolean addHostHeaderFallbackFilter = true;
Expand All @@ -142,8 +140,7 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
executionContextBuilder = new HttpExecutionContextBuilder();
strategyComputation = new ClientStrategyInfluencerChainBuilder();
this.loadBalancerFactory = defaultLoadBalancer();
this.serviceDiscoverer = requireNonNull(serviceDiscoverer);

this.serviceDiscoverer = new CastedServiceDiscoverer<>(serviceDiscoverer);
clientFilterFactory = appendFilter(clientFilterFactory, HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER);
}

Expand Down Expand Up @@ -176,15 +173,15 @@ static <U, R> SingleAddressHttpClientBuilder<U, R> setExecutionContext(

private static final class HttpClientBuildContext<U, R> {
final DefaultSingleAddressHttpClientBuilder<U, R> builder;
private final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> sd;
private final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> sd;
private final SdStatusCompletable sdStatus;

@Nullable
private final BiIntFunction<Throwable, ? extends Completable> serviceDiscovererRetryStrategy;

HttpClientBuildContext(
final DefaultSingleAddressHttpClientBuilder<U, R> builder,
final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> sd,
final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> sd,
@Nullable final BiIntFunction<Throwable, ? extends Completable> serviceDiscovererRetryStrategy) {
this.builder = builder;
this.serviceDiscovererRetryStrategy = serviceDiscovererRetryStrategy;
Expand All @@ -200,17 +197,18 @@ HttpClientConfig httpConfig() {
return builder.config;
}

ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> serviceDiscoverer(
HttpExecutionContext executionContext) {
BiIntFunction<Throwable, ? extends Completable> sdRetryStrategy = serviceDiscovererRetryStrategy;
ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer(
final String targetResource, final HttpExecutionContext executionContext) {
final BiIntFunction<Throwable, ? extends Completable> sdRetryStrategy = serviceDiscovererRetryStrategy;
if (sdRetryStrategy == HttpClients.NoRetriesStrategy.INSTANCE) {
return sd;
}
if (sdRetryStrategy == null) {
sdRetryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
}
return new RetryingServiceDiscoverer<>(new StatusAwareServiceDiscoverer<>(sd, sdStatus), sdRetryStrategy);
return new RetryingServiceDiscoverer<>(targetResource, new StatusAwareServiceDiscoverer<>(sd, sdStatus),
sdRetryStrategy, executionContext, HttpClientBuildContext::makeUnavailable);
}

private static <R> ServiceDiscovererEvent<R> makeUnavailable(final ServiceDiscovererEvent<R> event) {
return new DefaultServiceDiscovererEvent<>(event.address(), UNAVAILABLE);
}
}

Expand All @@ -220,6 +218,7 @@ public StreamingHttpClient buildStreaming() {
}

private static <U, R> StreamingHttpClient buildStreaming(final HttpClientBuildContext<U, R> ctx) {
final String targetResource = targetResource(ctx);
final ReadOnlyHttpClientConfig roConfig = ctx.httpConfig().asReadOnly();
final HttpExecutionContext builderExecutionContext = ctx.builder.executionContextBuilder.build();
final HttpExecutionStrategy computedStrategy =
Expand All @@ -236,7 +235,7 @@ public HttpExecutionStrategy executionStrategy() {
final CompositeCloseable closeOnException = newCompositeCloseable();
try {
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<R>>> sdEvents =
ctx.serviceDiscoverer(executionContext).discover(ctx.address());
ctx.serviceDiscoverer(targetResource, executionContext).discover(ctx.address());

ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> connectionFactoryFilter =
ctx.builder.connectionFactoryFilter;
Expand Down Expand Up @@ -304,9 +303,7 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(

final LoadBalancer<FilterableStreamingHttpLoadBalancedConnection> lb =
closeOnException.prepend(ctx.builder.loadBalancerFactory.newLoadBalancer(
sdEvents,
connectionFactory,
targetAddress(ctx)));
sdEvents, connectionFactory, targetResource));

ContextAwareStreamingHttpClientFilterFactory currClientFilterFactory = ctx.builder.clientFilterFactory;

Expand Down Expand Up @@ -338,14 +335,14 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(
builderStrategy.missing(computedStrategy) != offloadNone()) {
LOGGER.info("Client for {} created with the builder strategy {} but resulting computed strategy is " +
"{}. One of the filters enforces additional offloading. To find out what filter is " +
"it, enable debug level logging for {}.", targetAddress(ctx), builderStrategy,
"it, enable debug level logging for {}.", targetResource, builderStrategy,
computedStrategy, ClientStrategyInfluencerChainBuilder.class);
} else if (builderStrategy == computedStrategy) {
LOGGER.debug("Client for {} created with the execution strategy {}.",
targetAddress(ctx), computedStrategy);
targetResource, computedStrategy);
} else {
LOGGER.debug("Client for {} created with the builder strategy {}, resulting computed strategy is {}.",
targetAddress(ctx), builderStrategy, computedStrategy);
targetResource, builderStrategy, computedStrategy);
}
return new FilterableClientToClient(wrappedClient, executionContext);
} catch (final Throwable t) {
Expand Down Expand Up @@ -392,10 +389,14 @@ private static StreamingHttpRequestResponseFactory defaultReqRespFactory(ReadOnl
}
}

private static <U, R> String targetAddress(final HttpClientBuildContext<U, R> ctx) {
assert ctx.builder.address != null;
return ctx.builder.proxyAddress == null ?
ctx.builder.address.toString() : ctx.builder.address + " (via " + ctx.builder.proxyAddress + ")";
/**
* This method is used to create a "targetResource" identifier that helps us to correlate internal state of the
* ServiceDiscoveryRetryStrategy and LoadBalancer.
*/
private static <U, R> String targetResource(final HttpClientBuildContext<U, R> ctx) {
final String uniqueAddress = ctx.builder.address + "/" + CLIENT_ID.incrementAndGet();
return ctx.builder.proxyAddress == null ? uniqueAddress :
uniqueAddress + " (via " + ctx.builder.proxyAddress + ")";
}

private static ContextAwareStreamingHttpClientFilterFactory appendFilter(
Expand Down Expand Up @@ -601,7 +602,7 @@ public DefaultSingleAddressHttpClientBuilder<U, R> appendClientFilter(
@Override
public DefaultSingleAddressHttpClientBuilder<U, R> serviceDiscoverer(
final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> serviceDiscoverer) {
this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
this.serviceDiscoverer = new CastedServiceDiscoverer<>(serviceDiscoverer);
return this;
}

Expand Down Expand Up @@ -751,23 +752,6 @@ public Publisher<Collection<E>> discover(final U u) {
}
}

static final class RetryingServiceDiscoverer<U, R, E extends ServiceDiscovererEvent<R>>
extends DelegatingServiceDiscoverer<U, R, E> {
private final BiIntFunction<Throwable, ? extends Completable> retryStrategy;

RetryingServiceDiscoverer(final ServiceDiscoverer<U, R, E> delegate,
final BiIntFunction<Throwable, ? extends Completable> retryStrategy) {
super(delegate);
this.retryStrategy = requireNonNull(retryStrategy);
}

@Override
public Publisher<Collection<E>> discover(final U u) {
// terminateOnNextException false -> LB is after this operator, if LB throws do best effort retry.
return delegate().discover(u).retryWhen(false, retryStrategy);
}
}

private static final class AlpnReqRespFactoryFunc implements
Function<HttpProtocolVersion, StreamingHttpRequestResponseFactory> {
private final BufferAllocator allocator;
Expand Down Expand Up @@ -839,4 +823,47 @@ private static <ResolvedAddress> HttpLoadBalancerFactory<ResolvedAddress> defaul
RoundRobinLoadBalancers.<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection>builder(
DefaultHttpLoadBalancerFactory.class.getSimpleName()).build());
}

// Because of the change in https://github.com/apple/servicetalk/pull/2379, we should constrain the type back to
// ServiceDiscovererEvent without "? extends" to allow RetryingServiceDiscoverer to mark events as UNAVAILABLE.
Comment on lines +827 to +828
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can back out these changes? I think it's technically a breaking change but afaict we could never practically make use of them because there isn't a type parameter on the ClientBuilder that lets us bridge the type between the ServiceDiscoverer and the LB.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about bridging the type between SD and LB, it's about a possibility to pass a custom SD with custom event subtype to HttpClients. Without this, it won't be possible to use those implementations, they will have to change to use ServiceDiscovererEvent instead.

private static final class CastedServiceDiscoverer<U, R>
implements ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> {

private final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> delegate;

private CastedServiceDiscoverer(final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> delegate) {
this.delegate = requireNonNull(delegate);
}

@Override
@SuppressWarnings("unchecked")
public Publisher<Collection<ServiceDiscovererEvent<R>>> discover(final U address) {
return delegate.discover(address).map(e -> (Collection<ServiceDiscovererEvent<R>>) e);
}

@Override
public Completable closeAsync() {
return delegate.closeAsync();
}

@Override
public Completable closeAsyncGracefully() {
return delegate.closeAsyncGracefully();
}

@Override
public Completable onClose() {
return delegate.onClose();
}

@Override
public Completable onClosing() {
return delegate.onClosing();
}

@Override
public String toString() {
return delegate.toString();
}
}
}
Loading
Loading