Details

    • Type: Sub-task Sub-task
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.14.0
    • Component/s: tez
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      This is somewhat expected since we copy lots of object in POShuffleLoadTez for accumulator UDF. With large data, it consistently fails with OOM. We need to re-implement it.

      Here is an example stack trace-

      2014-08-02 02:59:15,801 ERROR [TezChild] org.apache.tez.runtime.task.TezTaskRunner: Exception of type Error. Exiting now
      java.lang.OutOfMemoryError: GC overhead limit exceeded
          at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
          at java.lang.StringCoding.decode(StringCoding.java:193)
          at java.lang.String.<init>(String.java:416)
          at org.apache.pig.data.BinInterSedes$BinInterSedesTupleRawComparator.compareBinInterSedesDatum(BinInterSedes.java:964)
          at org.apache.pig.data.BinInterSedes$BinInterSedesTupleRawComparator.compareBinSedesTuple(BinInterSedes.java:770)
          at org.apache.pig.data.BinInterSedes$BinInterSedesTupleRawComparator.compare(BinInterSedes.java:728)
          at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator.compare(PigTupleSortComparator.java:100)
          at org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.lessThan(TezMerger.java:539)
          at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:144)
          at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:108)
          at org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.adjustPriorityQueue(TezMerger.java:486)
          at org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.next(TezMerger.java:503)
          at org.apache.tez.runtime.library.common.ValuesIterator.readNextKey(ValuesIterator.java:179)
          at org.apache.tez.runtime.library.common.ValuesIterator.access$300(ValuesIterator.java:45)
          at org.apache.tez.runtime.library.common.ValuesIterator$1$1.next(ValuesIterator.java:138)
          at org.apache.pig.backend.hadoop.executionengine.tez.POShuffleTezLoad.getNextTuple(POShuffleTezLoad.java:176)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:301)
          at org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez.getNextTuple(POStoreTez.java:113)
          at org.apache.pig.backend.hadoop.executionengine.tez.PigProcessor.runPipeline(PigProcessor.java:313)
          at org.apache.pig.backend.hadoop.executionengine.tez.PigProcessor.run(PigProcessor.java:196)
          at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
          at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:180)
          at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
          at java.security.AccessController.doPrivileged(Native Method) 
          at javax.security.auth.Subject.doAs(Subject.java:415)
          at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
      
      1. PIG-4104-2.patch
        19 kB
        Rohini Palaniswamy

        Issue Links

          Activity

          Hide
          Rohini Palaniswamy added a comment -

          Committed to trunk. Thanks for reviewing and trying it out Cheolsoo.

          Show
          Rohini Palaniswamy added a comment - Committed to trunk. Thanks for reviewing and trying it out Cheolsoo.
          Hide
          Cheolsoo Park added a comment -

          +1. My test passes. Thank you Rohini!

          Show
          Cheolsoo Park added a comment - +1. My test passes. Thank you Rohini!
          Hide
          Cheolsoo Park added a comment -

          Rohini Palaniswamy, yes sure. I am running my job now.

          Your patch is smart. Moving the iteration from POShuffleLoadTez to AccumulativeTupleBuffer should work. I don't know why I couldn't think of this!

          Show
          Cheolsoo Park added a comment - Rohini Palaniswamy , yes sure. I am running my job now. Your patch is smart. Moving the iteration from POShuffleLoadTez to AccumulativeTupleBuffer should work. I don't know why I couldn't think of this!
          Hide
          Rohini Palaniswamy added a comment -

          Cheolsoo Park,
          Could you try out the patch if possible?

          Show
          Rohini Palaniswamy added a comment - Cheolsoo Park , Could you try out the patch if possible?
          Hide
          Rohini Palaniswamy added a comment -

          Can't we call the accumulative function on the current key and set of values instead of adding to the buffer and iterating till all keys are done and calling pkgr.getNext(); in the end? The he definition of Accumulator UDF is that the UDF will be called with a set of values for a key which is not supposed to be the full list of the values. What I am thinking of is something like having AccumulatorEvalFunc.accumulate() from POUserFunc invoked for every iteration of while loop in POShuffleTezLoad with that key and value iterator. But have not looked at the accumulator implementation till now. So not sure if this is possible.

          Show
          Rohini Palaniswamy added a comment - Can't we call the accumulative function on the current key and set of values instead of adding to the buffer and iterating till all keys are done and calling pkgr.getNext(); in the end? The he definition of Accumulator UDF is that the UDF will be called with a set of values for a key which is not supposed to be the full list of the values. What I am thinking of is something like having AccumulatorEvalFunc.accumulate() from POUserFunc invoked for every iteration of while loop in POShuffleTezLoad with that key and value iterator. But have not looked at the accumulator implementation till now. So not sure if this is possible.

            People

            • Assignee:
              Rohini Palaniswamy
              Reporter:
              Cheolsoo Park
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development