Description
We were running into issues with very large jobs where files distributed via the Crunch DistCache would overload all DataNodes serving the files. The serving DataNodes will run out of Xceiver threads causing BlockMissingExceptions and the job will fail after some HDFS retries. This can be fixed by increasing the replication factor for files distributed via DistCache hence spreading the load across more DataNodes.
I suggest adding a config option for setting a different replication factor but defaulting to the current behavior of using the default replication factor.
2016-01-19 18:24:45,269 WARN [main] org.apache.hadoop.hdfs.DFSClient: DFS Read org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-133877431-10.255.1.10-1340216259506:blk_5327751941_1104340730962 file=/tmp/crunch-1412104163/p17/COMBINE at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:889) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:568) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848) at java.io.DataInputStream.read(DataInputStream.java:149) at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323) at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) at org.apache.crunch.util.DistCache.read(DistCache.java:72) at org.apache.crunch.impl.mr.run.CrunchTaskContext.<init>(CrunchTaskContext.java:46) at org.apache.crunch.impl.mr.run.CrunchReducer.setup(CrunchReducer.java:40) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:168) at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1651) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1630) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1482) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:720) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:790) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)