diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index 7e448c8e8fad..b23311a903c0 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -379,13 +379,13 @@ message MonitoringInfoSpecs { ] }]; - USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = { - urn: "beam:metric:user:per_worker_histogram_int64:v1", - type: "beam:metrics:per_worker_histogram_int64:v1", + USER_HISTOGRAM = 23 [(monitoring_info_spec) = { + urn: "beam:metric:user:histogram_int64:v1", + type: "beam:metrics:histogram_int64:v1", required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"], annotations: [{ key: "description", - value: "URN utilized to report per worker histogram metric." + value: "URN utilized to report histogram metric." }] }]; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java index f45dd154eb9e..5455ec21a82f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java @@ -43,19 +43,19 @@ public class DefaultMetricResults extends MetricResults { private final Iterable> distributions; private final Iterable> gauges; private final Iterable> stringSets; - private final Iterable> perWorkerHistograms; + private final Iterable> histograms; public DefaultMetricResults( Iterable> counters, Iterable> distributions, Iterable> gauges, Iterable> stringSets, - Iterable> perWorkerHistograms) { + Iterable> histograms) { this.counters = counters; this.distributions = distributions; this.gauges = gauges; this.stringSets = stringSets; - this.perWorkerHistograms = perWorkerHistograms; + this.histograms = histograms; } @Override @@ -68,7 +68,6 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { Iterables.filter( stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())), Iterables.filter( - perWorkerHistograms, - perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey()))); + histograms, histogram -> MetricFiltering.matches(filter, histogram.getKey()))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java index 7e876810c63a..e78d5900b8bb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java @@ -69,7 +69,7 @@ public static MetricUpdate create(MetricKey key, T update) { public abstract Iterable> stringSetUpdates(); /** All the histogram updates. */ - public abstract Iterable> perWorkerHistogramsUpdates(); + public abstract Iterable> histogramsUpdates(); /** Create a new {@link MetricUpdates} bundle. */ public static MetricUpdates create( @@ -77,13 +77,9 @@ public static MetricUpdates create( Iterable> distributionUpdates, Iterable> gaugeUpdates, Iterable> stringSetUpdates, - Iterable> perWorkerHistogramsUpdates) { + Iterable> histogramsUpdates) { return new AutoValue_MetricUpdates( - counterUpdates, - distributionUpdates, - gaugeUpdates, - stringSetUpdates, - perWorkerHistogramsUpdates); + counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates, histogramsUpdates); } /** Returns true if there are no updates in this MetricUpdates object. */ @@ -92,6 +88,6 @@ public boolean isEmpty() { && Iterables.isEmpty(distributionUpdates()) && Iterables.isEmpty(gaugeUpdates()) && Iterables.isEmpty(stringSetUpdates()) - && Iterables.isEmpty(perWorkerHistogramsUpdates()); + && Iterables.isEmpty(histogramsUpdates()); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 3faf36219c9c..bf2eac7b3e5d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.core.metrics; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE; +import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE; -import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter; @@ -94,9 +94,6 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { private MetricsMap stringSets = new MetricsMap<>(StringSetCell::new); - private MetricsMap, HistogramCell> perWorkerHistograms = - new MetricsMap<>(HistogramCell::new); - private MetricsMap, HistogramCell> histograms = new MetricsMap<>(HistogramCell::new); @@ -223,20 +220,8 @@ public StringSetCell getStringSet(MetricName metricName) { return stringSets.tryGet(metricName); } - /** - * Return the {@link Histogram} that should be used for implementing the given per-worker {@code - * metricName} in this container. - */ - @Override - public HistogramCell getPerWorkerHistogram( - MetricName metricName, HistogramData.BucketType bucketType) { - HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType)); - return val; - } - - public MetricsMap, HistogramCell> - getPerWorkerHistogram() { - return perWorkerHistograms; + public MetricsMap, HistogramCell> getHistogram() { + return histograms; } private > @@ -278,7 +263,7 @@ public MetricUpdates getUpdates() { extractUpdates(distributions), extractUpdates(gauges), extractUpdates(stringSets), - extractHistogramUpdates(perWorkerHistograms)); + extractHistogramUpdates(histograms)); } /** @return The MonitoringInfo metadata from the metric. */ @@ -400,8 +385,8 @@ public MetricUpdates getUpdates() { private @Nullable SimpleMonitoringInfoBuilder histogramToMonitoringMetadata(MetricKey metricKey) { return metricToMonitoringMetadata( metricKey, - MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE, - MonitoringInfoConstants.Urns.USER_PER_WORKER_HISTOGRAM); + MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE, + MonitoringInfoConstants.Urns.USER_HISTOGRAM); } /** @@ -453,7 +438,7 @@ public Iterable getMonitoringInfos() { } } - for (MetricUpdate metricUpdate : metricUpdates.perWorkerHistogramsUpdates()) { + for (MetricUpdate metricUpdate : metricUpdates.histogramsUpdates()) { MonitoringInfo mi = histogramUpdateToMonitoringInfo(metricUpdate); if (mi != null) { monitoringInfos.add(mi); @@ -502,7 +487,7 @@ public Map getMonitoringData(ShortIdMap shortIds) { } } }); - perWorkerHistograms.forEach( + histograms.forEach( (metricName, histogramCell) -> { if (histogramCell.getDirty().beforeCommit()) { String shortId = @@ -547,7 +532,7 @@ public void commitUpdates() { distributions.forEachValue(distribution -> distribution.getDirty().afterCommit()); gauges.forEachValue(gauge -> gauge.getDirty().afterCommit()); stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit()); - perWorkerHistograms.forEachValue( + histograms.forEachValue( histogram -> { histogram.getDirty().afterCommit(); }); @@ -586,7 +571,7 @@ public MetricUpdates getCumulative() { extractCumulatives(distributions), extractCumulatives(gauges), extractCumulatives(stringSets), - extractHistogramCumulatives(perWorkerHistograms)); + extractHistogramCumulatives(histograms)); } /** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */ @@ -623,10 +608,10 @@ private void updateForStringSetType(MonitoringInfo monitoringInfo) { stringSet.update(decodeStringSet(monitoringInfo.getPayload())); } - private void updateForPerWorkerHistogramInt64(MonitoringInfo monitoringInfo) { + private void updateForHistogramInt64(MonitoringInfo monitoringInfo) { MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo); HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17); - Histogram histogram = getPerWorkerHistogram(metricName, buckets); + Histogram histogram = getHistogram(metricName, buckets); HistogramData data = decodeInt64Histogram(monitoringInfo.getPayload()); histogram.update(data); } @@ -655,8 +640,8 @@ public void update(Iterable monitoringInfos) { updateForStringSetType(monitoringInfo); break; - case PER_WORKER_HISTOGRAM_TYPE: - updateForPerWorkerHistogramInt64(monitoringInfo); // use type, and not urn info + case HISTOGRAM_TYPE: + updateForHistogramInt64(monitoringInfo); // use type, and not urn info break; default: LOG.warn("Unsupported metric type {}", monitoringInfo.getType()); @@ -701,14 +686,14 @@ public boolean equals(@Nullable Object object) { && Objects.equals(distributions, metricsContainerImpl.distributions) && Objects.equals(gauges, metricsContainerImpl.gauges) && Objects.equals(stringSets, metricsContainerImpl.stringSets) - && Objects.equals(perWorkerHistograms, metricsContainerImpl.perWorkerHistograms); + && Objects.equals(histograms, metricsContainerImpl.histograms); } return false; } @Override public int hashCode() { - return Objects.hash(stepName, counters, distributions, gauges, stringSets, perWorkerHistograms); + return Objects.hash(stepName, counters, distributions, gauges, stringSets, histograms); } /** @@ -830,21 +815,6 @@ public static MetricsContainerImpl deltaContainer( deltaValueCell.incTopBucketCount( currValue.getTopBucketCount() - prevValue.getTopBucketCount()); } - for (Map.Entry, HistogramCell> cell : - curr.perWorkerHistograms.entries()) { - HistogramData.BucketType bt = cell.getKey().getValue(); - HistogramData prevValue = prev.perWorkerHistograms.get(cell.getKey()).getCumulative(); - HistogramData currValue = cell.getValue().getCumulative(); - HistogramCell deltaValueCell = deltaContainer.perWorkerHistograms.get(cell.getKey()); - deltaValueCell.incBottomBucketCount( - currValue.getBottomBucketCount() - prevValue.getBottomBucketCount()); - for (int i = 0; i < bt.getNumBuckets(); i++) { - Long bucketCountDelta = currValue.getCount(i) - prevValue.getCount(i); - deltaValueCell.incBucketCount(i, bucketCountDelta); - } - deltaValueCell.incTopBucketCount( - currValue.getTopBucketCount() - prevValue.getTopBucketCount()); - } for (Map.Entry cell : curr.stringSets.entries()) { // Simply take the most recent value for stringSets, no need to count deltas. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java index cb74b26ff0bd..a4d831dd0d0d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java @@ -138,7 +138,7 @@ public static MetricResults asMetricResults( Map> distributions = new HashMap<>(); Map> gauges = new HashMap<>(); Map> sets = new HashMap<>(); - Map> perWorkerHistograms = new HashMap<>(); + Map> histograms = new HashMap<>(); attemptedMetricsContainers.forEachMetricContainer( container -> { @@ -148,8 +148,7 @@ public static MetricResults asMetricResults( distributions, cumulative.distributionUpdates(), DistributionData::combine); mergeAttemptedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine); mergeAttemptedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine); - mergeAttemptedResults( - perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine); + mergeAttemptedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine); }); committedMetricsContainers.forEachMetricContainer( container -> { @@ -159,8 +158,7 @@ public static MetricResults asMetricResults( distributions, cumulative.distributionUpdates(), DistributionData::combine); mergeCommittedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine); mergeCommittedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine); - mergeCommittedResults( - perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine); + mergeCommittedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine); }); return new DefaultMetricResults( @@ -174,7 +172,7 @@ public static MetricResults asMetricResults( sets.values().stream() .map(result -> result.transform(StringSetData::extractResult)) .collect(toList()), - perWorkerHistograms.values().stream() + histograms.values().stream() .map(result -> result.transform(HistogramData::extractResult)) .collect(toList())); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java index a6168ea5a8fa..9707df451056 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java @@ -54,8 +54,7 @@ public static final class Urns { extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_DOUBLE); public static final String USER_SET_STRING = extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING); - public static final String USER_PER_WORKER_HISTOGRAM = - extractUrn(MonitoringInfoSpecs.Enum.USER_PER_WORKER_HISTOGRAM); + public static final String USER_HISTOGRAM = extractUrn(MonitoringInfoSpecs.Enum.USER_HISTOGRAM); public static final String SAMPLED_BYTE_SIZE = extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE); public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED); @@ -167,8 +166,7 @@ public static final class TypeUrns { public static final String BOTTOM_N_DOUBLE_TYPE = "beam:metrics:bottom_n_double:v1"; public static final String PROGRESS_TYPE = "beam:metrics:progress:v1"; public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1"; - public static final String PER_WORKER_HISTOGRAM_TYPE = - "beam:metrics:per_worker_histogram_int64:v1"; + public static final String HISTOGRAM_TYPE = "beam:metrics:histogram_int64:v1"; static { // Validate that compile time constants match the values stored in the protos. @@ -195,9 +193,7 @@ public static final class TypeUrns { BOTTOM_N_DOUBLE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOTTOM_N_DOUBLE_TYPE))); checkArgument(PROGRESS_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.PROGRESS_TYPE))); checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE))); - checkArgument( - PER_WORKER_HISTOGRAM_TYPE.equals( - getUrn(MonitoringInfoTypeUrns.Enum.PER_WORKER_HISTOGRAM))); + checkArgument(HISTOGRAM_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.HISTOGRAM))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java index b15ff242f12b..fc52db50dd89 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java @@ -162,12 +162,11 @@ public SimpleMonitoringInfoBuilder setStringSetValue(StringSetData value) { } /** - * Encodes the value and sets the type to {@link - * MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM_TYPE}. + * Encodes the value and sets the type to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM_TYPE}. */ public SimpleMonitoringInfoBuilder setInt64HistogramValue(HistogramData data) { this.builder.setPayload(encodeInt64Histogram(data)); - this.builder.setType(MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE); + this.builder.setType(MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE); return this; } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index f212b54e05f6..5b3d71f4873e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -372,7 +372,6 @@ public void testDeltaCounters() { HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5); MetricName hName = MetricName.named("namespace", "histogram"); MetricName stringSetName = MetricName.named("namespace", "stringset"); - MetricName pwhName = MetricName.named("namespace", "perWorkerHistogram"); MetricsContainerImpl prevContainer = new MetricsContainerImpl(null); prevContainer.getCounter(cName).inc(2L); @@ -384,10 +383,6 @@ public void testDeltaCounters() { prevContainer.getHistogram(hName, bucketType).update(3); prevContainer.getHistogram(hName, bucketType).update(20); - // Set PerWorkerBucketCounts to [0,1,1,0,0,0,0] - prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(1); - prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(3); - MetricsContainerImpl nextContainer = new MetricsContainerImpl(null); nextContainer.getCounter(cName).inc(9L); nextContainer.getGauge(gName).set(8L); @@ -406,10 +401,6 @@ public void testDeltaCounters() { nextContainer.getHistogram(hName, bucketType).update(20); nextContainer.getHistogram(hName, bucketType).update(20); - // Set PerWorkerBucketCounts to [1,0,0,0,0,0,1] - nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(-1); - nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(20); - MetricsContainerImpl deltaContainer = MetricsContainerImpl.deltaContainer(prevContainer, nextContainer); // Expect counter value: 7 = 9 - 2 @@ -435,20 +426,6 @@ public void testDeltaCounters() { } assertEquals( 2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount()); - - // Expect per worker bucket counts: [1,0,0,0,0,0,1] - assertEquals( - 1, - deltaContainer - .getPerWorkerHistogram(pwhName, bucketType) - .getCumulative() - .getBottomBucketCount()); - assertEquals( - 1, - deltaContainer - .getPerWorkerHistogram(pwhName, bucketType) - .getCumulative() - .getTopBucketCount()); } @Test diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java index 5e3319a0f0df..7162a1a879bc 100644 --- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java +++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java @@ -85,7 +85,7 @@ public Iterable> getStringSets() { } @Override - public Iterable> getPerWorkerHistograms() { + public Iterable> getHistograms() { return Collections.emptyList(); } } diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java index ed7b408926a3..328088415d31 100644 --- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java +++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java @@ -94,7 +94,7 @@ public void testWriteMetricsWithCommittedSupported() throws Exception { + "\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":" - + "\"ns1\"},\"step\":\"s3\"}],\"perWorkerHistograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd" + + "\"ns1\"},\"step\":\"s3\"}],\"histograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd" + "\"]},\"committed\":{\"stringSet\":[\"ab\"]},\"name\":{\"name\":\"n3\"," + "\"namespace\":\"ns1\"},\"step\":\"s3\"}]}"; assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size()); @@ -116,7 +116,7 @@ public void testWriteMetricsWithCommittedUnSupported() throws Exception { + "{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\"" + ",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":" - + "\"ns1\"},\"step\":\"s3\"}],\"perWorkerHistograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]}," + + "\"ns1\"},\"step\":\"s3\"}],\"histograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]}," + "\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}"; assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size()); assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0)); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java index 281f903f2c54..b24f7ddf4dda 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java @@ -93,7 +93,7 @@ public Iterable> getStringSets() { } @Override - public Iterable> getPerWorkerHistograms() { + public Iterable> getHistograms() { return Collections.emptyList(); } }; diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java index fdedcc0086be..562eae22e6d4 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java @@ -106,7 +106,7 @@ private static class QueryResults extends MetricQueryResults { private final Iterable> distributions; private final Iterable> gauges; private final Iterable> stringSets; - private final Iterable> perWorkerHistograms; + private final Iterable> histograms; private QueryResults( Iterable> counters, @@ -117,7 +117,7 @@ private QueryResults( this.distributions = distributions; this.gauges = gauges; this.stringSets = stringSets; - this.perWorkerHistograms = Collections.emptyList(); // not implemented + this.histograms = Collections.emptyList(); // not implemented } @Override @@ -141,8 +141,8 @@ public Iterable> getStringSets() { } @Override - public Iterable> getPerWorkerHistograms() { - return perWorkerHistograms; + public Iterable> getHistograms() { + return histograms; } } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java index 09e19219ed9b..75b31712b693 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java @@ -156,7 +156,7 @@ public Iterable> stringSetUpdates() { } @Override - public Iterable> perWorkerHistogramsUpdates() { + public Iterable> histogramsUpdates() { return Collections.emptyList(); // not implemented } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java index 5e2605b2dc70..609b482c33c4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java @@ -39,7 +39,7 @@ public abstract class MetricQueryResults { public abstract Iterable> getStringSets(); /** Return the metric results for the sets that matched the filter. */ - public abstract Iterable> getPerWorkerHistograms(); + public abstract Iterable> getHistograms(); static void printMetrics(String type, Iterable> metrics, StringBuilder sb) { List> metricsList = ImmutableList.copyOf(metrics);