diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 42434f1..914efef 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -3287,4 +3287,19 @@ void printStatistics() throws IOException { ": " + pair.getValue()); } } + + /** Get the number of entries in the filesystem cache + * @return the number of entries in the filesystem cache + */ + @VisibleForTesting + public static int getCacheSize() { + return CACHE.map.size(); + } + + /** This is used to test close functionality + */ + @VisibleForTesting + public void closeForTesting() throws IOException { + CACHE.remove(this.key, this); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index dfcceb8..1dfa020 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -606,6 +606,7 @@ private void requestNewHdfsDelegationToken(ApplicationId applicationId, rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); } + @VisibleForTesting protected Token[] obtainSystemTokensForUser(String user, final Credentials credentials) throws IOException, InterruptedException { // Get new hdfs tokens on behalf of this user @@ -616,8 +617,16 @@ private void requestNewHdfsDelegationToken(ApplicationId applicationId, proxyUser.doAs(new PrivilegedExceptionAction[]>() { @Override public Token[] run() throws Exception { - return FileSystem.get(getConfig()).addDelegationTokens( - UserGroupInformation.getLoginUser().getUserName(), credentials); + FileSystem fs = FileSystem.get(getConfig()); + try { + return fs.addDelegationTokens( + UserGroupInformation.getLoginUser().getUserName(), + credentials); + } finally { + // Close the FileSystem created by the new proxy user, + // So that we don't leave an entry in the FileSystem cache + fs.close(); + } } }); return newTokens; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 5d31404..20a1745 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -289,7 +289,9 @@ public String toString() { static class MyFS extends DistributedFileSystem { public MyFS() {} - public void close() {} + public void close() throws IOException { + closeForTesting(); + } @Override public void initialize(URI uri, Configuration conf) throws IOException {} @@ -299,6 +301,11 @@ public MyToken getDelegationToken(String renewer) throws IOException { LOG.info("Called MYDFS.getdelegationtoken " + result); return result; } + + public Token[] addDelegationTokens( + final String renewer, Credentials credentials) throws IOException { + return new Token[0]; + } } /** @@ -1022,4 +1029,16 @@ public void testAppSubmissionWithPreviousToken() throws Exception{ // app2 completes, app1 is still running, check the token is not cancelled Assert.assertFalse(Renewer.cancelled); } + + // Test FileSystem memory leak in obtainSystemTokensForUser. + @Test + public void testFSLeakInObtainSystemTokensForUser() throws Exception{ + Credentials credentials = new Credentials(); + String user = "test"; + int oldSize = FileSystem.getCacheSize(); + delegationTokenRenewer.obtainSystemTokensForUser(user, credentials); + delegationTokenRenewer.obtainSystemTokensForUser(user, credentials); + delegationTokenRenewer.obtainSystemTokensForUser(user, credentials); + Assert.assertEquals(oldSize, FileSystem.getCacheSize()); + } }