Skip to content

Commit

Permalink
loadbalancer-experimental: fix broken assert in XdsOutlierDetector (#…
Browse files Browse the repository at this point in the history
…2886)

Motivation:

When cancelling the whole xDS outlier detector it first enters the
sequential executor then has each child indicator cancel itself. It
does this synchronously and then checks that the indicator set is
empty. This doesn't work like that because each indicator tries to
bounce itself through the sequential executor, so it won't actually
be empty until after the method exits.

Modifications:

- Use the `sequentialCancel()` method instead: we know that we're
  in the sequential executor already.
- Add some tests.

Result:

What is the result of this change?
  • Loading branch information
bryce-anderson authored Apr 2, 2024
1 parent 1aafb7a commit 49d966b
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public final void cancel() {
sequentialExecutor.execute(this::sequentialCancel);
}

private void sequentialCancel() {
void sequentialCancel() {
assert sequentialExecutor.isCurrentThreadDraining();
if (cancelled) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,18 @@ final class XdsOutlierDetector<ResolvedAddress, C extends LoadBalancedConnection
// reads and writes are protected by `sequentialExecutor`.
private int ejectedHostCount;

XdsOutlierDetector(final Executor executor, final OutlierDetectorConfig config, final String lbDescription,
SequentialExecutor.ExceptionHandler exceptionHandler) {
this.sequentialExecutor = new SequentialExecutor(exceptionHandler);
this.executor = requireNonNull(executor, "executor");
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
this.kernel = new Kernel(config);
}

XdsOutlierDetector(final Executor executor, final OutlierDetectorConfig config, final String lbDescription) {
this.sequentialExecutor = new SequentialExecutor((uncaughtException) ->
LOGGER.error("{}: Uncaught exception in " + this.getClass().getSimpleName(), this, uncaughtException));
SequentialExecutor.ExceptionHandler exceptionHandler = (uncaughtException) ->
LOGGER.error("{}: Uncaught exception in {}", this, getClass().getSimpleName(), uncaughtException);
this.sequentialExecutor = new SequentialExecutor(exceptionHandler);
this.executor = requireNonNull(executor, "executor");
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
this.kernel = new Kernel(config);
Expand All @@ -97,13 +106,18 @@ public void cancel() {
sequentialExecutor.execute(() -> {
List<XdsHealthIndicator<ResolvedAddress, C>> indicatorList = new ArrayList<>(indicators);
for (XdsHealthIndicator indicator : indicatorList) {
indicator.cancel();
indicator.sequentialCancel();
}
assert indicators.isEmpty();
assert indicatorCount.get() == 0;
});
}

// Exposed for testing. Not thread safe.
int ejectedHostCount() {
return ejectedHostCount;
}

private final class XdsHealthIndicatorImpl extends XdsHealthIndicator<ResolvedAddress, C> {

XdsHealthIndicatorImpl(final ResolvedAddress address, OutlierDetectorConfig outlierDetectorConfig,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.concurrent.api.TestExecutor;

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

class XdsOutlierDetectorTest {

private final TestExecutor executor = new TestExecutor();
OutlierDetectorConfig config = new OutlierDetectorConfig.Builder()
.failureDetectorInterval(Duration.ofSeconds(5), Duration.ZERO)
.maxEjectionTime(Duration.ofMinutes(5), Duration.ZERO)
.build();

@Nullable
XdsOutlierDetector<String, TestLoadBalancedConnection> xdsOutlierDetector;

private void init() {
xdsOutlierDetector = new XdsOutlierDetector<>(
new NormalizedTimeSourceExecutor(executor), config, "lb-description", exn -> {
// just rethrow and it should surface to the tests.
throw new RuntimeException("Unexpected exception", exn);
});
}

@Test
void outlierDetectorCancellation() {
init();
HealthIndicator<String, TestLoadBalancedConnection> indicator = xdsOutlierDetector.newHealthIndicator(
"addr-1", NoopLoadBalancerObserver.instance().hostObserver("addr-1"));
xdsOutlierDetector.cancel();
assertThat(indicator.isHealthy(), equalTo(true));
}

@Test
void cancellationOfEvictedHealthIndicatorMarksHostUnejected() {
init();
HealthIndicator<String, TestLoadBalancedConnection> healthIndicator = xdsOutlierDetector.newHealthIndicator(
"addr-1", NoopLoadBalancerObserver.instance().hostObserver("addr-1"));
eject(healthIndicator);
assertThat(healthIndicator.isHealthy(), equalTo(false));
assertThat(xdsOutlierDetector.ejectedHostCount(), equalTo(1));
healthIndicator.cancel();
assertThat(xdsOutlierDetector.ejectedHostCount(), equalTo(0));
}

@Test
void maxHostRemovalIsHonored() {
config = new OutlierDetectorConfig.Builder(config)
.maxEjectionPercentage(50)
.build();
init();

HealthIndicator<String, TestLoadBalancedConnection> indicator1 = xdsOutlierDetector.newHealthIndicator(
"addr-1", NoopLoadBalancerObserver.instance().hostObserver("addr-1"));
HealthIndicator<String, TestLoadBalancedConnection> indicator2 = xdsOutlierDetector.newHealthIndicator(
"addr-2", NoopLoadBalancerObserver.instance().hostObserver("addr-2"));
eject(indicator1);
assertThat(xdsOutlierDetector.ejectedHostCount(), equalTo(1));
assertThat(indicator1.isHealthy(), equalTo(false));
eject(indicator2);
assertThat(xdsOutlierDetector.ejectedHostCount(), equalTo(1));
assertThat(indicator2.isHealthy(), equalTo(true));

// revive indicator1
executor.advanceTimeBy(config.baseEjectionTime().toNanos(), TimeUnit.NANOSECONDS);
assertThat(indicator1.isHealthy(), equalTo(true));

// eject indicator2 and then indicator1. They should only require one bad request to eject again.
indicator2.onRequestError(indicator2.beforeConnectStart(), ErrorClass.EXT_ORIGIN_REQUEST_FAILED);
assertThat(indicator2.isHealthy(), equalTo(false));
// should be allowed to be ejected
indicator1.onRequestError(indicator1.beforeConnectStart(), ErrorClass.EXT_ORIGIN_REQUEST_FAILED);
assertThat(indicator1.isHealthy(), equalTo(true));
}

@Test
void hostRevival() {
init();
HealthIndicator<String, TestLoadBalancedConnection> indicator = xdsOutlierDetector.newHealthIndicator(
"addr-1", NoopLoadBalancerObserver.instance().hostObserver("addr-1"));
eject(indicator);
assertThat(indicator.isHealthy(), equalTo(false));
executor.advanceTimeBy(config.baseEjectionTime().toNanos(), TimeUnit.NANOSECONDS);
assertThat(indicator.isHealthy(), equalTo(true));
}

private void eject(HealthIndicator<String, TestLoadBalancedConnection> indicator) {
for (int i = 0; i < config.consecutive5xx(); i++) {
indicator.onRequestError(indicator.beforeConnectStart(), ErrorClass.EXT_ORIGIN_REQUEST_FAILED);
}
}
}

0 comments on commit 49d966b

Please sign in to comment.