Skip to content

Commit

Permalink
loadbalancer-experimental: fix some linter warnings/errors (#2997)
Browse files Browse the repository at this point in the history
Motivation:

We have a few warnings and errors that are popping up that can
be addressed. Most revolve around the use of raw types and
nullability.

Modifications:

Fix the warnings and errors.
  • Loading branch information
bryce-anderson authored Jul 9, 2024
1 parent 3c6dbfc commit 991a400
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ protected final Single<C> noActiveHostsFailure(List<? extends Host<ResolvedAddre
}

// This method assumes the host is considered healthy.
protected final @Nullable Single<C> selectFromHost(Host<ResolvedAddress, C> host, Predicate<C> selector,
@Nullable
protected final Single<C> selectFromHost(Host<ResolvedAddress, C> host, Predicate<C> selector,
boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) {
// First see if we can get an existing connection regardless of health status.
if (!forceNewConnectionAndReserve) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ private enum State {
private final Addr address;
@Nullable
private final HealthCheckConfig healthCheckConfig;
@Nullable
private final ConnectionPoolStrategy<C> connectionPoolStrategy;
@Nullable
private final HealthIndicator<Addr, C> healthIndicator;
Expand All @@ -97,8 +96,8 @@ private enum State {
DefaultHost(final String lbDescription, final Addr address,
final ConnectionPoolStrategy<C> connectionPoolStrategy,
final ConnectionFactory<Addr, ? extends C> connectionFactory,
final HostObserver hostObserver, final @Nullable HealthCheckConfig healthCheckConfig,
final @Nullable HealthIndicator healthIndicator) {
final HostObserver hostObserver, @Nullable final HealthCheckConfig healthCheckConfig,
@Nullable final HealthIndicator<Addr, C> healthIndicator) {
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
this.address = requireNonNull(address, "address");
this.healthIndicator = healthIndicator;
Expand Down Expand Up @@ -179,14 +178,15 @@ public boolean markExpired() {
}

@Override
public @Nullable C pickConnection(Predicate<C> selector, @Nullable final ContextMap context) {
@Nullable
public C pickConnection(Predicate<C> selector, @Nullable final ContextMap context) {
final List<C> connections = connState.connections;
return connectionPoolStrategy.select(connections, selector);
}

@Override
public Single<C> newConnection(
Predicate<C> selector, final boolean forceNewConnectionAndReserve, final @Nullable ContextMap context) {
Predicate<C> selector, final boolean forceNewConnectionAndReserve, @Nullable final ContextMap context) {
return Single.defer(() -> {
ContextMap actualContext = context;
if (actualContext == null) {
Expand Down Expand Up @@ -274,8 +274,6 @@ private void onConnectionError(Throwable cause) {
assert healthCheckConfig != null;
for (;;) {
ConnState previous = connStateUpdater.get(this);
// TODO: if we have a failure, why does it matter if the connections are there?
// If we try to make a new connection (maybe the pool is small) it would likely fail.
if (!previous.isActive() || !previous.connections.isEmpty()
|| cause instanceof ConnectionLimitReachedException) {
LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.",
Expand All @@ -292,7 +290,8 @@ private void onConnectionError(Throwable cause) {
lbDescription, address, nextState.failedConnections,
healthCheckConfig.failedThreshold, cause);
} else {
assert nextState.state == State.UNHEALTHY;
// if we're unhealthy we also have a non-null healthCheck.
assert nextState.state == State.UNHEALTHY && nextState.healthCheck != null;
LOGGER.info("{}: failed to open a new connection to the host on address {} " +
"{} time(s) in a row. Error counting threshold reached, marking this host as " +
"UNHEALTHY for the selection algorithm and triggering background health-checking.",
Expand All @@ -318,7 +317,7 @@ public boolean canMakeNewConnections() {
return state != State.EXPIRED && state != State.CLOSED;
}

private boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) {
private boolean addConnection(final C connection, @Nullable final HealthCheck currentHealthCheck) {
int addAttempt = 0;
for (;;) {
final ConnState previous = connStateUpdater.get(this);
Expand Down Expand Up @@ -446,6 +445,7 @@ private Completable doClose(final boolean graceful) {
private void cancelIfHealthCheck(ConnState connState) {
if (connState.isUnhealthy()) {
LOGGER.debug("{}: health check cancelled for {}.", lbDescription, this);
assert connState.healthCheck != null; // guaranteed by `.isUnhealthy()`
connState.healthCheck.cancel();
}
}
Expand Down Expand Up @@ -528,7 +528,8 @@ private final class ConnState {
@Nullable
HealthCheck healthCheck;

private ConnState(final List<C> connections, State state, int failedConnections, HealthCheck healthCheck) {
private ConnState(final List<C> connections, State state, int failedConnections,
@Nullable final HealthCheck healthCheck) {
// These asserts codify the invariants of the state.
// if the state is unhealthy there must be a healthcheck
assert state != State.UNHEALTHY || healthCheck != null;
Expand All @@ -543,6 +544,7 @@ private ConnState(final List<C> connections, State state, int failedConnections,
}

ConnState toNextFailedConnection(Throwable cause) {
assert healthCheckConfig != null;
final int nextFailedCount = addWithOverflowProtection(this.failedConnections, 1);
if (state == State.ACTIVE && healthCheckConfig.failedThreshold <= nextFailedCount) {
return new ConnState(connections, State.UNHEALTHY, nextFailedCount, new HealthCheck(cause));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ private <T extends PrioritizedHost> List<T> rebuildWithPriorities(final List<T>
// #zone-aware-load-balancing
// Consolidate our hosts into their respective priority groups. Since we're going to use a map we must use
// and ordered map (in this case a TreeMap) so that we can iterate in order of group priority.
TreeMap<Integer, Group> groups = new TreeMap<>();
TreeMap<Integer, Group<T>> groups = new TreeMap<>();
for (T host : hosts) {
if (host.priority() < 0) {
LOGGER.warn("{}: Illegal priority: {} (expected priority >=0). Ignoring priority data.",
lbDescription, host.priority());
return hosts;
}
Group group = groups.computeIfAbsent(host.priority(), i -> new Group());
Group<T> group = groups.computeIfAbsent(host.priority(), i -> new Group<>());
if (host.isHealthy()) {
group.healthyCount++;
}
Expand All @@ -81,7 +81,7 @@ private <T extends PrioritizedHost> List<T> rebuildWithPriorities(final List<T>

// Compute the health percentage for each group.
int totalHealthPercentage = 0;
for (Group group : groups.values()) {
for (Group<T> group : groups.values()) {
group.healthPercentage = Math.min(100, overProvisionPercentage * group.healthyCount / group.hosts.size());
totalHealthPercentage = Math.min(100, totalHealthPercentage + group.healthPercentage);
}
Expand All @@ -95,7 +95,7 @@ private <T extends PrioritizedHost> List<T> rebuildWithPriorities(final List<T>
List<T> weightedResults = new ArrayList<>();
int activeGroups = 0;
int remainingProbability = 100;
for (Group group : groups.values()) {
for (Group<T> group : groups.values()) {
assert !group.hosts.isEmpty();
final int groupProbability = Math.min(remainingProbability,
group.healthPercentage * 100 / totalHealthPercentage);
Expand All @@ -109,7 +109,7 @@ private <T extends PrioritizedHost> List<T> rebuildWithPriorities(final List<T>
}
}
// We should have at least one host now: if all the hosts were unhealthy the `totalHealthyPercentage` would be
// zero and we would have bailed before re-weighting. If the weights of a group were all zero we should have
// zero, and we would have bailed before re-weighting. If the weights of a group were all zero we should have
// re-weighted them all equally and added them.
assert !weightedResults.isEmpty();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
* are unable to have a connection established. Providing {@code null} disables this mechanism (meaning the host
* continues being eligible for connecting on the request path).
* @param outlierDetectorFactory outlier detector factory.
* @see RoundRobinLoadBalancerFactory
*/
DefaultLoadBalancer(
final String id,
Expand Down Expand Up @@ -412,7 +411,8 @@ private PrioritizedHostImpl<ResolvedAddress, C> createHost(ServiceDiscovererEven
ResolvedAddress addr = event.address();
final LoadBalancerObserver.HostObserver hostObserver = loadBalancerObserver.hostObserver(addr);
// All hosts will share the health check config of the parent load balancer.
final HealthIndicator indicator = outlierDetector.newHealthIndicator(addr, hostObserver);
final HealthIndicator<ResolvedAddress, C> indicator =
outlierDetector.newHealthIndicator(addr, hostObserver);
// We don't need the host level health check if we are either not health checking at all or if the
// failed connect threshold is negative, meaning disabled.
final HealthCheckConfig hostHealthCheckConfig =
Expand All @@ -421,9 +421,7 @@ private PrioritizedHostImpl<ResolvedAddress, C> createHost(ServiceDiscovererEven
new DefaultHost<>(lbDescription, addr, connectionPoolStrategy,
connectionFactory, hostObserver, hostHealthCheckConfig, indicator),
eventWeight(event), eventPriority(event));
if (indicator != null) {
indicator.setHost(host);
}
indicator.setHost(host);
host.onClose().afterFinally(() ->
sequentialExecutor.execute(() -> {
final List<PrioritizedHostImpl<ResolvedAddress, C>> currentHosts = usedHosts;
Expand Down Expand Up @@ -523,7 +521,7 @@ private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final
Single<C> result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve);
return result.beforeOnError(exn -> {
if (exn instanceof NoActiveHostException) {
if (!currentHostSelector.isHealthy()) {
if (healthCheckConfig != null && !currentHostSelector.isHealthy()) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(
@Override
public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
@Nullable LoadBalancerObserver loadBalancerObserver) {
return loadBalancerObserver(ignored -> loadBalancerObserver);
return loadBalancerObserver(loadBalancerObserver == null ? null : ignored -> loadBalancerObserver);
}

@Override
Expand All @@ -71,8 +71,7 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(

@Override
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorConfig(OutlierDetectorConfig outlierDetectorConfig) {
this.outlierDetectorConfig = outlierDetectorConfig == null ?
OutlierDetectorConfig.DEFAULT_CONFIG : outlierDetectorConfig;
this.outlierDetectorConfig = requireNonNull(outlierDetectorConfig, "outlierDetectorConfig");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.client.api.ServiceDiscovererEvent;

import java.util.Collection;
import javax.annotation.Nullable;

final class NoopLoadBalancerObserver implements LoadBalancerObserver {

Expand Down Expand Up @@ -78,7 +79,7 @@ public void onActiveHostRemoved(int connectionCount) {
}

@Override
public void onHostMarkedUnhealthy(Throwable cause) {
public void onHostMarkedUnhealthy(@Nullable Throwable cause) {
// noop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ private void drain(Cell next) {
final Thread thisThread = Thread.currentThread();
currentDrainingThread = thisThread;
for (;;) {
assert next != null;
try {
next.runnable.run();
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ public final boolean isHealthy() {
// than the eviction time technically prescribes.
if (evictedUntilNanos <= currentTimeNanos()) {
sequentialExecutor.execute(() -> {
if (!cancelled && this.evictedUntilNanos != null && this.evictedUntilNanos <= currentTimeNanos()) {
final Long innerEvictedUntilNanos = this.evictedUntilNanos;
if (!cancelled && innerEvictedUntilNanos != null && innerEvictedUntilNanos <= currentTimeNanos()) {
sequentialRevive();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void cancel() {
kernel.cancel();
sequentialExecutor.execute(() -> {
List<XdsHealthIndicator<ResolvedAddress, C>> indicatorList = new ArrayList<>(indicators);
for (XdsHealthIndicator indicator : indicatorList) {
for (XdsHealthIndicator<ResolvedAddress, C> indicator : indicatorList) {
indicator.sequentialCancel();
}
assert indicators.isEmpty();
Expand Down Expand Up @@ -248,7 +248,7 @@ private static final class AlwaysHealthyOutlierDetectorAlgorithm<ResolvedAddress
public void detectOutliers(final OutlierDetectorConfig config,
final Collection<? extends XdsHealthIndicator<ResolvedAddress, C>> indicators) {
int unhealthy = 0;
for (XdsHealthIndicator indicator : indicators) {
for (XdsHealthIndicator<ResolvedAddress, C> indicator : indicators) {
// Hosts can still be marked unhealthy due to consecutive failures.
final boolean isHealthy = indicator.isHealthy();
if (isHealthy) {
Expand Down

0 comments on commit 991a400

Please sign in to comment.