Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-5508

JobTracker memory leak caused by unreleased FileSystem objects in JobInProgress#cleanupJob

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 1-win, 1.2.1
    • Fix Version/s: 1-win, 1.3.0
    • Component/s: jobtracker
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Target Version/s:

      Description

      MAPREDUCE-5351 fixed a memory leak problem but introducing another filesystem object (see "tempDirFs") that is not properly released.

       JobInProgress#cleanupJob()
      
        void cleanupJob() {
      ...
                tempDirFs = jobTempDirPath.getFileSystem(conf);
                CleanupQueue.getInstance().addToQueue(
                    new PathDeletionContext(jobTempDirPath, conf, userUGI, jobId));
      ...
       if (tempDirFs != fs) {
            try {
              fs.close();
            } catch (IOException ie) {
      ...
      }
      
      1. MAPREDUCE-5508.patch
        2 kB
        Xi Fang
      2. MAPREDUCE-5508.3.patch
        4 kB
        Xi Fang
      3. MAPREDUCE-5508.2.patch
        4 kB
        Xi Fang
      4. MAPREDUCE-5508.1.patch
        3 kB
        Xi Fang
      5. JobInProgress.java
        126 kB
        viswanathan
      6. CleanupQueue.java
        6 kB
        viswanathan

        Issue Links

          Activity

          Hide
          viswanathan added a comment -

          Thanks a lot Xi.

          Show
          viswanathan added a comment - Thanks a lot Xi.
          Hide
          Xi Fang added a comment -

          One way to confirm that is to set to mapred.jobtracker.completeuserjobs.maximum = 0 and run some jobs. After all the jobs are done, wait for a while and check the number of FS objects in FileSystem#Cache.

          Show
          Xi Fang added a comment - One way to confirm that is to set to mapred.jobtracker.completeuserjobs.maximum = 0 and run some jobs. After all the jobs are done, wait for a while and check the number of FS objects in FileSystem#Cache.
          Hide
          viswanathan added a comment -

          Hi Chris,

          Thanks for your detailed response.

          Thanks,

          Show
          viswanathan added a comment - Hi Chris, Thanks for your detailed response. Thanks,
          Hide
          Chris Nauroth added a comment -

          Hope this fix is committed in branch-1, please share the revision of the commit.

          http://svn.apache.org/viewvc?view=revision&revision=1497962
          http://svn.apache.org/viewvc?view=revision&revision=1499904
          http://svn.apache.org/viewvc?view=revision&revision=1525774

          I have noticed that heap size of Jobtracker is gradually increasing after the upgrade also.

          Just observing heap size wouldn't be sufficient to confirm or deny that the fix is in place. It's natural for the JVM to grow the heap as needed. Incremental garbage collection will clean that up gradually, and a full GC eventually would reclaim all unused space. All of this is too unpredictable to confirm or deny the memory leak.

          We confirmed this fix by running various MapReduce workloads in a controlled environment, running jmap on the JobTracker process to dump a memory map, and then viewing the dump with jhat. When the memory leak happens, you end up seeing DistributedFileSystem instances that are only referenced from the internal HashMap of the FileSystem#Cache. (With no other reference to the instance, it means that no one is ever going to close it, and therefore it will never get removed from the cache.) With all of these patches applied, we see all DistributedFileSystem instances are referenced from the FileSystem#Cache and also some other references.

          Do I need to update any other patches/fix?

          No, that's all of them.

          If there are any additional questions, I recommend moving to the user@hadoop.apache.org mailing list.

          Show
          Chris Nauroth added a comment - Hope this fix is committed in branch-1, please share the revision of the commit. http://svn.apache.org/viewvc?view=revision&revision=1497962 http://svn.apache.org/viewvc?view=revision&revision=1499904 http://svn.apache.org/viewvc?view=revision&revision=1525774 I have noticed that heap size of Jobtracker is gradually increasing after the upgrade also. Just observing heap size wouldn't be sufficient to confirm or deny that the fix is in place. It's natural for the JVM to grow the heap as needed. Incremental garbage collection will clean that up gradually, and a full GC eventually would reclaim all unused space. All of this is too unpredictable to confirm or deny the memory leak. We confirmed this fix by running various MapReduce workloads in a controlled environment, running jmap on the JobTracker process to dump a memory map, and then viewing the dump with jhat. When the memory leak happens, you end up seeing DistributedFileSystem instances that are only referenced from the internal HashMap of the FileSystem#Cache . (With no other reference to the instance, it means that no one is ever going to close it, and therefore it will never get removed from the cache.) With all of these patches applied, we see all DistributedFileSystem instances are referenced from the FileSystem#Cache and also some other references. Do I need to update any other patches/fix? No, that's all of them. If there are any additional questions, I recommend moving to the user@hadoop.apache.org mailing list.
          Hide
          viswanathan added a comment -

          Hi Chris/Xi,

          PFA of Java classes which was modified based on the patches for JT OOME.

          Do I miss anything in the Java class? I have modified based on the patches.

          In hadoop 1.2.1, FileSystem.java class remains same as in the patch so I didn't modified.

          Please review and send your feedback.

          Thanks,

          Show
          viswanathan added a comment - Hi Chris/Xi, PFA of Java classes which was modified based on the patches for JT OOME. Do I miss anything in the Java class? I have modified based on the patches. In hadoop 1.2.1, FileSystem.java class remains same as in the patch so I didn't modified. Please review and send your feedback. Thanks,
          Hide
          viswanathan added a comment -

          Hi Chris/Xi,

          Hope you are doing great.

          As mentioned, have upgraded our HDFS cluster with the following 3 patches for JT OOME.

          https://issues.apache.org/jira/secure/attachment/12590105/MAPREDUCE-5351-2.patch
          https://issues.apache.org/jira/secure/attachment/12590672/MAPREDUCE-5351-addendum-1.patch
          https://issues.apache.org/jira/secure/attachment/12604722/MAPREDUCE-5508.3.patch

          In the above patches, I didn't add the patch of TestCleanupQueue class. Hope its not required.

          I have noticed that heap size of Jobtracker is gradually increasing after the upgrade also.

          Currently the heap size as follows,
          Cluster Summary (Heap Size is 869.19 MB/8.89 GB) - Yesterday while re-starting the JT the heap size was only 200MB plus.

          Is the heap size will reduce automatically after any threshold limit?

          Do I need to update any other patches/fix?

          Please do the needful as it is in production environment.

          Thanks,
          Viswa.J

          Show
          viswanathan added a comment - Hi Chris/Xi, Hope you are doing great. As mentioned, have upgraded our HDFS cluster with the following 3 patches for JT OOME. https://issues.apache.org/jira/secure/attachment/12590105/MAPREDUCE-5351-2.patch https://issues.apache.org/jira/secure/attachment/12590672/MAPREDUCE-5351-addendum-1.patch https://issues.apache.org/jira/secure/attachment/12604722/MAPREDUCE-5508.3.patch In the above patches, I didn't add the patch of TestCleanupQueue class. Hope its not required. I have noticed that heap size of Jobtracker is gradually increasing after the upgrade also. Currently the heap size as follows, Cluster Summary (Heap Size is 869.19 MB/8.89 GB) - Yesterday while re-starting the JT the heap size was only 200MB plus. Is the heap size will reduce automatically after any threshold limit? Do I need to update any other patches/fix? Please do the needful as it is in production environment. Thanks, Viswa.J
          Hide
          viswanathan added a comment -

          Hi Chris,

          Hope this fix is committed in branch-1, please share the revision of the commit.

          Thanks,

          Show
          viswanathan added a comment - Hi Chris, Hope this fix is committed in branch-1, please share the revision of the commit. Thanks,
          Hide
          Xi Fang added a comment -

          Thanks Chris and viswanathan. And I think the three patches are what you need. It won't affect production environment because it is a very back-end thing. Users won't notice any difference I think.

          Show
          Xi Fang added a comment - Thanks Chris and viswanathan. And I think the three patches are what you need. It won't affect production environment because it is a very back-end thing. Users won't notice any difference I think.
          Hide
          viswanathan added a comment -

          Hi Chris,

          Appreciate your response.

          Thanks,

          Show
          viswanathan added a comment - Hi Chris, Appreciate your response. Thanks,
          Hide
          viswanathan added a comment -

          Hi Chris,

          Hope this 3 patches only need to apply, if any other patches missing please let me know.

          https://issues.apache.org/jira/secure/attachment/12590105/MAPREDUCE-5351-2.patch
          https://issues.apache.org/jira/secure/attachment/12590672/MAPREDUCE-5351-addendum-1.patch
          https://issues.apache.org/jira/secure/attachment/12604722/MAPREDUCE-5508.3.patch

          Also have downloaded the latest hadoop-1.2.1 version, while building jar I'm getting snapshot version(i.e., hadoop-core-1.2.2-SNAPSHOT.jar) jars. Is these because of some other changes are made in that version.

          Hope this will not be a problem for updating in production environment.

          Thanks,

          Show
          viswanathan added a comment - Hi Chris, Hope this 3 patches only need to apply, if any other patches missing please let me know. https://issues.apache.org/jira/secure/attachment/12590105/MAPREDUCE-5351-2.patch https://issues.apache.org/jira/secure/attachment/12590672/MAPREDUCE-5351-addendum-1.patch https://issues.apache.org/jira/secure/attachment/12604722/MAPREDUCE-5508.3.patch Also have downloaded the latest hadoop-1.2.1 version, while building jar I'm getting snapshot version(i.e., hadoop-core-1.2.2-SNAPSHOT.jar) jars. Is these because of some other changes are made in that version. Hope this will not be a problem for updating in production environment. Thanks,
          Hide
          viswanathan added a comment -

          Thanks a lot Chris for your valuable response.

          Show
          viswanathan added a comment - Thanks a lot Chris for your valuable response.
          Hide
          Chris Nauroth added a comment -

          Hello, viswanathan. I'm sorry to hear that this bug is causing trouble for you. It is not committed for 1.2.1. It would be included in the 1.3.0 release.

          I recommend against doing what you described: trying to obtain a compiled jar for a patched 1.2.1 from an individual dev. The problem with this approach is that you wouldn't know for sure what was included in that jar. This would open your deployment to security risk, because whoever compiled that jar could have snuck in a backdoor. Even if you decide to trust a particular Apache dev for this, there is still the risk of a man-in-the-middle interfering with your download.

          The Apache release process actually provides everything you need to obtain the official 1.2.1 source, apply additional patches, and then build from source yourself. Official Apache releases come with the signature of the release manager and a checksum that you can use to verify the contents. Patches are publicly posted in jira, Subversion, and mirrored to git. Commits to the repositories are authenticated, so you know that only Apache committers have committed the patches. Every committed patch requires agreement from at least two people: the contributor and at least one other committer.

          You can find a mirror for downloading a distro from here:

          http://www.apache.org/dyn/closer.cgi/hadoop/common/

          For example, here is a direct link to 1.2.1 at one of the mirrors:

          http://www.interior-dsgn.com/apache/hadoop/common/hadoop-1.2.1/

          After downloading the source, there are multiple patches related to this memory leak that you'll want to apply, from MAPREDUCE-5351 and MAPREDUCE-5508:

          https://issues.apache.org/jira/secure/attachment/12590105/MAPREDUCE-5351-2.patch

          https://issues.apache.org/jira/secure/attachment/12590672/MAPREDUCE-5351-addendum-1.patch

          https://issues.apache.org/jira/secure/attachment/12604722/MAPREDUCE-5508.3.patch

          Then, running "ant jar" ought to be sufficient to build and create the jar that you need. By following this process, you'll get a much more trustworthy jar than something compiled and provided by an individual Apache developer.

          Show
          Chris Nauroth added a comment - Hello, viswanathan . I'm sorry to hear that this bug is causing trouble for you. It is not committed for 1.2.1. It would be included in the 1.3.0 release. I recommend against doing what you described: trying to obtain a compiled jar for a patched 1.2.1 from an individual dev. The problem with this approach is that you wouldn't know for sure what was included in that jar. This would open your deployment to security risk, because whoever compiled that jar could have snuck in a backdoor. Even if you decide to trust a particular Apache dev for this, there is still the risk of a man-in-the-middle interfering with your download. The Apache release process actually provides everything you need to obtain the official 1.2.1 source, apply additional patches, and then build from source yourself. Official Apache releases come with the signature of the release manager and a checksum that you can use to verify the contents. Patches are publicly posted in jira, Subversion, and mirrored to git. Commits to the repositories are authenticated, so you know that only Apache committers have committed the patches. Every committed patch requires agreement from at least two people: the contributor and at least one other committer. You can find a mirror for downloading a distro from here: http://www.apache.org/dyn/closer.cgi/hadoop/common/ For example, here is a direct link to 1.2.1 at one of the mirrors: http://www.interior-dsgn.com/apache/hadoop/common/hadoop-1.2.1/ After downloading the source, there are multiple patches related to this memory leak that you'll want to apply, from MAPREDUCE-5351 and MAPREDUCE-5508 : https://issues.apache.org/jira/secure/attachment/12590105/MAPREDUCE-5351-2.patch https://issues.apache.org/jira/secure/attachment/12590672/MAPREDUCE-5351-addendum-1.patch https://issues.apache.org/jira/secure/attachment/12604722/MAPREDUCE-5508.3.patch Then, running "ant jar" ought to be sufficient to build and create the jar that you need. By following this process, you'll get a much more trustworthy jar than something compiled and provided by an individual Apache developer.
          Hide
          viswanathan added a comment -

          Hi Chris,

          Is this fix committed for hadoop-1.2.1 release? If not, could you please provide the complied jar(for version 1.2.1) of these fix.

          Thanks,

          Show
          viswanathan added a comment - Hi Chris, Is this fix committed for hadoop-1.2.1 release? If not, could you please provide the complied jar(for version 1.2.1) of these fix. Thanks,
          Hide
          Xi Fang added a comment -

          Thanks Chris and Sandy!

          Show
          Xi Fang added a comment - Thanks Chris and Sandy!
          Hide
          Chris Nauroth added a comment -

          I filed HADOOP-9993 for documenting the requirement that FileSystem#close implementations must be idempotent.

          Show
          Chris Nauroth added a comment - I filed HADOOP-9993 for documenting the requirement that FileSystem#close implementations must be idempotent.
          Hide
          Chris Nauroth added a comment -

          I have committed this to branch-1 and branch-1-win. Xi, thank you for providing a patch for this tricky issue. Sandy, thank you for help with code reviews.

          Show
          Chris Nauroth added a comment - I have committed this to branch-1 and branch-1-win. Xi, thank you for providing a patch for this tricky issue. Sandy, thank you for help with code reviews.
          Hide
          Xi Fang added a comment -

          Thanks Chris and Sandy. I just finished the large scale test. I didn't find memory leak in my test. I removed tabs and attached a new patch.

          So Chris, do you think we should file a new Jira for the idempotent implementation?

          Show
          Xi Fang added a comment - Thanks Chris and Sandy. I just finished the large scale test. I didn't find memory leak in my test. I removed tabs and attached a new patch. So Chris, do you think we should file a new Jira for the idempotent implementation?
          Hide
          Sandy Ryza added a comment -

          There are some tabs that should be converted to spaces. Other than that I am +1 for the patch.

          Show
          Sandy Ryza added a comment - There are some tabs that should be converted to spaces. Other than that I am +1 for the patch.
          Hide
          Chris Nauroth added a comment -

          +1 for the patch. Thanks again, Xi.

          Sandy Ryza, assuming that Xi's large scale tests come back showing no memory leaks, are you also +1 for this patch? If so, then I will commit to branch-1 and branch-1-win.

          Does this mean that ... could create either one or two FileSystems objects?

          If that code sample is the only thread running, then only one instance is created, and fs1 == fs2. With multiple threads running, it's non-deterministic, because the other threads could be running FileSystem#get and FileSystem#close on the same cached instances at just the right moment. It's possible to get 2 instances created, and fs1 != fs2.

          It's a good idea to document that FileSystem#close requires an idempotent implementation, in scope of a separate jira. In practice, DistributedFileSystem does guarantee idempotence via a synchronized close method and an isRunning flag inside the DFSClient.

          BTW, while researching some of these issues around the cache, I started to think that we ought to ref-count the instances to better guard against problems like this. Then, I found HADOOP-4655. Discussion in that issue made an intentional choice not to ref count in order to preserve backwards-compatibility with clients that don't call close.

          Show
          Chris Nauroth added a comment - +1 for the patch. Thanks again, Xi. Sandy Ryza , assuming that Xi's large scale tests come back showing no memory leaks, are you also +1 for this patch? If so, then I will commit to branch-1 and branch-1-win. Does this mean that ... could create either one or two FileSystems objects? If that code sample is the only thread running, then only one instance is created, and fs1 == fs2. With multiple threads running, it's non-deterministic, because the other threads could be running FileSystem#get and FileSystem#close on the same cached instances at just the right moment. It's possible to get 2 instances created, and fs1 != fs2. It's a good idea to document that FileSystem#close requires an idempotent implementation, in scope of a separate jira. In practice, DistributedFileSystem does guarantee idempotence via a synchronized close method and an isRunning flag inside the DFSClient . BTW, while researching some of these issues around the cache, I started to think that we ought to ref-count the instances to better guard against problems like this. Then, I found HADOOP-4655 . Discussion in that issue made an intentional choice not to ref count in order to preserve backwards-compatibility with clients that don't call close.
          Hide
          Sandy Ryza added a comment -

          Does this mean that

          FileSystem fs1 = FileSystem.get(conf);
          FileSystem fs2 = FileSystem.get(conf);
          

          could create either one or two FileSystems objects?

          If that's the case we should document that FileSystem#close implementations must be idempotent

          Show
          Sandy Ryza added a comment - Does this mean that FileSystem fs1 = FileSystem.get(conf); FileSystem fs2 = FileSystem.get(conf); could create either one or two FileSystems objects? If that's the case we should document that FileSystem#close implementations must be idempotent
          Hide
          Xi Fang added a comment -

          Thanks Chris. I attached a new patch and will launch a large scale test tomorrow.

          Show
          Xi Fang added a comment - Thanks Chris. I attached a new patch and will launch a large scale test tomorrow.
          Hide
          Chris Nauroth added a comment -

          If we call Thread.currentThread().interrupt(), is that possible that fs won't be closed in JobInProgress#cleanupJob()?

          I think fs will be closed, and I don't think calling Thread.currentThread().interrupt() will harm that. This just sets the interrupt status of the thread (basically setting a flag to true on the thread). Nothing substantial happens with the interrupt status immediately. If there is a call to a blocking API (i.e. Object.wait or Thread.sleep), then a thread in interrupted status will cause a throw of InterruptedException. I don't see any calls to blocking APIs between the doAs and the close of fs, so I don't see an issue that could prevent close of fs.

          I think if the answer to my first question is "fs will be closed in JobInProgress#cleanupJob()", there will be no memory leak.

          Yes, I think you're right. Thanks! Please disregard this part of my feedback.

          I think the only thing left is to add the call to Thread.currentThread().interrupt().

          Show
          Chris Nauroth added a comment - If we call Thread.currentThread().interrupt(), is that possible that fs won't be closed in JobInProgress#cleanupJob()? I think fs will be closed, and I don't think calling Thread.currentThread().interrupt() will harm that. This just sets the interrupt status of the thread (basically setting a flag to true on the thread). Nothing substantial happens with the interrupt status immediately. If there is a call to a blocking API (i.e. Object.wait or Thread.sleep), then a thread in interrupted status will cause a throw of InterruptedException. I don't see any calls to blocking APIs between the doAs and the close of fs, so I don't see an issue that could prevent close of fs. I think if the answer to my first question is "fs will be closed in JobInProgress#cleanupJob()", there will be no memory leak. Yes, I think you're right. Thanks! Please disregard this part of my feedback. I think the only thing left is to add the call to Thread.currentThread().interrupt().
          Hide
          Xi Fang added a comment -

          Chris Nauroth, thanks for your comments.

          Swallowing the InterruptedException is problematic if any upstream code depends on seeing the thread's interrupted status, so let's restore the interrupted status in the catch block by calling Thread.currentThread().interrupt().

          If we call Thread.currentThread().interrupt(), is that possible that fs won't be closed in JobInProgress#cleanupJob()?

          If there is an InterruptedException, then we currently would pass a null tempDirFs to the CleanupQueue, where we'd once again risk leaking memory. I suggest that if there is an InterruptedException, then we skip adding to the CleanupQueue and log a warning. This is consistent with the error-handling strategy in the rest of the method. (It logs warnings.)

          I think if the answer to my first question is "fs will be closed in JobInProgress#cleanupJob()", there will be no memory leak. This is because even if we pass null into CleanupQueue, the new fs created in CleanupQueue#deletePath() would be closed anyway.

          Thanks Chris.

          Show
          Xi Fang added a comment - Chris Nauroth , thanks for your comments. Swallowing the InterruptedException is problematic if any upstream code depends on seeing the thread's interrupted status, so let's restore the interrupted status in the catch block by calling Thread.currentThread().interrupt(). If we call Thread.currentThread().interrupt(), is that possible that fs won't be closed in JobInProgress#cleanupJob()? If there is an InterruptedException, then we currently would pass a null tempDirFs to the CleanupQueue, where we'd once again risk leaking memory. I suggest that if there is an InterruptedException, then we skip adding to the CleanupQueue and log a warning. This is consistent with the error-handling strategy in the rest of the method. (It logs warnings.) I think if the answer to my first question is "fs will be closed in JobInProgress#cleanupJob()", there will be no memory leak. This is because even if we pass null into CleanupQueue, the new fs created in CleanupQueue#deletePath() would be closed anyway. Thanks Chris.
          Hide
          Chris Nauroth added a comment -

          Thanks for the new patch, Xi. This mostly looks good to me, and I'm glad to hear that it still fixes the memory leak. Here are a few comments:

          1. Can we remove the unused PathDeletionContext constructor? It would require a small change in TestCleanupQueue.
          2. Swallowing the InterruptedException is problematic if any upstream code depends on seeing the thread's interrupted status, so let's restore the interrupted status in the catch block by calling Thread.currentThread().interrupt().
          3. If there is an InterruptedException, then we currently would pass a null tempDirFs to the CleanupQueue, where we'd once again risk leaking memory. I suggest that if there is an InterruptedException, then we skip adding to the CleanupQueue and log a warning. This is consistent with the error-handling strategy in the rest of the method. (It logs warnings.)
          Show
          Chris Nauroth added a comment - Thanks for the new patch, Xi. This mostly looks good to me, and I'm glad to hear that it still fixes the memory leak. Here are a few comments: Can we remove the unused PathDeletionContext constructor? It would require a small change in TestCleanupQueue . Swallowing the InterruptedException is problematic if any upstream code depends on seeing the thread's interrupted status, so let's restore the interrupted status in the catch block by calling Thread.currentThread().interrupt() . If there is an InterruptedException , then we currently would pass a null tempDirFs to the CleanupQueue , where we'd once again risk leaking memory. I suggest that if there is an InterruptedException , then we skip adding to the CleanupQueue and log a warning. This is consistent with the error-handling strategy in the rest of the method. (It logs warnings.)
          Hide
          Xi Fang added a comment -

          I set both staging and system dirs to hdfs on my test cluster. I ran 35,000 job submissions and manually checked the number of DistributedFileSystem objects. No memory leak related to DistributedFileSystem was found.

          Show
          Xi Fang added a comment - I set both staging and system dirs to hdfs on my test cluster. I ran 35,000 job submissions and manually checked the number of DistributedFileSystem objects. No memory leak related to DistributedFileSystem was found.
          Hide
          Xi Fang added a comment -

          Thanks Chris and Sandy. I made a draft patch for the proposal. I am thinking we still pass "tempDirFs" into PathDeletionContext instead of passing "fs", in order to deal with the case that fs is closed by someone. Although tempDirFs might be different from fs due to the different subject problem discussed above, in most of cases they would be the same (I used "userUGI" to get tempDirFs). So this is still an optimization. Let me know your comments. Thanks.

          Show
          Xi Fang added a comment - Thanks Chris and Sandy. I made a draft patch for the proposal. I am thinking we still pass "tempDirFs" into PathDeletionContext instead of passing "fs", in order to deal with the case that fs is closed by someone. Although tempDirFs might be different from fs due to the different subject problem discussed above, in most of cases they would be the same (I used "userUGI" to get tempDirFs). So this is still an optimization. Let me know your comments. Thanks.
          Hide
          Sandy Ryza added a comment -

          Sorry for the delay in getting back. That makes sense about the UGI. +1 to passing the correct FileSystem with the PathDeletionContext.

          Show
          Sandy Ryza added a comment - Sorry for the delay in getting back. That makes sense about the UGI. +1 to passing the correct FileSystem with the PathDeletionContext.
          Hide
          Chris Nauroth added a comment -

          I have committed HDFS-5211.

          Xi is going to work on an alternative patch that tries my second idea: pass the correct FileSystem into the PathDeletionContext so that CleanupQueue doesn't need to rely on the cache's implicit key creation. If that works, and if we confirm that the new patch also fixes the leak, then we can compare the two patches and decide which one to commit.

          Show
          Chris Nauroth added a comment - I have committed HDFS-5211 . Xi is going to work on an alternative patch that tries my second idea: pass the correct FileSystem into the PathDeletionContext so that CleanupQueue doesn't need to rely on the cache's implicit key creation. If that works, and if we confirm that the new patch also fixes the leak, then we can compare the two patches and decide which one to commit.
          Hide
          Xi Fang added a comment -

          Thanks Chris for filing HDFS-5211. That sounds good to me

          Show
          Xi Fang added a comment - Thanks Chris for filing HDFS-5211 . That sounds good to me
          Hide
          Chris Nauroth added a comment -

          ...race condition between DistributedFileSystem#close() and FileSystem#close()...

          I've been meaning to file an issue for this one, so I just created HDFS-5211. Xi, do you mind if we handle that issue separately and keep this issue focused on fixing the JobTracker memory leak? It's an HDFS bug that impacts any multi-threaded usage of DistributedFileSystem (not just this MapReduce code path), and I think we can fix the root cause there in HDFS.

          IOW, we don't need to let the presence of that bug influence the way we code a fix for this one, because I think we can fix both.

          Show
          Chris Nauroth added a comment - ...race condition between DistributedFileSystem#close() and FileSystem#close()... I've been meaning to file an issue for this one, so I just created HDFS-5211 . Xi, do you mind if we handle that issue separately and keep this issue focused on fixing the JobTracker memory leak? It's an HDFS bug that impacts any multi-threaded usage of DistributedFileSystem (not just this MapReduce code path), and I think we can fix the root cause there in HDFS. IOW, we don't need to let the presence of that bug influence the way we code a fix for this one, because I think we can fix both.
          Hide
          Xi Fang added a comment -

          Thanks Sandy Ryza and Chris Nauroth. Actually, the above discussion made me have second thoughts on the patch attached. There is a race condition here. Supposed that Path#getFileSystem in CleanupQueue#deletePath retrieved the same instance of JobInProgress#fs from FileSystem#Cache as well. Because there is race condition between DistributedFileSystem#close() and FileSystem#close(), it is possible that at the most just after JobInProgress#cleanupJob closed JobInProgress#fs's DFSClient, the processor switched to CleanupQueue#deletePath and called fs.delete(). Because this fs's DFCClient has been closed, an exception would be thrown and this staging directory won't be deleted then.

          Show
          Xi Fang added a comment - Thanks Sandy Ryza and Chris Nauroth . Actually, the above discussion made me have second thoughts on the patch attached. There is a race condition here. Supposed that Path#getFileSystem in CleanupQueue#deletePath retrieved the same instance of JobInProgress#fs from FileSystem#Cache as well. Because there is race condition between DistributedFileSystem#close() and FileSystem#close(), it is possible that at the most just after JobInProgress#cleanupJob closed JobInProgress#fs's DFSClient, the processor switched to CleanupQueue#deletePath and called fs.delete(). Because this fs's DFCClient has been closed, an exception would be thrown and this staging directory won't be deleted then.
          Hide
          Chris Nauroth added a comment -

          If I understand correctly, in that case the same UGI instance (JobInProgress.userUGI) that was used to create the fs is used to close it, so having different subjects is not possible.

          The 2 UGI instances (one passed in to PathDeletionContext and one created implicitly by the call to Path#getFileSystem) do have the same value, but they have different identities. Even though they are logically equivalent, there are 2 different underlying Subject instances with different identity hash codes. Thus, the cache entry used during creation of the FileSystem will be different from the one used during close, which causes the leak.

          Another way of saying this is that even though the same Principal is used, Path#getFileSystem will create a different Subject, because CleanupQueue is running inside a different JAAS AccessControlContext. The Subject is a function of not only the Principal but also the AccessControlContext. (Again, both the AccessControlContext and Subject may be logically equivalent, but we're observing that they are not the same instances, so they have different identity hash codes.)

          If you really want to avoid the extra close, then the only other possible solution that I can think of would involve passing the FileSystem instance to use into the PathDeletionContext. This would make it explicit instead of relying on the implicit cache lookup of Path#getFileSystem. I haven't completely thought through if that approach would have other side effects though.

          Show
          Chris Nauroth added a comment - If I understand correctly, in that case the same UGI instance (JobInProgress.userUGI) that was used to create the fs is used to close it, so having different subjects is not possible. The 2 UGI instances (one passed in to PathDeletionContext and one created implicitly by the call to Path#getFileSystem ) do have the same value, but they have different identities. Even though they are logically equivalent, there are 2 different underlying Subject instances with different identity hash codes. Thus, the cache entry used during creation of the FileSystem will be different from the one used during close, which causes the leak. Another way of saying this is that even though the same Principal is used, Path#getFileSystem will create a different Subject , because CleanupQueue is running inside a different JAAS AccessControlContext . The Subject is a function of not only the Principal but also the AccessControlContext . (Again, both the AccessControlContext and Subject may be logically equivalent, but we're observing that they are not the same instances, so they have different identity hash codes.) If you really want to avoid the extra close, then the only other possible solution that I can think of would involve passing the FileSystem instance to use into the PathDeletionContext . This would make it explicit instead of relying on the implicit cache lookup of Path#getFileSystem . I haven't completely thought through if that approach would have other side effects though.
          Hide
          Sandy Ryza added a comment -

          If I understand correctly, in that case the same UGI instance (JobInProgress.userUGI) that was used to create the fs is used to close it, so having different subjects is not possible.

          Show
          Sandy Ryza added a comment - If I understand correctly, in that case the same UGI instance (JobInProgress.userUGI) that was used to create the fs is used to close it, so having different subjects is not possible.
          Hide
          Xi Fang added a comment -

          Thanks Sandy for the information on HADOOP-6670. I think we may still need to close fs anyway, because p.getFileSystem(conf) in CleanupQueue#deletePath may not be able to find the FileSystem#Cache entry of JobInProgress#fs because of the different subject problem we discussed above. In this case, nothing will remove JobInProgress#fs from the FileSystem#Cache.

          Show
          Xi Fang added a comment - Thanks Sandy for the information on HADOOP-6670 . I think we may still need to close fs anyway, because p.getFileSystem(conf) in CleanupQueue#deletePath may not be able to find the FileSystem#Cache entry of JobInProgress#fs because of the different subject problem we discussed above. In this case, nothing will remove JobInProgress#fs from the FileSystem#Cache.
          Hide
          Sandy Ryza added a comment -

          Ahh ok that makes total sense. Agreed that we shouldn't change the equals/hashCode - this behavior was intentional (HADOOP-6670).

          We still shouldn't need to close fs in every case though, right? We should be able to look at the scheme of the jobSubmitDir and only close it if it doesn't match the scheme of jobTempDir?

          Show
          Sandy Ryza added a comment - Ahh ok that makes total sense. Agreed that we shouldn't change the equals/hashCode - this behavior was intentional ( HADOOP-6670 ). We still shouldn't need to close fs in every case though, right? We should be able to look at the scheme of the jobSubmitDir and only close it if it doesn't match the scheme of jobTempDir?
          Hide
          Xi Fang added a comment -

          Just found Chris was also working on this thread . I agree with Chris. Changing the hash code may have a wide impact on existing code that would be risky.

          Show
          Xi Fang added a comment - Just found Chris was also working on this thread . I agree with Chris. Changing the hash code may have a wide impact on existing code that would be risky.
          Hide
          Xi Fang added a comment -

          Sandy Ryza Thanks for your comments.

          Have you tested this fix.

          Yes. We have tested this fix on our test cluster (about 130,000 submission). After the workflow was done, we waited for a couple of minutes (jobs were retiring), then forced GC, and then dumped the memory. We manually checked the FileSystem#Cache. There was no memory leak.

          For your analysis

          1. I agree with "it doesn't appear that tempDirFs and fs are ever even ending up equal because tempDirFs is created with the wrong UGI."
          2. I think tempDir would be fine because 1) JobInProgess#cleanupJob won't introduce a file system instance for tempDir and 2) the fs in CleanupQueue@deletePath would be reused (i.e. only one instance would exist in FileSystem#Cache). My initial thought was this part has a memory leak. But a test shows that there is no problem here.
          3. The problem is actually

          tempDirFs = jobTempDirPath.getFileSystem(conf);
          

          The problem here is that this guy "MAY" (I will explain later) put a new entry in FileSystem#Cache. Note that this would eventually go into UserGroupInformation#getCurrentUser to get a UGI with a current AccessControlContext. CleanupQueue#deletePath won't close this entry because a different UGI (i.e. "userUGI" created in JobInProgress) is used there. Here is the tricky part which we had a long discussion with Chris Nauroth and Vinod Kumar Vavilapalli. The problem here is that although we may only have one current user, the following code "MAY" return different subjects.

           static UserGroupInformation getCurrentUser() throws IOException {
              AccessControlContext context = AccessController.getContext();
          -->    Subject subject = Subject.getSubject(context);   -------------------------< 
          

          Because the entry of FileSystem#Cache uses identityHashCode of a subject to construct the key, a file system object created by "jobTempDirPath.getFileSystem(conf)" may not be found later when this code is executed again, although we may have the same principle (i.e. the current user). This eventually leads to an unbounded number of file system instances in FileSystem#Cache. Nothing is going to remove them from the cache.

          Please let me know if you have any questions.

          Show
          Xi Fang added a comment - Sandy Ryza Thanks for your comments. Have you tested this fix. Yes. We have tested this fix on our test cluster (about 130,000 submission). After the workflow was done, we waited for a couple of minutes (jobs were retiring), then forced GC, and then dumped the memory. We manually checked the FileSystem#Cache. There was no memory leak. For your analysis 1. I agree with "it doesn't appear that tempDirFs and fs are ever even ending up equal because tempDirFs is created with the wrong UGI." 2. I think tempDir would be fine because 1) JobInProgess#cleanupJob won't introduce a file system instance for tempDir and 2) the fs in CleanupQueue@deletePath would be reused (i.e. only one instance would exist in FileSystem#Cache). My initial thought was this part has a memory leak. But a test shows that there is no problem here. 3. The problem is actually tempDirFs = jobTempDirPath.getFileSystem(conf); The problem here is that this guy "MAY" (I will explain later) put a new entry in FileSystem#Cache. Note that this would eventually go into UserGroupInformation#getCurrentUser to get a UGI with a current AccessControlContext. CleanupQueue#deletePath won't close this entry because a different UGI (i.e. "userUGI" created in JobInProgress) is used there. Here is the tricky part which we had a long discussion with Chris Nauroth and Vinod Kumar Vavilapalli . The problem here is that although we may only have one current user, the following code "MAY" return different subjects. static UserGroupInformation getCurrentUser() throws IOException { AccessControlContext context = AccessController.getContext(); --> Subject subject = Subject.getSubject(context); -------------------------< Because the entry of FileSystem#Cache uses identityHashCode of a subject to construct the key, a file system object created by "jobTempDirPath.getFileSystem(conf)" may not be found later when this code is executed again, although we may have the same principle (i.e. the current user). This eventually leads to an unbounded number of file system instances in FileSystem#Cache. Nothing is going to remove them from the cache. Please let me know if you have any questions.
          Hide
          Chris Nauroth added a comment -

          I was +1 for Xi's patch, but I didn't want to commit it too hastily so that the participants on MAPREDUCE-5351 would get a chance to review. Sandy, thank you for responding so quickly.

          Have you tested this fix?

          Yes, Xi repeated his test run of ~200,000 jobs with this patch, and the heap dump no longer showed leaked instances of DistributedFileSystem. This is definitely the source of the leak.

          The deeper problem to me is that we are creating a new UGI, which can have a new subject, which can create a new entry in the FS cache, every time CleanupQueue#deletePath is called with a null UGI.

          Just to clarify, the problem we saw is that a new UserGroupInformation/new Subject gets created regardless of whether or not a null UGI was passed to CleanupQueue#deletePath. It's buried behind the last line of this code snippet:

              protected void deletePath() throws IOException, InterruptedException {
                final Path p = getPathForCleanup();
                (ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs(
                    new PrivilegedExceptionAction<Object>() {
                      public Object run() throws IOException {
                        FileSystem fs = p.getFileSystem(conf);
          

          The last line eventually hits FileSystem#Cache#get, which calls UserGroupInformation#getCurrentUser while constructing the cache key, but that doesn't always result in the same underlying Subject instance as the one that initially created the FS cache entry.

          A better fix would be to avoid this, either by having CleanupQueue hold a UGI of the login user for use in these situations or to avoid the doAs entirely when the given UGI is null.

          We can tell from the heap dump that the leaked instances are associated with the user UGI and not the login UGI, so I don't think there is a way to use the login UGI to remove those cache entries. I don't see a way to avoid the doAs, because we're seeing the leak in the case of the user UGI, so we'd want the impersonation in place.

          Side note: the return value of UserGroupInformation#getLoginUser is cached for the lifetime of the process, so it's always going to have the same Subject instance. That makes it impossible to have a large, visible leak in the FS cache for entries associated to the login user.

          The only other potential solution I can think of is explicitly passing the FileSystem instance to use for the delete and close into PathDeletionContext. Then, it wouldn't need to infer the FS to use by calling Path#getFileSystem with its implicit creation of a new Subject. Do you think something like that would work?

          BTW, this problem also made me wonder if it's incorrect for UGI to use an identity hash code. I couldn't track down the rationale for that. Presumably it's related to performance. The code of Subject#hashCode combines data from a lot of internal data structures, and it all happens while holding the monitor. This made me think changing the hash code would be too risky.

          Show
          Chris Nauroth added a comment - I was +1 for Xi's patch, but I didn't want to commit it too hastily so that the participants on MAPREDUCE-5351 would get a chance to review. Sandy, thank you for responding so quickly. Have you tested this fix? Yes, Xi repeated his test run of ~200,000 jobs with this patch, and the heap dump no longer showed leaked instances of DistributedFileSystem . This is definitely the source of the leak. The deeper problem to me is that we are creating a new UGI, which can have a new subject, which can create a new entry in the FS cache, every time CleanupQueue#deletePath is called with a null UGI. Just to clarify, the problem we saw is that a new UserGroupInformation /new Subject gets created regardless of whether or not a null UGI was passed to CleanupQueue#deletePath . It's buried behind the last line of this code snippet: protected void deletePath() throws IOException, InterruptedException { final Path p = getPathForCleanup(); (ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs( new PrivilegedExceptionAction< Object >() { public Object run() throws IOException { FileSystem fs = p.getFileSystem(conf); The last line eventually hits FileSystem#Cache#get , which calls UserGroupInformation#getCurrentUser while constructing the cache key, but that doesn't always result in the same underlying Subject instance as the one that initially created the FS cache entry. A better fix would be to avoid this, either by having CleanupQueue hold a UGI of the login user for use in these situations or to avoid the doAs entirely when the given UGI is null. We can tell from the heap dump that the leaked instances are associated with the user UGI and not the login UGI, so I don't think there is a way to use the login UGI to remove those cache entries. I don't see a way to avoid the doAs, because we're seeing the leak in the case of the user UGI, so we'd want the impersonation in place. Side note: the return value of UserGroupInformation#getLoginUser is cached for the lifetime of the process, so it's always going to have the same Subject instance. That makes it impossible to have a large, visible leak in the FS cache for entries associated to the login user. The only other potential solution I can think of is explicitly passing the FileSystem instance to use for the delete and close into PathDeletionContext . Then, it wouldn't need to infer the FS to use by calling Path#getFileSystem with its implicit creation of a new Subject . Do you think something like that would work? BTW, this problem also made me wonder if it's incorrect for UGI to use an identity hash code. I couldn't track down the rationale for that. Presumably it's related to performance. The code of Subject#hashCode combines data from a lot of internal data structures, and it all happens while holding the monitor. This made me think changing the hash code would be too risky.
          Hide
          Sandy Ryza added a comment -

          Have you tested this fix? I took a deeper look into this and it doesn't appear that tempDirFs and fs are ever even ending up equal because tempDirFs is created with the wrong UGI.

          The deeper problem to me is that we are creating a new UGI, which can have a new subject, which can create a new entry in the FS cache, every time CleanupQueue#deletePath is called with a null UGI. This occurs here:

                  CleanupQueue.getInstance().addToQueue(
                      new PathDeletionContext(tempDir, conf));
          

          A better fix would be to avoid this, either by having CleanupQueue hold a UGI of the login user for use in these situations or to avoid the doAs entirely when the given UGI is null.

          Show
          Sandy Ryza added a comment - Have you tested this fix? I took a deeper look into this and it doesn't appear that tempDirFs and fs are ever even ending up equal because tempDirFs is created with the wrong UGI. The deeper problem to me is that we are creating a new UGI, which can have a new subject, which can create a new entry in the FS cache, every time CleanupQueue#deletePath is called with a null UGI. This occurs here: CleanupQueue.getInstance().addToQueue( new PathDeletionContext(tempDir, conf)); A better fix would be to avoid this, either by having CleanupQueue hold a UGI of the login user for use in these situations or to avoid the doAs entirely when the given UGI is null.
          Hide
          Xi Fang added a comment -

          This bug was found in Microsoft's large scale test with about 200,000 job submissions. The memory usage is steadily growing up.

          There is a long discussion between Hortonworks (thanks Chris Nauroth and Vinod Kumar Vavilapalli) and Microsoft on this issue. Here is the summary of the discussion.

          1. The heap dumps are showing DistributedFileSystem instances that are only referred to from the cache's HashMap entries. Since nothing else has a reference, nothing else can ever attempt to close it, and therefore it will never be removed from the cache.

          2. The special check for "tempDirFS" (see code in description) in the patch for MAPREDUCE-5351 is intended as an optimization so that CleanupQueue doesn't need to immediately reopen a FileSystem that was just closed. However, we observed that we're getting different identity hash code values on the subject in the key. The code is assuming that CleanupQueue will find the same Subject that was used inside JobInProgress. Unfortunately, this is not guaranteed, because we may have crossed into a different access control context at this point, via UserGroupInformation#doAs. Even though it's conceptually the same user, the Subject is a function of the current AccessControlContext:

            public synchronized
            static UserGroupInformation getCurrentUser() throws IOException {
              AccessControlContext context = AccessController.getContext();
              Subject subject = Subject.getSubject(context);
          

          Even if the contexts are logically equivalent between JobInProgress and CleanupQueue, we see no guarantee that Java will give you the same Subject instance, which is required for successful lookup in the FileSystem cache (because of the use of identity hash code).

          A fix is abandon this optimization and close the FileSystem within the same AccessControlContext that opened it.

          Show
          Xi Fang added a comment - This bug was found in Microsoft's large scale test with about 200,000 job submissions. The memory usage is steadily growing up. There is a long discussion between Hortonworks (thanks Chris Nauroth and Vinod Kumar Vavilapalli ) and Microsoft on this issue. Here is the summary of the discussion. 1. The heap dumps are showing DistributedFileSystem instances that are only referred to from the cache's HashMap entries. Since nothing else has a reference, nothing else can ever attempt to close it, and therefore it will never be removed from the cache. 2. The special check for "tempDirFS" (see code in description) in the patch for MAPREDUCE-5351 is intended as an optimization so that CleanupQueue doesn't need to immediately reopen a FileSystem that was just closed. However, we observed that we're getting different identity hash code values on the subject in the key. The code is assuming that CleanupQueue will find the same Subject that was used inside JobInProgress. Unfortunately, this is not guaranteed, because we may have crossed into a different access control context at this point, via UserGroupInformation#doAs. Even though it's conceptually the same user, the Subject is a function of the current AccessControlContext: public synchronized static UserGroupInformation getCurrentUser() throws IOException { AccessControlContext context = AccessController.getContext(); Subject subject = Subject.getSubject(context); Even if the contexts are logically equivalent between JobInProgress and CleanupQueue, we see no guarantee that Java will give you the same Subject instance, which is required for successful lookup in the FileSystem cache (because of the use of identity hash code). A fix is abandon this optimization and close the FileSystem within the same AccessControlContext that opened it.

            People

            • Assignee:
              Xi Fang
              Reporter:
              Xi Fang
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development