From d3e55abd2a75091ae955438b07ef0c82ca5e1c26 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 12 Jul 2024 10:42:35 -0700 Subject: [PATCH 1/5] Incorrect state possible after retrying `ServiceDiscoverer` events Motivation: Clients have a configurable `serviceDiscovererRetryStrategy` to guarantee a steady stream of events to the `LoadBalancer` that never fails. It's necessary at the client level to avoid hanging requests indefinitely and let requests observe failures from ServiceDiscoverer. Also, for `PartitionedHttpClient` it's necessary to guarantee that `GroupedPublisher` never fails. Retry is effectively a re-subscribe. According to `ServiceDiscoverer` contract (clarified in #3002), each `Subscriber` receives a "state of the world" as the first collection of events. The problem is that the state may change significantly between retries, as a result unavailable addresses can remain inside the `LoadBalancer` forever. Example: T1. SD delivers [a,b] T1. LB receives [a,b] T1. SD delivers error T2. SD info changed ("a" got revoked) T3. Client retries SD T3. SD delivers [b] T3. LB receives [b] (but still holds "a") When we retry `ServiceDiscoverer` errors, we should keep pushing deltas downstream or purge events that are not present in the new "state of the world". We previously had this protection but it was mistakenly removed in #1949 as part of a broader refactoring around `ServiceDiscoverer` <-> `LoadBalancer` contract. Modifications: - Add `RetryingServiceDiscoverer` that handles retries and keeps the state between retries. - Use it in `DefaultSingleAddressHttpClientBuilder` and `DefaultPartitionedHttpClientBuilder`. - Use `CastedServiceDiscoverer` to allow modifications for `ServiceDiscovererEvent` after we started to use a wildcard type in #2379. - Pass consistent `targetResource` identifier to both `RetryingServiceDiscoverer` and `LoadBalancerFactory` to allow state correlation when inspecting heap dump. Result: Client keeps pushing deltas to `LoadBalancer` after retrying `ServiceDiscoverer` errors, keeping its state consistent with `ServiceDiscoverer`. --- .../api/partition/PartitionAttributes.java | 2 +- .../DefaultPartitionedHttpClientBuilder.java | 40 +++-- ...DefaultSingleAddressHttpClientBuilder.java | 123 +++++++++------ .../http/netty/RetryingServiceDiscoverer.java | 149 ++++++++++++++++++ .../http/netty/PartitionedHttpClientTest.java | 1 + 5 files changed, 256 insertions(+), 59 deletions(-) create mode 100644 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java diff --git a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/partition/PartitionAttributes.java b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/partition/PartitionAttributes.java index 3020794411..b1382d650c 100644 --- a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/partition/PartitionAttributes.java +++ b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/partition/PartitionAttributes.java @@ -26,7 +26,7 @@ import static java.util.Objects.requireNonNull; /** - * Provide a way to describe a partition using a collection of of attributes. Typically only a single type of any + * Provide a way to describe a partition using a collection of attributes. Typically only a single type of any * particular {@link Key} exists in each {@link PartitionAttributes}. For example: *
  * { [Key(shard) = "shard X"], [Key(data center) = "data center X"], [Key(is main) = "false/true"] }
diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java
index a964ef548b..faadce37b5 100644
--- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java
@@ -55,17 +55,16 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
+import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
 import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
-import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
 import static io.servicetalk.concurrent.api.Single.defer;
 import static io.servicetalk.concurrent.api.Single.failed;
 import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
-import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_INIT_DURATION;
-import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_MAX_DELAY;
 import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
 import static java.util.Objects.requireNonNull;
 import static java.util.function.Function.identity;
@@ -73,6 +72,7 @@
 @Deprecated // FIXME: 0.43 - remove deprecated class
 final class DefaultPartitionedHttpClientBuilder implements PartitionedHttpClientBuilder {
     private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPartitionedHttpClientBuilder.class);
+    private static final AtomicInteger CLIENT_ID = new AtomicInteger();
 
     private final U address;
     private final Function partitionAttributesBuilderFactory;
@@ -101,15 +101,11 @@ final class DefaultPartitionedHttpClientBuilder implements PartitionedHttp
 
     @Override
     public StreamingHttpClient buildStreaming() {
+        final String targetResource = targetResource(address);
         final HttpExecutionContext executionContext = executionContextBuilder.build();
-        BiIntFunction sdRetryStrategy = serviceDiscovererRetryStrategy;
-        if (sdRetryStrategy == null) {
-            sdRetryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
-                    SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
-        }
         final ServiceDiscoverer> psd =
-                new DefaultSingleAddressHttpClientBuilder.RetryingServiceDiscoverer<>(serviceDiscoverer,
-                        sdRetryStrategy);
+                new RetryingServiceDiscoverer<>(targetResource, serviceDiscoverer, serviceDiscovererRetryStrategy,
+                        executionContext, DefaultPartitionedHttpClientBuilder::makeUnavailable);
 
         final PartitionedClientFactory clientFactory = (pa, sd) -> {
             // build new context, user may have changed anything on the builder from the filter
@@ -139,6 +135,30 @@ public StreamingHttpClient buildStreaming() {
         return new FilterableClientToClient(partitionedClient, executionContext);
     }
 
+    private static  String targetResource(final U address) {
+        return address + "/" + CLIENT_ID.incrementAndGet();
+    }
+
+    private static  PartitionedServiceDiscovererEvent makeUnavailable(
+            final PartitionedServiceDiscovererEvent event) {
+        return new PartitionedServiceDiscovererEvent() {
+            @Override
+            public PartitionAttributes partitionAddress() {
+                return event.partitionAddress();
+            }
+
+            @Override
+            public R address() {
+                return event.address();
+            }
+
+            @Override
+            public Status status() {
+                return UNAVAILABLE;
+            }
+        };
+    }
+
     private static final class DefaultPartitionedStreamingHttpClientFilter implements
                                                                                  FilterableStreamingHttpClient {
 
diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java
index ce5e3cb94b..27a0e1d60a 100644
--- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java
@@ -19,6 +19,7 @@
 import io.servicetalk.buffer.api.CharSequences;
 import io.servicetalk.client.api.ConnectionFactory;
 import io.servicetalk.client.api.ConnectionFactoryFilter;
+import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
 import io.servicetalk.client.api.DelegatingServiceDiscoverer;
 import io.servicetalk.client.api.LoadBalancer;
 import io.servicetalk.client.api.ServiceDiscoverer;
@@ -66,17 +67,17 @@
 
 import java.net.InetSocketAddress;
 import java.net.SocketOption;
-import java.time.Duration;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import javax.annotation.Nullable;
 
 import static io.netty.util.NetUtil.toSocketAddressString;
+import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
 import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
 import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor;
-import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
 import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
 import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
 import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
@@ -86,7 +87,6 @@
 import static io.servicetalk.http.netty.StrategyInfluencerAwareConversions.toConditionalConnectionFilterFactory;
 import static java.lang.Integer.parseInt;
 import static java.time.Duration.ofMinutes;
-import static java.time.Duration.ofSeconds;
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -105,9 +105,7 @@ final class DefaultSingleAddressHttpClientBuilder implements SingleAddress
             new RetryingHttpRequesterFilter.Builder().build();
     private static final StreamingHttpConnectionFilterFactory DEFAULT_IDLE_TIMEOUT_FILTER =
             new IdleTimeoutConnectionFilter(ofMinutes(5));
-
-    static final Duration SD_RETRY_STRATEGY_INIT_DURATION = ofSeconds(2);
-    static final Duration SD_RETRY_STRATEGY_MAX_DELAY = ofSeconds(128);
+    private static final AtomicInteger CLIENT_ID = new AtomicInteger();
 
     private final U address;
     @Nullable
@@ -116,7 +114,7 @@ final class DefaultSingleAddressHttpClientBuilder implements SingleAddress
     final HttpExecutionContextBuilder executionContextBuilder;
     private final ClientStrategyInfluencerChainBuilder strategyComputation;
     private HttpLoadBalancerFactory loadBalancerFactory;
-    private ServiceDiscoverer> serviceDiscoverer;
+    private ServiceDiscoverer> serviceDiscoverer;
     private Function hostToCharSequenceFunction =
             DefaultSingleAddressHttpClientBuilder::toAuthorityForm;
     private boolean addHostHeaderFallbackFilter = true;
@@ -142,8 +140,7 @@ final class DefaultSingleAddressHttpClientBuilder implements SingleAddress
         executionContextBuilder = new HttpExecutionContextBuilder();
         strategyComputation = new ClientStrategyInfluencerChainBuilder();
         this.loadBalancerFactory = defaultLoadBalancer();
-        this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
-
+        this.serviceDiscoverer = new CastedServiceDiscoverer<>(serviceDiscoverer);
         clientFilterFactory = appendFilter(clientFilterFactory, HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER);
     }
 
@@ -176,7 +173,7 @@ static  SingleAddressHttpClientBuilder setExecutionContext(
 
     private static final class HttpClientBuildContext {
         final DefaultSingleAddressHttpClientBuilder builder;
-        private final ServiceDiscoverer> sd;
+        private final ServiceDiscoverer> sd;
         private final SdStatusCompletable sdStatus;
 
         @Nullable
@@ -184,7 +181,7 @@ private static final class HttpClientBuildContext {
 
         HttpClientBuildContext(
                 final DefaultSingleAddressHttpClientBuilder builder,
-                final ServiceDiscoverer> sd,
+                final ServiceDiscoverer> sd,
                 @Nullable final BiIntFunction serviceDiscovererRetryStrategy) {
             this.builder = builder;
             this.serviceDiscovererRetryStrategy = serviceDiscovererRetryStrategy;
@@ -200,17 +197,18 @@ HttpClientConfig httpConfig() {
             return builder.config;
         }
 
-        ServiceDiscoverer> serviceDiscoverer(
-                HttpExecutionContext executionContext) {
-            BiIntFunction sdRetryStrategy = serviceDiscovererRetryStrategy;
+        ServiceDiscoverer> serviceDiscoverer(
+                final String targetResource, final HttpExecutionContext executionContext) {
+            final BiIntFunction sdRetryStrategy = serviceDiscovererRetryStrategy;
             if (sdRetryStrategy == HttpClients.NoRetriesStrategy.INSTANCE) {
                 return sd;
             }
-            if (sdRetryStrategy == null) {
-                sdRetryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
-                        SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
-            }
-            return new RetryingServiceDiscoverer<>(new StatusAwareServiceDiscoverer<>(sd, sdStatus), sdRetryStrategy);
+            return new RetryingServiceDiscoverer<>(targetResource, new StatusAwareServiceDiscoverer<>(sd, sdStatus),
+                    sdRetryStrategy, executionContext, HttpClientBuildContext::makeUnavailable);
+        }
+
+        private static  ServiceDiscovererEvent makeUnavailable(final ServiceDiscovererEvent event) {
+            return new DefaultServiceDiscovererEvent<>(event.address(), UNAVAILABLE);
         }
     }
 
@@ -220,6 +218,7 @@ public StreamingHttpClient buildStreaming() {
     }
 
     private static  StreamingHttpClient buildStreaming(final HttpClientBuildContext ctx) {
+        final String targetResource = targetResource(ctx);
         final ReadOnlyHttpClientConfig roConfig = ctx.httpConfig().asReadOnly();
         final HttpExecutionContext builderExecutionContext = ctx.builder.executionContextBuilder.build();
         final HttpExecutionStrategy computedStrategy =
@@ -236,7 +235,7 @@ public HttpExecutionStrategy executionStrategy() {
         final CompositeCloseable closeOnException = newCompositeCloseable();
         try {
             final Publisher>> sdEvents =
-                    ctx.serviceDiscoverer(executionContext).discover(ctx.address());
+                    ctx.serviceDiscoverer(targetResource, executionContext).discover(ctx.address());
 
             ConnectionFactoryFilter connectionFactoryFilter =
                     ctx.builder.connectionFactoryFilter;
@@ -304,9 +303,7 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(
 
             final LoadBalancer lb =
                     closeOnException.prepend(ctx.builder.loadBalancerFactory.newLoadBalancer(
-                            sdEvents,
-                            connectionFactory,
-                            targetAddress(ctx)));
+                            sdEvents, connectionFactory, targetResource));
 
             ContextAwareStreamingHttpClientFilterFactory currClientFilterFactory = ctx.builder.clientFilterFactory;
 
@@ -338,14 +335,14 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(
                     builderStrategy.missing(computedStrategy) != offloadNone()) {
                 LOGGER.info("Client for {} created with the builder strategy {} but resulting computed strategy is " +
                                 "{}. One of the filters enforces additional offloading. To find out what filter is " +
-                                "it, enable debug level logging for {}.", targetAddress(ctx), builderStrategy,
+                                "it, enable debug level logging for {}.", targetResource, builderStrategy,
                         computedStrategy, ClientStrategyInfluencerChainBuilder.class);
             } else if (builderStrategy == computedStrategy) {
                 LOGGER.debug("Client for {} created with the execution strategy {}.",
-                        targetAddress(ctx), computedStrategy);
+                        targetResource, computedStrategy);
             } else {
                 LOGGER.debug("Client for {} created with the builder strategy {}, resulting computed strategy is {}.",
-                        targetAddress(ctx), builderStrategy, computedStrategy);
+                        targetResource, builderStrategy, computedStrategy);
             }
             return new FilterableClientToClient(wrappedClient, executionContext);
         } catch (final Throwable t) {
@@ -392,10 +389,14 @@ private static StreamingHttpRequestResponseFactory defaultReqRespFactory(ReadOnl
         }
     }
 
-    private static  String targetAddress(final HttpClientBuildContext ctx) {
-        assert ctx.builder.address != null;
-        return ctx.builder.proxyAddress == null ?
-                ctx.builder.address.toString() : ctx.builder.address + " (via " + ctx.builder.proxyAddress + ")";
+    /**
+     * This method is used to create a "targetResource" identifier that helps us to correlate internal state of the
+     * ServiceDiscoveryRetryStrategy and LoadBalancer.
+     */
+    private static  String targetResource(final HttpClientBuildContext ctx) {
+        final String uniqueAddress = ctx.builder.address + "/" + CLIENT_ID.incrementAndGet();
+        return ctx.builder.proxyAddress == null ? uniqueAddress :
+                uniqueAddress + " (via " + ctx.builder.proxyAddress + ")";
     }
 
     private static ContextAwareStreamingHttpClientFilterFactory appendFilter(
@@ -601,7 +602,7 @@ public DefaultSingleAddressHttpClientBuilder appendClientFilter(
     @Override
     public DefaultSingleAddressHttpClientBuilder serviceDiscoverer(
             final ServiceDiscoverer> serviceDiscoverer) {
-        this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
+        this.serviceDiscoverer = new CastedServiceDiscoverer<>(serviceDiscoverer);
         return this;
     }
 
@@ -751,23 +752,6 @@ public Publisher> discover(final U u) {
         }
     }
 
-    static final class RetryingServiceDiscoverer>
-            extends DelegatingServiceDiscoverer {
-        private final BiIntFunction retryStrategy;
-
-        RetryingServiceDiscoverer(final ServiceDiscoverer delegate,
-                                  final BiIntFunction retryStrategy) {
-            super(delegate);
-            this.retryStrategy = requireNonNull(retryStrategy);
-        }
-
-        @Override
-        public Publisher> discover(final U u) {
-            // terminateOnNextException false -> LB is after this operator, if LB throws do best effort retry.
-            return delegate().discover(u).retryWhen(false, retryStrategy);
-        }
-    }
-
     private static final class AlpnReqRespFactoryFunc implements
                                                   Function {
         private final BufferAllocator allocator;
@@ -839,4 +823,47 @@ private static  HttpLoadBalancerFactory defaul
                 RoundRobinLoadBalancers.builder(
                                 DefaultHttpLoadBalancerFactory.class.getSimpleName()).build());
     }
+
+    // Because of the change in https://github.com/apple/servicetalk/pull/2379, we should constrain the type back to
+    // ServiceDiscovererEvent without "? extends" to allow RetryingServiceDiscoverer to mark events as UNAVAILABLE.
+    private static final class CastedServiceDiscoverer
+            implements ServiceDiscoverer> {
+
+        private final ServiceDiscoverer> delegate;
+
+        private CastedServiceDiscoverer(final ServiceDiscoverer> delegate) {
+            this.delegate = requireNonNull(delegate);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public Publisher>> discover(final U address) {
+            return delegate.discover(address).map(e -> (Collection>) e);
+        }
+
+        @Override
+        public Completable closeAsync() {
+            return delegate.closeAsync();
+        }
+
+        @Override
+        public Completable closeAsyncGracefully() {
+            return delegate.closeAsyncGracefully();
+        }
+
+        @Override
+        public Completable onClose() {
+            return delegate.onClose();
+        }
+
+        @Override
+        public Completable onClosing() {
+            return delegate.onClosing();
+        }
+
+        @Override
+        public String toString() {
+            return delegate.toString();
+        }
+    }
 }
diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
new file mode 100644
index 0000000000..eb6ea5d30f
--- /dev/null
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright © 2024 Apple Inc. and the ServiceTalk project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.servicetalk.http.netty;
+
+import io.servicetalk.client.api.DelegatingServiceDiscoverer;
+import io.servicetalk.client.api.ServiceDiscoverer;
+import io.servicetalk.client.api.ServiceDiscovererEvent;
+import io.servicetalk.concurrent.api.BiIntFunction;
+import io.servicetalk.concurrent.api.Completable;
+import io.servicetalk.concurrent.api.Publisher;
+import io.servicetalk.http.api.HttpExecutionContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+import javax.annotation.Nullable;
+
+import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
+import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.emptyMap;
+
+final class RetryingServiceDiscoverer>
+        extends DelegatingServiceDiscoverer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RetryingServiceDiscoverer.class);
+
+    private static final Duration SD_RETRY_STRATEGY_INIT_DURATION = ofSeconds(2);
+    private static final Duration SD_RETRY_STRATEGY_MAX_DELAY = ofSeconds(128);
+
+    private final String targetResource;
+    private final BiIntFunction retryStrategy;
+    private final UnaryOperator makeUnavailable;
+
+    RetryingServiceDiscoverer(final String targetResource,
+                              final ServiceDiscoverer delegate,
+                              @Nullable BiIntFunction retryStrategy,
+                              final HttpExecutionContext executionContext,
+                              final UnaryOperator makeUnavailable) {
+        super(delegate);
+        this.targetResource = targetResource;
+        if (retryStrategy == null) {
+            retryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
+                    SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
+        }
+        this.retryStrategy = retryStrategy;
+        this.makeUnavailable = makeUnavailable;
+    }
+
+    @Override
+    public Publisher> discover(final U address) {
+        // The returned publisher is guaranteed to never fail because we retry all errors here. However, LoadBalancer
+        // can still cancel and re-subscribe in attempt to recover from unhealthy state. In this case, we need to
+        // re-initialize the ServiceDiscovererEventsCache and restart from an empty state.
+        return Publisher.defer(() -> {
+            final ServiceDiscovererEventsCache eventsCache =
+                    new ServiceDiscovererEventsCache<>(targetResource, makeUnavailable);
+            return delegate().discover(address)
+                    .map(eventsCache::consumeAndFilter)
+                    .beforeOnError(eventsCache::errorSeen)
+                    // terminateOnNextException false -> LB is after this operator, if LB throws do best effort retry
+                    .retryWhen(false, retryStrategy);
+        });
+    }
+
+    private static final class ServiceDiscovererEventsCache> {
+        @SuppressWarnings("rawtypes")
+        private static final Map NONE_RETAINED = emptyMap();
+
+        private final String targetResource;
+        private final UnaryOperator makeUnavailable;
+        private final Map currentState = new HashMap<>();
+        private Map retainedState = noneRetained();
+
+        private ServiceDiscovererEventsCache(final String targetResource, final UnaryOperator makeUnavailable) {
+            this.targetResource = targetResource;
+            this.makeUnavailable = makeUnavailable;
+        }
+
+        void errorSeen(final Throwable t) {
+            if (retainedState == NONE_RETAINED) {
+                retainedState = new HashMap<>(currentState);
+                currentState.clear();
+            }
+            LOGGER.debug("{} observed an error from ServiceDiscoverer", targetResource, t);
+        }
+
+        Collection consumeAndFilter(final Collection events) {
+            if (retainedState == NONE_RETAINED) {
+                for (E event : events) {
+                    if (UNAVAILABLE.equals(event.status())) {
+                        currentState.remove(event.address());
+                    } else {
+                        currentState.put(event.address(), event);
+                    }
+                }
+                return events;
+            }
+
+            // We have seen an error and re-subscribed upon retry. Based on the Publisher rule 1.10
+            // (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.10), each subscribe
+            // expects a different Subscriber. Therefore, discovery Publisher suppose to start from a fresh state. We
+            // should populate currentState with new addresses and deactivate the ones which are not present in the new
+            // collection, but were left in retainedState.
+            assert currentState.isEmpty();
+            final List toReturn = new ArrayList<>(events.size() + retainedState.size());
+            for (E event : events) {
+                final R address = event.address();
+                toReturn.add(event);
+                retainedState.remove(address);
+                if (!UNAVAILABLE.equals(event.status())) {
+                    currentState.put(address, event);
+                }
+            }
+
+            for (E event : retainedState.values()) {
+                assert event.status() != UNAVAILABLE;
+                toReturn.add(makeUnavailable.apply(event));
+            }
+
+            retainedState = noneRetained();
+            return toReturn;
+        }
+
+        @SuppressWarnings("unchecked")
+        private static > Map noneRetained() {
+            return NONE_RETAINED;
+        }
+    }
+}
diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PartitionedHttpClientTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PartitionedHttpClientTest.java
index 674f1376ea..be8616722d 100644
--- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PartitionedHttpClientTest.java
+++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PartitionedHttpClientTest.java
@@ -67,6 +67,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("deprecation")
 class PartitionedHttpClientTest {
     private static final PartitionAttributes.Key SRV_NAME = PartitionAttributes.Key.newKey();
     private static final PartitionAttributes.Key SRV_LEADER = PartitionAttributes.Key.newKey();

From 04527680a6944781ddad9c1ac92ee151fc54a53f Mon Sep 17 00:00:00 2001
From: Idel Pivnitskiy 
Date: Fri, 12 Jul 2024 18:48:48 -0700
Subject: [PATCH 2/5] RetryingServiceDiscoverer: ctor arg fix and comment
 clarification

---
 .../servicetalk/http/netty/RetryingServiceDiscoverer.java | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
index eb6ea5d30f..5eca70581f 100644
--- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
@@ -21,7 +21,8 @@
 import io.servicetalk.concurrent.api.BiIntFunction;
 import io.servicetalk.concurrent.api.Completable;
 import io.servicetalk.concurrent.api.Publisher;
-import io.servicetalk.http.api.HttpExecutionContext;
+import io.servicetalk.transport.api.ExecutionContext;
+import io.servicetalk.transport.api.ExecutionStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ final class RetryingServiceDiscoverer>
     RetryingServiceDiscoverer(final String targetResource,
                               final ServiceDiscoverer delegate,
                               @Nullable BiIntFunction retryStrategy,
-                              final HttpExecutionContext executionContext,
+                              final ExecutionContext executionContext,
                               final UnaryOperator makeUnavailable) {
         super(delegate);
         this.targetResource = targetResource;
@@ -120,7 +121,8 @@ Collection consumeAndFilter(final Collection events) {
             // (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.10), each subscribe
             // expects a different Subscriber. Therefore, discovery Publisher suppose to start from a fresh state. We
             // should populate currentState with new addresses and deactivate the ones which are not present in the new
-            // collection, but were left in retainedState.
+            // collection, but were left in retainedState. Original events are propagated as-is, even if they contain
+            // duplicate events because retry strategy should not alter the original flow from ServiceDiscoverer.
             assert currentState.isEmpty();
             final List toReturn = new ArrayList<>(events.size() + retainedState.size());
             for (E event : events) {

From 883ce990f5c54e9fcc3142556e4ac33d7f61a763 Mon Sep 17 00:00:00 2001
From: Idel Pivnitskiy 
Date: Fri, 12 Jul 2024 18:48:56 -0700
Subject: [PATCH 3/5] Add RetryingServiceDiscovererTest

---
 .../netty/RetryingServiceDiscovererTest.java  | 304 ++++++++++++++++++
 1 file changed, 304 insertions(+)
 create mode 100644 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingServiceDiscovererTest.java

diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingServiceDiscovererTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingServiceDiscovererTest.java
new file mode 100644
index 0000000000..20b1f43223
--- /dev/null
+++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingServiceDiscovererTest.java
@@ -0,0 +1,304 @@
+/*
+ * Copyright © 2024 Apple Inc. and the ServiceTalk project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.servicetalk.http.netty;
+
+import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
+import io.servicetalk.client.api.ServiceDiscoverer;
+import io.servicetalk.client.api.ServiceDiscovererEvent;
+import io.servicetalk.client.api.ServiceDiscovererEvent.Status;
+import io.servicetalk.concurrent.api.Completable;
+import io.servicetalk.concurrent.api.Publisher;
+import io.servicetalk.concurrent.api.TestPublisher;
+import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;
+import io.servicetalk.transport.netty.internal.GlobalExecutionContext;
+
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
+import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
+import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
+import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
+import static io.servicetalk.concurrent.internal.TestTimeoutConstants.CI;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class RetryingServiceDiscovererTest {
+
+    private static final int DEFAULT_POLL_MILLIS = CI ? 30 : 10;
+
+    private final BlockingQueue>>> pubs =
+            new LinkedBlockingQueue<>();
+    private final TestPublisherSubscriber>> subscriber =
+            new TestPublisherSubscriber<>();
+    private final ServiceDiscoverer> sd;
+
+    RetryingServiceDiscovererTest() {
+        @SuppressWarnings("unchecked")
+        ServiceDiscoverer> delegate = mock(ServiceDiscoverer.class);
+        when(delegate.discover(any())).thenReturn(Publisher.defer(() -> {
+            TestPublisher>> pub = new TestPublisher<>();
+            pubs.add(pub);
+            return pub;
+        }));
+        sd = new RetryingServiceDiscoverer<>(RetryingServiceDiscovererTest.class.getSimpleName(), delegate,
+                (count, t) -> Completable.completed(), GlobalExecutionContext.globalExecutionContext(),
+                e -> new DefaultServiceDiscovererEvent<>(e.address(), UNAVAILABLE));
+    }
+
+    @BeforeEach
+    void setUp() {
+        toSource(sd.discover("any")).subscribe(subscriber);
+        subscriber.awaitSubscription().request(Long.MAX_VALUE);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        verifyNoEventsReceived();
+        assertThat("Unexpected publisher created", pubs, is(empty()));
+    }
+
+    @Test
+    void errorWithNoAddresses() throws Exception {
+        TestPublisher>> sdEvents = pubs.take();
+        sdEvents = triggerRetry(sdEvents);
+        verifyNoEventsReceived();
+        sendUpAndVerifyReceived("addr1", sdEvents);
+    }
+
+    @ParameterizedTest(name = "{displayName} [{index}] withDuplicateEvents={0}")
+    @ValueSource(booleans = {false, true})
+    void sameAddressPostRetry(boolean withDuplicateEvents) throws Exception {
+        TestPublisher>> sdEvents = pubs.take();
+        ServiceDiscovererEvent evt1 = sendUpAndVerifyReceived("addr1", sdEvents);
+
+        for (int i = 0; i < 3; i++) {
+            sdEvents = triggerRetry(sdEvents);
+            verifyNoEventsReceived();
+
+            if (withDuplicateEvents) {
+                ServiceDiscovererEvent evt1Un = flip(evt1);
+                sdEvents.onNext(asList(evt1, evt1Un, evt1));
+                verifyReceivedEvents(contains(evt1, evt1Un, evt1));
+            } else {
+                sendUpAndVerifyReceived(evt1.address(), sdEvents);
+            }
+        }
+    }
+
+    @ParameterizedTest(name = "{displayName} [{index}] withDuplicateEvents={0}")
+    @ValueSource(booleans = {false, true})
+    void newAddressPostRetry(boolean withDuplicateEvents) throws Exception {
+        TestPublisher>> sdEvents = pubs.take();
+        ServiceDiscovererEvent evt1 = sendUpAndVerifyReceived("addr9", sdEvents);
+
+        for (int i = 0; i < 3; i++) {
+            sdEvents = triggerRetry(sdEvents);
+            verifyNoEventsReceived();
+
+            ServiceDiscovererEvent evt2 = new DefaultServiceDiscovererEvent<>("addr" + i, AVAILABLE);
+            if (withDuplicateEvents) {
+                sdEvents.onNext(asList(evt2, flip(evt2), evt2));
+                verifyReceivedEvents(contains(evt2, flip(evt2), evt2, flip(evt1)));
+            } else {
+                sdEvents.onNext(singletonList(evt2));
+                verifyReceivedEvents(contains(evt2, flip(evt1)));
+            }
+            evt1 = evt2;
+        }
+    }
+
+    @ParameterizedTest(name = "{displayName} [{index}] withDuplicateEvents={0}")
+    @ValueSource(booleans = {false, true})
+    void overlapAddressPostRetry(boolean withDuplicateEvents) throws Exception {
+        TestPublisher>> sdEvents = pubs.take();
+
+        ServiceDiscovererEvent evt1 = sendUpAndVerifyReceived("addr1", sdEvents);
+        ServiceDiscovererEvent evt2 = sendUpAndVerifyReceived("addr2", sdEvents);
+        ServiceDiscovererEvent evt3 = sendUpAndVerifyReceived("addr3", sdEvents);
+
+        sdEvents = triggerRetry(sdEvents);
+        verifyNoEventsReceived();
+
+        ServiceDiscovererEvent evt4 = new DefaultServiceDiscovererEvent<>("addr4", AVAILABLE);
+        List> newState = withDuplicateEvents ?
+                asList(evt2, evt4, evt2, evt4) : asList(evt2, evt4);
+        sdEvents.onNext(newState);
+
+        Collection> events = receivedEvents(subscriber);
+        assertThat("Unexpected event received", events, hasSize(newState.size() + 2));
+        assertThat("Unexpected event received",
+                events.stream().limit(newState.size()).collect(toList()), contains(newState.toArray()));
+        assertThat("Unexpected event received",
+                events.stream().skip(newState.size()).collect(toList()), containsInAnyOrder(flip(evt1), flip(evt3)));
+
+        sdEvents = triggerRetry(sdEvents);
+        verifyNoEventsReceived();
+
+        ServiceDiscovererEvent evt5 = new DefaultServiceDiscovererEvent<>("addr5", AVAILABLE);
+        newState = withDuplicateEvents ? asList(evt2, evt3, evt5, evt5) : asList(evt2, evt3, evt5);
+        sdEvents.onNext(newState);
+
+        events = receivedEvents(subscriber);
+        assertThat("Unexpected event received", events, hasSize(newState.size() + 1));
+        assertThat("Unexpected event received",
+                events.stream().limit(newState.size()).collect(toList()), contains(newState.toArray()));
+        assertThat("Unexpected event received",
+                events.stream().skip(newState.size()).collect(toList()), containsInAnyOrder(flip(evt4)));
+    }
+
+    @Test
+    void addRemoveBeforeRetry() throws Exception {
+        TestPublisher>> sdEvents = pubs.take();
+        ServiceDiscovererEvent evt1 = sendUpAndVerifyReceived("addr1", sdEvents);
+        sendUpAndVerifyReceived(evt1.address(), UNAVAILABLE, sdEvents);
+
+        sdEvents = triggerRetry(sdEvents);
+        verifyNoEventsReceived();
+
+        sendUpAndVerifyReceived("addr1", sdEvents);
+    }
+
+    @Test
+    void removeAfterRetry() throws Exception {
+        TestPublisher>> sdEvents = pubs.take();
+        ServiceDiscovererEvent evt1 = sendUpAndVerifyReceived("addr1", sdEvents);
+
+        sdEvents = triggerRetry(sdEvents);
+        verifyNoEventsReceived();
+
+        sdEvents.onNext(asList(evt1, flip(evt1)));
+        verifyReceivedEvents(contains(evt1, flip(evt1)));
+    }
+
+    @Test
+    void addAndRemovePostRetry() throws Exception {
+        TestPublisher>> sdEvents = pubs.take();
+        ServiceDiscovererEvent evt1 = sendUpAndVerifyReceived("addr1", sdEvents);
+        ServiceDiscovererEvent evt2 = sendUpAndVerifyReceived("addr2", sdEvents);
+
+        sdEvents = triggerRetry(sdEvents);
+        verifyNoEventsReceived();
+
+        sdEvents.onNext(asList(evt1, flip(evt1)));
+        verifyReceivedEvents(contains(evt1, flip(evt1), flip(evt2)));
+    }
+
+    @Test
+    void noUnavailableEventsAfterCancel() throws Exception {
+        TestPublisher>> sdEvents = pubs.take();
+
+        sendUpAndVerifyReceived("addr1", sdEvents);
+        ServiceDiscovererEvent evt2 = sendUpAndVerifyReceived("addr2", sdEvents);
+        sendUpAndVerifyReceived("addr3", sdEvents);
+
+        triggerRetry(sdEvents);
+        verifyNoEventsReceived();
+
+        // Cancel and re-subscribe should not produce UNAVAILABLE events
+        subscriber.awaitSubscription().cancel();
+        TestPublisherSubscriber>> newSubscriber =
+                new TestPublisherSubscriber<>();
+        toSource(sd.discover("any")).subscribe(newSubscriber);
+        newSubscriber.awaitSubscription().request(Long.MAX_VALUE);
+        sdEvents = pubs.take();
+
+        ServiceDiscovererEvent evt4 = new DefaultServiceDiscovererEvent<>("addr4", AVAILABLE);
+        sdEvents.onNext(asList(evt2, evt4));
+        Collection> events = receivedEvents(newSubscriber);
+        assertThat("Unexpected event received", events, contains(evt2, evt4));
+        verifyNoEventsReceived(subscriber);
+        verifyNoEventsReceived(newSubscriber);
+    }
+
+    private TestPublisher>> triggerRetry(
+            TestPublisher>> sdEvents)
+            throws Exception {
+        sdEvents.onError(DELIBERATE_EXCEPTION);
+        return pubs.take();
+    }
+
+    private void verifyNoEventsReceived() {
+        verifyNoEventsReceived(subscriber);
+    }
+
+    private void verifyNoEventsReceived(
+            TestPublisherSubscriber>> subscriber) {
+        assertThat("Unexpected event received",
+                subscriber.pollOnNext(DEFAULT_POLL_MILLIS, MILLISECONDS), is(nullValue()));
+    }
+
+    private Collection> receivedEvents(
+            TestPublisherSubscriber>> subscriber) {
+        List>> items = subscriber.takeOnNext(1);
+        assertThat("Unexpected items received", items, hasSize(1));
+        return items.get(0);
+    }
+
+    private void verifyReceivedEvents(Matcher>> matcher) {
+        Collection> events = receivedEvents(subscriber);
+        assertThat("Unexpected event received", events, matcher);
+    }
+
+    private ServiceDiscovererEvent sendUpAndVerifyReceived(String addr,
+                TestPublisher>> sdEvents) {
+        return sendUpAndVerifyReceived(addr, AVAILABLE, sdEvents);
+    }
+
+    private ServiceDiscovererEvent sendUpAndVerifyReceived(String addr, Status status,
+                TestPublisher>> sdEvents) {
+        ServiceDiscovererEvent evt = new DefaultServiceDiscovererEvent<>(addr, status);
+        sdEvents.onNext(singletonList(evt));
+        Collection> received = subscriber.takeOnNext(1)
+                .stream()
+                .flatMap(Collection::stream)
+                .collect(toList());
+        assertThat("Unexpected number of events received", received, hasSize(1));
+        Iterator> iterator = received.iterator();
+        ServiceDiscovererEvent receivedEvt = iterator.next();
+        assertThat("Unexpected event received", receivedEvt.address(), is(addr));
+        assertThat("Unexpected event received", receivedEvt.status(), is(status));
+        assertThat("Unexpected iterator.hasNext()", iterator.hasNext(), is(false));
+        return evt;
+    }
+
+    private static ServiceDiscovererEvent flip(ServiceDiscovererEvent evt) {
+        Status flipped = AVAILABLE.equals(evt.status()) ? UNAVAILABLE : AVAILABLE;
+        return new DefaultServiceDiscovererEvent<>(evt.address(), flipped);
+    }
+}

From a9c3eb675381072a79b5c21ca935f5ee530993cd Mon Sep 17 00:00:00 2001
From: Idel Pivnitskiy 
Date: Mon, 15 Jul 2024 12:54:23 -0700
Subject: [PATCH 4/5] add warning if receive UNAVAILABLE events after
 re-subscribe

---
 .../http/netty/RetryingServiceDiscoverer.java            | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
index 5eca70581f..38f7cb9390 100644
--- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
@@ -125,15 +125,24 @@ Collection consumeAndFilter(final Collection events) {
             // duplicate events because retry strategy should not alter the original flow from ServiceDiscoverer.
             assert currentState.isEmpty();
             final List toReturn = new ArrayList<>(events.size() + retainedState.size());
+            int unavailableCounter = 0;
             for (E event : events) {
                 final R address = event.address();
                 toReturn.add(event);
                 retainedState.remove(address);
                 if (!UNAVAILABLE.equals(event.status())) {
                     currentState.put(address, event);
+                } else {
+                    ++unavailableCounter;
                 }
             }
 
+            if (unavailableCounter > 0) {
+                LOGGER.warn("{} received {} UNAVAILABLE events but expected a new 'state of the world'. This is an " +
+                        "indicator of a buggy ServiceDiscoverer implementation that doesn't honor the API contract.",
+                        targetResource, unavailableCounter);
+            }
+
             for (E event : retainedState.values()) {
                 assert event.status() != UNAVAILABLE;
                 toReturn.add(makeUnavailable.apply(event));

From 99ebe670f25664f06ea55a22a0bc56826b67f4a7 Mon Sep 17 00:00:00 2001
From: Idel Pivnitskiy 
Date: Thu, 18 Jul 2024 13:57:34 -0700
Subject: [PATCH 5/5] Fix noUnavailableEventsAfterCancel() test

It should re-subscribe to the same publisher instead of calling
`discover` one more time
---
 .../http/netty/RetryingServiceDiscovererTest.java           | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingServiceDiscovererTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingServiceDiscovererTest.java
index 20b1f43223..62b393dfa3 100644
--- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingServiceDiscovererTest.java
+++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingServiceDiscovererTest.java
@@ -67,6 +67,7 @@ class RetryingServiceDiscovererTest {
     private final TestPublisherSubscriber>> subscriber =
             new TestPublisherSubscriber<>();
     private final ServiceDiscoverer> sd;
+    private final Publisher>> discoveryEvents;
 
     RetryingServiceDiscovererTest() {
         @SuppressWarnings("unchecked")
@@ -79,11 +80,12 @@ class RetryingServiceDiscovererTest {
         sd = new RetryingServiceDiscoverer<>(RetryingServiceDiscovererTest.class.getSimpleName(), delegate,
                 (count, t) -> Completable.completed(), GlobalExecutionContext.globalExecutionContext(),
                 e -> new DefaultServiceDiscovererEvent<>(e.address(), UNAVAILABLE));
+        discoveryEvents = sd.discover("any");
     }
 
     @BeforeEach
     void setUp() {
-        toSource(sd.discover("any")).subscribe(subscriber);
+        toSource(discoveryEvents).subscribe(subscriber);
         subscriber.awaitSubscription().request(Long.MAX_VALUE);
     }
 
@@ -234,7 +236,7 @@ void noUnavailableEventsAfterCancel() throws Exception {
         subscriber.awaitSubscription().cancel();
         TestPublisherSubscriber>> newSubscriber =
                 new TestPublisherSubscriber<>();
-        toSource(sd.discover("any")).subscribe(newSubscriber);
+        toSource(discoveryEvents).subscribe(newSubscriber);
         newSubscriber.awaitSubscription().request(Long.MAX_VALUE);
         sdEvents = pubs.take();