Uploaded image for project: 'Apache Tez'
  1. Apache Tez
  2. TEZ-2505

PipelinedSorter uses Comparator objects concurrently from multiple threads

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.7.0
    • 0.7.1
    • None
    • None
    • Scalding 0.13.1+PR1220 ; Cascading-3.0.0-wip-118 ; scala 2.11.6 ; java openjdk 1.8.0_45-internal ; Debian linux 8 (stable); Intel(R) Core(TM) i7-3770 (amd64)

    Description

      When attempting to run the same multi-DAG application (that worked fine under the same environment except Cascading-3.0.0-wip-115 and tez 0.6.1), one of the early, and simplest DAG crashes on the PipelinedSorter.

      The stack at the crash site looks like:

      2015-05-28 11:52:47,120 ERROR [TezChild] element.TrapHandler: caught Throwable, no trap available, rethrowing
      cascading.CascadingException: unable to compare stream elements in position: 0
              at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:164)
              at cascading.tuple.hadoop.util.TupleComparator.compare(TupleComparator.java:38)
              at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.compareKeys(PipelinedSorter.java:669)
              at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.compare(PipelinedSorter.java:684)
              at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:99)
              at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63)
              at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.sort(PipelinedSorter.java:631)
              at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter.sort(PipelinedSorter.java:230)
              at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter.collect(PipelinedSorter.java:311)
              at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter.write(PipelinedSorter.java:272)
              at org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput$1.write(OrderedPartitionedKVOutput.java:164)
              at cascading.flow.tez.stream.element.OldOutputCollector.collect(OldOutputCollector.java:51)
              at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
              at cascading.flow.tez.stream.element.TezCoGroupGate.wrapGroupingAndCollect(TezCoGroupGate.java:193)
              at cascading.flow.hadoop.stream.HadoopGroupGate.receive(HadoopGroupGate.java:103)
              at cascading.flow.hadoop.stream.HadoopGroupGate.receive(HadoopGroupGate.java:45)
              at cascading.flow.stream.element.FunctionEachStage$1.collect(FunctionEachStage.java:81)
              at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
              at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
              at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:49)
              at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47)
              at scala.collection.Iterator$class.foreach(Iterator.scala:750)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
              at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:47)
              at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:100)
              at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:40)
              at cascading.flow.stream.element.FunctionEachStage$1.collect(FunctionEachStage.java:81)
              at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
              at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
              at com.twitter.scalding.MapFunction.operate(Operations.scala:60)
              at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:100)
              at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:40)
              at cascading.flow.stream.element.FunctionEachStage$1.collect(FunctionEachStage.java:81)
              at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
              at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
              at cascading.operation.Identity$1.operate(Identity.java:124)
              at cascading.operation.Identity.operate(Identity.java:150)
              at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:100)
              at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:40)
              at cascading.flow.stream.element.SourceStage.map(SourceStage.java:110)
              at cascading.flow.stream.element.SourceStage.run(SourceStage.java:66)
              at cascading.flow.tez.stream.element.TezSourceStage.run(TezSourceStage.java:95)
              at cascading.flow.tez.FlowProcessor.run(FlowProcessor.java:165)
              at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:337)
              at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:179)
              at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:171)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:422)
              at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
              at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:171)
              at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:167)
              at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: cascading.CascadingException: unable to read element from underlying stream
              at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:82)
              at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:33)
              at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:160)
              ... 55 more
      Caused by: java.io.EOFException
              at java.io.DataInputStream.readFully(DataInputStream.java:197)
              at java.io.DataInputStream.readFully(DataInputStream.java:169)
              at org.apache.hadoop.io.WritableUtils.readString(WritableUtils.java:125)
              at cascading.tuple.hadoop.io.HadoopTupleInputStream.readString(HadoopTupleInputStream.java:75)
              at cascading.tuple.hadoop.io.HadoopTupleInputStream.readType(HadoopTupleInputStream.java:85)
              at cascading.tuple.hadoop.io.HadoopTupleInputStream.getNextElement(HadoopTupleInputStream.java:52)
              at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:77)
              ... 57 more
      

      with an apparently random variation at the top of stack which is

      2015-05-28 13:10:13,459 ERROR [TezChild] element.TrapHandler: caught Throwable, no trap available, rethrowing
      cascading.CascadingException: java.io.EOFException
              at cascading.tuple.hadoop.util.TupleComparator.compare(TupleComparator.java:42)
              at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.compareKeys(PipelinedSorter.java:669)
              at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.compare(PipelinedSorter.java:684)
              at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:99)
              at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63)
              at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.sort(PipelinedSorter.java:631)
      

      all running TezChildren fail with the same stacks (either variant) at the same time on the same node, which is straight off a HDFS-backed CSV file.

      The cascading.tuple.hadoop.util.TupleComparator#compare code at the top of stack has been in use in a MAPREDUCE context for over 2.5 years; first analysis with cwensel (who successfully reproduced the issue without scalding) points towards an issue on tez side.

      as a workaround, it is possible to run with

      "tez.runtime.sorter.class" -> "LEGACY"

      , but this is impractical in the long run.

      Attachments

        1. log-7142-x.log
          77 kB
          Cyrille Chépélov
        2. TEZ-2505.1.patch
          3 kB
          Rajesh Balamohan
        3. TEZ-2505.2.patch
          2 kB
          Rajesh Balamohan
        4. TEZ-2505.branch-0.7.0.patch
          3 kB
          Rajesh Balamohan
        5. TEZ-2506.1.patch
          2 kB
          Rajesh Balamohan
        6. verbose-MBTSE.patch
          2 kB
          Cyrille Chépélov

        Activity

          People

            rajesh.balamohan Rajesh Balamohan
            cchepelov Cyrille Chépélov
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: