diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index d203f65b93d..ae56e5141ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -122,8 +122,7 @@ private void createDir(Path path, FsPermission perm) throws IOException { createStatusCacheLoader(final Configuration conf) { return new CacheLoader>() { public Future load(Path path) { - try { - FileSystem fs = path.getFileSystem(conf); + try (FileSystem fs = path.getFileSystem(conf)) { return Futures.immediateFuture(fs.getFileStatus(path)); } catch (Throwable th) { // report failures so it can be memoized @@ -265,21 +264,21 @@ private void verifyAndCopy(Path destination) } catch (URISyntaxException e) { throw new IOException("Invalid resource", e); } - FileSystem sourceFs = sCopy.getFileSystem(conf); - FileStatus sStat = sourceFs.getFileStatus(sCopy); - if (sStat.getModificationTime() != resource.getTimestamp()) { - throw new IOException("Resource " + sCopy + - " changed on src filesystem (expected " + resource.getTimestamp() + - ", was " + sStat.getModificationTime()); - } - if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { - if (!isPublic(sourceFs, sCopy, sStat, statCache)) { + try (FileSystem sourceFs = sCopy.getFileSystem(conf)){ + FileStatus sStat = sourceFs.getFileStatus(sCopy); + if (sStat.getModificationTime() != resource.getTimestamp()) { throw new IOException("Resource " + sCopy + - " is not publicly accessible and as such cannot be part of the" + - " public cache."); + " changed on src filesystem (expected " + resource.getTimestamp() + + ", was " + sStat.getModificationTime()); + } + if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { + if (!isPublic(sourceFs, sCopy, sStat, statCache)) { + throw new IOException("Resource " + sCopy + + " is not publicly accessible and as such cannot be part of the" + + " public cache."); + } } } - downloadAndUnpack(sCopy, destination); } @@ -291,9 +290,8 @@ private void verifyAndCopy(Path destination) */ private void downloadAndUnpack(Path source, Path destination) throws YarnException { - try { - FileSystem sourceFileSystem = source.getFileSystem(conf); - FileSystem destinationFileSystem = destination.getFileSystem(conf); + try (FileSystem sourceFileSystem = source.getFileSystem(conf); + FileSystem destinationFileSystem = destination.getFileSystem(conf)) { if (sourceFileSystem.getFileStatus(source).isDirectory()) { FileUtil.copy( sourceFileSystem, source, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 08d6189289d..ecaa9deacf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.EnumSet; @@ -55,7 +56,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.URL; import org.junit.Assert; - +import org.junit.Before; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.logging.Log; @@ -67,6 +68,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; @@ -317,6 +319,8 @@ public void testDownloadBadPublic() throws IOException, URISyntaxException, } catch (ExecutionException e) { Assert.assertTrue(e.getCause() instanceof IOException); } + + verifyFileSystemClosed(); } @Test (timeout=60000) @@ -329,6 +333,7 @@ public void testDownloadPublicWithStatCache() throws IOException, // if test directory doesn't have ancestor permission, skip this test FileSystem f = basedir.getFileSystem(conf); assumeTrue(FSDownload.ancestorsHaveExecutePermissions(f, basedir, null)); + f.close(); files.mkdir(basedir, null, true); conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); @@ -372,7 +377,9 @@ public void testDownloadPublicWithStatCache() throws IOException, final FileStatus sStat = fs.getFileStatus(path); tasks.add(new Callable() { public Boolean call() throws IOException { - return FSDownload.isPublic(fs, path, sStat, statCache); + try (FileSystem fileSystem = fs) { + return FSDownload.isPublic(fileSystem, path, sStat, statCache); + } } }); } @@ -391,6 +398,8 @@ public Boolean call() throws IOException { } finally { exec.shutdown(); } + + verifyFileSystemClosed(); } @Test (timeout=10000) @@ -464,9 +473,11 @@ public void testDownload() throws IOException, URISyntaxException, } catch (ExecutionException e) { throw new IOException("Failed exec", e); } + + verifyFileSystemClosed(); } - private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, + private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, URISyntaxException, InterruptedException{ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); @@ -546,18 +557,21 @@ private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, public void testDownloadArchive() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.TAR); + verifyFileSystemClosed(); } @Test (timeout=10000) public void testDownloadPatternJar() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.JAR); + verifyFileSystemClosed(); } @Test (timeout=10000) public void testDownloadArchiveZip() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.ZIP); + verifyFileSystemClosed(); } /* @@ -573,12 +587,14 @@ public void testDownloadArchiveZipWithTurkishLocale() throws IOException, downloadWithFileType(TEST_FILE_TYPE.ZIP); // Set the locale back to original default locale Locale.setDefault(defaultLocale); + verifyFileSystemClosed(); } @Test (timeout=10000) public void testDownloadArchiveTgz() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.TGZ); + verifyFileSystemClosed(); } private void verifyPermsRecursively(FileSystem fs, @@ -610,7 +626,7 @@ private void verifyPermsRecursively(FileSystem fs, Assert.assertTrue(status.getPermission().toShort() == FSDownload.PRIVATE_FILE_PERMS.toShort()); } - } + } } @Test (timeout=10000) @@ -668,13 +684,15 @@ public void testDirDownload() throws IOException, InterruptedException { System.out.println("Testing path " + localized); assert(status.isDirectory()); assert(rsrcVis.containsKey(p.getKey())); - - verifyPermsRecursively(localized.getFileSystem(conf), + FileSystem fs = localized.getFileSystem(conf); + verifyPermsRecursively(fs, files, localized, rsrcVis.get(p.getKey())); + fs.close(); } } catch (ExecutionException e) { throw new IOException("Failed exec", e); } + verifyFileSystemClosed(); } @Test (timeout=10000) @@ -709,5 +727,57 @@ public void testUniqueDestinationPath() throws Exception { // resource. Therefore the final localizedPath for the resource should be // destination directory (passed as an argument) + file name. Assert.assertEquals(destPath, rPath.get().getParent()); + verifyFileSystemClosed(); + } + + private void verifyFileSystemClosed() { + System.out.println("Number file handlers in" + + "CloseAuditLocalFileSystem is " + FileHandlerCounter.get()); + Assert.assertEquals("FileSystem was not closed", + 0, FileHandlerCounter.get()); + } + + /** + * A simple counter. + * Count the number of fake file handlers number for file system class. + * The state is shared, be sure to call {@code FileHandlerCounter.reset()} + * at the beginning of the test scope. + */ + private static class FileHandlerCounter { + private static AtomicInteger numOfHandlers = + new AtomicInteger(0); + static void delete() { + if(numOfHandlers.intValue() > 0) { + numOfHandlers.decrementAndGet(); + } + } + static void reset() { + numOfHandlers.set(0); + } + static void add() { + numOfHandlers.incrementAndGet(); + } + static int get() { + return numOfHandlers.intValue(); + } + } + + /** + * A wrapper over {@link LocalFileSystem}. + * Each instance of this class automatically updates the counter in + * {@link FileHandlerCounter} when it gets created and closed. + */ + static class CloseAuditLocalFileSystem extends LocalFileSystem { + @Override + public void initialize(URI name, Configuration config) + throws IOException { + super.initialize(name, config); + FileHandlerCounter.add(); + } + @Override + public void close() throws IOException { + super.close(); + FileHandlerCounter.delete(); + } } }