Skip to content

Commit

Permalink
update naming
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Jan 10, 2025
1 parent 3307bfc commit 932be6d
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,13 @@ message MonitoringInfoSpecs {
]
}];

USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
USER_HISTOGRAM = 23 [(monitoring_info_spec) = {
urn: "beam:metric:user:histogram_int64:v1",
type: "beam:metrics:histogram_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report per worker histogram metric."
value: "URN utilized to report histogram metric."
}]
}];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;
private final Iterable<MetricResult<HistogramData>> histograms;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
Iterable<MetricResult<HistogramData>> histograms) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
this.perWorkerHistograms = perWorkerHistograms;
this.histograms = histograms;
}

@Override
Expand All @@ -68,7 +68,6 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
Iterables.filter(
perWorkerHistograms,
perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey())));
histograms, histogram -> MetricFiltering.matches(filter, histogram.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,17 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
public abstract Iterable<MetricUpdate<StringSetData>> stringSetUpdates();

/** All the histogram updates. */
public abstract Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates();
public abstract Iterable<MetricUpdate<HistogramData>> histogramsUpdates();

/** Create a new {@link MetricUpdates} bundle. */
public static MetricUpdates create(
Iterable<MetricUpdate<Long>> counterUpdates,
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
Iterable<MetricUpdate<HistogramData>> histogramsUpdates) {
return new AutoValue_MetricUpdates(
counterUpdates,
distributionUpdates,
gaugeUpdates,
stringSetUpdates,
perWorkerHistogramsUpdates);
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates, histogramsUpdates);
}

/** Returns true if there are no updates in this MetricUpdates object. */
Expand All @@ -92,6 +88,6 @@ public boolean isEmpty() {
&& Iterables.isEmpty(distributionUpdates())
&& Iterables.isEmpty(gaugeUpdates())
&& Iterables.isEmpty(stringSetUpdates())
&& Iterables.isEmpty(perWorkerHistogramsUpdates());
&& Iterables.isEmpty(histogramsUpdates());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.runners.core.metrics;

import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
Expand Down Expand Up @@ -94,9 +94,6 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {

private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);

private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> perWorkerHistograms =
new MetricsMap<>(HistogramCell::new);

private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> histograms =
new MetricsMap<>(HistogramCell::new);

Expand Down Expand Up @@ -223,20 +220,8 @@ public StringSetCell getStringSet(MetricName metricName) {
return stringSets.tryGet(metricName);
}

/**
* Return the {@link Histogram} that should be used for implementing the given per-worker {@code
* metricName} in this container.
*/
@Override
public HistogramCell getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType));
return val;
}

public MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell>
getPerWorkerHistogram() {
return perWorkerHistograms;
public MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> getHistogram() {
return histograms;
}

private <UpdateT, CellT extends MetricCell<UpdateT>>
Expand Down Expand Up @@ -278,7 +263,7 @@ public MetricUpdates getUpdates() {
extractUpdates(distributions),
extractUpdates(gauges),
extractUpdates(stringSets),
extractHistogramUpdates(perWorkerHistograms));
extractHistogramUpdates(histograms));
}

/** @return The MonitoringInfo metadata from the metric. */
Expand Down Expand Up @@ -400,8 +385,8 @@ public MetricUpdates getUpdates() {
private @Nullable SimpleMonitoringInfoBuilder histogramToMonitoringMetadata(MetricKey metricKey) {
return metricToMonitoringMetadata(
metricKey,
MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE,
MonitoringInfoConstants.Urns.USER_PER_WORKER_HISTOGRAM);
MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE,
MonitoringInfoConstants.Urns.USER_HISTOGRAM);
}

/**
Expand Down Expand Up @@ -453,7 +438,7 @@ public Iterable<MonitoringInfo> getMonitoringInfos() {
}
}

for (MetricUpdate<HistogramData> metricUpdate : metricUpdates.perWorkerHistogramsUpdates()) {
for (MetricUpdate<HistogramData> metricUpdate : metricUpdates.histogramsUpdates()) {
MonitoringInfo mi = histogramUpdateToMonitoringInfo(metricUpdate);
if (mi != null) {
monitoringInfos.add(mi);
Expand Down Expand Up @@ -502,7 +487,7 @@ public Map<String, ByteString> getMonitoringData(ShortIdMap shortIds) {
}
}
});
perWorkerHistograms.forEach(
histograms.forEach(
(metricName, histogramCell) -> {
if (histogramCell.getDirty().beforeCommit()) {
String shortId =
Expand Down Expand Up @@ -547,7 +532,7 @@ public void commitUpdates() {
distributions.forEachValue(distribution -> distribution.getDirty().afterCommit());
gauges.forEachValue(gauge -> gauge.getDirty().afterCommit());
stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit());
perWorkerHistograms.forEachValue(
histograms.forEachValue(
histogram -> {
histogram.getDirty().afterCommit();
});
Expand Down Expand Up @@ -586,7 +571,7 @@ public MetricUpdates getCumulative() {
extractCumulatives(distributions),
extractCumulatives(gauges),
extractCumulatives(stringSets),
extractHistogramCumulatives(perWorkerHistograms));
extractHistogramCumulatives(histograms));
}

/** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */
Expand Down Expand Up @@ -623,10 +608,10 @@ private void updateForStringSetType(MonitoringInfo monitoringInfo) {
stringSet.update(decodeStringSet(monitoringInfo.getPayload()));
}

private void updateForPerWorkerHistogramInt64(MonitoringInfo monitoringInfo) {
private void updateForHistogramInt64(MonitoringInfo monitoringInfo) {
MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);
Histogram histogram = getPerWorkerHistogram(metricName, buckets);
Histogram histogram = getHistogram(metricName, buckets);
HistogramData data = decodeInt64Histogram(monitoringInfo.getPayload());
histogram.update(data);
}
Expand Down Expand Up @@ -655,8 +640,8 @@ public void update(Iterable<MonitoringInfo> monitoringInfos) {
updateForStringSetType(monitoringInfo);
break;

case PER_WORKER_HISTOGRAM_TYPE:
updateForPerWorkerHistogramInt64(monitoringInfo); // use type, and not urn info
case HISTOGRAM_TYPE:
updateForHistogramInt64(monitoringInfo); // use type, and not urn info
break;
default:
LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
Expand Down Expand Up @@ -701,14 +686,14 @@ public boolean equals(@Nullable Object object) {
&& Objects.equals(distributions, metricsContainerImpl.distributions)
&& Objects.equals(gauges, metricsContainerImpl.gauges)
&& Objects.equals(stringSets, metricsContainerImpl.stringSets)
&& Objects.equals(perWorkerHistograms, metricsContainerImpl.perWorkerHistograms);
&& Objects.equals(histograms, metricsContainerImpl.histograms);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(stepName, counters, distributions, gauges, stringSets, perWorkerHistograms);
return Objects.hash(stepName, counters, distributions, gauges, stringSets, histograms);
}

/**
Expand Down Expand Up @@ -830,21 +815,6 @@ public static MetricsContainerImpl deltaContainer(
deltaValueCell.incTopBucketCount(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}
for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
curr.perWorkerHistograms.entries()) {
HistogramData.BucketType bt = cell.getKey().getValue();
HistogramData prevValue = prev.perWorkerHistograms.get(cell.getKey()).getCumulative();
HistogramData currValue = cell.getValue().getCumulative();
HistogramCell deltaValueCell = deltaContainer.perWorkerHistograms.get(cell.getKey());
deltaValueCell.incBottomBucketCount(
currValue.getBottomBucketCount() - prevValue.getBottomBucketCount());
for (int i = 0; i < bt.getNumBuckets(); i++) {
Long bucketCountDelta = currValue.getCount(i) - prevValue.getCount(i);
deltaValueCell.incBucketCount(i, bucketCountDelta);
}
deltaValueCell.incTopBucketCount(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}

for (Map.Entry<MetricName, StringSetCell> cell : curr.stringSets.entries()) {
// Simply take the most recent value for stringSets, no need to count deltas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static MetricResults asMetricResults(
Map<MetricKey, MetricResult<DistributionData>> distributions = new HashMap<>();
Map<MetricKey, MetricResult<GaugeData>> gauges = new HashMap<>();
Map<MetricKey, MetricResult<StringSetData>> sets = new HashMap<>();
Map<MetricKey, MetricResult<HistogramData>> perWorkerHistograms = new HashMap<>();
Map<MetricKey, MetricResult<HistogramData>> histograms = new HashMap<>();

attemptedMetricsContainers.forEachMetricContainer(
container -> {
Expand All @@ -148,8 +148,7 @@ public static MetricResults asMetricResults(
distributions, cumulative.distributionUpdates(), DistributionData::combine);
mergeAttemptedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine);
mergeAttemptedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
mergeAttemptedResults(
perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine);
mergeAttemptedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine);
});
committedMetricsContainers.forEachMetricContainer(
container -> {
Expand All @@ -159,8 +158,7 @@ public static MetricResults asMetricResults(
distributions, cumulative.distributionUpdates(), DistributionData::combine);
mergeCommittedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine);
mergeCommittedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
mergeCommittedResults(
perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine);
mergeCommittedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine);
});

return new DefaultMetricResults(
Expand All @@ -174,7 +172,7 @@ public static MetricResults asMetricResults(
sets.values().stream()
.map(result -> result.transform(StringSetData::extractResult))
.collect(toList()),
perWorkerHistograms.values().stream()
histograms.values().stream()
.map(result -> result.transform(HistogramData::extractResult))
.collect(toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public static final class Urns {
extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_DOUBLE);
public static final String USER_SET_STRING =
extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING);
public static final String USER_PER_WORKER_HISTOGRAM =
extractUrn(MonitoringInfoSpecs.Enum.USER_PER_WORKER_HISTOGRAM);
public static final String USER_HISTOGRAM = extractUrn(MonitoringInfoSpecs.Enum.USER_HISTOGRAM);
public static final String SAMPLED_BYTE_SIZE =
extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED);
Expand Down Expand Up @@ -167,8 +166,7 @@ public static final class TypeUrns {
public static final String BOTTOM_N_DOUBLE_TYPE = "beam:metrics:bottom_n_double:v1";
public static final String PROGRESS_TYPE = "beam:metrics:progress:v1";
public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1";
public static final String PER_WORKER_HISTOGRAM_TYPE =
"beam:metrics:per_worker_histogram_int64:v1";
public static final String HISTOGRAM_TYPE = "beam:metrics:histogram_int64:v1";

static {
// Validate that compile time constants match the values stored in the protos.
Expand All @@ -195,9 +193,7 @@ public static final class TypeUrns {
BOTTOM_N_DOUBLE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOTTOM_N_DOUBLE_TYPE)));
checkArgument(PROGRESS_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.PROGRESS_TYPE)));
checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE)));
checkArgument(
PER_WORKER_HISTOGRAM_TYPE.equals(
getUrn(MonitoringInfoTypeUrns.Enum.PER_WORKER_HISTOGRAM)));
checkArgument(HISTOGRAM_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.HISTOGRAM)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,11 @@ public SimpleMonitoringInfoBuilder setStringSetValue(StringSetData value) {
}

/**
* Encodes the value and sets the type to {@link
* MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM_TYPE}.
* Encodes the value and sets the type to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM_TYPE}.
*/
public SimpleMonitoringInfoBuilder setInt64HistogramValue(HistogramData data) {
this.builder.setPayload(encodeInt64Histogram(data));
this.builder.setType(MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE);
this.builder.setType(MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ public void testDeltaCounters() {
HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5);
MetricName hName = MetricName.named("namespace", "histogram");
MetricName stringSetName = MetricName.named("namespace", "stringset");
MetricName pwhName = MetricName.named("namespace", "perWorkerHistogram");

MetricsContainerImpl prevContainer = new MetricsContainerImpl(null);
prevContainer.getCounter(cName).inc(2L);
Expand All @@ -384,10 +383,6 @@ public void testDeltaCounters() {
prevContainer.getHistogram(hName, bucketType).update(3);
prevContainer.getHistogram(hName, bucketType).update(20);

// Set PerWorkerBucketCounts to [0,1,1,0,0,0,0]
prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(1);
prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(3);

MetricsContainerImpl nextContainer = new MetricsContainerImpl(null);
nextContainer.getCounter(cName).inc(9L);
nextContainer.getGauge(gName).set(8L);
Expand All @@ -406,10 +401,6 @@ public void testDeltaCounters() {
nextContainer.getHistogram(hName, bucketType).update(20);
nextContainer.getHistogram(hName, bucketType).update(20);

// Set PerWorkerBucketCounts to [1,0,0,0,0,0,1]
nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(-1);
nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(20);

MetricsContainerImpl deltaContainer =
MetricsContainerImpl.deltaContainer(prevContainer, nextContainer);
// Expect counter value: 7 = 9 - 2
Expand All @@ -435,20 +426,6 @@ public void testDeltaCounters() {
}
assertEquals(
2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount());

// Expect per worker bucket counts: [1,0,0,0,0,0,1]
assertEquals(
1,
deltaContainer
.getPerWorkerHistogram(pwhName, bucketType)
.getCumulative()
.getBottomBucketCount());
assertEquals(
1,
deltaContainer
.getPerWorkerHistogram(pwhName, bucketType)
.getCumulative()
.getTopBucketCount());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Iterable<MetricResult<StringSetResult>> getStringSets() {
}

@Override
public Iterable<MetricResult<HistogramData>> getPerWorkerHistograms() {
public Iterable<MetricResult<HistogramData>> getHistograms() {
return Collections.emptyList();
}
}
Loading

0 comments on commit 932be6d

Please sign in to comment.