Hadoop Common
  1. Hadoop Common
  2. HADOOP-1027

Fix the RAM FileSystem/Merge problems (reported in HADOOP-1014)

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.12.0
    • Component/s: None
    • Labels:
      None

      Description

      1) Merge algorithm implementation does not delete empty segments (sequence files with no key/val data) in cases where single level merges don't happen on those segments (due to the check "numberOfSegmentsRemaining <= factor" returning true). This affected the in-mem merge in a subtle way :-
      For the in-mem merge, the merge-spill-file is given the same name as the name of the 0th entry file in the ramfs. If this file was an empty file, then it would not get deleted from the ramfs, and if the subsequent merge on ramfs chose the same name for the merge-spill-file, it would overwrite the previously created spill. This led to the inconsistent output sizes.

      2) The InMemoryFileSystem has a "close" method which is not protected (only method where pathToFileAttribs map is modified without first locking the InMemoryFileSystem instance) and that quite likely leads to ConcurrentModificationException if some thread calls InMemoryFileSystem.close (due to some exception) and some other thread is in the process of doing InMemoryFileSystem.getFiles(). However, this problem will not affect the correctness of the merge process (anyway the task is going to fail) and the more important thing is that some other exception happened (like insufficient disk space and so map outputs could not be written) which may not be related to the merge process at all.

      3) The number of outputs that is merged at once in RAM should be limited. This is to prevent OutOfMemory errors. Consider a case where there are 10s of thousands of maps and all maps generate empty outputs. Given the default size of the RAM FS as 75 MB, we can possibly accomodate lots of map outputs in RAM without doing any merge but it also results in the various other data structures exploding in size. We have to do a trade off here especially because the inmem-merging is done in the TaskTracker process which already is under a good amount of memory pressure.

      1. 1027-new4.patch
        17 kB
        Devaraj Das
      2. 1027-new3.patch
        17 kB
        Devaraj Das
      3. 1027-new2.patch
        17 kB
        Devaraj Das
      4. 1027-new.patch
        15 kB
        Devaraj Das

        Issue Links

          Activity

          Devaraj Das created issue -
          Devaraj Das made changes -
          Field Original Value New Value
          Link This issue is related to HADOOP-1014 [ HADOOP-1014 ]
          Hide
          Devaraj Das added a comment -

          The attached patch does the following:
          1) Modifies hadoop-default.xml; sets the value of fs.inmemory.size.mb to 75

          2) SequenceFile.Sorter.MergeQueue.merge is modified to cleanup empty segments right at the beginning where the segments are obtained from the sorted map datastructure. Segments are extracted from the sorted map until we have collected the desired number of segments to merge or have no more segments to look at. This optimizes cases where we have lots of empty segments to merge.

          3) InMemoryFileSystem class has been modified to have synchronization blocks wherever pathToFileAttribs map is touched. Some of them may be unnecessary but don't harm. It also has what Owen pointed out as a comment on HADOOP-1014. renameRaw has also been modified to throw an exception if the dst already exists.

          4) The changes in ReduceTaskRunner are:
          4.1) MERGE_THRESHOLD, a number signifying the limit on the max number of files we will accumulate before initiating inmem merge, has been introduced. This is set to 500.
          4.2) The check for whether to initiate inmem merge has been modified to take into account the MERGE_THRESHOLD
          4.3) The on-disk spill file is now created prior to invoking merge (to take care of the case where we may not have any files left in the ram fs to cloneFileAttributes from; this will happen if all map outputs are empty)
          4.4) InMemFileSys.close is called at just one place now

          Show
          Devaraj Das added a comment - The attached patch does the following: 1) Modifies hadoop-default.xml; sets the value of fs.inmemory.size.mb to 75 2) SequenceFile.Sorter.MergeQueue.merge is modified to cleanup empty segments right at the beginning where the segments are obtained from the sorted map datastructure. Segments are extracted from the sorted map until we have collected the desired number of segments to merge or have no more segments to look at. This optimizes cases where we have lots of empty segments to merge. 3) InMemoryFileSystem class has been modified to have synchronization blocks wherever pathToFileAttribs map is touched. Some of them may be unnecessary but don't harm. It also has what Owen pointed out as a comment on HADOOP-1014 . renameRaw has also been modified to throw an exception if the dst already exists. 4) The changes in ReduceTaskRunner are: 4.1) MERGE_THRESHOLD, a number signifying the limit on the max number of files we will accumulate before initiating inmem merge, has been introduced. This is set to 500. 4.2) The check for whether to initiate inmem merge has been modified to take into account the MERGE_THRESHOLD 4.3) The on-disk spill file is now created prior to invoking merge (to take care of the case where we may not have any files left in the ram fs to cloneFileAttributes from; this will happen if all map outputs are empty) 4.4) InMemFileSys.close is called at just one place now
          Devaraj Das made changes -
          Attachment 1027.patch [ 12351580 ]
          Hide
          Devaraj Das added a comment -

          This is the correct patch.

          Show
          Devaraj Das added a comment - This is the correct patch.
          Devaraj Das made changes -
          Attachment 1027-new.patch [ 12351584 ]
          Devaraj Das made changes -
          Attachment 1027.patch [ 12351580 ]
          Hide
          Owen O'Malley added a comment -

          Ack! Jira ate my comment.

          Limiting the fan-in of the merge to 500 works, but means that the "spill size" may be small. WIth 200k maps, it means that you are pretty guaranteed to have 400 spills and thus a 2 level merge, which is unfortunate. If I remember right, the SequenceFile merge code has huge input buffers for the files, which is totally unnecessary for ramfs. Can we disable the buffers? Just the keys for the merge shouldn't be that big of a burden on the jvm.

          Show
          Owen O'Malley added a comment - Ack! Jira ate my comment. Limiting the fan-in of the merge to 500 works, but means that the "spill size" may be small. WIth 200k maps, it means that you are pretty guaranteed to have 400 spills and thus a 2 level merge, which is unfortunate. If I remember right, the SequenceFile merge code has huge input buffers for the files, which is totally unnecessary for ramfs. Can we disable the buffers? Just the keys for the merge shouldn't be that big of a burden on the jvm.
          Devaraj Das made changes -
          Link This issue incorporates HADOOP-1012 [ HADOOP-1012 ]
          Hide
          Devaraj Das added a comment -

          Good point regarding the buffersize. However, the problem with the buffersizes is that it cannot be completely disabled without, probably, major changes in the FileSystem part of the framework. This is because things like checksum depend on the buffersize. As an instance, the minimum buffersize should be at least as much as io.bytes.per.checksum, otherwise the checksum algo won't work, etc.... So, changing the buffersize, although it makes good sense for the ramfs, doesn't guarantee we won't run into the OutOfMemory errors when we increase the number of maps from, lets say, 9000 to 18000 (and all outputs can still fit in the ramfs). One more thing worth noting here is that we will have ramfs, Pririty Queue, and other merge datastructures proportional to the number of files we will merge at once.
          So, chose a middle ground for this. In the merge code, used the minimum buffersize which is same as the value of io.bytes.per.checksum for the case where merge is trying to open a file in the ramfs. Also, added a config value mapred.inmem.merge.threshold; it has a default value of 0 which signifies that we DON'T want to have the file-count-threshold-based merging. If we want it (coz of OutOfMemory errors), then configure some appropriate value for that (like 5000 or so). This should give optimal performance for the typical use cases.
          Makes sense?

          Show
          Devaraj Das added a comment - Good point regarding the buffersize. However, the problem with the buffersizes is that it cannot be completely disabled without, probably, major changes in the FileSystem part of the framework. This is because things like checksum depend on the buffersize. As an instance, the minimum buffersize should be at least as much as io.bytes.per.checksum, otherwise the checksum algo won't work, etc.... So, changing the buffersize, although it makes good sense for the ramfs, doesn't guarantee we won't run into the OutOfMemory errors when we increase the number of maps from, lets say, 9000 to 18000 (and all outputs can still fit in the ramfs). One more thing worth noting here is that we will have ramfs, Pririty Queue, and other merge datastructures proportional to the number of files we will merge at once. So, chose a middle ground for this. In the merge code, used the minimum buffersize which is same as the value of io.bytes.per.checksum for the case where merge is trying to open a file in the ramfs. Also, added a config value mapred.inmem.merge.threshold; it has a default value of 0 which signifies that we DON'T want to have the file-count-threshold-based merging. If we want it (coz of OutOfMemory errors), then configure some appropriate value for that (like 5000 or so). This should give optimal performance for the typical use cases. Makes sense?
          Devaraj Das made changes -
          Attachment 1027-new2.patch [ 12351811 ]
          Hide
          Devaraj Das added a comment -

          Forgot to mention that the patch 1027-new2.patch implements my comments.

          Show
          Devaraj Das added a comment - Forgot to mention that the patch 1027-new2.patch implements my comments.
          Hide
          Devaraj Das added a comment -

          Attached is a patch that applies cleanly to the current trunk. Also, set the merge threshold count (for inmem merge) to 1000 files in hadoop-default.xml.
          I think it makes sense to have merge based also on the count of the number of files accumulated in ramfs, by default. We will then have a better control of the memory usage. It will avoid the various datastructures growing out of bounds (and thereby giving OutOfMemory errors) when there are too many maps (generating small outputs) and the size of the ramfs is large (so many many map outputs can be accomodated).
          Hopefully, since we can accomodate so many map outputs in the ramfs in the first place, the sizes of the outputs are, quite likely, small and hence the on-disk spills are small too. So although disk IO would be there, think it would not be the major bottleneck.
          Thoughts?

          Show
          Devaraj Das added a comment - Attached is a patch that applies cleanly to the current trunk. Also, set the merge threshold count (for inmem merge) to 1000 files in hadoop-default.xml. I think it makes sense to have merge based also on the count of the number of files accumulated in ramfs, by default. We will then have a better control of the memory usage. It will avoid the various datastructures growing out of bounds (and thereby giving OutOfMemory errors) when there are too many maps (generating small outputs) and the size of the ramfs is large (so many many map outputs can be accomodated). Hopefully, since we can accomodate so many map outputs in the ramfs in the first place, the sizes of the outputs are, quite likely, small and hence the on-disk spills are small too. So although disk IO would be there, think it would not be the major bottleneck. Thoughts?
          Devaraj Das made changes -
          Attachment 1027-new3.patch [ 12351895 ]
          Hide
          David Bowen added a comment -

          I notice that this patch contains if statements without the braces. Quoting from the coding guidelines http://java.sun.com/docs/codeconv/html/CodeConventions.doc6.html#449:

          Note: if statements always use braces {}. Avoid the following error-prone form:

          if (condition) //AVOID! THIS OMITS THE BRACES {}!
          statement;

          Show
          David Bowen added a comment - I notice that this patch contains if statements without the braces. Quoting from the coding guidelines http://java.sun.com/docs/codeconv/html/CodeConventions.doc6.html#449: Note: if statements always use braces {}. Avoid the following error-prone form: if (condition) //AVOID! THIS OMITS THE BRACES {}! statement;
          Hide
          Devaraj Das added a comment -

          This hopefully addresses the coding convention related comment.

          Show
          Devaraj Das added a comment - This hopefully addresses the coding convention related comment.
          Devaraj Das made changes -
          Attachment 1027-new4.patch [ 12351949 ]
          Devaraj Das made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Show
          Hadoop QA added a comment - +1, because http://issues.apache.org/jira/secure/attachment/12351949/1027-new4.patch applied and successfully tested against trunk revision http://svn.apache.org/repos/asf/lucene/hadoop/trunk/511100 . Results are at http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch
          Hide
          David Bowen added a comment -

          > This hopefully addresses the coding convention related comment.

          Indeed it does. Thank-you.

          Show
          David Bowen added a comment - > This hopefully addresses the coding convention related comment. Indeed it does. Thank-you.
          Hide
          Owen O'Malley added a comment -

          +1

          Show
          Owen O'Malley added a comment - +1
          Hide
          Doug Cutting added a comment -

          I just committed this. Thanks, Devaraj!

          Show
          Doug Cutting added a comment - I just committed this. Thanks, Devaraj!
          Doug Cutting made changes -
          Fix Version/s 0.12.0 [ 12312293 ]
          Resolution Fixed [ 1 ]
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Doug Cutting made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Owen O'Malley made changes -
          Component/s mapred [ 12310690 ]

            People

            • Assignee:
              Devaraj Das
              Reporter:
              Devaraj Das
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development