Skip to content

Commit

Permalink
beamschematype -> rowtype
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen committed Aug 30, 2024
1 parent 84cfc84 commit fd3896e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ import scala.collection.concurrent
import scala.jdk.CollectionConverters.*

// https://beam.apache.org/documentation/programming-guide/#schema-definition
sealed trait BeamSchemaType[T] extends Converter[T, Row, Row] {
sealed trait RowType[T] extends Converter[T, Row, Row] {
def schema: Schema
def apply(r: Row): T = from(r)
def apply(t: T): Row = to(t)
}

object BeamSchemaType {
implicit def apply[T: BeamSchemaField]: BeamSchemaType[T] =
BeamSchemaType[T](CaseMapper.identity)
object RowType {
implicit def apply[T: BeamSchemaField]: RowType[T] =
RowType[T](CaseMapper.identity)

def apply[T](cm: CaseMapper)(implicit f: BeamSchemaField[T]): BeamSchemaType[T] = {
def apply[T](cm: CaseMapper)(implicit f: BeamSchemaField[T]): RowType[T] = {
f match {
case r: BeamSchemaField.Record[_] =>
val mappedSchema = r.schema(cm) // fail fast on bad annotations
new BeamSchemaType[T] {
new RowType[T] {
private val caseMapper: CaseMapper = cm
override lazy val schema: Schema = mappedSchema

Expand All @@ -55,7 +55,7 @@ object BeamSchemaType {
}
case _ =>
throw new IllegalArgumentException(
s"BeamSchemaType can only be created from Record. Got $f"
s"RowType can only be created from Record. Got $f"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ import java.util.UUID
import scala.reflect.ClassTag
import scala.jdk.CollectionConverters.*

class BeamSchemaTypeSuite extends MagnolifySuite {
class RowTypeSuite extends MagnolifySuite {
private def test[T: Arbitrary: ClassTag](implicit
bst: BeamSchemaType[T],
bst: RowType[T],
eq: Eq[T]
): Unit = testNamed[T](className[T])

private def testNamed[T: Arbitrary](name: String)(implicit
bst: BeamSchemaType[T],
bst: RowType[T],
eq: Eq[T]
): Unit = {
// Ensure serializable even after evaluation of `schema`
Expand Down Expand Up @@ -117,8 +117,8 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
}

{
implicit val bst: BeamSchemaType[LowerCamel] =
BeamSchemaType[LowerCamel](CaseMapper(_.toUpperCase))
implicit val bst: RowType[LowerCamel] =
RowType[LowerCamel](CaseMapper(_.toUpperCase))
test[LowerCamel]

{
Expand All @@ -135,10 +135,10 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
{
// value classes should act only as fields
intercept[IllegalArgumentException] {
BeamSchemaType[ValueClass]
RowType[ValueClass]
}

implicit val bst: BeamSchemaType[HasValueClass] = BeamSchemaType[HasValueClass]
implicit val bst: RowType[HasValueClass] = RowType[HasValueClass]
test[HasValueClass]

assert(bst.schema.getField("vc").getType == Schema.FieldType.STRING)
Expand Down
16 changes: 8 additions & 8 deletions docs/beam.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Beam

`BeamSchemaType[T]` provides conversion between Scala type `T` and a [Beam Schema](https://beam.apache.org/documentation/programming-guide/#schema-definition). Custom support for type `T` can be added with an implicit intsance of `BeamSchemaField[T]`.
`RowType[T]` provides conversion between Scala type `T` and a [Beam Schema](https://beam.apache.org/documentation/programming-guide/#schema-definition). Custom support for type `T` can be added with an implicit intsance of `BeamSchemaField[T]`.

```scala mdoc:compile-only
import java.net.URI
Expand All @@ -13,12 +13,12 @@ import magnolify.beam.*
// Encode custom type URI as String
implicit val uriField: BeamSchemaField[URI] = BeamSchemaField.from[String](URI.create)(_.toString)

val beamSchemaType = BeamSchemaType[Outer]
val row = beamSchemaType.to(record)
val copy: Outer = beamSchemaType.from(row)
val rowType = RowType[Outer]
val row = rowType.to(record)
val copy: Outer = rowType.from(row)

// Beam Schema
val schema = beamSchemaType.schema
val schema = rowType.schema
```

## Enums
Expand Down Expand Up @@ -47,7 +47,7 @@ SQL-compatible logical types are supported via `import magnolify.beam.logical.sq

## Case mapping

To use a different field case format in target records, add an optional `CaseMapper` argument to `BeamSchemaType`:
To use a different field case format in target records, add an optional `CaseMapper` argument to `RowType`:

```scala mdoc:compile-only
import magnolify.beam.*
Expand All @@ -57,6 +57,6 @@ import com.google.common.base.CaseFormat
case class LowerCamel(firstName: String, lastName: String)

val toSnakeCase = CaseFormat.LOWER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE).convert _
val beamSchemaType = BeamSchemaType[LowerCamel](CaseMapper(toSnakeCase))
beamSchemaType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe)
val rowType = RowType[LowerCamel](CaseMapper(toSnakeCase))
rowType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe)
```

0 comments on commit fd3896e

Please sign in to comment.