From c12779bf081072a89a6711090a49924ac6b1ed8e Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 14 Nov 2024 16:25:03 -0700 Subject: [PATCH] capacity-limiter-api: Observer all gradient limit changes Motivation: We currently see callbacks to Observer.onLimitChange(..) only for the successful case, but it's perhaps even more important for the limit decrease case. Modifications: - Call the callbacks whenever the limit changes. - Also call them on the failure case. Since we don't use the gradient and RTT params, we pass in -1.0. This also requires a change in the javadoc. - Add some tests to make sure the observers are called. --- servicetalk-capacity-limiter-api/build.gradle | 3 +- .../limiter/api/GradientCapacityLimiter.java | 40 ++++++++++++------- .../api/GradientCapacityLimiterBuilder.java | 9 +++-- .../api/GradientCapacityLimiterTest.java | 36 ++++++++++++++++- 4 files changed, 68 insertions(+), 20 deletions(-) diff --git a/servicetalk-capacity-limiter-api/build.gradle b/servicetalk-capacity-limiter-api/build.gradle index 3976e6ea1c..141a14eb4c 100644 --- a/servicetalk-capacity-limiter-api/build.gradle +++ b/servicetalk-capacity-limiter-api/build.gradle @@ -26,6 +26,7 @@ dependencies { testImplementation enforcedPlatform("org.junit:junit-bom:$junit5Version") testImplementation testFixtures(project(":servicetalk-concurrent-internal")) testImplementation project(":servicetalk-test-resources") - testImplementation "org.junit.jupiter:junit-jupiter-api" testImplementation "org.hamcrest:hamcrest:$hamcrestVersion" + testImplementation "org.junit.jupiter:junit-jupiter-api" + testImplementation "org.mockito:mockito-core:$mockitoCoreVersion" } diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java index 84e18ff0c7..3c8d99adad 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java @@ -149,7 +149,7 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co /** * Needs to be called while holding the lock. */ - private int updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) { + private double updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) { assert lock.isHeldByCurrentThread(); if (isNaN(longLatencyMillis) || isNaN(shortLatencyMillis) || shortLatencyMillis == 0) { return -1; @@ -169,47 +169,57 @@ private int updateLimit(final long timestampNs, final double shortLatencyMillis, } final double headroom = gradient >= 1 ? this.headroom.apply(gradient, limit) : 0; - final double oldLimit = limit; - final int newLimit = (int) (limit = min(max, max(min, (gradient * limit) + headroom))); - observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit); - return newLimit; + limit = min(max, max(min, (gradient * limit) + headroom)); + return gradient; } private int onSuccess(final long durationNs) { final long nowNs = timeSource.getAsLong(); final long rttMillis = NANOSECONDS.toMillis(durationNs); - int newPending; - int limit; + final double longLatencyMillis; + final double shortLatencyMillis; + final int newPending; + final double oldLimit; + final double newLimit; + double gradient = 0.0; lock.lock(); try { - limit = (int) this.limit; - final double longLatencyMillis = longLatency.observe(nowNs, rttMillis); - final double shortLatencyMillis = shortLatency.observe(nowNs, rttMillis); + oldLimit = this.limit; + longLatencyMillis = longLatency.observe(nowNs, rttMillis); + shortLatencyMillis = shortLatency.observe(nowNs, rttMillis); newPending = --pending; if ((nowNs - lastSamplingNs) >= limitUpdateIntervalNs) { - limit = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis); + gradient = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis); } + newLimit = limit; } finally { lock.unlock(); } - + if (oldLimit != newLimit) { + observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit); + } observer.onActiveRequestsDecr(); - return limit - newPending; + return (int) (newLimit - newPending); } private int onDrop() { int newPending; - double newLimit; - + final double oldLimit; + final double newLimit; lock.lock(); try { + oldLimit = limit; newLimit = limit = max(min, limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss)); newPending = --pending; } finally { lock.unlock(); } + if (oldLimit != newLimit) { + // latencies and gradient were not involved in the calculations + observer.onLimitChange(0.0 /*longLatencyMillis*/, 0.0 /*shortLatencyMillis*/, -1.0 /*gradient*/, oldLimit, newLimit); + } observer.onActiveRequestsDecr(); return (int) (newLimit - newPending); } diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java index a278001454..850e9c229b 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java @@ -348,10 +348,13 @@ public interface Observer { *

* The rate of reporting to the observer is based on the rate of change to this * {@link CapacityLimiter} and the {@link #limitUpdateInterval(Duration) sampling interval}. - * @param longRtt The exponential moving average stat of request response times. - * @param shortRtt The sampled response time that triggered the limit change. + * @param longRtt The exponential moving average stat of request response times. A negative value means + * response times were not used in the calculation. + * @param shortRtt The sampled response time that triggered the limit change. A negative value means + * * response times were not used in the calculation. * @param gradient The response time gradient (delta) between the long exposed stat (see. longRtt) - * and the sampled response time (see. shortRtt). + * and the sampled response time (see. shortRtt). A negative value means the gradient was not used in the + * calculation. * @param oldLimit The previous limit of the limiter. * @param newLimit The current limit of the limiter. */ diff --git a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java index a117f31d02..3aacda9336 100644 --- a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java +++ b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java @@ -15,6 +15,8 @@ */ package io.servicetalk.capacity.limiter.api; +import io.servicetalk.capacity.limiter.api.GradientCapacityLimiterBuilder.Observer; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,6 +29,12 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.AdditionalMatchers.not; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; class GradientCapacityLimiterTest { @@ -34,6 +42,7 @@ class GradientCapacityLimiterTest { private static final Classification DEFAULT = () -> 0; + private final Observer observer = mock(Observer.class); @Nullable private CapacityLimiter capacityLimiter; @Nullable @@ -47,6 +56,8 @@ void setup() { } capacityLimiter = new GradientCapacityLimiterBuilder() .timeSource(timeSource) + .limitUpdateInterval(Duration.ofMillis(50)) + .observer(observer) .build(); } @@ -54,18 +65,25 @@ void setup() { void canAcquireTicket() { CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null); assertThat(ticket, notNullValue()); + verify(observer).onActiveRequestsIncr(); + + // release it and make sure we observed the release. + ticket.completed(); + verify(observer).onActiveRequestsDecr(); + verifyNoMoreInteractions(observer); } @Test void capacityCanDepleteToTheMinLimit() { for (;;) { CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null); - currentTime += Duration.ofMillis(10).toNanos(); + currentTime += Duration.ofMillis(50).toNanos(); int capacity = ticket.failed(SAD_EXCEPTION); if (capacity == DEFAULT_MIN_LIMIT) { break; } } + } @Test @@ -89,4 +107,20 @@ void canRejectTicketAcquisitions() { lastTicket = capacityLimiter.tryAcquire(DEFAULT, null); assertThat(lastTicket, notNullValue()); } + + @Test + void observesLimitChanges() { + CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null); + currentTime += Duration.ofMillis(10).toNanos(); + ticket.failed(SAD_EXCEPTION); + + // ticket failure will not use gradient + verify(observer).onLimitChange(eq(0.0), eq(0.0), eq(-1.0), eq(100.0), eq(50.0)); + + ticket = capacityLimiter.tryAcquire(DEFAULT, null); + currentTime += Duration.ofMillis(50).toNanos(); + // Ticket success should adjust observation upward. + ticket.completed(); + verify(observer).onLimitChange(not(eq(0.0)), not(eq(0.0)), not(eq(-1.0)), anyDouble(), anyDouble()); + } }