From b387721eeac33b99b8649a8704393eff2331cd60 Mon Sep 17 00:00:00 2001 From: Naireen Date: Tue, 5 Nov 2024 00:40:47 +0000 Subject: [PATCH] add ability to encode and decode histogram data to portable runners --- runners/core-java/build.gradle | 2 + .../core/metrics/MonitoringInfoEncodings.java | 101 ++++++++++++++++++ .../metrics/MonitoringInfoEncodingsTest.java | 44 ++++++++ sdks/java/core/build.gradle | 1 + .../apache/beam/sdk/util/HistogramData.java | 74 +++++++++++++ sdks/java/harness/build.gradle | 2 + 6 files changed, 224 insertions(+) diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle index b477dde91212..0f55c10b97f2 100644 --- a/runners/core-java/build.gradle +++ b/runners/core-java/build.gradle @@ -42,6 +42,7 @@ dependencies { implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:job-management", configuration: "shadow") + implementation library.java.google_api_services_dataflow implementation library.java.vendored_guava_32_1_2_jre implementation library.java.joda_time implementation library.java.vendored_grpc_1_60_1 @@ -52,5 +53,6 @@ dependencies { testImplementation library.java.junit testImplementation library.java.mockito_core testImplementation library.java.slf4j_api + testImplementation(library.java.google_api_services_dataflow) testRuntimeOnly library.java.slf4j_simple } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java index 433e7f4fb20b..c818e422f5e5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java @@ -17,8 +17,19 @@ */ package org.apache.beam.runners.core.metrics; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.dataflow.model.Base2Exponent; +import com.google.api.services.dataflow.model.BucketOptions; +import com.google.api.services.dataflow.model.DataflowHistogramValue; +import com.google.api.services.dataflow.model.Linear; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Set; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DoubleCoder; @@ -26,10 +37,14 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; +// TODO(naireenhussain): Refactor out DataflowHistogramValue to be runner agnostic, and rename to +// remove Dataflow reference. + /** A set of functions used to encode and decode common monitoring info types. */ public class MonitoringInfoEncodings { private static final Coder VARINT_CODER = VarLongCoder.of(); @@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString payload) { throw new RuntimeException(e); } } + + /** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ + public static ByteString encodeInt64Histogram(HistogramData inputHistogram) { + try { + int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets(); + + DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue(); + + if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) { + HistogramData.LinearBuckets buckets = + (HistogramData.LinearBuckets) inputHistogram.getBucketType(); + Linear linear = new Linear(); + linear.setNumberOfBuckets(numberOfBuckets); + linear.setWidth(buckets.getWidth()); + linear.setStart(buckets.getStart()); + outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear)); + } else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) { + HistogramData.ExponentialBuckets buckets = + (HistogramData.ExponentialBuckets) inputHistogram.getBucketType(); + Base2Exponent base2Exp = new Base2Exponent(); + base2Exp.setNumberOfBuckets(numberOfBuckets); + base2Exp.setScale(buckets.getScale()); + outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp)); + } else { + throw new RuntimeException("Unable to parse histogram, bucket is not recognized"); + } + + outputHistogram2.setCount(inputHistogram.getTotalCount()); + + List bucketCounts = new ArrayList<>(); + + Arrays.stream(inputHistogram.getBucketCount()) + .forEach( + val -> { + bucketCounts.add(val); + }); + + outputHistogram2.setBucketCounts(bucketCounts); + + ObjectMapper objectMapper = new ObjectMapper(); + String jsonString = objectMapper.writeValueAsString(outputHistogram2); + + return ByteString.copyFromUtf8(jsonString); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ + public static HistogramData decodeInt64Histogram(ByteString payload) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards + DataflowHistogramValue newHist = new DataflowHistogramValue(); + newHist.setCount(jsonNode.get("count").asLong()); + + List bucketCounts = new ArrayList<>(); + Iterator itr = jsonNode.get("bucketCounts").iterator(); + while (itr.hasNext()) { + Long item = itr.next().asLong(); + bucketCounts.add(item); + } + newHist.setBucketCounts(bucketCounts); + + if (jsonNode.get("bucketOptions").has("linear")) { + Linear linear = new Linear(); + JsonNode linearNode = jsonNode.get("bucketOptions").get("linear"); + linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt()); + linear.setWidth(linearNode.get("width").asDouble()); + linear.setStart(linearNode.get("start").asDouble()); + newHist.setBucketOptions(new BucketOptions().setLinear(linear)); + } else if (jsonNode.get("bucketOptions").has("exponential")) { + Base2Exponent base2Exp = new Base2Exponent(); + JsonNode expNode = jsonNode.get("bucketOptions").get("exponential"); + base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt()); + base2Exp.setScale(expNode.get("scale").asInt()); + newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp)); + } else { + throw new RuntimeException("Unable to parse histogram, bucket is not recognized"); + } + return new HistogramData(newHist); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java index 8a43eef5883d..a4d70e9eddcd 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java @@ -21,26 +21,38 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; import static org.junit.Assert.assertEquals; import java.util.Collections; +import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Tests for {@link MonitoringInfoEncodings}. */ @RunWith(JUnit4.class) public class MonitoringInfoEncodingsTest { + @Rule + public ExpectedLogs monitoringInfoCodingsExpectedLogs = + ExpectedLogs.none(MonitoringInfoEncodings.class); + + @Rule public ExpectedException thrown = ExpectedException.none(); + @Test public void testInt64DistributionEncoding() { DistributionData data = DistributionData.create(1L, 2L, 3L, 4L); @@ -105,4 +117,36 @@ public void testDoubleCounterEncoding() { assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 0, 0, 0}), payload); assertEquals(1.0, decodeDoubleCounter(payload), 0.001); } + + @Test + public void testHistgramInt64EncodingLinearHist() { + HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5); + + HistogramData inputHistogram = new HistogramData(buckets); + inputHistogram.record(5, 10, 15, 20); + ByteString payload = encodeInt64Histogram(inputHistogram); + + assertEquals(inputHistogram, decodeInt64Histogram(payload)); + } + + @Test + public void testHistgramInt64EncodingExpHist() { + HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10); + HistogramData inputHistogram = new HistogramData(buckets); + inputHistogram.record(2, 4, 8, 16, 32); + ByteString payload = encodeInt64Histogram(inputHistogram); + assertEquals(inputHistogram, decodeInt64Histogram(payload)); + } + + @Test + public void testHistgramInt64EncodingUnsupportedBucket() { + thrown.expect(Exception.class); + thrown.expectMessage("Unable to parse histogram, bucket is not recognized"); + + HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of(); + + HistogramData inputHistogram = new HistogramData(buckets); + inputHistogram.record(2, 4, 8, 16, 32); + encodeInt64Histogram(inputHistogram); + } } diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index a8dfbf42f970..7423cb7c6b8e 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -75,6 +75,7 @@ dependencies { permitUnusedDeclared library.java.antlr permitUsedUndeclared library.java.antlr_runtime // Required to load constants from the model, e.g. max timestamp for global window + provided library.java.google_api_services_dataflow shadow project(path: ":model:pipeline", configuration: "shadow") shadow project(path: ":model:fn-execution", configuration: "shadow") shadow project(path: ":model:job-management", configuration: "shadow") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java index 65ccda06be65..516425108e68 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java @@ -74,6 +74,42 @@ public HistogramData(BucketType bucketType) { this.sumOfSquaredDeviations = 0; } + /** + * Create a histogram from DataflowHistogramValue proto. + * + * @param histogramProto DataflowHistogramValue proto used to populate stats for the histogram. + */ + public HistogramData( + com.google.api.services.dataflow.model.DataflowHistogramValue histogramProto) { + int numBuckets; + if (histogramProto.getBucketOptions().getLinear() != null) { + double start = histogramProto.getBucketOptions().getLinear().getStart(); + double width = histogramProto.getBucketOptions().getLinear().getWidth(); + numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets(); + this.bucketType = LinearBuckets.of(start, width, numBuckets); + this.buckets = new long[bucketType.getNumBuckets()]; + + int idx = 0; + for (long val : histogramProto.getBucketCounts()) { + this.buckets[idx] = val; + this.numBoundedBucketRecords += val; + idx++; + } + } else { + // Assume it's a exponential histogram if its not linear + int scale = histogramProto.getBucketOptions().getExponential().getScale(); + numBuckets = histogramProto.getBucketOptions().getExponential().getNumberOfBuckets(); + this.bucketType = ExponentialBuckets.of(scale, numBuckets); + this.buckets = new long[bucketType.getNumBuckets()]; + int idx = 0; + for (long val : histogramProto.getBucketCounts()) { + this.buckets[idx] = val; + this.numBoundedBucketRecords += val; + idx++; + } + } + } + public BucketType getBucketType() { return this.bucketType; } @@ -293,6 +329,10 @@ public synchronized long getTopBucketCount() { return numTopRecords; } + public synchronized long[] getBucketCount() { + return buckets; + } + public synchronized double getTopBucketMean() { return numTopRecords == 0 ? 0 : topRecordsSum / numTopRecords; } @@ -573,6 +613,40 @@ public double getRangeTo() { // Note: equals() and hashCode() are implemented by the AutoValue. } + // Used for testing unsupported Bucket formats + @AutoValue + public abstract static class UnsupportedBuckets implements BucketType { + + public static UnsupportedBuckets of() { + return new AutoValue_HistogramData_UnsupportedBuckets(0); + } + + @Override + public int getBucketIndex(double value) { + return 0; + } + + @Override + public double getBucketSize(int index) { + return 0; + } + + @Override + public double getAccumulatedBucketSize(int index) { + return 0; + } + + @Override + public double getRangeFrom() { + return 0; + } + + @Override + public double getRangeTo() { + return 0; + } + } + @Override public synchronized boolean equals(@Nullable Object object) { if (object instanceof HistogramData) { diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index 2de578cb32cf..ed3034c08612 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -30,6 +30,7 @@ dependencies { provided project(path: ":model:pipeline", configuration: "shadow") provided project(path: ":sdks:java:core", configuration: "shadow") provided project(path: ":sdks:java:transform-service:launcher") + provided library.java.google_api_services_dataflow provided library.java.avro provided library.java.jackson_databind provided library.java.joda_time @@ -79,4 +80,5 @@ dependencies { shadowTest project(path: ":sdks:java:core", configuration: "shadowTest") shadowTestRuntimeClasspath library.java.slf4j_jdk14 permitUnusedDeclared library.java.avro + permitUnusedDeclared library.java.google_api_services_dataflow }