diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 7e5a1a9507..0e9c8cdd9f 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -476,7 +476,21 @@ package com.twitter.scalding { } } - /** In the typed API every reduce operation is handled by this Buffer */ + private[scalding] object SkewMonitorCounters { + // Strangely, if group name and key name are different, then the counter would be zero + val KeyCount = "scalding.debug.reduce.key.count" + val ValuesCountSum = "scalding.debug.reduce.value.count.sum" + val ValuesCountSquareSum = "scalding.debug.reduce.value.count.sum.square" + } + + class CountingIterator[T](wraps: Iterator[T]) extends Iterator[T] { + // This Wrapper lets us know how many values have been iterated in an Iterator + private[this] var nextCalls = 0L + def hasNext = wraps.hasNext + def next = { nextCalls += 1; wraps.next } + def seen: Long = nextCalls + } + class TypedBufferOp[K, V, U]( conv: TupleConverter[K], @transient reduceFn: (K, Iterator[V]) => Iterator[U], @@ -487,9 +501,9 @@ package com.twitter.scalding { def operate(flowProcess: FlowProcess[_], call: BufferCall[Any]) { val oc = call.getOutputCollector val key = conv(call.getGroup) - val values = call.getArgumentsIterator + val values = new CountingIterator(call.getArgumentsIterator .asScala - .map(_.getObject(0).asInstanceOf[V]) + .map(_.getObject(0).asInstanceOf[V])) // Avoiding a lambda here val resIter = reduceFnSer.get(key, values) @@ -498,6 +512,17 @@ package com.twitter.scalding { tup.set(0, resIter.next) oc.add(tup) } + + val numValuesPerKey = values.seen + + flowProcess.increment(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount, 1L) + flowProcess.increment(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum, numValuesPerKey) + flowProcess.increment(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum, numValuesPerKey * numValuesPerKey) + + // Uncomment the following to trigger the bug + //flowProcess.increment("TestGroup", "TestKey", 1) + } + } } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala index 666128a04e..f78868f693 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala @@ -1817,6 +1817,50 @@ class CounterJobTest extends WordSpec with Matchers { } } +class ReduceValueCountJob(args: Args) extends Job(args) { + TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3)))) + .group + .forceToReducers + .sum + .toTypedPipe + .map{ + case (a: Int, (b: Int, c: Int)) => + (a, b, c) + } + .write(TypedTsv[(Int, Int, Int)](args("output"))) +} + +class ReduceValueCounterTest extends WordSpec with Matchers { + "Reduce Values Count" should { + JobTest(new com.twitter.scalding.ReduceValueCountJob(_)) + .arg("output", "output0") + .counter("TestGroup", "TestKey"){ + x => println("PRINTING KEY AND GROUP! " + x) + } + .counter(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount){ + x => + x should be(3) + } + .counter(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum) { + x => + // key 1 has two values, thus 2^2 = 4. key 2 and 3 has only one respectively + x should be(4 + 1 + 1) + } + .counter(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum) { + x => + // sum of keys = 2 (for key 1) + 1 (for key 2) + 1 (for key 3) + x should be (2 + 1 + 1) + } + .sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("output0")){ + tuples => + + } + .runHadoop + .finish + } + +} + object DailySuffixTsvJob { val strd1 = "2014-05-01" val strd2 = "2014-05-02"