Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-646

distcp should place the file distcp_src_files in distributed cache

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.21.0
    • Component/s: distcp
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Patch increases the replication factor of _distcp_src_files to sqrt(min(maxMapsOnCluster, totalMapsInThisJob)) sothat many maps won't access the same replica of the file _distcp_src_files at the same time.

      Description

      When large number of files are being copied by distcp, accessing distcp_src_files seems to be an issue, as all map tasks would be accessing this file. The error message seen is:

      09/06/16 10:13:16 INFO mapred.JobClient: Task Id : attempt_200906040559_0110_m_003348_0, Status : FAILED
      java.io.IOException: Could not obtain block: blk_-4229860619941366534_1500174
      file=/mapredsystem/hadoop/mapredsystem/distcp_7fiyvq/_distcp_src_files
      at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.chooseDataNode(DFSClient.java:1757)
      at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.blockSeekTo(DFSClient.java:1585)
      at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:1712)
      at java.io.DataInputStream.readFully(DataInputStream.java:178)
      at java.io.DataInputStream.readFully(DataInputStream.java:152)
      at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1450)
      at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1428)
      at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1417)
      at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1412)
      at org.apache.hadoop.mapred.SequenceFileRecordReader.<init>(SequenceFileRecordReader.java:43)
      at org.apache.hadoop.tools.DistCp$CopyInputFormat.getRecordReader(DistCp.java:299)
      at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:336)
      at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
      at org.apache.hadoop.mapred.Child.main(Child.java:170)

      This could be because of HADOOP-6038 and/or HADOOP-4681.

      If distcp places this special file distcp_src_files in distributed cache, that could solve the problem.

      1. d_replica_srcfilelist.patch
        2 kB
        Ravi Gummadi
      2. d_replica_srcfilelist_v1.patch
        2 kB
        Ravi Gummadi
      3. d_replica_srcfilelist_v2.patch
        3 kB
        Ravi Gummadi

        Activity

        Hide
        Hudson added a comment -

        Integrated in Hadoop-Mapreduce-trunk #15 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Mapreduce-trunk/15/)

        Show
        Hudson added a comment - Integrated in Hadoop-Mapreduce-trunk #15 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Mapreduce-trunk/15/ )
        Hide
        Tsz Wo Nicholas Sze added a comment -

        I have committed this. Thanks, Ravi!

        Show
        Tsz Wo Nicholas Sze added a comment - I have committed this. Thanks, Ravi!
        Hide
        Ravi Gummadi added a comment -

        ant test-patch gave

        [exec] -1 overall.
        [exec]
        [exec] +1 @author. The patch does not contain any @author tags.
        [exec]
        [exec] -1 tests included. The patch doesn't appear to include any new or modified tests.
        [exec] Please justify why no tests are needed for this patch.
        [exec]
        [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
        [exec]
        [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
        [exec]
        [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
        [exec]
        [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.
        [exec]
        [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings.

        Unit tests passed on my linux machine.

        Show
        Ravi Gummadi added a comment - ant test-patch gave [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] -1 tests included. The patch doesn't appear to include any new or modified tests. [exec] Please justify why no tests are needed for this patch. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. [exec] [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings. Unit tests passed on my linux machine.
        Hide
        Tsz Wo Nicholas Sze added a comment -

        I also think that it is not easy to create new unit tests for this. Manual tests are good enough. Could you run test-patch and the unit tests, and then post the results?

        Show
        Tsz Wo Nicholas Sze added a comment - I also think that it is not easy to create new unit tests for this. Manual tests are good enough. Could you run test-patch and the unit tests, and then post the results?
        Hide
        Ravi Gummadi added a comment -

        Looks difficult to check the replication of this file from testcase as we need to check the replication while the distcp job is running.
        Manually tested with log message that display the replication factor of this file after setReplication() is done for different values by changing the code of testMapCount() in TestCopyFiles.

        Show
        Ravi Gummadi added a comment - Looks difficult to check the replication of this file from testcase as we need to check the replication while the distcp job is running. Manually tested with log message that display the replication factor of this file after setReplication() is done for different values by changing the code of testMapCount() in TestCopyFiles.
        Hide
        Tsz Wo Nicholas Sze added a comment -

        +1 patch looks good. Not sure if it is easy to add a unit test.

        Show
        Tsz Wo Nicholas Sze added a comment - +1 patch looks good. Not sure if it is easy to add a unit test.
        Hide
        Ravi Gummadi added a comment -

        Thanks Nicholas for pointing it out.

        Attached new patch with the suggested change.

        Show
        Ravi Gummadi added a comment - Thanks Nicholas for pointing it out. Attached new patch with the suggested change.
        Hide
        Tsz Wo Nicholas Sze added a comment -

        The replication number should also depend on the number of maps (see DistCp.setMapCount(..)). It does not make sense to set replication to 89 if there are only 10 maps.

        Show
        Tsz Wo Nicholas Sze added a comment - The replication number should also depend on the number of maps (see DistCp.setMapCount(..)). It does not make sense to set replication to 89 if there are only 10 maps.
        Hide
        Ravi Gummadi added a comment -

        Attaching patch with suggested changes.

        Show
        Ravi Gummadi added a comment - Attaching patch with suggested changes.
        Hide
        Doug Cutting added a comment -

        Some comments on the patch:

        • FSShell should not be used to increase replication, rather just call FileSystem#setReplication().
        • The replication should be set immediately after the file is closed, so that it has a chance to get replicated while duplicates are checked before the job is submitted.
        Show
        Doug Cutting added a comment - Some comments on the patch: FSShell should not be used to increase replication, rather just call FileSystem#setReplication(). The replication should be set immediately after the file is closed, so that it has a chance to get replicated while duplicates are checked before the job is submitted.
        Hide
        Doug Cutting added a comment -

        > Would it be better if distcp sets dfs.client.max.block.acquire.failures to sqrt(maxMapsOnCluster)

        The hope is that by increasing the replication there won't be many failures: it shouldn't have to try every replica. So 3 should probably still be fine, I think.

        Show
        Doug Cutting added a comment - > Would it be better if distcp sets dfs.client.max.block.acquire.failures to sqrt(maxMapsOnCluster) The hope is that by increasing the replication there won't be many failures: it shouldn't have to try every replica. So 3 should probably still be fine, I think.
        Hide
        Ravi Gummadi added a comment -

        Doug, Would it be better if distcp sets dfs.client.max.block.acquire.failures to sqrt(maxMapsOnCluster) as DFSClient.DFSInputStream.chooseDataNode() compares number of failures with this config property(default value of 3) ? But we need that only for this file _distcp_src_files.

        Show
        Ravi Gummadi added a comment - Doug, Would it be better if distcp sets dfs.client.max.block.acquire.failures to sqrt(maxMapsOnCluster) as DFSClient.DFSInputStream.chooseDataNode() compares number of failures with this config property(default value of 3) ? But we need that only for this file _distcp_src_files.
        Hide
        Ravi Gummadi added a comment -

        Attaching patch for increasing the replication of _distcp_src_files to sqrt(maxMapsOnCluster).

        Please review and provide your comments.

        Show
        Ravi Gummadi added a comment - Attaching patch for increasing the replication of _distcp_src_files to sqrt(maxMapsOnCluster). Please review and provide your comments.
        Hide
        Doug Cutting added a comment -

        > Is that still OK fornamenode's perf ?

        This should not be a problem for the namenode. It would be best to write the file first with normal replication, then increase its replication, to avoid an overly-long HDFS write pipeline.

        The rationale for sqrt is that a two-stage fanout is done: first from the original to the replicas, then from the replicas to the maps. Sqrt(maps) uses approximately the same fanout factor at each stage, minimizing the number of datanode clients (the presumed bottleneck here).

        Show
        Doug Cutting added a comment - > Is that still OK fornamenode's perf ? This should not be a problem for the namenode. It would be best to write the file first with normal replication, then increase its replication, to avoid an overly-long HDFS write pipeline. The rationale for sqrt is that a two-stage fanout is done: first from the original to the replicas, then from the replicas to the maps. Sqrt(maps) uses approximately the same fanout factor at each stage, minimizing the number of datanode clients (the presumed bottleneck here).
        Hide
        Ravi Gummadi added a comment -

        In general, I think the size of this file distcp_src_files would not consume many hdfs blocks space.

        WIth thousands of nodes in cluster(say 4000), even sqrt of getMaxMapTasks() would be 89(i.e. sqrt(8000)), which is a big number for replication. Is that still OK fornamenode's perf with many distcp jobs running parallelly, each creating this file with this many replicas ?

        Show
        Ravi Gummadi added a comment - In general, I think the size of this file distcp_src_files would not consume many hdfs blocks space. WIth thousands of nodes in cluster(say 4000), even sqrt of getMaxMapTasks() would be 89(i.e. sqrt(8000)), which is a big number for replication. Is that still OK fornamenode's perf with many distcp jobs running parallelly, each creating this file with this many replicas ?
        Hide
        Doug Cutting added a comment -

        Distributed cache would give every mapper a full copy of the file, but each only reads a portion of the file, so I think increased replication is more appropriate than the distributed cache.

        We can find the number of map slots with JobClient#getClusterStatus().getMaxMapTasks(). We might set the replication to the square root of that. This should not overload the namenode worse than any other job.

        Show
        Doug Cutting added a comment - Distributed cache would give every mapper a full copy of the file, but each only reads a portion of the file, so I think increased replication is more appropriate than the distributed cache. We can find the number of map slots with JobClient#getClusterStatus().getMaxMapTasks(). We might set the replication to the square root of that. This should not overload the namenode worse than any other job.
        Hide
        Ravi Gummadi added a comment -

        Yes. Increasing replication is another solution. But since we dont know the number of parallel maps that can run, setting replication to a fixed value like 10 may not be enough for some cases and also can become overhead on namenode. So distributed cache could be better ?

        Show
        Ravi Gummadi added a comment - Yes. Increasing replication is another solution. But since we dont know the number of parallel maps that can run, setting replication to a fixed value like 10 may not be enough for some cases and also can become overhead on namenode. So distributed cache could be better ?
        Hide
        Doug Cutting added a comment -

        Another option might be to increase its replication, no?

        Show
        Doug Cutting added a comment - Another option might be to increase its replication, no?

          People

          • Assignee:
            Ravi Gummadi
            Reporter:
            Ravi Gummadi
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development