Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
0.13.1
-
None
-
None
-
Standard hadoop installation, any number of nodes > 10
Description
Steps to Reproduce:
On the above cluster run any job with #partitions/reducers = 8000+
Observe CPU utilization on any mapper.
Observations:
The output.collect(Key, Value) call takes a huge amount of CPU, causing the job to hang.
This is a result of two issues:
1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk() on each call to output.collect
2) Call to sortAndSpillToDisk causes creation of a writer object, eventually calling:
MessageDigest digester = MessageDigest.getInstance("MD5");
digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
sync = digester.digest();
A code-block in SequenceFile.java(652)
Issue #1:
Further investigation reveals the following stack trace whenever the task is suspended.
[1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
[2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
[3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
[4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312)
[5] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:653)
[6] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:622)
[7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386)
[8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412)
[9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition (MapTask.java:307)
[10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk (MapTask.java:387)
[11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
/My code/
[12] mypackage.MyClass$Map.map (MyClass.java:283)
--------------
[13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46)
[14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189)
[15] org.apache.hadoop.mapred.TaskTracker$Child.main (TaskTracker.java:1,771)
The piece of code causing the problem is (MapTask.java:355)
----------------------------------------------------------
long totalMem = 0;
for (int i = 0; i < partitions; i++)
totalMem += sortImpl[i].getMemoryUtilized(); <---- == 16K (BasicTypeSorterBase.java(88) (startOffsets.length (below)) * BUFFERED_KEY_VAL_OVERHEAD;
if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { <----------------condition is always true if partitions > 7500
sortAndSpillToDisk();
keyValBuffer.reset();
for (int i = 0; i < partitions; i++)
}
----------------------------------------------------------
Looking at the variable values in org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
sortImpl[0] = {
org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of org.apache.hadoop.io.DataOutputBuffer(id=1159)
org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of int[1024] (id=1160) <--1K * 16 (previously explained) == 16K
org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of int[1024] (id=1161)
org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of int[1024] (id=1162)
org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of int[1024] (id=1163)
org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of org.apache.hadoop.io.MD5Hash$Comparator(id=1164)
org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0
org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16
org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of org.apache.hadoop.mapred.Task$2(id=1165)
}
Computation
maxBufferSize == 120M
therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K = 7500 partitions
Issue #2:
digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
[1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
[2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
[3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
InetAddress.getLocalHost() call does not cache results, this results in a look up to the host file and DNS(???) bumping up the CPU usage even higher (Observed).
This is a BLOCKER issue and needs immediate attention.
Notes:
1) Output.collect should not take hit from framework, separate thread to handle spill buffer?
2) InetAddress.getLocalHost result should be cached in a static variable?
3) Spilling logic should not involve #of partitions, needs redesign?