Skip to content

Commit

Permalink
loadbalancer-experimental: make connection selection strategy modular (
Browse files Browse the repository at this point in the history
…#2815)

Motivation:

The configurable linearSearchSpace is used as a strategy
for picking among sessions in a particular host but it is not
highly used and mayber for good reason: sessions that are beyond
the max size are always in the odd position of getting picked a
lot or seldom, depending on how many sessions there are. There is
not way to set how many times it will attempt to pick a random
entry after the linear search space so if you exceed the linear
space by just 1 the extra connection will be re-attempted 64
times in a row before failing. There are potentially better ways to
select hosts such as encouraging a core pool of connections, etc.

Modifications:

Make the connection selection policy modular and have multiple
supported patterns that users can experiment with.
  • Loading branch information
bryce-anderson authored Apr 2, 2024
1 parent 49d966b commit 71b5fa6
Show file tree
Hide file tree
Showing 17 changed files with 922 additions and 98 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;

/**
* Configuration of the strategy for selecting connections from a pool to the same endpoint.
*/
public abstract class ConnectionPoolConfig {

static final int DEFAULT_MAX_EFFORT = 5;
static final int DEFAULT_LINEAR_SEARCH_SPACE = 16;

private ConnectionPoolConfig() {
// only instances are in this class.
}

/**
* A connection selection strategy that prioritizes a configurable "core" pool.
* <p>
* This {@link ConnectionPoolStrategy} attempts to emulate the pooling behavior often seen in thread pools.
* Specifically it allows for the configuration of a "core pool" size which are intended to be long-lived.
* Iteration starts in the core pool at a random position and then iterates through the entire core pool before
* moving to an overflow pool. Because iteration of this core pool starts at a random position the core connections
* will get an even traffic load and, because they are equally selectable, will tend not to be removed due to
* idleness.
* <p>
* If the core pool cannot satisfy the load traffic can spill over to extra connections which are selected in-order.
* This has the property of minimizing traffic to the latest elements added outside the core pool size, thus let
* them idle out of the pool once they're no longer necessary.
*
* @param corePoolSize the size of the core pool.
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @return the configured {@link ConnectionPoolConfig}.
*/
public static ConnectionPoolConfig corePool(final int corePoolSize, final boolean forceCorePool) {
return new CorePoolStrategy(corePoolSize, forceCorePool);
}

/**
* A connection selection strategy that prioritizes connection reuse.
* <p>
* This {@link ConnectionPoolStrategy} attempts to minimize the number of connections by attempting to direct
* traffic to connections in the order they were created in linear order up until a configured quantity. After
* this linear pool is exhausted the remaining connections will be selected from at random. Prioritizing traffic
* to the existing connections will let tailing connections be removed due to idleness.
* @return the configured {@link ConnectionPoolConfig}.
*/
public static ConnectionPoolConfig linearSearch() {
return linearSearch(DEFAULT_LINEAR_SEARCH_SPACE);
}

/**
* A connection selection strategy that prioritizes connection reuse.
* <p>
* This {@link ConnectionPoolStrategy} attempts to minimize the number of connections by attempting to direct
* traffic to connections in the order they were created in linear order up until a configured quantity. After
* this linear pool is exhausted the remaining connections will be selected from at random. Prioritizing traffic
* to the existing connections will let tailing connections be removed due to idleness.
* @param linearSearchSpace the space to search linearly before resorting to random selection for remaining
* connections.
* @return the configured {@link ConnectionPoolConfig}.
*/
public static ConnectionPoolConfig linearSearch(int linearSearchSpace) {
return new LinearSearchStrategy(linearSearchSpace);
}

/**
* A {@link ConnectionPoolStrategy} that attempts to discern between the health of individual connections.
* If individual connections have health data the P2C strategy can be used to bias traffic toward the best
* connections. This has the following algorithm:
* - Randomly select two connections from the 'core pool' (pick-two).
* - Try to select the 'best' of the two connections.
* - If we fail to select the best connection, try the other connection.
* - If both connections fail, repeat the pick-two operation for up to maxEffort attempts, begin linear iteration
* through the remaining connections searching for an acceptable connection.
* @param corePoolSize the size of the core pool.
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @return the configured {@link ConnectionPoolConfig}.
*/
public static ConnectionPoolConfig p2c(int corePoolSize, boolean forceCorePool) {
return p2c(DEFAULT_MAX_EFFORT, corePoolSize, forceCorePool);
}

/**
* A {@link ConnectionPoolStrategy} that attempts to discern between the health of individual connections.
* If individual connections have health data the P2C strategy can be used to bias traffic toward the best
* connections. This has the following algorithm:
* - Randomly select two connections from the 'core pool' (pick-two).
* - Try to select the 'best' of the two connections.
* - If we fail to select the best connection, try the other connection.
* - If both connections fail, repeat the pick-two operation for up to maxEffort attempts, begin linear iteration
* through the remaining connections searching for an acceptable connection.
* @param maxEffort the maximum number of attempts to pick a healthy connection from the core pool.
* @param corePoolSize the size of the core pool.
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @return the configured {@link ConnectionPoolConfig}.
*/
public static ConnectionPoolConfig p2c(int maxEffort, int corePoolSize, boolean forceCorePool) {
return new P2CStrategy(maxEffort, corePoolSize, forceCorePool);
}

// instance types
static final class CorePoolStrategy extends ConnectionPoolConfig {
final int corePoolSize;
final boolean forceCorePool;

CorePoolStrategy(final int corePoolSize, final boolean forceCorePool) {
this.corePoolSize = ensurePositive(corePoolSize, "corePoolSize");
this.forceCorePool = forceCorePool;
}
}

static final class P2CStrategy extends ConnectionPoolConfig {
final int maxEffort;
final int corePoolSize;
final boolean forceCorePool;

P2CStrategy(final int maxEffort, final int corePoolSize, final boolean forceCorePool) {
this.maxEffort = ensurePositive(maxEffort, "maxEffort");
this.corePoolSize = ensurePositive(corePoolSize, "corePoolSize");
this.forceCorePool = forceCorePool;
}
}

static final class LinearSearchStrategy extends ConnectionPoolConfig {
final int linearSearchSpace;

LinearSearchStrategy(int linearSearchSpace) {
this.linearSearchSpace = ensurePositive(linearSearchSpace, "linearSearchSpace");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;

import java.util.List;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/**
* A strategy for selecting connections at the {@link Host} level connection pool.
* @param <C> the concrete type of the connection.
*/
interface ConnectionPoolStrategy<C extends LoadBalancedConnection> {

/**
* Select a connection from the ordered list of connections.
* @param connections the list of connections to pick from.
* @param selector a predicate used to test a connection.
* @return the selected connection, or {@code null} if no existing connection was selected.
*/
@Nullable
C select(List<C> connections, Predicate<C> selector);

/**
* The factory of {@link ConnectionPoolStrategy} instances.
* @param <C> the least specific connection type necessary for properly implementing the strategy.
* @see ConnectionPoolStrategy for available strategies.
*/

interface ConnectionPoolStrategyFactory<C extends LoadBalancedConnection> {

/**
* Provide an instance of the {@link ConnectionPoolStrategy} to use with a {@link Host}.
* @param targetResource the resource name, used for logging purposes.
* @return an instance of the {@link ConnectionPoolStrategy} to use with a {@link Host}.
*/
ConnectionPoolStrategy<C> buildStrategy(String targetResource);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
import static java.lang.Math.min;

/**
* A connection selection strategy that prioritizes a configurable "core" pool.
* <p>
* This {@link ConnectionPoolStrategy} attempts to emulate the pooling behavior often seen in thread pools.
* Specifically it allows for the configuration of a "core pool" size which are intended to be long-lived.
* Iteration starts in the core pool at a random position and then iterates through the entire core pool before
* moving to an overflow pool. Because iteration of this core pool starts at a random position the core connections
* will get an even traffic load and, because they are equally selectable, will tend not to be removed due to idleness.
* <p>
* If the core pool cannot satisfy the load traffic can spill over to extra connections which are selected in-order.
* This has the property of minimizing traffic to the latest elements added outside the core pool size, thus let
* them idle out of the pool once they're no longer necessary.
*
* @param <C> the concrete type of the {@link LoadBalancedConnection}.
*/
final class CorePoolConnectionPoolStrategy<C extends LoadBalancedConnection>
implements ConnectionPoolStrategy<C> {

private final int corePoolSize;
private final boolean forceCorePool;

private CorePoolConnectionPoolStrategy(final int corePoolSize, final boolean forceCorePool) {
this.corePoolSize = ensurePositive(corePoolSize, "corePoolSize");
this.forceCorePool = forceCorePool;
}

@Nullable
@Override
public C select(List<C> connections, Predicate<C> selector) {
final int connectionCount = connections.size();
if (forceCorePool && connectionCount < corePoolSize) {
// return null so the Host will create a new connection and thus populate the connection pool.
return null;
}
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
final int randomSearchSpace = min(connectionCount, corePoolSize);
final int offset = rnd.nextInt(randomSearchSpace);
for (int i = 0; i < randomSearchSpace; i++) {
int ii = offset + i;
if (ii >= randomSearchSpace) {
ii -= randomSearchSpace;
}
final C connection = connections.get(ii);
if (selector.test(connection)) {
return connection;
}
}
// Didn't succeed in the core pool. Linear search through the overflow pool (if it exists).
for (int i = randomSearchSpace; i < connectionCount; i++) {
final C connection = connections.get(i);
if (selector.test(connection)) {
return connection;
}
}
// So sad, we didn't find a healthy connection.
return null;
}

static <C extends LoadBalancedConnection> ConnectionPoolStrategyFactory<C> factory(
int corePoolSize, boolean forceCorePool) {
ensurePositive(corePoolSize, "corePoolSize");
return (targetResource) -> new CorePoolConnectionPoolStrategy<>(corePoolSize, forceCorePool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand All @@ -50,30 +49,12 @@
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static io.servicetalk.loadbalancer.RequestTracker.REQUEST_TRACKER_KEY;
import static java.lang.Math.min;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;

final class DefaultHost<Addr, C extends LoadBalancedConnection> implements Host<Addr, C> {

/**
* With a relatively small number of connections we can minimize connection creation under moderate concurrency by
* exhausting the full search space without sacrificing too much latency caused by the cost of a CAS operation per
* selection attempt.
*/
private static final int MIN_RANDOM_SEARCH_SPACE = 64;

/**
* For larger search spaces, due to the cost of a CAS operation per selection attempt we see diminishing returns for
* trying to locate an available connection when most connections are in use. This increases tail latencies, thus
* after some number of failed attempts it appears to be more beneficial to open a new connection instead.
* <p>
* The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection
* counts, larger connection counts, low connection churn, high connection churn.
*/
private static final float RANDOM_SEARCH_FACTOR = 0.75f;

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHost.class);

private enum State {
Expand Down Expand Up @@ -101,22 +82,23 @@ private enum State {
@Nullable
private final HealthCheckConfig healthCheckConfig;
@Nullable
private final ConnectionPoolStrategy<C> connectionPoolStrategy;
@Nullable
private final HealthIndicator<Addr, C> healthIndicator;
private final LoadBalancerObserver.HostObserver hostObserver;
private final ConnectionFactory<Addr, ? extends C> connectionFactory;
private final int linearSearchSpace;
private final ListenableAsyncCloseable closeable;
private volatile ConnState connState = new ConnState(emptyList(), State.ACTIVE, 0, null);

DefaultHost(final String lbDescription, final Addr address,
final ConnectionPoolStrategy<C> connectionPoolStrategy,
final ConnectionFactory<Addr, ? extends C> connectionFactory,
final int linearSearchSpace, final HostObserver hostObserver,
final @Nullable HealthCheckConfig healthCheckConfig,
final @Nullable HealthIndicator<Addr, C> healthIndicator) {
final HostObserver hostObserver, final @Nullable HealthCheckConfig healthCheckConfig,
final @Nullable HealthIndicator healthIndicator) {
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
this.address = requireNonNull(address, "address");
this.linearSearchSpace = linearSearchSpace;
this.healthIndicator = healthIndicator;
this.connectionPoolStrategy = requireNonNull(connectionPoolStrategy, "connectionPoolStrategy");
requireNonNull(connectionFactory, "connectionFactory");
this.connectionFactory = healthIndicator == null ? connectionFactory :
new InstrumentedConnectionFactory<>(connectionFactory, healthIndicator);
Expand Down Expand Up @@ -195,31 +177,7 @@ public boolean markExpired() {
@Override
public @Nullable C pickConnection(Predicate<C> selector, @Nullable final ContextMap context) {
final List<C> connections = connState.connections;
// Exhaust the linear search space first:
final int linearAttempts = min(connections.size(), linearSearchSpace);
for (int j = 0; j < linearAttempts; ++j) {
final C connection = connections.get(j);
if (selector.test(connection)) {
return connection;
}
}
// Try other connections randomly:
if (connections.size() > linearAttempts) {
final int diff = connections.size() - linearAttempts;
// With small enough search space, attempt number of times equal to number of remaining connections.
// Back off after exploring most of the search space, it gives diminishing returns.
final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff :
(int) (diff * RANDOM_SEARCH_FACTOR);
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int j = 0; j < randomAttempts; ++j) {
final C connection = connections.get(rnd.nextInt(linearAttempts, connections.size()));
if (selector.test(connection)) {
return connection;
}
}
}
// So sad, we didn't find a healthy connection.
return null;
return connectionPoolStrategy.select(connections, selector);
}

@Override
Expand Down
Loading

0 comments on commit 71b5fa6

Please sign in to comment.