Skip to content

Commit

Permalink
Add outstanding-request metrics per connected host
Browse files Browse the repository at this point in the history
  • Loading branch information
spkrka committed Jul 10, 2020
1 parent 60c0bb5 commit 329992a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
Expand Down Expand Up @@ -73,6 +75,8 @@ public class SemanticFolsomMetrics implements Metrics {
private final MetricId id;

private final Set<OutstandingRequestsGauge> gauges = new CopyOnWriteArraySet<>();
private final ConcurrentMap<String, HostMetrics> instanceMetrics = new ConcurrentHashMap<>();
private final MetricId outstandingRequestGaugeBaseId;

public SemanticFolsomMetrics(final SemanticMetricRegistry registry, final MetricId baseMetricId) {

Expand Down Expand Up @@ -142,12 +146,12 @@ protected Ratio getRatio() {
this.touchSuccesses = registry.meter(touchMetersId.tagged("result", "success"));
this.touchFailures = registry.meter(touchMetersId.tagged("result", "failure"));

final MetricId outstandingRequestGauge =
outstandingRequestGaugeBaseId =
id.tagged(
"what", "outstanding-requests",
"unit", "requests");
registry.register(
outstandingRequestGauge,
outstandingRequestGaugeBaseId.tagged("host", "all"),
(Gauge<Long>)
() ->
gauges.stream().mapToLong(OutstandingRequestsGauge::getOutstandingRequests).sum());
Expand Down Expand Up @@ -344,10 +348,47 @@ public RatioGauge getHitRatio() {
@Override
public void registerOutstandingRequestsGauge(final OutstandingRequestsGauge gauge) {
gauges.add(gauge);
final String hostName = gauge.getHostName();
instanceMetrics.computeIfAbsent(hostName, HostMetrics::new).add(gauge);
}

@Override
public void unregisterOutstandingRequestsGauge(OutstandingRequestsGauge gauge) {
public void unregisterOutstandingRequestsGauge(final OutstandingRequestsGauge gauge) {
final String hostName = gauge.getHostName();
gauges.remove(gauge);
instanceMetrics.computeIfAbsent(hostName, HostMetrics::new).remove(gauge);

// TODO: possibly clean up old hostnames from the map.
}

private class HostMetrics {
private final Set<OutstandingRequestsGauge> gauges = new CopyOnWriteArraySet<>();
private final MetricId metricId;
private boolean active = false;

private HostMetrics(final String hostname) {
metricId = outstandingRequestGaugeBaseId.tagged("host", hostname);
}

private synchronized void remove(final OutstandingRequestsGauge gauge) {
gauges.remove(gauge);

if (gauges.isEmpty() && active) {
active = false;
registry.remove(metricId);
}
}

private synchronized void add(final OutstandingRequestsGauge gauge) {
gauges.add(gauge);
if (!active) {
active = true;
registry.register(metricId, (Gauge<Long>) this::gaugeFunction);
}
}

private long gaugeFunction() {
return gauges.stream().mapToLong(OutstandingRequestsGauge::getOutstandingRequests).sum();
}
}
}
4 changes: 4 additions & 0 deletions folsom/src/main/java/com/spotify/folsom/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,9 @@ public interface Metrics {

interface OutstandingRequestsGauge {
int getOutstandingRequests();

default String getHostName() {
return "";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class DefaultRawMemcacheClient extends AbstractRawMemcacheClient {
? new EpollEventLoopGroup(0, THREAD_FACTORY)
: new NioEventLoopGroup(0, THREAD_FACTORY));
private final int pendingCounterLimit;
private final OutstandingRequestGauge outstandingRequestGauge = new OutstandingRequestGauge();

public static CompletionStage<RawMemcacheClient> connect(
final HostAndPort address,
Expand Down Expand Up @@ -210,7 +211,7 @@ private DefaultRawMemcacheClient(

GLOBAL_CONNECTION_COUNT.incrementAndGet();

metrics.registerOutstandingRequestsGauge(this::numPendingRequests);
metrics.registerOutstandingRequestsGauge(outstandingRequestGauge);

channel.pipeline().addLast("handler", new ConnectionHandler());
}
Expand Down Expand Up @@ -450,7 +451,7 @@ private void setDisconnected(String message) {
pendingCounter.set(pendingCounterLimit);
channel.close();
GLOBAL_CONNECTION_COUNT.decrementAndGet();
metrics.unregisterOutstandingRequestsGauge(this::numPendingRequests);
metrics.unregisterOutstandingRequestsGauge(outstandingRequestGauge);
notifyConnectionChange();
}
}
Expand All @@ -459,13 +460,21 @@ static int getGlobalConnectionCount() {
return GLOBAL_CONNECTION_COUNT.get();
}

private int numPendingRequests() {
final int counter = pendingCounter.get();
if (counter >= pendingCounterLimit) {
if (disconnectReason.get() != null) {
return 0; // Disconnected implies no pending requests
private class OutstandingRequestGauge implements Metrics.OutstandingRequestsGauge {
@Override
public int getOutstandingRequests() {
final int counter = pendingCounter.get();
if (counter >= pendingCounterLimit) {
if (disconnectReason.get() != null) {
return 0; // Disconnected implies no pending requests
}
}
return counter;
}

@Override
public String getHostName() {
return address.getHostText();
}
return counter;
}
}

0 comments on commit 329992a

Please sign in to comment.