From 057cc56f7d63df4d234d93a640f9f318f3388a35 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 15 Nov 2024 10:18:08 -0700 Subject: [PATCH] capacity-limiter-api: observer all gradient limit changes (#3107) Motivation: We currently see callbacks to Observer.onLimitChange(..) only for the successful case, but it's perhaps even more important for the limit decrease case. Modifications: - Call the callbacks whenever the limit changes. - Also call them on the failure case. Since we don't use the gradient and RTT params, we pass in -1.0. This also requires a change in the javadoc. - Add some tests to make sure the observers are called. --- servicetalk-capacity-limiter-api/build.gradle | 3 +- .../limiter/api/GradientCapacityLimiter.java | 41 ++++++++++++------- .../api/GradientCapacityLimiterBuilder.java | 12 ++++-- .../api/GradientCapacityLimiterTest.java | 33 +++++++++++++++ 4 files changed, 70 insertions(+), 19 deletions(-) diff --git a/servicetalk-capacity-limiter-api/build.gradle b/servicetalk-capacity-limiter-api/build.gradle index 3976e6ea1c..141a14eb4c 100644 --- a/servicetalk-capacity-limiter-api/build.gradle +++ b/servicetalk-capacity-limiter-api/build.gradle @@ -26,6 +26,7 @@ dependencies { testImplementation enforcedPlatform("org.junit:junit-bom:$junit5Version") testImplementation testFixtures(project(":servicetalk-concurrent-internal")) testImplementation project(":servicetalk-test-resources") - testImplementation "org.junit.jupiter:junit-jupiter-api" testImplementation "org.hamcrest:hamcrest:$hamcrestVersion" + testImplementation "org.junit.jupiter:junit-jupiter-api" + testImplementation "org.mockito:mockito-core:$mockitoCoreVersion" } diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java index 84e18ff0c7..326451c393 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java @@ -149,7 +149,7 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co /** * Needs to be called while holding the lock. */ - private int updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) { + private double updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) { assert lock.isHeldByCurrentThread(); if (isNaN(longLatencyMillis) || isNaN(shortLatencyMillis) || shortLatencyMillis == 0) { return -1; @@ -169,47 +169,58 @@ private int updateLimit(final long timestampNs, final double shortLatencyMillis, } final double headroom = gradient >= 1 ? this.headroom.apply(gradient, limit) : 0; - final double oldLimit = limit; - final int newLimit = (int) (limit = min(max, max(min, (gradient * limit) + headroom))); - observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit); - return newLimit; + limit = min(max, max(min, (gradient * limit) + headroom)); + return gradient; } private int onSuccess(final long durationNs) { final long nowNs = timeSource.getAsLong(); final long rttMillis = NANOSECONDS.toMillis(durationNs); - int newPending; - int limit; + final double longLatencyMillis; + final double shortLatencyMillis; + final int newPending; + final double oldLimit; + final double newLimit; + double gradient = 0.0; lock.lock(); try { - limit = (int) this.limit; - final double longLatencyMillis = longLatency.observe(nowNs, rttMillis); - final double shortLatencyMillis = shortLatency.observe(nowNs, rttMillis); + oldLimit = this.limit; + longLatencyMillis = longLatency.observe(nowNs, rttMillis); + shortLatencyMillis = shortLatency.observe(nowNs, rttMillis); newPending = --pending; if ((nowNs - lastSamplingNs) >= limitUpdateIntervalNs) { - limit = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis); + gradient = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis); } + newLimit = limit; } finally { lock.unlock(); } - + if (oldLimit != newLimit) { + observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit); + } observer.onActiveRequestsDecr(); - return limit - newPending; + return (int) (newLimit - newPending); } private int onDrop() { int newPending; - double newLimit; - + final double oldLimit; + final double newLimit; lock.lock(); try { + oldLimit = limit; newLimit = limit = max(min, limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss)); newPending = --pending; } finally { lock.unlock(); } + if (oldLimit != newLimit) { + // latencies and gradient were not involved in the calculations + observer.onLimitChange( + -1.0 /*longLatencyMillis*/, -1.0 /*shortLatencyMillis*/, -1.0 /*gradient*/, oldLimit, newLimit); + } observer.onActiveRequestsDecr(); return (int) (newLimit - newPending); } diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java index a278001454..9acc0d7ad3 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterBuilder.java @@ -329,6 +329,9 @@ private String name() { /** * A state observer for Gradient {@link CapacityLimiter} to monitor internal state changes. + * + * Note: callbacks are not guaranteed to be executed sequentially or in exactly the same order that state changes + * occurred. */ public interface Observer { @@ -348,10 +351,13 @@ public interface Observer { *

* The rate of reporting to the observer is based on the rate of change to this * {@link CapacityLimiter} and the {@link #limitUpdateInterval(Duration) sampling interval}. - * @param longRtt The exponential moving average stat of request response times. - * @param shortRtt The sampled response time that triggered the limit change. + * @param longRtt The exponential moving average stat of request response times. A negative value means + * response times were not used in the calculation. + * @param shortRtt The sampled response time that triggered the limit change. A negative value means + * * response times were not used in the calculation. * @param gradient The response time gradient (delta) between the long exposed stat (see. longRtt) - * and the sampled response time (see. shortRtt). + * and the sampled response time (see. shortRtt). A negative value means the gradient was not used in the + * calculation. * @param oldLimit The previous limit of the limiter. * @param newLimit The current limit of the limiter. */ diff --git a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java index a117f31d02..644d33a9fd 100644 --- a/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java +++ b/servicetalk-capacity-limiter-api/src/test/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiterTest.java @@ -15,6 +15,8 @@ */ package io.servicetalk.capacity.limiter.api; +import io.servicetalk.capacity.limiter.api.GradientCapacityLimiterBuilder.Observer; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,6 +29,12 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.AdditionalMatchers.not; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; class GradientCapacityLimiterTest { @@ -34,6 +42,7 @@ class GradientCapacityLimiterTest { private static final Classification DEFAULT = () -> 0; + private final Observer observer = mock(Observer.class); @Nullable private CapacityLimiter capacityLimiter; @Nullable @@ -47,6 +56,8 @@ void setup() { } capacityLimiter = new GradientCapacityLimiterBuilder() .timeSource(timeSource) + .limitUpdateInterval(Duration.ofMillis(50)) + .observer(observer) .build(); } @@ -54,6 +65,12 @@ void setup() { void canAcquireTicket() { CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null); assertThat(ticket, notNullValue()); + verify(observer).onActiveRequestsIncr(); + + // release it and make sure we observed the release. + ticket.completed(); + verify(observer).onActiveRequestsDecr(); + verifyNoMoreInteractions(observer); } @Test @@ -89,4 +106,20 @@ void canRejectTicketAcquisitions() { lastTicket = capacityLimiter.tryAcquire(DEFAULT, null); assertThat(lastTicket, notNullValue()); } + + @Test + void observesLimitChanges() { + CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null); + currentTime += Duration.ofMillis(10).toNanos(); + ticket.failed(SAD_EXCEPTION); + + // ticket failure will not use gradient + verify(observer).onLimitChange(eq(-1.0), eq(-1.0), eq(-1.0), eq(100.0), eq(50.0)); + + ticket = capacityLimiter.tryAcquire(DEFAULT, null); + currentTime += Duration.ofMillis(50).toNanos(); + // Ticket success should adjust observation upward. + ticket.completed(); + verify(observer).onLimitChange(not(eq(-1.0)), not(eq(-1.0)), not(eq(-1.0)), anyDouble(), anyDouble()); + } }