From 63d91f579d575674be78bf03e988821d73dba673 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Mon, 30 Sep 2024 09:34:28 -0400 Subject: [PATCH 1/2] Scio execution graph --- .../scala/com/spotify/scio/ScioContext.scala | 45 ++++++++++++------- .../com/spotify/scio/graph/StepInfo.scala | 39 ++++++++++++++++ .../scala/com/spotify/scio/io/TextIO.scala | 3 +- .../scio/testing/TestDataManager.scala | 3 +- .../syntax/SCollectionSafeSyntax.scala | 2 +- .../scio/values/PCollectionWrapper.scala | 3 ++ .../com/spotify/scio/values/SCollection.scala | 8 +++- .../scio/values/SCollectionOutput.scala | 19 ++++++++ 8 files changed, 100 insertions(+), 22 deletions(-) create mode 100644 scio-core/src/main/scala/com/spotify/scio/graph/StepInfo.scala create mode 100644 scio-core/src/main/scala/com/spotify/scio/values/SCollectionOutput.scala diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index ce3e169b6a..b09d8e0fbb 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -22,6 +22,7 @@ import java.io.File import java.net.URI import java.nio.file.Files import com.spotify.scio.coders.{Coder, CoderMaterializer, KVCoder} +import com.spotify.scio.graph.{CustomInput, Parallelize, StepInfo, TransformStep, UnionAll} import com.spotify.scio.io._ import com.spotify.scio.metrics.Metrics import com.spotify.scio.options.ScioOptions @@ -524,8 +525,8 @@ class ScioContext private[scio] ( private var _onClose: Unit => Unit = identity /** Wrap a [[org.apache.beam.sdk.values.PCollection PCollection]]. */ - def wrap[T](p: PCollection[T]): SCollection[T] = - new SCollectionImpl[T](p, this) + def wrap[T](p: PCollection[T], step: StepInfo): SCollection[T] = + new SCollectionImpl[T](p, this, step) /** Add callbacks calls when the context is closed. */ private[scio] def onClose(f: Unit => Unit): Unit = @@ -687,25 +688,33 @@ class ScioContext private[scio] ( private[scio] def applyTransform[U]( name: Option[String], - root: PTransform[_ >: PBegin, PCollection[U]] + root: PTransform[_ >: PBegin, PCollection[U]], + step: StepInfo ): SCollection[U] = - wrap(applyInternal(name, root)) + wrap(applyInternal(name, root), step) private[scio] def applyTransform[U]( - root: PTransform[_ >: PBegin, PCollection[U]] + root: PTransform[_ >: PBegin, PCollection[U]], + step: StepInfo ): SCollection[U] = - applyTransform(None, root) + applyTransform(None, root, step) private[scio] def applyTransform[U]( name: String, - root: PTransform[_ >: PBegin, PCollection[U]] + root: PTransform[_ >: PBegin, PCollection[U]], + step: StepInfo ): SCollection[U] = - applyTransform(Option(name), root) + applyTransform(Option(name), root, step) def transform[U](f: ScioContext => SCollection[U]): SCollection[U] = transform(this.tfName)(f) - def transform[U](name: String)(f: ScioContext => SCollection[U]): SCollection[U] = - wrap(transform_(name)(f(_).internal)) + def transform[U](name: String)(f: ScioContext => SCollection[U]): SCollection[U] = { + val transformed = transform_(name)(sc => SCollectionOutput(f(sc))) + wrap( + transformed.scioCollection.internal, + TransformStep(name, transformed.scioCollection.step) + ) + } private[scio] def transform_[U <: POutput](f: ScioContext => U): U = transform_(tfName)(f) @@ -760,7 +769,7 @@ class ScioContext private[scio] ( if (this.isTest) { TestDataManager.getInput(testId.get)(CustomIO[T](name)).toSCollection(this) } else { - applyTransform(name, transform) + applyTransform(name, transform, CustomInput(name)) } } @@ -788,13 +797,15 @@ class ScioContext private[scio] ( // `T: Coder` context bound is required since `scs` might be empty. def unionAll[T: Coder](scs: => Iterable[SCollection[T]]): SCollection[T] = { val tfName = this.tfName // evaluate eagerly to avoid overriding `scs` names - scs match { + scs.toList match { case Nil => empty() case contents => + val sources = contents.map(_.step) wrap( PCollectionList .of(contents.map(_.internal).asJava) - .apply(tfName, Flatten.pCollections()) + .apply(tfName, Flatten.pCollections()), + UnionAll(tfName, sources) ) } } @@ -809,7 +820,7 @@ class ScioContext private[scio] ( def parallelize[T: Coder](elems: Iterable[T]): SCollection[T] = requireNotClosed { val coder = CoderMaterializer.beam(this, Coder[T]) - this.applyTransform(Create.of(elems.asJava).withCoder(coder)) + this.applyTransform(Create.of(elems.asJava).withCoder(coder), Parallelize) } /** @@ -822,7 +833,7 @@ class ScioContext private[scio] ( requireNotClosed { val coder = CoderMaterializer.beam(this, KVCoder(koder, voder)) this - .applyTransform(Create.of(elems.asJava).withCoder(coder)) + .applyTransform(Create.of(elems.asJava).withCoder(coder), Parallelize) .map(kv => (kv.getKey, kv.getValue)) } @@ -834,7 +845,7 @@ class ScioContext private[scio] ( requireNotClosed { val coder = CoderMaterializer.beam(this, Coder[T]) val v = elems.map(t => TimestampedValue.of(t._1, t._2)) - this.applyTransform(Create.timestamped(v.asJava).withCoder(coder)) + this.applyTransform(Create.timestamped(v.asJava).withCoder(coder), Parallelize) } /** @@ -848,7 +859,7 @@ class ScioContext private[scio] ( requireNotClosed { val coder = CoderMaterializer.beam(this, Coder[T]) val v = elems.zip(timestamps).map(t => TimestampedValue.of(t._1, t._2)) - this.applyTransform(Create.timestamped(v.asJava).withCoder(coder)) + this.applyTransform(Create.timestamped(v.asJava).withCoder(coder), Parallelize) } // ======================================================================= diff --git a/scio-core/src/main/scala/com/spotify/scio/graph/StepInfo.scala b/scio-core/src/main/scala/com/spotify/scio/graph/StepInfo.scala new file mode 100644 index 0000000000..d637a3c6b4 --- /dev/null +++ b/scio-core/src/main/scala/com/spotify/scio/graph/StepInfo.scala @@ -0,0 +1,39 @@ +package com.spotify.scio.graph + +private[scio] trait StepInfo { + val name: String + val sources: List[StepInfo] +} + +private[scio] case class TransformStep(name: String, sources: List[StepInfo]) extends StepInfo + +private[scio] object TransformStep { + def apply(name: String, source: StepInfo): TransformStep = TransformStep(name, List(source)) +} + +// preserve link to an original PTransform? +private[scio] case class CustomInput(name: String) extends StepInfo { + val sources: List[StepInfo] = List.empty +} + +private[scio] case class UnionAll(name: String, sources: List[StepInfo]) extends StepInfo + +private[scio] object Parallelize extends StepInfo { + override val name: String = "parallelize" + override val sources: List[StepInfo] = List.empty +} + +private[scio] case class ReadTextIO(filePattern: String) extends StepInfo { + override val name: String = null + override val sources: List[StepInfo] = List() +} + +private[scio] case class TestInput(kind: String) extends StepInfo { + override val name: String = kind + override val sources: List[StepInfo] = List() +} + +private[scio] case class FlatMap(source: StepInfo) extends StepInfo { + val name: String = null + override val sources: List[StepInfo] = List(source) +} diff --git a/scio-core/src/main/scala/com/spotify/scio/io/TextIO.scala b/scio-core/src/main/scala/com/spotify/scio/io/TextIO.scala index 1429ccb430..3005e93c2b 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/TextIO.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/TextIO.scala @@ -22,6 +22,7 @@ import java.nio.channels.Channels import java.util.Collections import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.graph.ReadTextIO import com.spotify.scio.util.ScioUtil import com.spotify.scio.util.FilenamePolicySupplier import com.spotify.scio.values.SCollection @@ -51,7 +52,7 @@ final case class TextIO(path: String) extends ScioIO[String] { .withCompression(params.compression) .withEmptyMatchTreatment(params.emptyMatchTreatment) - sc.applyTransform(t) + sc.applyTransform(t, ReadTextIO(filePattern)) .setCoder(coder) } diff --git a/scio-core/src/main/scala/com/spotify/scio/testing/TestDataManager.scala b/scio-core/src/main/scala/com/spotify/scio/testing/TestDataManager.scala index b4504873a3..90e4280f75 100644 --- a/scio-core/src/main/scala/com/spotify/scio/testing/TestDataManager.scala +++ b/scio-core/src/main/scala/com/spotify/scio/testing/TestDataManager.scala @@ -18,6 +18,7 @@ package com.spotify.scio.testing import com.spotify.scio.coders.Coder +import com.spotify.scio.graph.TestInput import com.spotify.scio.io.ScioIO import com.spotify.scio.values.SCollection import com.spotify.scio.{ScioContext, ScioResult} @@ -46,7 +47,7 @@ final private[scio] case class TestStreamInputSource[T]( ) override def toSCollection(sc: ScioContext): SCollection[T] = - sc.applyTransform(stream) + sc.applyTransform(stream, TestInput("TestStream")) override def toString: String = s"TestStream(${stream.getEvents})" } diff --git a/scio-core/src/main/scala/com/spotify/scio/transforms/syntax/SCollectionSafeSyntax.scala b/scio-core/src/main/scala/com/spotify/scio/transforms/syntax/SCollectionSafeSyntax.scala index 591c6c4bc7..76bb4173b3 100644 --- a/scio-core/src/main/scala/com/spotify/scio/transforms/syntax/SCollectionSafeSyntax.scala +++ b/scio-core/src/main/scala/com/spotify/scio/transforms/syntax/SCollectionSafeSyntax.scala @@ -75,7 +75,7 @@ trait SCollectionSafeSyntax { tuple .get(errorTag) .setCoder(CoderMaterializer.beam(self.context, Coder[(T, Throwable)])) - (self.context.wrap(main), self.context.wrap(errorPipe)) + (self.context.wrap(main, FlatMap()), self.context.wrap(errorPipe)) } } } diff --git a/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala b/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala index a23d1b741f..da22b7fbaa 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala @@ -19,6 +19,7 @@ package com.spotify.scio.values import com.spotify.scio.ScioContext import com.spotify.scio.coders.{BeamCoders, Coder} +import com.spotify.scio.graph.StepInfo import org.apache.beam.sdk.transforms.PTransform import org.apache.beam.sdk.values.{PCollection, POutput} @@ -27,6 +28,8 @@ private[values] trait PCollectionWrapper[T] extends TransformNameable { /** The [[org.apache.beam.sdk.values.PCollection PCollection]] being wrapped internally. */ val internal: PCollection[T] + val step: StepInfo + implicit def coder: Coder[T] = BeamCoders.getCoder(internal) /** diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index 245818eed9..27ceb4bb61 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -27,6 +27,7 @@ import com.spotify.scio.estimators.{ ApproximateUniqueCounter, ApproximateUniqueCounterByError } +import com.spotify.scio.graph.StepInfo import com.spotify.scio.io._ import com.spotify.scio.schemas.{Schema, SchemaMaterializer} import com.spotify.scio.testing.TestDataManager @@ -1763,5 +1764,8 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { io.writeWithContext(this, ()) } -private[scio] class SCollectionImpl[T](val internal: PCollection[T], val context: ScioContext) - extends SCollection[T] {} +private[scio] class SCollectionImpl[T]( + val internal: PCollection[T], + val context: ScioContext, + val step: StepInfo +) extends SCollection[T] {} diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollectionOutput.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionOutput.scala new file mode 100644 index 0000000000..265cb56e1d --- /dev/null +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollectionOutput.scala @@ -0,0 +1,19 @@ +package com.spotify.scio.values; + +import org.apache.beam.sdk.Pipeline +import org.apache.beam.sdk.transforms.PTransform +import org.apache.beam.sdk.values.{PInput, POutput, PValue, TupleTag} + +import java.util; + +case class SCollectionOutput[T](scioCollection: SCollection[T]) extends POutput { + override def getPipeline: Pipeline = scioCollection.internal.getPipeline + + override def expand(): util.Map[TupleTag[_], PValue] = scioCollection.internal.expand() + + override def finishSpecifyingOutput( + transformName: String, + input: PInput, + transform: PTransform[_, _] + ): Unit = scioCollection.internal.finishSpecifyingOutput(transformName, input, transform) +} From 2be611e17f07812b9f572fd89b7a693a0928985d Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Wed, 2 Oct 2024 21:32:47 -0400 Subject: [PATCH 2/2] Redesigned graph model --- .../scala/com/spotify/scio/ScioContext.scala | 46 ++++--- .../spotify/scio/graph/ScioGraphNode.scala | 127 ++++++++++++++++++ .../com/spotify/scio/graph/StepInfo.scala | 39 ------ .../main/scala/com/spotify/scio/io/Tap.scala | 3 +- .../scala/com/spotify/scio/io/TextIO.scala | 7 +- .../scio/testing/TestDataManager.scala | 9 +- .../syntax/SCollectionSafeSyntax.scala | 10 +- .../com/spotify/scio/util/ArtisanJoin.scala | 2 +- .../scio/values/PCollectionWrapper.scala | 4 +- .../com/spotify/scio/values/SCollection.scala | 4 +- .../scio/parquet/avro/ParquetAvroIO.scala | 3 + 11 files changed, 180 insertions(+), 74 deletions(-) create mode 100644 scio-core/src/main/scala/com/spotify/scio/graph/ScioGraphNode.scala delete mode 100644 scio-core/src/main/scala/com/spotify/scio/graph/StepInfo.scala diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index b09d8e0fbb..885158b92c 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -22,7 +22,7 @@ import java.io.File import java.net.URI import java.nio.file.Files import com.spotify.scio.coders.{Coder, CoderMaterializer, KVCoder} -import com.spotify.scio.graph.{CustomInput, Parallelize, StepInfo, TransformStep, UnionAll} +import com.spotify.scio.graph.{NodeIO, NodeType, ScioGraphNode} import com.spotify.scio.io._ import com.spotify.scio.metrics.Metrics import com.spotify.scio.options.ScioOptions @@ -525,7 +525,7 @@ class ScioContext private[scio] ( private var _onClose: Unit => Unit = identity /** Wrap a [[org.apache.beam.sdk.values.PCollection PCollection]]. */ - def wrap[T](p: PCollection[T], step: StepInfo): SCollection[T] = + def wrap[T](p: PCollection[T], step: ScioGraphNode): SCollection[T] = new SCollectionImpl[T](p, this, step) /** Add callbacks calls when the context is closed. */ @@ -689,30 +689,31 @@ class ScioContext private[scio] ( private[scio] def applyTransform[U]( name: Option[String], root: PTransform[_ >: PBegin, PCollection[U]], - step: StepInfo + step: ScioGraphNode ): SCollection[U] = wrap(applyInternal(name, root), step) private[scio] def applyTransform[U]( root: PTransform[_ >: PBegin, PCollection[U]], - step: StepInfo + step: ScioGraphNode ): SCollection[U] = applyTransform(None, root, step) private[scio] def applyTransform[U]( name: String, root: PTransform[_ >: PBegin, PCollection[U]], - step: StepInfo + step: ScioGraphNode ): SCollection[U] = applyTransform(Option(name), root, step) - def transform[U](f: ScioContext => SCollection[U]): SCollection[U] = transform(this.tfName)(f) + def transform[U: ClassTag](f: ScioContext => SCollection[U]): SCollection[U] = + transform(this.tfName)(f) - def transform[U](name: String)(f: ScioContext => SCollection[U]): SCollection[U] = { + def transform[U: ClassTag](name: String)(f: ScioContext => SCollection[U]): SCollection[U] = { val transformed = transform_(name)(sc => SCollectionOutput(f(sc))) wrap( transformed.scioCollection.internal, - TransformStep(name, transformed.scioCollection.step) + ScioGraphNode.node[U](name, NodeType.Transform, List(transformed.scioCollection)) ) } @@ -769,7 +770,7 @@ class ScioContext private[scio] ( if (this.isTest) { TestDataManager.getInput(testId.get)(CustomIO[T](name)).toSCollection(this) } else { - applyTransform(name, transform, CustomInput(name)) + applyTransform(name, transform, ScioGraphNode.read(name, NodeIO.CustomInput)) } } @@ -795,32 +796,31 @@ class ScioContext private[scio] ( /** Create a union of multiple SCollections. Supports empty lists. */ // `T: Coder` context bound is required since `scs` might be empty. - def unionAll[T: Coder](scs: => Iterable[SCollection[T]]): SCollection[T] = { + def unionAll[T: Coder: ClassTag](scs: => Iterable[SCollection[T]]): SCollection[T] = { val tfName = this.tfName // evaluate eagerly to avoid overriding `scs` names scs.toList match { case Nil => empty() case contents => - val sources = contents.map(_.step) wrap( PCollectionList .of(contents.map(_.internal).asJava) .apply(tfName, Flatten.pCollections()), - UnionAll(tfName, sources) + ScioGraphNode.node[T](tfName, NodeType.UnionAll, contents) ) } } /** Form an empty SCollection. */ - def empty[T: Coder](): SCollection[T] = parallelize(Nil) + def empty[T: Coder: ClassTag](): SCollection[T] = parallelize(Nil) /** * Distribute a local Scala `Iterable` to form an SCollection. * @group in_memory */ - def parallelize[T: Coder](elems: Iterable[T]): SCollection[T] = + def parallelize[T: Coder: ClassTag](elems: Iterable[T]): SCollection[T] = requireNotClosed { val coder = CoderMaterializer.beam(this, Coder[T]) - this.applyTransform(Create.of(elems.asJava).withCoder(coder), Parallelize) + this.applyTransform(Create.of(elems.asJava).withCoder(coder), ScioGraphNode.parallelize[T]) } /** @@ -833,7 +833,7 @@ class ScioContext private[scio] ( requireNotClosed { val coder = CoderMaterializer.beam(this, KVCoder(koder, voder)) this - .applyTransform(Create.of(elems.asJava).withCoder(coder), Parallelize) + .applyTransform(Create.of(elems.asJava).withCoder(coder), ScioGraphNode.parallelize[(K, V)]) .map(kv => (kv.getKey, kv.getValue)) } @@ -841,25 +841,31 @@ class ScioContext private[scio] ( * Distribute a local Scala `Iterable` with timestamps to form an SCollection. * @group in_memory */ - def parallelizeTimestamped[T: Coder](elems: Iterable[(T, Instant)]): SCollection[T] = + def parallelizeTimestamped[T: Coder: ClassTag](elems: Iterable[(T, Instant)]): SCollection[T] = requireNotClosed { val coder = CoderMaterializer.beam(this, Coder[T]) val v = elems.map(t => TimestampedValue.of(t._1, t._2)) - this.applyTransform(Create.timestamped(v.asJava).withCoder(coder), Parallelize) + this.applyTransform( + Create.timestamped(v.asJava).withCoder(coder), + ScioGraphNode.parallelize[T] + ) } /** * Distribute a local Scala `Iterable` with timestamps to form an SCollection. * @group in_memory */ - def parallelizeTimestamped[T: Coder]( + def parallelizeTimestamped[T: Coder: ClassTag]( elems: Iterable[T], timestamps: Iterable[Instant] ): SCollection[T] = requireNotClosed { val coder = CoderMaterializer.beam(this, Coder[T]) val v = elems.zip(timestamps).map(t => TimestampedValue.of(t._1, t._2)) - this.applyTransform(Create.timestamped(v.asJava).withCoder(coder), Parallelize) + this.applyTransform( + Create.timestamped(v.asJava).withCoder(coder), + ScioGraphNode.parallelize[T] + ) } // ======================================================================= diff --git a/scio-core/src/main/scala/com/spotify/scio/graph/ScioGraphNode.scala b/scio-core/src/main/scala/com/spotify/scio/graph/ScioGraphNode.scala new file mode 100644 index 0000000000..3153be87ec --- /dev/null +++ b/scio-core/src/main/scala/com/spotify/scio/graph/ScioGraphNode.scala @@ -0,0 +1,127 @@ +package com.spotify.scio.graph + +import com.spotify.scio.values.SCollection + +import scala.reflect.ClassTag + +private[scio] case class ScioGraphNode( + name: String, + `type`: String, + io: Option[String], + dataClass: Class[_], + schemaPath: Option[String], + sources: List[ScioGraphNode], + properties: Map[String, Any] +) + +object NodeType { + val Parallelize: String = "Parallelize" + val Transform: String = "Transform" + val UnionAll: String = "UnionAll" + val FlatMap: String = "FlatMap" +} + +object NodeIO { + val Text: String = "Text" + val CustomInput: String = "CustomInput" +} + +object ScioGraphNode { + def parallelize[T](implicit ct: ClassTag[T]) = + node[T](null, NodeType.Parallelize, List()) + + def node[T]( + name: String, + `type`: String, + sources: List[SCollection[_]], + properties: Map[String, Any] = Map.empty + )(implicit + ct: ClassTag[T] + ): ScioGraphNode = { + ScioGraphNode( + name, + `type`, + None, + ct.runtimeClass, + None, + sources.map(_.step), + properties + ) + } + + def read[T](name: String, io: String, properties: Map[String, Any] = Map.empty)(implicit + ct: ClassTag[T] + ): ScioGraphNode = { + ScioGraphNode( + name, + "read", + Some(io), + ct.runtimeClass, + None, + List(), + properties + ) + } + + def write[T]( + name: String, + io: String, + source: SCollection[_], + properties: Map[String, Any] = Map.empty + )(implicit + ct: ClassTag[T] + ): ScioGraphNode = { + ScioGraphNode( + name, + "write", + Some(io), + ct.runtimeClass, + None, + List(source.step), + properties + ) + } +} + +//private[scio] case class SingleSourceNode( +// name: String, +// source: ScioGraphNode, +// dataClass: Class[_], +// schemaPath: Option[String] = None +//) extends ScioGraphNode { +// override val sources: List[ScioGraphNode] = List(source) +//} +// +//private[scio] case class TransformStep(name: String, sources: List[ScioGraphNode]) +// extends ScioGraphNode +// +//private[scio] object TransformStep { +// def apply(name: String, source: ScioGraphNode): TransformStep = TransformStep(name, List(source)) +//} + +// preserve link to an original PTransform? +//private[scio] case class CustomInput(name: String) extends ScioGraphNode { +// val sources: List[ScioGraphNode] = List.empty +//} + +//private[scio] case class UnionAll(name: String, sources: List[ScioGraphNode]) extends ScioGraphNode + +//private[scio] object Parallelize extends ScioGraphNode { +// override val name: String = "parallelize" +// override val sources: List[ScioGraphNode] = List.empty +//} + +//private[scio] case class ReadTextIO(filePattern: String) extends ScioGraphNode { +// override val name: String = null +// override val sources: List[ScioGraphNode] = List() +//} +// +//private[scio] case class TestInput(kind: String) extends ScioGraphNode { +// override val name: String = kind +// override val sources: List[ScioGraphNode] = List() +//} +// +//private[scio] case class FlatMap(source: ScioGraphNode) extends ScioGraphNode { +// val name: String = null +// override val sources: List[ScioGraphNode] = List(source) +//} diff --git a/scio-core/src/main/scala/com/spotify/scio/graph/StepInfo.scala b/scio-core/src/main/scala/com/spotify/scio/graph/StepInfo.scala deleted file mode 100644 index d637a3c6b4..0000000000 --- a/scio-core/src/main/scala/com/spotify/scio/graph/StepInfo.scala +++ /dev/null @@ -1,39 +0,0 @@ -package com.spotify.scio.graph - -private[scio] trait StepInfo { - val name: String - val sources: List[StepInfo] -} - -private[scio] case class TransformStep(name: String, sources: List[StepInfo]) extends StepInfo - -private[scio] object TransformStep { - def apply(name: String, source: StepInfo): TransformStep = TransformStep(name, List(source)) -} - -// preserve link to an original PTransform? -private[scio] case class CustomInput(name: String) extends StepInfo { - val sources: List[StepInfo] = List.empty -} - -private[scio] case class UnionAll(name: String, sources: List[StepInfo]) extends StepInfo - -private[scio] object Parallelize extends StepInfo { - override val name: String = "parallelize" - override val sources: List[StepInfo] = List.empty -} - -private[scio] case class ReadTextIO(filePattern: String) extends StepInfo { - override val name: String = null - override val sources: List[StepInfo] = List() -} - -private[scio] case class TestInput(kind: String) extends StepInfo { - override val name: String = kind - override val sources: List[StepInfo] = List() -} - -private[scio] case class FlatMap(source: StepInfo) extends StepInfo { - val name: String = null - override val sources: List[StepInfo] = List(source) -} diff --git a/scio-core/src/main/scala/com/spotify/scio/io/Tap.scala b/scio-core/src/main/scala/com/spotify/scio/io/Tap.scala index 2f0c17de8c..b16d802a9a 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/Tap.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/Tap.scala @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.{ByteArrayCoder, Coder => BCoder} import org.apache.beam.sdk.util.CoderUtils import java.io.{EOFException, InputStream} +import scala.reflect.ClassTag /** * Placeholder to an external data set that can either be load into memory as an iterator or opened @@ -88,7 +89,7 @@ final case class TextTap(path: String, params: TextIO.ReadParam) extends Tap[Str sc.read(TextIO(path))(params) } -final private[scio] class InMemoryTap[T: Coder] extends Tap[T] { +final private[scio] class InMemoryTap[T: Coder: ClassTag] extends Tap[T] { private[scio] val id: String = UUID.randomUUID().toString override def value: Iterator[T] = InMemorySink.get(id).iterator override def open(sc: ScioContext): SCollection[T] = diff --git a/scio-core/src/main/scala/com/spotify/scio/io/TextIO.scala b/scio-core/src/main/scala/com/spotify/scio/io/TextIO.scala index 3005e93c2b..0742d08ff6 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/TextIO.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/TextIO.scala @@ -22,7 +22,7 @@ import java.nio.channels.Channels import java.util.Collections import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} -import com.spotify.scio.graph.ReadTextIO +import com.spotify.scio.graph.{NodeIO, ScioGraphNode} import com.spotify.scio.util.ScioUtil import com.spotify.scio.util.FilenamePolicySupplier import com.spotify.scio.values.SCollection @@ -50,9 +50,10 @@ final case class TextIO(path: String) extends ScioIO[String] { .read() .from(filePattern) .withCompression(params.compression) - .withEmptyMatchTreatment(params.emptyMatchTreatment) + .withEmptyMatchTreatment(params.emptyMatchTreatment).dis - sc.applyTransform(t, ReadTextIO(filePattern)) + sc.applyTransform(t, ScioGraphNode.read(null, NodeIO.Text, Map("filePattern" -> filePattern)) + ) .setCoder(coder) } diff --git a/scio-core/src/main/scala/com/spotify/scio/testing/TestDataManager.scala b/scio-core/src/main/scala/com/spotify/scio/testing/TestDataManager.scala index 90e4280f75..94e44b2820 100644 --- a/scio-core/src/main/scala/com/spotify/scio/testing/TestDataManager.scala +++ b/scio-core/src/main/scala/com/spotify/scio/testing/TestDataManager.scala @@ -18,7 +18,7 @@ package com.spotify.scio.testing import com.spotify.scio.coders.Coder -import com.spotify.scio.graph.TestInput +import com.spotify.scio.graph.ScioGraphNode import com.spotify.scio.io.ScioIO import com.spotify.scio.values.SCollection import com.spotify.scio.{ScioContext, ScioResult} @@ -29,6 +29,7 @@ import org.apache.beam.sdk.testing.TestStream import scala.collection.concurrent.TrieMap import scala.collection.mutable.{Set => MSet} import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} /* Inputs are Scala Iterables to be parallelized for TestPipeline, or PTransforms to be applied */ @@ -37,7 +38,7 @@ sealed private[scio] trait JobInputSource[T] { val asIterable: Try[Iterable[T]] } -final private[scio] case class TestStreamInputSource[T]( +final private[scio] case class TestStreamInputSource[T: ClassTag]( stream: TestStream[T] ) extends JobInputSource[T] { override val asIterable: Try[Iterable[T]] = Failure( @@ -47,12 +48,12 @@ final private[scio] case class TestStreamInputSource[T]( ) override def toSCollection(sc: ScioContext): SCollection[T] = - sc.applyTransform(stream, TestInput("TestStream")) + sc.applyTransform(stream, ScioGraphNode.read[T]("TestStream", "TestInput")) override def toString: String = s"TestStream(${stream.getEvents})" } -final private[scio] case class IterableInputSource[T: Coder]( +final private[scio] case class IterableInputSource[T: Coder: ClassTag]( iterable: Iterable[T] ) extends JobInputSource[T] { override val asIterable: Success[Iterable[T]] = Success(iterable) diff --git a/scio-core/src/main/scala/com/spotify/scio/transforms/syntax/SCollectionSafeSyntax.scala b/scio-core/src/main/scala/com/spotify/scio/transforms/syntax/SCollectionSafeSyntax.scala index 76bb4173b3..7e51fee893 100644 --- a/scio-core/src/main/scala/com/spotify/scio/transforms/syntax/SCollectionSafeSyntax.scala +++ b/scio-core/src/main/scala/com/spotify/scio/transforms/syntax/SCollectionSafeSyntax.scala @@ -18,6 +18,7 @@ package com.spotify.scio.transforms.syntax import com.spotify.scio.values.SCollection import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.graph.{NodeType, ScioGraphNode} import com.spotify.scio.util.NamedDoFn import com.twitter.chill.ClosureCleaner import org.apache.beam.sdk.transforms.DoFn.{Element, MultiOutputReceiver, ProcessElement} @@ -25,6 +26,7 @@ import org.apache.beam.sdk.transforms.ParDo import org.apache.beam.sdk.values.{TupleTag, TupleTagList} import scala.collection.compat._ +import scala.reflect.ClassTag trait SCollectionSafeSyntax { @@ -43,7 +45,7 @@ trait SCollectionSafeSyntax { * * @group transform */ - def safeFlatMap[U: Coder]( + def safeFlatMap[U: Coder: ClassTag]( f: T => TraversableOnce[U] ): (SCollection[U], SCollection[(T, Throwable)]) = { val (mainTag, errorTag) = (new TupleTag[U], new TupleTag[(T, Throwable)]) @@ -75,7 +77,11 @@ trait SCollectionSafeSyntax { tuple .get(errorTag) .setCoder(CoderMaterializer.beam(self.context, Coder[(T, Throwable)])) - (self.context.wrap(main, FlatMap()), self.context.wrap(errorPipe)) + ( + self.context.wrap(main, ScioGraphNode.node(null, NodeType.FlatMap, List(self))), + self.context + .wrap(errorPipe, ScioGraphNode.node(null, NodeType.FlatMap, List(self))) + ) } } } diff --git a/scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala b/scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala index dee871c127..101df09cd5 100644 --- a/scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala +++ b/scio-core/src/main/scala/com/spotify/scio/util/ArtisanJoin.scala @@ -75,7 +75,7 @@ private[scio] object ArtisanJoin { type DF = DoFn[KV[KEY, CoGbkResult], (KEY, (A1, B1))] a.context - .wrap(keyed) + .wrap(keyed, ) .withName(name) .applyTransform(ParDo.of(new DF { @ProcessElement diff --git a/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala b/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala index da22b7fbaa..875fed4278 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/PCollectionWrapper.scala @@ -19,7 +19,7 @@ package com.spotify.scio.values import com.spotify.scio.ScioContext import com.spotify.scio.coders.{BeamCoders, Coder} -import com.spotify.scio.graph.StepInfo +import com.spotify.scio.graph.ScioGraphNode import org.apache.beam.sdk.transforms.PTransform import org.apache.beam.sdk.values.{PCollection, POutput} @@ -28,7 +28,7 @@ private[values] trait PCollectionWrapper[T] extends TransformNameable { /** The [[org.apache.beam.sdk.values.PCollection PCollection]] being wrapped internally. */ val internal: PCollection[T] - val step: StepInfo + val step: ScioGraphNode implicit def coder: Coder[T] = BeamCoders.getCoder(internal) diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index 27ceb4bb61..15bc213d90 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -27,7 +27,7 @@ import com.spotify.scio.estimators.{ ApproximateUniqueCounter, ApproximateUniqueCounterByError } -import com.spotify.scio.graph.StepInfo +import com.spotify.scio.graph.ScioGraphNode import com.spotify.scio.io._ import com.spotify.scio.schemas.{Schema, SchemaMaterializer} import com.spotify.scio.testing.TestDataManager @@ -1767,5 +1767,5 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { private[scio] class SCollectionImpl[T]( val internal: PCollection[T], val context: ScioContext, - val step: StepInfo + val step: ScioGraphNode ) extends SCollection[T] {} diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index 2f31724918..9aeab15c8c 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -37,6 +37,7 @@ import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.io.hadoop.SerializableConfiguration import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider +import org.apache.beam.sdk.transforms.display.DisplayData import org.apache.beam.sdk.values.TypeDescriptor import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job @@ -50,6 +51,7 @@ import org.apache.parquet.filter2.predicate.FilterPredicate import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.parquet.hadoop.metadata.CompressionCodecName +import scala.jdk.CollectionConverters.SeqHasAsJava import scala.reflect.{classTag, ClassTag} final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[T] { @@ -106,6 +108,7 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[ job.getConfiguration, compression ) + val transform = WriteFiles.to(sink).withNumShards(numShards) if (!isWindowed) transform else transform.withWindowedWrites() }