Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-1698

7500+ reducers/partitions causes job to hang

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 0.13.1
    • 0.14.0
    • None
    • None
    • Standard hadoop installation, any number of nodes > 10

    Description

      Steps to Reproduce:
      On the above cluster run any job with #partitions/reducers = 8000+
      Observe CPU utilization on any mapper.

      Observations:
      The output.collect(Key, Value) call takes a huge amount of CPU, causing the job to hang.

      This is a result of two issues:
      1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk() on each call to output.collect
      2) Call to sortAndSpillToDisk causes creation of a writer object, eventually calling:
      MessageDigest digester = MessageDigest.getInstance("MD5");
      digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
      sync = digester.digest();
      A code-block in SequenceFile.java(652)

      Issue #1:
      Further investigation reveals the following stack trace whenever the task is suspended.
      [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
      [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
      [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
      [4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312)
      [5] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:653)
      [6] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:622)
      [7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386)
      [8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412)
      [9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition (MapTask.java:307)
      [10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk (MapTask.java:387)
      [11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
      /My code/
      [12] mypackage.MyClass$Map.map (MyClass.java:283)
      --------------
      [13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46)
      [14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189)
      [15] org.apache.hadoop.mapred.TaskTracker$Child.main (TaskTracker.java:1,771)

      The piece of code causing the problem is (MapTask.java:355)
      ----------------------------------------------------------
      long totalMem = 0;
      for (int i = 0; i < partitions; i++)
      totalMem += sortImpl[i].getMemoryUtilized(); <---- == 16K (BasicTypeSorterBase.java(88) (startOffsets.length (below)) * BUFFERED_KEY_VAL_OVERHEAD;

      if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { <----------------condition is always true if partitions > 7500
      sortAndSpillToDisk();
      keyValBuffer.reset();
      for (int i = 0; i < partitions; i++)

      { sortImpl[i].close(); }

      }
      ----------------------------------------------------------

      Looking at the variable values in org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
      sortImpl[0] = {
      org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of org.apache.hadoop.io.DataOutputBuffer(id=1159)
      org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of int[1024] (id=1160) <--1K * 16 (previously explained) == 16K
      org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of int[1024] (id=1161)
      org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of int[1024] (id=1162)
      org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of int[1024] (id=1163)
      org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of org.apache.hadoop.io.MD5Hash$Comparator(id=1164)
      org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0
      org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16
      org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of org.apache.hadoop.mapred.Task$2(id=1165)
      }
      Computation
      maxBufferSize == 120M
      therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K = 7500 partitions

      Issue #2:
      digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
      [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
      [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
      [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
      InetAddress.getLocalHost() call does not cache results, this results in a look up to the host file and DNS(???) bumping up the CPU usage even higher (Observed).

      This is a BLOCKER issue and needs immediate attention.

      Notes:
      1) Output.collect should not take hit from framework, separate thread to handle spill buffer?
      2) InetAddress.getLocalHost result should be cached in a static variable?
      3) Spilling logic should not involve #of partitions, needs redesign?

      Attachments

        1. 1698.1.patch
          6 kB
          Devaraj Das
        2. 1698.patch
          6 kB
          Devaraj Das
        3. 1698.patch
          6 kB
          Devaraj Das
        4. 1698.patch
          6 kB
          Devaraj Das

        Activity

          People

            ddas Devaraj Das
            srikantk Srikanth Kakani
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: