From d02f8668611273f88e34ac39dcc69be07754bf44 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 3 May 2024 11:39:47 -0600 Subject: [PATCH] loadbalancer-experimental: add support for prioritization (#2905) Motivation: We want to support priority groups in the LB. Modifications: Support it. --- .../DefaultHostPriorityStrategy.java | 162 +++++++++++ .../loadbalancer/DefaultLoadBalancer.java | 201 +++++++++++-- .../loadbalancer/HostPriorityStrategy.java | 30 ++ .../loadbalancer/NoopOutlierDetector.java | 6 + .../loadbalancer/OutlierDetector.java | 3 + .../loadbalancer/PrioritizedHost.java | 46 +++ .../loadbalancer/XdsHealthIndicator.java | 2 +- .../loadbalancer/XdsOutlierDetector.java | 31 +- .../DefaultHostPriorityStrategyTest.java | 271 ++++++++++++++++++ .../loadbalancer/DefaultLoadBalancerTest.java | 6 + .../loadbalancer/P2CSelectorTest.java | 3 +- 11 files changed, 736 insertions(+), 25 deletions(-) create mode 100644 servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategy.java create mode 100644 servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HostPriorityStrategy.java create mode 100644 servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/PrioritizedHost.java create mode 100644 servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategyTest.java diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategy.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategy.java new file mode 100644 index 0000000000..e8921bd547 --- /dev/null +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategy.java @@ -0,0 +1,162 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import static io.servicetalk.utils.internal.NumberUtils.ensurePositive; + +final class DefaultHostPriorityStrategy implements HostPriorityStrategy { + + static final HostPriorityStrategy INSTANCE = new DefaultHostPriorityStrategy(); + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHostPriorityStrategy.class); + private static final int DEFAULT_OVER_PROVISION_FACTOR = 140; + + + private final int overProvisionPercentage; + + DefaultHostPriorityStrategy() { + this(DEFAULT_OVER_PROVISION_FACTOR); + } + + // exposed for testing + DefaultHostPriorityStrategy(final int overProvisionPercentage) { + this.overProvisionPercentage = ensurePositive(overProvisionPercentage, "overProvisionPercentage"); + } + + @Override + public List prioritize(List hosts) { + // no need to compute priorities if there are no hosts. + return hosts.isEmpty() ? hosts : rebuildWithPriorities(hosts); + } + + private List rebuildWithPriorities(final List hosts) { + assert !hosts.isEmpty(); + + // TODO: this precludes having an expected amount of traffic favor local zones and the rest routed to + // remote zones intentionally even if all hosts are well. + // https://www.envoyproxy.io/docs/envoy/latest/configuration/upstream/cluster_manager/cluster_runtime + // #zone-aware-load-balancing + // The behavior could be structured more as a tree, but it's not obvious how to feed such a tree into the load + // balancer. + List groups = new ArrayList<>(); + // First consolidate our hosts into their respective priority groups. + for (T host : hosts) { + if (host.priority() < 0) { + LOGGER.warn("Found illegal priority: {}. Dropping priority grouping data.", host.priority()); + return hosts; + } + Group group = getGroup(groups, host.priority()); + if (host.isHealthy()) { + group.healthyCount++; + } + group.hosts.add(host); + } + + // If there is only a single group we don't need to adjust weights. + if (groups.size() == 1) { + return hosts; + } + + // Compute the health percentage for each group. + int totalHealthPercentage = 0; + for (Group group : groups) { + group.healthPercentage = Math.min(100, overProvisionPercentage * group.healthyCount / group.hosts.size()); + totalHealthPercentage = Math.min(100, totalHealthPercentage + group.healthPercentage); + } + if (totalHealthPercentage == 0) { + // nothing is considered healthy so everything is considered healthy. + return hosts; + } + + // We require that we have a continuous priority set. We could relax this if we wanted by using a tree map to + // traverse in order. However, I think it's also a requirement of other xDS compatible implementations. + List weightedResults = new ArrayList<>(); + int remainingProbability = 100; + for (int i = 0; i < groups.size() && remainingProbability > 0; i++) { + Group group = groups.get(i); + if (group.hosts.isEmpty()) { + // we don't have a continuous priority group. Warn and rebuild without priorities. + LOGGER.warn("Non-continuous priority groups: {} total groups but missing group {}. " + + "Dropping priority grouping data.", groups.size(), i); + return hosts; + } + + // We need to compute the weight for the group. + final int groupProbability = Math.min(remainingProbability, + group.healthPercentage * 100 / totalHealthPercentage); + if (groupProbability == 0) { + // TODO: this means all hosts for this group are unhealthy. This may be worth some logging. + } else { + remainingProbability -= groupProbability; + group.addToResults(groupProbability, weightedResults); + } + } + // What to do if we don't have any healthy nodes at all? + if (remainingProbability > 0) { + // TODO: this is an awkward situation. Should we panic and just discard priority information? + } + return weightedResults; + } + + private Group getGroup(List groups, int priority) { + while (groups.size() < priority + 1) { + groups.add(new Group()); + } + return groups.get(priority); + } + + private static class Group { + final List hosts = new ArrayList<>(); + int healthyCount; + int healthPercentage; + + private void addToResults(int groupProbability, List results) { + // Add all the members of the group after we recompute their weights. To recompute the weights we're going + // to normalize against their group probability. + double groupTotalWeight = totalWeight(hosts); + if (groupTotalWeight == 0) { + double weight = ((double) groupProbability) / hosts.size(); + for (H host : hosts) { + host.loadBalancingWeight(weight); + results.add(host); + } + } else { + double scalingFactor = groupProbability / groupTotalWeight; + for (H host : hosts) { + double hostWeight = host.loadBalancingWeight() * scalingFactor; + host.loadBalancingWeight(hostWeight); + if (hostWeight > 0) { + results.add(host); + } + } + } + } + } + + private static double totalWeight(Iterable hosts) { + double sum = 0; + for (PrioritizedHost host : hosts) { + sum += host.loadBalancingWeight(); + } + return sum; + } +} diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java index b389b9414c..d82135d110 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java @@ -62,6 +62,7 @@ import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.utils.internal.NumberUtils.ensureNonNegative; import static java.lang.Integer.toHexString; import static java.lang.System.identityHashCode; import static java.util.Collections.emptyList; @@ -91,7 +92,7 @@ final class DefaultLoadBalancer hostSelector; // reads and writes are protected by `sequentialExecutor`. - private List> usedHosts = emptyList(); + private List> usedHosts = emptyList(); // reads and writes are protected by `sequentialExecutor`. private boolean isClosed; @@ -106,6 +107,7 @@ final class DefaultLoadBalancer connectionFactory; @Nullable private final HealthCheckConfig healthCheckConfig; + private final HostPriorityStrategy priorityStrategy; private final OutlierDetector outlierDetector; private final LoadBalancerObserver loadBalancerObserver; private final ListenableAsyncCloseable asyncCloseable; @@ -136,6 +138,7 @@ final class DefaultLoadBalancer + sequentialExecutor.execute(() -> sequentialUpdateUsedHosts(usedHosts))); } private void subscribeToEvents(boolean resubscribe) { @@ -180,7 +187,7 @@ private Completable doClose(final boolean graceful) { outlierDetector.cancel(); } isClosed = true; - List> currentList = usedHosts; + List> currentList = usedHosts; final CompositeCloseable compositeCloseable = newCompositeCloseable() .appendAll(currentList) .appendAll(connectionFactory); @@ -278,8 +285,9 @@ private void sequentialOnNext(Collection> nextHosts = new ArrayList<>(usedHosts.size() + events.size()); - final List> oldUsedHosts = usedHosts; + final List> nextHosts = new ArrayList<>( + usedHosts.size() + events.size()); + final List> oldUsedHosts = usedHosts; // First we make a map of addresses to events so that we don't get quadratic behavior for diffing. final Map> eventMap = new HashMap<>(); for (ServiceDiscovererEvent event : events) { @@ -294,12 +302,22 @@ private void sequentialOnNext(Collection host : oldUsedHosts) { + for (PrioritizedHostImpl host : oldUsedHosts) { ServiceDiscovererEvent event = eventMap.remove(host.address()); if (event == null) { // Host doesn't have a SD update so just copy it over. nextHosts.add(host); - } else if (AVAILABLE.equals(event.status())) { + continue; + } + // Set the new weight and priority of the host. + double oldSDWeight = host.serviceDiscoveryWeight(); + int oldPriority = host.priority(); + host.serviceDiscoveryWeight(eventWeight(event)); + host.priority(eventPriority(event)); + hostSetChanged = hostSetChanged + || oldPriority != host.priority() || oldSDWeight != host.serviceDiscoveryWeight(); + + if (AVAILABLE.equals(event.status())) { // We only send the ready event if the previous host list was empty. sendReadyEvent = oldUsedHosts.isEmpty(); // If the host is already in CLOSED state, we should discard it and create a new entry. @@ -309,7 +327,7 @@ private void sequentialOnNext(Collection createHost(ResolvedAddress addr) { + private PrioritizedHostImpl createHost(ServiceDiscovererEvent event) { + ResolvedAddress addr = event.address(); final LoadBalancerObserver.HostObserver hostObserver = loadBalancerObserver.hostObserver(addr); // All hosts will share the health check config of the parent load balancer. final HealthIndicator indicator = outlierDetector.newHealthIndicator(addr, hostObserver); @@ -388,19 +407,22 @@ private Host createHost(ResolvedAddress addr) { // failed connect threshold is negative, meaning disabled. final HealthCheckConfig hostHealthCheckConfig = healthCheckConfig == null || healthCheckConfig.failedThreshold < 0 ? null : healthCheckConfig; - final Host host = new DefaultHost<>(lbDescription, addr, connectionPoolStrategy, - connectionFactory, hostObserver, hostHealthCheckConfig, indicator); + final PrioritizedHostImpl host = new PrioritizedHostImpl<>( + new DefaultHost<>(lbDescription, addr, connectionPoolStrategy, + connectionFactory, hostObserver, hostHealthCheckConfig, indicator), + eventWeight(event), eventPriority(event)); if (indicator != null) { indicator.setHost(host); } host.onClose().afterFinally(() -> sequentialExecutor.execute(() -> { - final List> currentHosts = usedHosts; + final List> currentHosts = usedHosts; if (currentHosts.isEmpty()) { // Can't remove an entry from an empty list. return; } - final List> nextHosts = listWithHostRemoved(currentHosts, host); + final List> nextHosts = listWithHostRemoved( + currentHosts, host); // we only need to do anything else if we actually removed the host if (nextHosts.size() != currentHosts.size()) { sequentialUpdateUsedHosts(nextHosts); @@ -413,8 +435,9 @@ private Host createHost(ResolvedAddress addr) { return host; } - private List> listWithHostRemoved(List> oldHostsTyped, - Host toRemove) { + private List> listWithHostRemoved( + List> oldHostsTyped, + PrioritizedHostImpl toRemove) { final int index = oldHostsTyped.indexOf(toRemove); if (index < 0) { // Element doesn't exist: just return the old list. @@ -425,7 +448,8 @@ private List> listWithHostRemoved(List> newHosts = new ArrayList<>(oldHostsTyped.size() - 1); + final List> newHosts = + new ArrayList<>(oldHostsTyped.size() - 1); for (int i = 0; i < oldHostsTyped.size(); ++i) { if (i != index) { newHosts.add(oldHostsTyped.get(i)); @@ -441,7 +465,7 @@ public void onError(final Throwable t) { // Terminate processor only if we will never re-subscribe eventStreamProcessor.onError(t); } - List> hosts = usedHosts; + List> hosts = usedHosts; LOGGER.error( "{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.", DefaultLoadBalancer.this, eventPublisher, hosts.size(), hosts, t); @@ -451,7 +475,7 @@ public void onError(final Throwable t) { @Override public void onComplete() { sequentialExecutor.execute(() -> { - List> hosts = usedHosts; + List> hosts = usedHosts; if (healthCheckConfig == null) { // Terminate processor only if we will never re-subscribe eventStreamProcessor.onComplete(); @@ -463,9 +487,9 @@ public void onComplete() { } // must be called from within the SequentialExecutor - private void sequentialUpdateUsedHosts(List> nextHosts) { + private void sequentialUpdateUsedHosts(List> nextHosts) { this.usedHosts = nextHosts; - this.hostSelector = hostSelector.rebuildWithHosts(usedHosts); + this.hostSelector = hostSelector.rebuildWithHosts(priorityStrategy.prioritize(usedHosts)); } @Override @@ -550,7 +574,8 @@ public List>> usedAddresses() { // must be called from within the sequential executor. private List>> sequentialUsedAddresses() { - return usedHosts.stream().map(host -> ((DefaultHost) host).asEntry()).collect(toList()); + return usedHosts.stream().map(host -> + ((DefaultHost) host.delegate()).asEntry()).collect(toList()); } private String makeDescription(String id, String targetResource) { @@ -582,4 +607,138 @@ public int hostSetSize() { return 0; } } + + // TODO: weight and priority need representation on the ServiceDiscovererEvent. + private static double eventWeight(ServiceDiscovererEvent event) { + assert event != null; // to make PMD happy. + return 1.0; + } + + private static int eventPriority(ServiceDiscovererEvent event) { + assert event != null; // to make PMD happy. + return 0; + } + + static final class PrioritizedHostImpl + implements Host, PrioritizedHost { + private final Host delegate; + private int priority; + private double serviceDiscoveryWeight; + private double loadBalancingWeight; + + PrioritizedHostImpl(final Host delegate, final double serviceDiscoveryWeight, + final int priority) { + this.delegate = requireNonNull(delegate, "delegate"); + this.priority = ensureNonNegative(priority, "priority"); + this.serviceDiscoveryWeight = serviceDiscoveryWeight; + this.loadBalancingWeight = serviceDiscoveryWeight; + } + + Host delegate() { + return delegate; + } + + @Override + public int priority() { + return priority; + } + + void priority(final int priority) { + this.priority = priority; + } + + // Set the intrinsic weight of the host. This is the information from service discovery. + // When this is set it also overwrites the load balancing weight which must then be recalculated. + void serviceDiscoveryWeight(final double weight) { + this.serviceDiscoveryWeight = weight; + this.loadBalancingWeight = weight; + } + + double serviceDiscoveryWeight() { + return serviceDiscoveryWeight; + } + + // Set the weight to use in load balancing. This includes derived weight information such as prioritization + // and is what the host selectors will use when picking hosts. + @Override + public void loadBalancingWeight(final double weight) { + this.loadBalancingWeight = weight; + } + + @Override + public double loadBalancingWeight() { + return loadBalancingWeight; + } + + @Override + public int score() { + return delegate.score(); + } + + @Override + public Completable closeAsync() { + return delegate.closeAsync(); + } + + @Override + public Completable closeAsyncGracefully() { + return delegate.closeAsyncGracefully(); + } + + @Override + public Completable onClose() { + return delegate.onClose(); + } + + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + + @Nullable + @Override + public C pickConnection(Predicate selector, @Nullable ContextMap context) { + return delegate.pickConnection(selector, context); + } + + @Override + public Single newConnection(Predicate selector, boolean forceNewConnectionAndReserve, + @Nullable ContextMap context) { + return delegate.newConnection(selector, forceNewConnectionAndReserve, context); + } + + @Override + public ResolvedAddress address() { + return delegate.address(); + } + + @Override + public boolean isHealthy() { + return delegate.isHealthy(); + } + + @Override + public boolean canMakeNewConnections() { + return delegate.canMakeNewConnections(); + } + + @Override + public boolean markActiveIfNotClosed() { + return delegate.markActiveIfNotClosed(); + } + + @Override + public boolean markExpired() { + return delegate.markExpired(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(priority: " + priority + + ", intrinsicWeight: " + serviceDiscoveryWeight + + ", loadBalancedWeight: " + loadBalancingWeight + + ", host: " + delegate + + ")"; + } + } } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HostPriorityStrategy.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HostPriorityStrategy.java new file mode 100644 index 0000000000..9a5d0b241a --- /dev/null +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HostPriorityStrategy.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import java.util.List; + +@FunctionalInterface +interface HostPriorityStrategy { + + /** + * Adjust the host set to account for priority. + * @param hosts the set of hosts to prioritize. + * @return the collection of hosts that should be used and have had their weights adjusted. + * @param the refined type of the {@link PrioritizedHost}. + */ + List prioritize(List hosts); +} diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopOutlierDetector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopOutlierDetector.java index fe65125091..dcb0b1953a 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopOutlierDetector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopOutlierDetector.java @@ -17,6 +17,7 @@ import io.servicetalk.client.api.LoadBalancedConnection; import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.Publisher; import java.util.concurrent.TimeUnit; @@ -46,6 +47,11 @@ public HealthIndicator newHealthIndicator( return new BasicHealthIndicator(); } + @Override + public Publisher healthStatusChanged() { + return Publisher.never(); + } + private final class BasicHealthIndicator extends DefaultRequestTracker implements HealthIndicator { diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/OutlierDetector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/OutlierDetector.java index 8654dfa08e..f7a77617cf 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/OutlierDetector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/OutlierDetector.java @@ -17,6 +17,7 @@ import io.servicetalk.client.api.LoadBalancedConnection; import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.loadbalancer.LoadBalancerObserver.HostObserver; /** @@ -33,4 +34,6 @@ interface OutlierDetector ext * @return new {@link HealthIndicator}. */ HealthIndicator newHealthIndicator(ResolvedAddress address, HostObserver hostObserver); + + Publisher healthStatusChanged(); } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/PrioritizedHost.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/PrioritizedHost.java new file mode 100644 index 0000000000..637c927141 --- /dev/null +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/PrioritizedHost.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +/** + * Handles for determining the weight of a host based on it's underlying weight and priority. + */ +interface PrioritizedHost { + + /** + * The current priority of the host. + * @return the current priority of the host. + */ + int priority(); + + /** + * Whether the host is considered healthy or not. + * @return whether the host is considered healthy or not. + */ + boolean isHealthy(); + + /** + * The weight of the host to use for load balancing. + * @return the weight of the host to use for load balancing. + */ + double loadBalancingWeight(); + + /** + * Set the weight of the host to use during load balancing. + * @param weight the weight of the host to use during load balancing. + */ + void loadBalancingWeight(double weight); +} diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsHealthIndicator.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsHealthIndicator.java index 6b405741ff..7583f48a80 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsHealthIndicator.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsHealthIndicator.java @@ -209,7 +209,7 @@ public final boolean updateOutlierStatus(OutlierDetectorConfig config, boolean i if (evictedUntilNanos <= currentTimeNanos()) { sequentialRevive(); } - // If we are evicted or just transitioned out of eviction we shouldn't be marked as an outlier this round. + // If we are evicted or just transitioned out of eviction we shouldn't be marked as an outlier this round. // Note that this differs from the envoy behavior. If we want to mimic it, then I think we need to just // fall through and maybe attempt to eject again. LOGGER.trace("{}-{}: markAsOutlier(..) resulted in host revival.", lbDescription, address); diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsOutlierDetector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsOutlierDetector.java index b3d9092e7a..9322058b37 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsOutlierDetector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsOutlierDetector.java @@ -17,7 +17,10 @@ import io.servicetalk.client.api.LoadBalancedConnection; import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.PublisherSource; import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.SourceAdapters; import io.servicetalk.concurrent.internal.SequentialCancellable; import io.servicetalk.loadbalancer.LoadBalancerObserver.HostObserver; import io.servicetalk.utils.internal.RandomUtils; @@ -27,12 +30,13 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; import static java.lang.Math.max; import static java.util.Objects.requireNonNull; @@ -67,10 +71,13 @@ final class XdsOutlierDetector healthStatusChangeProcessor = + newPublisherProcessorDropHeadOnOverflow(1); private final Kernel kernel; private final AtomicInteger indicatorCount = new AtomicInteger(); // Protected by `sequentialExecutor`. - private final Set> indicators = new HashSet<>(); + // Note that this is a LinkedHashSet so as to preserve the iteration order. + private final Set> indicators = new LinkedHashSet<>(); // reads and writes are protected by `sequentialExecutor`. private int ejectedHostCount; @@ -110,9 +117,15 @@ public void cancel() { } assert indicators.isEmpty(); assert indicatorCount.get() == 0; + healthStatusChangeProcessor.onComplete(); }); } + @Override + public Publisher healthStatusChanged() { + return SourceAdapters.fromSource(healthStatusChangeProcessor); + } + // Exposed for testing. Not thread safe. int ejectedHostCount() { return ejectedHostCount; @@ -188,10 +201,24 @@ private Cancellable scheduleNextOutliersCheck(OutlierDetectorConfig currentConfi private void sequentialCheckOutliers() { assert sequentialExecutor.isCurrentThreadDraining(); + boolean[] beforeState = new boolean[indicators.size()]; + int i = 0; + for (HealthIndicator indicator : indicators) { + beforeState[i++] = indicator.isHealthy(); + } for (XdsOutlierDetectorAlgorithm outlierDetector : algorithms) { outlierDetector.detectOutliers(config, indicators); } cancellable.nextCancellable(scheduleNextOutliersCheck(config)); + + // now check to see if any of our health states changed + i = 0; + for (HealthIndicator indicator : indicators) { + if (beforeState[i++] != indicator.isHealthy()) { + healthStatusChangeProcessor.onNext(null); + break; + } + } } } diff --git a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategyTest.java b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategyTest.java new file mode 100644 index 0000000000..e397ea58c6 --- /dev/null +++ b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultHostPriorityStrategyTest.java @@ -0,0 +1,271 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import org.hamcrest.Matcher; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; + +class DefaultHostPriorityStrategyTest { + + private final HostPriorityStrategy hostPriorityStrategy = new DefaultHostPriorityStrategy(100); + + @Test + void noPriorities() { + List hosts = makeHosts(4); + List result = hostPriorityStrategy.prioritize(hosts); + assertThat(result.size(), equalTo(hosts.size())); + + for (int i = 0; i < hosts.size(); i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), equalTo(result.get(0).loadBalancedWeight())); + } + } + + @Test + void noPrioritiesWithWeights() { + List hosts = makeHosts(4); + for (int i = 0; i < hosts.size(); i++) { + hosts.get(i).loadBalancingWeight(i + 1d); + } + List result = hostPriorityStrategy.prioritize(hosts); + assertThat(result.size(), equalTo(hosts.size())); + + for (int i = 0; i < hosts.size(); i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), approxEqual(result.get(0).loadBalancedWeight() * (i + 1))); + } + } + + @Test + void twoPrioritiesNoWeights() { + List hosts = makeHosts(6); + for (int i = 3; i < 6; i++) { + hosts.get(i).priority(1); + } + List result = hostPriorityStrategy.prioritize(hosts); + assertThat(result.size(), equalTo(3)); + + for (int i = 0; i < 3; i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), equalTo(result.get(0).loadBalancedWeight())); + } + } + + @Test + void twoPrioritiesWithWeights() { + List hosts = makeHosts(6); + for (int i = 0; i < hosts.size(); i++) { + hosts.get(i).loadBalancingWeight(i + 1d); + if (i >= 3) { + hosts.get(i).priority(1); + } + } + List result = hostPriorityStrategy.prioritize(hosts); + assertThat(result.size(), equalTo(3)); + + // We should only have the first three hosts because they were all healthy, so they are the only group. + for (int i = 0; i < 3; i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), approxEqual(result.get(0).loadBalancedWeight() * (i + 1))); + } + } + + @Test + void priorityGroupsWithoutUnhealthyNodes() { + List hosts = makeHosts(6); + for (int i = 3; i < hosts.size(); i++) { + hosts.get(i).priority(1); + } + List result = hostPriorityStrategy.prioritize(hosts); + + assertThat(result.size(), equalTo(3)); + + // We should only have the first three hosts because they were all healthy, so they are the only group. + for (int i = 0; i < 3; i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), approxEqual(result.get(0).loadBalancedWeight())); + } + } + + @Test + void priorityGroupsWithUnhealthyNodes() { + List hosts = makeHosts(6); + hosts.get(0).isHealthy(false); + for (int i = 3; i < hosts.size(); i++) { + hosts.get(i).priority(1); + } + List result = hostPriorityStrategy.prioritize(hosts); + + assertThat(result.size(), equalTo(6)); + + // We should only have the first three hosts because they were all healthy, so they are the only group. + for (int i = 0; i < 3; i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), approxEqual(result.get(0).loadBalancedWeight())); + } + for (int i = 3; i < 6; i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), approxEqual(result.get(3).loadBalancedWeight())); + } + // Now the relative weights between the two groups should be 66 / 34 as the first group will have 66% health + // and the second, while having 100% healthy, will only be able to pick up the slack. + assertThat(result.get(0).loadBalancedWeight(), approxEqual(result.get(3).loadBalancedWeight() * 66 / 34)); + } + + @Test + void priorityGroupsWithUnhealthyNodesTotallingLessThan100Percent() { + List hosts = makeHosts(6); + for (int i = 0; i < hosts.size(); i++) { + if (i >= 3) { + hosts.get(i).priority(1); + } + hosts.get(i).isHealthy(false); + } + hosts.get(0).isHealthy(true); + List result = hostPriorityStrategy.prioritize(hosts); + + assertThat(result.size(), equalTo(3)); + + // We should only have the first three hosts because while they didn't form a full healthy set the P1 group + // didn't provide _any_ healthy nodes, so no need to spill over. + for (int i = 0; i < 3; i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), approxEqual(result.get(0).loadBalancedWeight())); + } + } + + @Test + void priorityGroupsWithWeightedUnhealthyNodes() { + List hosts = makeHosts(6); + hosts.get(0).isHealthy(false); + for (int i = 0; i < hosts.size(); i++) { + if (i >= 3) { + hosts.get(i).loadBalancingWeight(i - 3 + 1d); + hosts.get(i).priority(1); + } else { + hosts.get(i).loadBalancingWeight(i + 1d); + } + } + List result = hostPriorityStrategy.prioritize(hosts); + + assertThat(result.size(), equalTo(6)); + + // We should only have the first three hosts because they were all healthy, so they are the only group. + for (int i = 0; i < 3; i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), approxEqual(result.get(0).loadBalancedWeight() * (i + 1))); + } + for (int i = 3; i < 6; i++) { + assertThat(result.get(i).address(), equalTo(hosts.get(i).address())); + // It doesn't matter what they are exactly so long as all weights are equal. + assertThat(result.get(i).loadBalancedWeight(), + approxEqual(result.get(3).loadBalancedWeight() * (i + 1 - 3))); + } + // Now the relative weights between the two groups should be 66 / 34 as the first group will have 66% health + // and the second, while having 100% healthy, will only be able to pick up the slack. + for (int i = 0; i < 3; i++) { + assertThat(result.get(i).loadBalancedWeight(), + approxEqual(result.get(i + 3).loadBalancedWeight() * 66 / 34)); + } + } + + private static List makeHosts(int count) { + String[] addresses = new String[count]; + for (int i = 0; i < count; i++) { + addresses[i] = "addr-" + i; + } + return makeHosts(addresses); + } + + private static List makeHosts(String... addresses) { + List results = new ArrayList<>(); + for (String address : addresses) { + results.add(new TestPrioritizedHost(address)); + } + return results; + } + + private static Matcher approxEqual(double expected) { + return closeTo(expected, 0.001); + } + + private static class TestPrioritizedHost implements PrioritizedHost { + + private final String address; + + private boolean isHealthy = true; + private int priority; + private double loadBalancedWeight = 1; + + TestPrioritizedHost(String address) { + this.address = address; + } + + String address() { + return address; + } + + @Override + public int priority() { + return priority; + } + + void priority(final int priority) { + this.priority = priority; + } + + // Set the weight to use in load balancing. This includes derived weight information such as prioritization + // and is what the host selectors will use when picking endpoints. + @Override + public void loadBalancingWeight(final double weight) { + this.loadBalancedWeight = weight; + } + + double loadBalancedWeight() { + return loadBalancedWeight; + } + + @Override + public boolean isHealthy() { + return isHealthy; + } + + public void isHealthy(final boolean isHealthy) { + this.isHealthy = isHealthy; + } + + @Override + public double loadBalancingWeight() { + return loadBalancedWeight; + } + } +} diff --git a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java index 7d1516f682..d58af13e36 100644 --- a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java +++ b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java @@ -16,6 +16,7 @@ package io.servicetalk.loadbalancer; import io.servicetalk.client.api.ServiceDiscovererEvent; +import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TestPublisher; import io.servicetalk.context.api.ContextMap; @@ -294,6 +295,11 @@ List getIndicators() { return new ArrayList<>(indicatorSet); } } + + @Override + public Publisher healthStatusChanged() { + return Publisher.never(); + } } private static class TestLoadBalancerPolicy extends LoadBalancingPolicy { diff --git a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java index 07212f7f41..abb3067707 100644 --- a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java +++ b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/P2CSelectorTest.java @@ -330,7 +330,8 @@ private void checkProbabilities(List> h int[] counts = runIterations(hosts); double totalProbability = hosts.stream().map(Host::weight).reduce(0d, (a, b) -> a + b); - Integer[] expected = hosts.stream().map(host -> (int) (ITERATIONS * (host.weight() / totalProbability))) + Integer[] expected = hosts.stream() + .map(host -> (int) (ITERATIONS * (host.weight() / totalProbability))) .toArray(Integer[]::new); // calculate the rough counts we should expect