From 35732ce8083c6cdce5342edf7f478b1e18216826 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 9 Jan 2025 23:48:29 -0500 Subject: [PATCH] Support use staged harness for Dataflow runner v2 job (#33508) --- sdks/java/container/boot.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index c23e50dcf1b0..7fd87b3524e2 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -134,21 +134,31 @@ func main() { } const jarsDir = "/opt/apache/beam/jars" + const javaHarnessJar = "beam-sdks-java-harness.jar" cp := []string{ filepath.Join(jarsDir, "slf4j-api.jar"), filepath.Join(jarsDir, "slf4j-jdk14.jar"), filepath.Join(jarsDir, "jcl-over-slf4j.jar"), filepath.Join(jarsDir, "log4j-over-slf4j.jar"), filepath.Join(jarsDir, "log4j-to-slf4j.jar"), - filepath.Join(jarsDir, "beam-sdks-java-harness.jar"), + filepath.Join(jarsDir, javaHarnessJar), } var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar") for _, a := range artifacts { name, _ := artifact.MustExtractFilePayload(a) if hasWorkerExperiment { - if strings.HasPrefix(name, "beam-runners-google-cloud-dataflow-java-fn-api-worker") { - continue + if strings.HasPrefix(name, "beam-sdks-java-harness") { + // Remove system "beam-sdks-java-harness.jar". User-provided jar will be + // added to classpath as a normal user jar further below. + for i, cl := range cp { + if !strings.HasSuffix(cl, javaHarnessJar) { + continue + } + logger.Printf(ctx, "Using staged java harness: %v", name) + cp = append(cp[:i], cp[i+1:]...) + break + } } if name == "dataflow-worker.jar" { continue