Skip to content

Commit

Permalink
capacity-limiter-api: observer all gradient limit changes (#3107)
Browse files Browse the repository at this point in the history
Motivation:

We currently see callbacks to Observer.onLimitChange(..) only for
the successful case, but it's perhaps even more important for the
limit decrease case.

Modifications:

- Call the callbacks whenever the limit changes.
- Also call them on the failure case. Since we don't use the
  gradient and RTT params, we pass in -1.0. This also requires
  a change in the javadoc.
- Add some tests to make sure the observers are called.
  • Loading branch information
bryce-anderson authored Nov 15, 2024
1 parent f783e7d commit 057cc56
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 19 deletions.
3 changes: 2 additions & 1 deletion servicetalk-capacity-limiter-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testImplementation enforcedPlatform("org.junit:junit-bom:$junit5Version")
testImplementation testFixtures(project(":servicetalk-concurrent-internal"))
testImplementation project(":servicetalk-test-resources")
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.hamcrest:hamcrest:$hamcrestVersion"
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.mockito:mockito-core:$mockitoCoreVersion"
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co
/**
* Needs to be called while holding the lock.
*/
private int updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) {
private double updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) {
assert lock.isHeldByCurrentThread();
if (isNaN(longLatencyMillis) || isNaN(shortLatencyMillis) || shortLatencyMillis == 0) {
return -1;
Expand All @@ -169,47 +169,58 @@ private int updateLimit(final long timestampNs, final double shortLatencyMillis,
}

final double headroom = gradient >= 1 ? this.headroom.apply(gradient, limit) : 0;
final double oldLimit = limit;
final int newLimit = (int) (limit = min(max, max(min, (gradient * limit) + headroom)));
observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit);
return newLimit;
limit = min(max, max(min, (gradient * limit) + headroom));
return gradient;
}

private int onSuccess(final long durationNs) {
final long nowNs = timeSource.getAsLong();
final long rttMillis = NANOSECONDS.toMillis(durationNs);
int newPending;
int limit;
final double longLatencyMillis;
final double shortLatencyMillis;
final int newPending;
final double oldLimit;
final double newLimit;
double gradient = 0.0;
lock.lock();
try {
limit = (int) this.limit;
final double longLatencyMillis = longLatency.observe(nowNs, rttMillis);
final double shortLatencyMillis = shortLatency.observe(nowNs, rttMillis);
oldLimit = this.limit;
longLatencyMillis = longLatency.observe(nowNs, rttMillis);
shortLatencyMillis = shortLatency.observe(nowNs, rttMillis);

newPending = --pending;
if ((nowNs - lastSamplingNs) >= limitUpdateIntervalNs) {
limit = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis);
gradient = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis);
}
newLimit = limit;
} finally {
lock.unlock();
}

if (oldLimit != newLimit) {
observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit);
}
observer.onActiveRequestsDecr();
return limit - newPending;
return (int) (newLimit - newPending);
}

private int onDrop() {
int newPending;
double newLimit;

final double oldLimit;
final double newLimit;
lock.lock();
try {
oldLimit = limit;
newLimit = limit = max(min, limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss));
newPending = --pending;
} finally {
lock.unlock();
}

if (oldLimit != newLimit) {
// latencies and gradient were not involved in the calculations
observer.onLimitChange(
-1.0 /*longLatencyMillis*/, -1.0 /*shortLatencyMillis*/, -1.0 /*gradient*/, oldLimit, newLimit);
}
observer.onActiveRequestsDecr();
return (int) (newLimit - newPending);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ private String name() {

/**
* A state observer for Gradient {@link CapacityLimiter} to monitor internal state changes.
*
* Note: callbacks are not guaranteed to be executed sequentially or in exactly the same order that state changes
* occurred.
*/
public interface Observer {

Expand All @@ -348,10 +351,13 @@ public interface Observer {
* <p>
* The rate of reporting to the observer is based on the rate of change to this
* {@link CapacityLimiter} and the {@link #limitUpdateInterval(Duration) sampling interval}.
* @param longRtt The exponential moving average stat of request response times.
* @param shortRtt The sampled response time that triggered the limit change.
* @param longRtt The exponential moving average stat of request response times. A negative value means
* response times were not used in the calculation.
* @param shortRtt The sampled response time that triggered the limit change. A negative value means
* * response times were not used in the calculation.
* @param gradient The response time gradient (delta) between the long exposed stat (see. longRtt)
* and the sampled response time (see. shortRtt).
* and the sampled response time (see. shortRtt). A negative value means the gradient was not used in the
* calculation.
* @param oldLimit The previous limit of the limiter.
* @param newLimit The current limit of the limiter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.capacity.limiter.api;

import io.servicetalk.capacity.limiter.api.GradientCapacityLimiterBuilder.Observer;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -27,13 +29,20 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.AdditionalMatchers.not;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

class GradientCapacityLimiterTest {

private static final Exception SAD_EXCEPTION = new Exception("sad");

private static final Classification DEFAULT = () -> 0;

private final Observer observer = mock(Observer.class);
@Nullable
private CapacityLimiter capacityLimiter;
@Nullable
Expand All @@ -47,13 +56,21 @@ void setup() {
}
capacityLimiter = new GradientCapacityLimiterBuilder()
.timeSource(timeSource)
.limitUpdateInterval(Duration.ofMillis(50))
.observer(observer)
.build();
}

@Test
void canAcquireTicket() {
CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null);
assertThat(ticket, notNullValue());
verify(observer).onActiveRequestsIncr();

// release it and make sure we observed the release.
ticket.completed();
verify(observer).onActiveRequestsDecr();
verifyNoMoreInteractions(observer);
}

@Test
Expand Down Expand Up @@ -89,4 +106,20 @@ void canRejectTicketAcquisitions() {
lastTicket = capacityLimiter.tryAcquire(DEFAULT, null);
assertThat(lastTicket, notNullValue());
}

@Test
void observesLimitChanges() {
CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null);
currentTime += Duration.ofMillis(10).toNanos();
ticket.failed(SAD_EXCEPTION);

// ticket failure will not use gradient
verify(observer).onLimitChange(eq(-1.0), eq(-1.0), eq(-1.0), eq(100.0), eq(50.0));

ticket = capacityLimiter.tryAcquire(DEFAULT, null);
currentTime += Duration.ofMillis(50).toNanos();
// Ticket success should adjust observation upward.
ticket.completed();
verify(observer).onLimitChange(not(eq(-1.0)), not(eq(-1.0)), not(eq(-1.0)), anyDouble(), anyDouble());
}
}

0 comments on commit 057cc56

Please sign in to comment.