diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java index a26ee1f296..8f8b7daa62 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java @@ -75,7 +75,8 @@ protected final Single noActiveHostsFailure(List selectFromHost(Host host, Predicate selector, + @Nullable + protected final Single selectFromHost(Host host, Predicate selector, boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) { // First see if we can get an existing connection regardless of health status. if (!forceNewConnectionAndReserve) { diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java index c264137aa8..6ff63dfb03 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java @@ -85,7 +85,6 @@ private enum State { private final Addr address; @Nullable private final HealthCheckConfig healthCheckConfig; - @Nullable private final ConnectionPoolStrategy connectionPoolStrategy; @Nullable private final HealthIndicator healthIndicator; @@ -97,8 +96,8 @@ private enum State { DefaultHost(final String lbDescription, final Addr address, final ConnectionPoolStrategy connectionPoolStrategy, final ConnectionFactory connectionFactory, - final HostObserver hostObserver, final @Nullable HealthCheckConfig healthCheckConfig, - final @Nullable HealthIndicator healthIndicator) { + final HostObserver hostObserver, @Nullable final HealthCheckConfig healthCheckConfig, + @Nullable final HealthIndicator healthIndicator) { this.lbDescription = requireNonNull(lbDescription, "lbDescription"); this.address = requireNonNull(address, "address"); this.healthIndicator = healthIndicator; @@ -179,14 +178,15 @@ public boolean markExpired() { } @Override - public @Nullable C pickConnection(Predicate selector, @Nullable final ContextMap context) { + @Nullable + public C pickConnection(Predicate selector, @Nullable final ContextMap context) { final List connections = connState.connections; return connectionPoolStrategy.select(connections, selector); } @Override public Single newConnection( - Predicate selector, final boolean forceNewConnectionAndReserve, final @Nullable ContextMap context) { + Predicate selector, final boolean forceNewConnectionAndReserve, @Nullable final ContextMap context) { return Single.defer(() -> { ContextMap actualContext = context; if (actualContext == null) { @@ -274,8 +274,6 @@ private void onConnectionError(Throwable cause) { assert healthCheckConfig != null; for (;;) { ConnState previous = connStateUpdater.get(this); - // TODO: if we have a failure, why does it matter if the connections are there? - // If we try to make a new connection (maybe the pool is small) it would likely fail. if (!previous.isActive() || !previous.connections.isEmpty() || cause instanceof ConnectionLimitReachedException) { LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.", @@ -292,7 +290,8 @@ private void onConnectionError(Throwable cause) { lbDescription, address, nextState.failedConnections, healthCheckConfig.failedThreshold, cause); } else { - assert nextState.state == State.UNHEALTHY; + // if we're unhealthy we also have a non-null healthCheck. + assert nextState.state == State.UNHEALTHY && nextState.healthCheck != null; LOGGER.info("{}: failed to open a new connection to the host on address {} " + "{} time(s) in a row. Error counting threshold reached, marking this host as " + "UNHEALTHY for the selection algorithm and triggering background health-checking.", @@ -318,7 +317,7 @@ public boolean canMakeNewConnections() { return state != State.EXPIRED && state != State.CLOSED; } - private boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) { + private boolean addConnection(final C connection, @Nullable final HealthCheck currentHealthCheck) { int addAttempt = 0; for (;;) { final ConnState previous = connStateUpdater.get(this); @@ -446,6 +445,7 @@ private Completable doClose(final boolean graceful) { private void cancelIfHealthCheck(ConnState connState) { if (connState.isUnhealthy()) { LOGGER.debug("{}: health check cancelled for {}.", lbDescription, this); + assert connState.healthCheck != null; // guaranteed by `.isUnhealthy()` connState.healthCheck.cancel(); } } @@ -528,7 +528,8 @@ private final class ConnState { @Nullable HealthCheck healthCheck; - private ConnState(final List connections, State state, int failedConnections, HealthCheck healthCheck) { + private ConnState(final List connections, State state, int failedConnections, + @Nullable final HealthCheck healthCheck) { // These asserts codify the invariants of the state. // if the state is unhealthy there must be a healthcheck assert state != State.UNHEALTHY || healthCheck != null; @@ -543,6 +544,7 @@ private ConnState(final List connections, State state, int failedConnections, } ConnState toNextFailedConnection(Throwable cause) { + assert healthCheckConfig != null; final int nextFailedCount = addWithOverflowProtection(this.failedConnections, 1); if (state == State.ACTIVE && healthCheckConfig.failedThreshold <= nextFailedCount) { return new ConnState(connections, State.UNHEALTHY, nextFailedCount, new HealthCheck(cause)); diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategy.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategy.java index a72fdfa410..5a6271b615 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategy.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategy.java @@ -59,14 +59,14 @@ private List rebuildWithPriorities(final List // #zone-aware-load-balancing // Consolidate our hosts into their respective priority groups. Since we're going to use a map we must use // and ordered map (in this case a TreeMap) so that we can iterate in order of group priority. - TreeMap groups = new TreeMap<>(); + TreeMap> groups = new TreeMap<>(); for (T host : hosts) { if (host.priority() < 0) { LOGGER.warn("{}: Illegal priority: {} (expected priority >=0). Ignoring priority data.", lbDescription, host.priority()); return hosts; } - Group group = groups.computeIfAbsent(host.priority(), i -> new Group()); + Group group = groups.computeIfAbsent(host.priority(), i -> new Group<>()); if (host.isHealthy()) { group.healthyCount++; } @@ -81,7 +81,7 @@ private List rebuildWithPriorities(final List // Compute the health percentage for each group. int totalHealthPercentage = 0; - for (Group group : groups.values()) { + for (Group group : groups.values()) { group.healthPercentage = Math.min(100, overProvisionPercentage * group.healthyCount / group.hosts.size()); totalHealthPercentage = Math.min(100, totalHealthPercentage + group.healthPercentage); } @@ -95,7 +95,7 @@ private List rebuildWithPriorities(final List List weightedResults = new ArrayList<>(); int activeGroups = 0; int remainingProbability = 100; - for (Group group : groups.values()) { + for (Group group : groups.values()) { assert !group.hosts.isEmpty(); final int groupProbability = Math.min(remainingProbability, group.healthPercentage * 100 / totalHealthPercentage); @@ -109,7 +109,7 @@ private List rebuildWithPriorities(final List } } // We should have at least one host now: if all the hosts were unhealthy the `totalHealthyPercentage` would be - // zero and we would have bailed before re-weighting. If the weights of a group were all zero we should have + // zero, and we would have bailed before re-weighting. If the weights of a group were all zero we should have // re-weighted them all equally and added them. assert !weightedResults.isEmpty(); diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java index 0024a29411..808e1810da 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java @@ -129,7 +129,6 @@ final class DefaultLoadBalancer createHost(ServiceDiscovererEven ResolvedAddress addr = event.address(); final LoadBalancerObserver.HostObserver hostObserver = loadBalancerObserver.hostObserver(addr); // All hosts will share the health check config of the parent load balancer. - final HealthIndicator indicator = outlierDetector.newHealthIndicator(addr, hostObserver); + final HealthIndicator indicator = + outlierDetector.newHealthIndicator(addr, hostObserver); // We don't need the host level health check if we are either not health checking at all or if the // failed connect threshold is negative, meaning disabled. final HealthCheckConfig hostHealthCheckConfig = @@ -421,9 +421,7 @@ private PrioritizedHostImpl createHost(ServiceDiscovererEven new DefaultHost<>(lbDescription, addr, connectionPoolStrategy, connectionFactory, hostObserver, hostHealthCheckConfig, indicator), eventWeight(event), eventPriority(event)); - if (indicator != null) { - indicator.setHost(host); - } + indicator.setHost(host); host.onClose().afterFinally(() -> sequentialExecutor.execute(() -> { final List> currentHosts = usedHosts; @@ -523,7 +521,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final Single result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve); return result.beforeOnError(exn -> { if (exn instanceof NoActiveHostException) { - if (!currentHostSelector.isHealthy()) { + if (healthCheckConfig != null && !currentHostSelector.isHealthy()) { final long currNextResubscribeTime = nextResubscribeTime; if (currNextResubscribeTime >= 0 && healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancerBuilder.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancerBuilder.java index db63e29fd0..e17e76d357 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancerBuilder.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancerBuilder.java @@ -59,7 +59,7 @@ public LoadBalancerBuilder loadBalancingPolicy( @Override public LoadBalancerBuilder loadBalancerObserver( @Nullable LoadBalancerObserver loadBalancerObserver) { - return loadBalancerObserver(ignored -> loadBalancerObserver); + return loadBalancerObserver(loadBalancerObserver == null ? null : ignored -> loadBalancerObserver); } @Override @@ -71,8 +71,7 @@ public LoadBalancerBuilder loadBalancerObserver( @Override public LoadBalancerBuilder outlierDetectorConfig(OutlierDetectorConfig outlierDetectorConfig) { - this.outlierDetectorConfig = outlierDetectorConfig == null ? - OutlierDetectorConfig.DEFAULT_CONFIG : outlierDetectorConfig; + this.outlierDetectorConfig = requireNonNull(outlierDetectorConfig, "outlierDetectorConfig"); return this; } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java index d6eb8ad183..29693dd45b 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java @@ -19,6 +19,7 @@ import io.servicetalk.client.api.ServiceDiscovererEvent; import java.util.Collection; +import javax.annotation.Nullable; final class NoopLoadBalancerObserver implements LoadBalancerObserver { @@ -78,7 +79,7 @@ public void onActiveHostRemoved(int connectionCount) { } @Override - public void onHostMarkedUnhealthy(Throwable cause) { + public void onHostMarkedUnhealthy(@Nullable Throwable cause) { // noop } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/SequentialExecutor.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/SequentialExecutor.java index 394e1e94c9..c54fb73381 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/SequentialExecutor.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/SequentialExecutor.java @@ -86,7 +86,6 @@ private void drain(Cell next) { final Thread thisThread = Thread.currentThread(); currentDrainingThread = thisThread; for (;;) { - assert next != null; try { next.runnable.run(); } catch (Throwable ex) { diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsHealthIndicator.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsHealthIndicator.java index 5746f4b482..50aba4e6fa 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsHealthIndicator.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsHealthIndicator.java @@ -125,7 +125,8 @@ public final boolean isHealthy() { // than the eviction time technically prescribes. if (evictedUntilNanos <= currentTimeNanos()) { sequentialExecutor.execute(() -> { - if (!cancelled && this.evictedUntilNanos != null && this.evictedUntilNanos <= currentTimeNanos()) { + final Long innerEvictedUntilNanos = this.evictedUntilNanos; + if (!cancelled && innerEvictedUntilNanos != null && innerEvictedUntilNanos <= currentTimeNanos()) { sequentialRevive(); } }); diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsOutlierDetector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsOutlierDetector.java index de3a2a0358..b02a6b94ce 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsOutlierDetector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsOutlierDetector.java @@ -111,7 +111,7 @@ public void cancel() { kernel.cancel(); sequentialExecutor.execute(() -> { List> indicatorList = new ArrayList<>(indicators); - for (XdsHealthIndicator indicator : indicatorList) { + for (XdsHealthIndicator indicator : indicatorList) { indicator.sequentialCancel(); } assert indicators.isEmpty(); @@ -248,7 +248,7 @@ private static final class AlwaysHealthyOutlierDetectorAlgorithm> indicators) { int unhealthy = 0; - for (XdsHealthIndicator indicator : indicators) { + for (XdsHealthIndicator indicator : indicators) { // Hosts can still be marked unhealthy due to consecutive failures. final boolean isHealthy = indicator.isHealthy(); if (isHealthy) {