From b68478788a763e6abb0b6e5e6e4961a9249e9e99 Mon Sep 17 00:00:00 2001 From: Alexandre Mazari Date: Fri, 14 Aug 2015 17:35:31 +0200 Subject: [PATCH 1/3] Add a PailSource ctor taking a PailSpec arg Exposing a PailSpec-taking ctor give more flexibility in PailSource construction. For example, compression and/or sizing policy can be set. --- .../scalding/commons/source/PailSource.scala | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala index 7b60eedc77..7694552f5f 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala @@ -19,7 +19,7 @@ package com.twitter.scalding.commons.source import scala.reflect.ClassTag import com.backtype.cascading.tap.PailTap -import com.backtype.hadoop.pail.PailStructure +import com.backtype.hadoop.pail.{ PailStructure, PailSpec } import cascading.tap.Tap import com.twitter.bijection.Injection import com.twitter.scalding._ @@ -79,9 +79,16 @@ object PailSource { /** * Generic version of Pail sink accepts a PailStructure. + * Prefer the override taking a PailSpec as it gives you finer control over pail configuration (compression for ex) */ def sink[T](rootPath: String, structure: PailStructure[T]): PailSource[T] = - new PailSource(rootPath, structure) + sink(rootPath, PailTap.makeSpec(null, structure)) + + /** + * Generic version of Pail sink accepts a PailSpec. + */ + def sink[T](rootPath: String, spec: PailSpec): PailSource[T] = + new PailSource[T](rootPath, spec) /** * A Pail sink can also build its structure on the fly from a @@ -113,10 +120,19 @@ object PailSource { /** * Generic version of Pail source accepts a PailStructure. + * Prefer the override taking a PailSpec as it gives you finer control over pail configuration (compression for ex) */ def source[T](rootPath: String, structure: PailStructure[T], subPaths: Array[List[String]]): PailSource[T] = { assert(subPaths != null && subPaths.size > 0) - new PailSource(rootPath, structure, subPaths) + new PailSource[T](rootPath, PailTap.makeSpec(null, structure), subPaths) + } + + /** + * Generic version of Pail source accepts a PailSpec. + */ + def source[T](rootPath: String, spec: PailSpec, subPaths: Array[List[String]]): PailSource[T] = { + assert(subPaths != null && subPaths.size > 0) + new PailSource[T](rootPath, spec, subPaths) } /** @@ -145,7 +161,7 @@ object PailSource { } } -class PailSource[T] private (rootPath: String, structure: PailStructure[T], subPaths: Array[List[String]] = null)(implicit conv: TupleConverter[T]) +class PailSource[T] private (rootPath: String, spec: PailSpec, subPaths: Array[List[String]] = null)(implicit conv: TupleConverter[T]) extends Source with Mappable[T] { import Dsl._ @@ -153,7 +169,6 @@ class PailSource[T] private (rootPath: String, structure: PailStructure[T], subP val fieldName = "pailItem" lazy val getTap = { - val spec = PailTap.makeSpec(null, structure) val javaSubPath = if ((subPaths == null) || (subPaths.size == 0)) null else subPaths map { _.asJava } val opts = new PailTap.PailTapOptions(spec, fieldName, javaSubPath, null) new PailTap(rootPath, opts) From 66f8f00f79d297b1f7c6f247e9a1802cb5c3490d Mon Sep 17 00:00:00 2001 From: Alexandre Mazari Date: Tue, 18 Aug 2015 18:16:48 +0200 Subject: [PATCH 2/3] Use an empty array as default value for subpath Assertions in factory functions and sane default value for the subPath field of PailSource remove the need for null-checking the val when creating a Tap. --- .../com/twitter/scalding/commons/source/PailSource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala index 7694552f5f..8251173830 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala @@ -161,7 +161,7 @@ object PailSource { } } -class PailSource[T] private (rootPath: String, spec: PailSpec, subPaths: Array[List[String]] = null)(implicit conv: TupleConverter[T]) +class PailSource[T] private (rootPath: String, spec: PailSpec, subPaths: Array[List[String]] = Array.empty)(implicit conv: TupleConverter[T]) extends Source with Mappable[T] { import Dsl._ @@ -169,7 +169,7 @@ class PailSource[T] private (rootPath: String, spec: PailSpec, subPaths: Array[L val fieldName = "pailItem" lazy val getTap = { - val javaSubPath = if ((subPaths == null) || (subPaths.size == 0)) null else subPaths map { _.asJava } + val javaSubPath = subPaths map { _.asJava } val opts = new PailTap.PailTapOptions(spec, fieldName, javaSubPath, null) new PailTap(rootPath, opts) } From 2144b29e3b44e9c1f8bbf31a71fc6d8dd08897f6 Mon Sep 17 00:00:00 2001 From: Alexandre Mazari Date: Mon, 24 Aug 2015 13:35:29 +0200 Subject: [PATCH 3/3] Make PailSource a TypeSink --- .../com/twitter/scalding/commons/source/PailSource.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala index 8251173830..dcb0e2e49e 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala @@ -161,11 +161,12 @@ object PailSource { } } -class PailSource[T] private (rootPath: String, spec: PailSpec, subPaths: Array[List[String]] = Array.empty)(implicit conv: TupleConverter[T]) - extends Source with Mappable[T] { +class PailSource[T] private (rootPath: String, spec: PailSpec, subPaths: Array[List[String]] = Array.empty)(implicit conv: TupleConverter[T], tset: TupleSetter[T]) + extends Source with Mappable[T] with TypedSink[T] { import Dsl._ override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](conv) + override def setter[U <: T]: TupleSetter[U] = TupleSetter.asSubSetter(tset) val fieldName = "pailItem" lazy val getTap = { @@ -187,7 +188,6 @@ class PailSource[T] private (rootPath: String, spec: PailSpec, subPaths: Array[L TestTapFactory(this, tap.getScheme).createTap(readOrWrite)(mode) } } - } /**