Details
-
Bug
-
Status: Patch Available
-
Major
-
Resolution: Unresolved
-
1.2.1
-
None
-
None
Description
I have come across an issue with CombineFileInputFormat. Actually I ran a hive query on approx 1.2 GB data with CombineHiveInputFormat which internally uses CombineFileInputFormat. My cluster size is 9 datanodes and max.split.size is 256 MB
When I ran this query with replication factor 9, hive consistently creates all 6 rack-local tasks and with replication factor 3 it creates 5 rack-local and 1 data local tasks.
When replication factor is 9 (equal to cluster size), all the tasks should be data-local as each datanode contains all the replicas of the input data, but that is not happening i.e all the tasks are rack-local.
When I dug into CombineFileInputFormat.java code in getMoreSplits method, I found the issue with the following snippet (specially in case of higher replication factor)
for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) { Map.Entry<String, List<OneBlockInfo>> one = iter.next(); nodes.add(one.getKey()); List<OneBlockInfo> blocksInNode = one.getValue(); // for each block, copy it into validBlocks. Delete it from // blockToNodes so that the same block does not appear in // two different splits. for (OneBlockInfo oneblock : blocksInNode) { if (blockToNodes.containsKey(oneblock)) { validBlocks.add(oneblock); blockToNodes.remove(oneblock); curSplitSize += oneblock.length; // if the accumulated split size exceeds the maximum, then // create this split. if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array addCreatedSplit(splits, nodes, validBlocks); curSplitSize = 0; validBlocks.clear(); } } }
First node in the map nodeToBlocks has all the replicas of input file, so the above code creates 6 splits all with only one location. Now if JT doesn't schedule these tasks on that node, all the tasks will be rack-local, even though all the other datanodes have all the other replicas.