Uploaded image for project: 'Hadoop Map/Reduce'
  1. Hadoop Map/Reduce
  2. MAPREDUCE-5611

CombineFileInputFormat only requests a single location per split when more could be optimal

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Patch Available
    • Major
    • Resolution: Unresolved
    • 1.2.1
    • None
    • None

    Description

      I have come across an issue with CombineFileInputFormat. Actually I ran a hive query on approx 1.2 GB data with CombineHiveInputFormat which internally uses CombineFileInputFormat. My cluster size is 9 datanodes and max.split.size is 256 MB
      When I ran this query with replication factor 9, hive consistently creates all 6 rack-local tasks and with replication factor 3 it creates 5 rack-local and 1 data local tasks.

      When replication factor is 9 (equal to cluster size), all the tasks should be data-local as each datanode contains all the replicas of the input data, but that is not happening i.e all the tasks are rack-local.

      When I dug into CombineFileInputFormat.java code in getMoreSplits method, I found the issue with the following snippet (specially in case of higher replication factor)

      CombineFileInputFormat.java
      for (Iterator<Map.Entry<String,
               List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
               iter.hasNext();) {
             Map.Entry<String, List<OneBlockInfo>> one = iter.next();
            nodes.add(one.getKey());
            List<OneBlockInfo> blocksInNode = one.getValue();
      
            // for each block, copy it into validBlocks. Delete it from
            // blockToNodes so that the same block does not appear in
            // two different splits.
            for (OneBlockInfo oneblock : blocksInNode) {
              if (blockToNodes.containsKey(oneblock)) {
                validBlocks.add(oneblock);
                blockToNodes.remove(oneblock);
                curSplitSize += oneblock.length;
      
                // if the accumulated split size exceeds the maximum, then
                // create this split.
                if (maxSize != 0 && curSplitSize >= maxSize) {
                  // create an input split and add it to the splits array
                  addCreatedSplit(splits, nodes, validBlocks);
                  curSplitSize = 0;
                  validBlocks.clear();
                }
              }
            }
      

      First node in the map nodeToBlocks has all the replicas of input file, so the above code creates 6 splits all with only one location. Now if JT doesn't schedule these tasks on that node, all the tasks will be rack-local, even though all the other datanodes have all the other replicas.

      Attachments

        1. CombineFileInputFormat-trunk.patch
          0.5 kB
          Chandra Prakash Bhagtani

        Activity

          People

            cpbhagtani Chandra Prakash Bhagtani
            cpbhagtani Chandra Prakash Bhagtani
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated: