Skip to content

Commit

Permalink
capacity-limiter-api: Observer all gradient limit changes
Browse files Browse the repository at this point in the history
Motivation:

We currently see callbacks to Observer.onLimitChange(..) only for
the successful case, but it's perhaps even more important for the
limit decrease case.

Modifications:

- Call the callbacks whenever the limit changes.
- Also call them on the failure case. Since we don't use the
  gradient and RTT params, we pass in -1.0. This also requires
  a change in the javadoc.
- Add some tests to make sure the observers are called.
  • Loading branch information
bryce-anderson committed Nov 14, 2024
1 parent 4187c05 commit c12779b
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 20 deletions.
3 changes: 2 additions & 1 deletion servicetalk-capacity-limiter-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testImplementation enforcedPlatform("org.junit:junit-bom:$junit5Version")
testImplementation testFixtures(project(":servicetalk-concurrent-internal"))
testImplementation project(":servicetalk-test-resources")
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.hamcrest:hamcrest:$hamcrestVersion"
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.mockito:mockito-core:$mockitoCoreVersion"
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co
/**
* Needs to be called while holding the lock.
*/
private int updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) {
private double updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) {
assert lock.isHeldByCurrentThread();
if (isNaN(longLatencyMillis) || isNaN(shortLatencyMillis) || shortLatencyMillis == 0) {
return -1;
Expand All @@ -169,47 +169,57 @@ private int updateLimit(final long timestampNs, final double shortLatencyMillis,
}

final double headroom = gradient >= 1 ? this.headroom.apply(gradient, limit) : 0;
final double oldLimit = limit;
final int newLimit = (int) (limit = min(max, max(min, (gradient * limit) + headroom)));
observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit);
return newLimit;
limit = min(max, max(min, (gradient * limit) + headroom));
return gradient;
}

private int onSuccess(final long durationNs) {
final long nowNs = timeSource.getAsLong();
final long rttMillis = NANOSECONDS.toMillis(durationNs);
int newPending;
int limit;
final double longLatencyMillis;
final double shortLatencyMillis;
final int newPending;
final double oldLimit;
final double newLimit;
double gradient = 0.0;
lock.lock();
try {
limit = (int) this.limit;
final double longLatencyMillis = longLatency.observe(nowNs, rttMillis);
final double shortLatencyMillis = shortLatency.observe(nowNs, rttMillis);
oldLimit = this.limit;
longLatencyMillis = longLatency.observe(nowNs, rttMillis);
shortLatencyMillis = shortLatency.observe(nowNs, rttMillis);

newPending = --pending;
if ((nowNs - lastSamplingNs) >= limitUpdateIntervalNs) {
limit = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis);
gradient = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis);
}
newLimit = limit;
} finally {
lock.unlock();
}

if (oldLimit != newLimit) {
observer.onLimitChange(longLatencyMillis, shortLatencyMillis, gradient, oldLimit, newLimit);
}
observer.onActiveRequestsDecr();
return limit - newPending;
return (int) (newLimit - newPending);
}

private int onDrop() {
int newPending;
double newLimit;

final double oldLimit;
final double newLimit;
lock.lock();
try {
oldLimit = limit;
newLimit = limit = max(min, limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss));
newPending = --pending;
} finally {
lock.unlock();
}

if (oldLimit != newLimit) {
// latencies and gradient were not involved in the calculations
observer.onLimitChange(0.0 /*longLatencyMillis*/, 0.0 /*shortLatencyMillis*/, -1.0 /*gradient*/, oldLimit, newLimit);
}
observer.onActiveRequestsDecr();
return (int) (newLimit - newPending);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,13 @@ public interface Observer {
* <p>
* The rate of reporting to the observer is based on the rate of change to this
* {@link CapacityLimiter} and the {@link #limitUpdateInterval(Duration) sampling interval}.
* @param longRtt The exponential moving average stat of request response times.
* @param shortRtt The sampled response time that triggered the limit change.
* @param longRtt The exponential moving average stat of request response times. A negative value means
* response times were not used in the calculation.
* @param shortRtt The sampled response time that triggered the limit change. A negative value means
* * response times were not used in the calculation.
* @param gradient The response time gradient (delta) between the long exposed stat (see. longRtt)
* and the sampled response time (see. shortRtt).
* and the sampled response time (see. shortRtt). A negative value means the gradient was not used in the
* calculation.
* @param oldLimit The previous limit of the limiter.
* @param newLimit The current limit of the limiter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.capacity.limiter.api;

import io.servicetalk.capacity.limiter.api.GradientCapacityLimiterBuilder.Observer;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -27,13 +29,20 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.AdditionalMatchers.not;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

class GradientCapacityLimiterTest {

private static final Exception SAD_EXCEPTION = new Exception("sad");

private static final Classification DEFAULT = () -> 0;

private final Observer observer = mock(Observer.class);
@Nullable
private CapacityLimiter capacityLimiter;
@Nullable
Expand All @@ -47,25 +56,34 @@ void setup() {
}
capacityLimiter = new GradientCapacityLimiterBuilder()
.timeSource(timeSource)
.limitUpdateInterval(Duration.ofMillis(50))
.observer(observer)
.build();
}

@Test
void canAcquireTicket() {
CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null);
assertThat(ticket, notNullValue());
verify(observer).onActiveRequestsIncr();

// release it and make sure we observed the release.
ticket.completed();
verify(observer).onActiveRequestsDecr();
verifyNoMoreInteractions(observer);
}

@Test
void capacityCanDepleteToTheMinLimit() {
for (;;) {
CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null);
currentTime += Duration.ofMillis(10).toNanos();
currentTime += Duration.ofMillis(50).toNanos();
int capacity = ticket.failed(SAD_EXCEPTION);
if (capacity == DEFAULT_MIN_LIMIT) {
break;
}
}

}

@Test
Expand All @@ -89,4 +107,20 @@ void canRejectTicketAcquisitions() {
lastTicket = capacityLimiter.tryAcquire(DEFAULT, null);
assertThat(lastTicket, notNullValue());
}

@Test
void observesLimitChanges() {
CapacityLimiter.Ticket ticket = capacityLimiter.tryAcquire(DEFAULT, null);
currentTime += Duration.ofMillis(10).toNanos();
ticket.failed(SAD_EXCEPTION);

// ticket failure will not use gradient
verify(observer).onLimitChange(eq(0.0), eq(0.0), eq(-1.0), eq(100.0), eq(50.0));

ticket = capacityLimiter.tryAcquire(DEFAULT, null);
currentTime += Duration.ofMillis(50).toNanos();
// Ticket success should adjust observation upward.
ticket.completed();
verify(observer).onLimitChange(not(eq(0.0)), not(eq(0.0)), not(eq(-1.0)), anyDouble(), anyDouble());
}
}

0 comments on commit c12779b

Please sign in to comment.