diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index de18dc6..a9ce9a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -118,8 +118,7 @@ private void createDir(Path path, FsPermission perm) throws IOException { createStatusCacheLoader(final Configuration conf) { return new CacheLoader>() { public Future load(Path path) { - try { - FileSystem fs = path.getFileSystem(conf); + try (FileSystem fs = path.getFileSystem(conf)) { return Futures.immediateFuture(fs.getFileStatus(path)); } catch (Throwable th) { // report failures so it can be memoized @@ -248,25 +247,28 @@ private static FileStatus getFileStatus(final FileSystem fs, final Path path, } private Path copy(Path sCopy, Path dstdir) throws IOException { - FileSystem sourceFs = sCopy.getFileSystem(conf); - Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName()); - FileStatus sStat = sourceFs.getFileStatus(sCopy); - if (sStat.getModificationTime() != resource.getTimestamp()) { - throw new IOException("Resource " + sCopy + - " changed on src filesystem (expected " + resource.getTimestamp() + - ", was " + sStat.getModificationTime()); - } - if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { - if (!isPublic(sourceFs, sCopy, sStat, statCache)) { + try ( + FileSystem sourceFs = sCopy.getFileSystem(conf); + FileSystem localFs = FileSystem.getLocal(conf);) { + Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName()); + FileStatus sStat = sourceFs.getFileStatus(sCopy); + if (sStat.getModificationTime() != resource.getTimestamp()) { throw new IOException("Resource " + sCopy + - " is not publicly accessable and as such cannot be part of the" + - " public cache."); + " changed on src filesystem (expected " + resource.getTimestamp() + + ", was " + sStat.getModificationTime()); + } + if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { + if (!isPublic(sourceFs, sCopy, sStat, statCache)) { + throw new IOException("Resource " + sCopy + + " is not publicly accessable and as such cannot be part of the" + + " public cache."); + } } - } - FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false, - true, conf); - return dCopy; + FileUtil.copy(sourceFs, sStat, localFs, dCopy, false, + true, conf); + return dCopy; + } } private long unpack(File localrsrc, File dst) throws IOException { @@ -345,6 +347,7 @@ private long unpack(File localrsrc, File dst) throws IOException { @Override public Path call() throws Exception { final Path sCopy; + FileSystem fs = null; try { sCopy = resource.getResource().toPath(); } catch (URISyntaxException e) { @@ -362,7 +365,8 @@ public Path run() throws Exception { }; }); unpack(new File(dTmp.toUri()), new File(dFinal.toUri())); - changePermissions(dFinal.getFileSystem(conf), dFinal); + fs = dFinal.getFileSystem(conf); + changePermissions(fs, dFinal); files.rename(dst_work, destDirPath, Rename.OVERWRITE); } catch (Exception e) { try { @@ -377,6 +381,9 @@ public Path run() throws Exception { } conf = null; resource = null; + if (fs != null) { + fs.close(); + } } return files.makeQualified(new Path(destDirPath, sCopy.getName())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 877dd08..64ad28f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.EnumSet; @@ -55,7 +56,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.URL; import org.junit.Assert; - +import org.junit.Before; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.logging.Log; @@ -67,6 +68,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; @@ -87,10 +89,24 @@ private static final Log LOG = LogFactory.getLog(TestFSDownload.class); private static AtomicLong uniqueNumberGenerator = new AtomicLong(System.currentTimeMillis()); + private static Configuration conf = null; private enum TEST_FILE_TYPE { TAR, JAR, ZIP, TGZ }; - + + @Before + public void initConf(){ + conf = new Configuration(); + conf.setClass("fs.file.impl", + CloseAuditLocalFileSystem.class, + LocalFileSystem.class); + // Disable the cache, so file system will not be automatically closed. + conf.setBoolean("fs.file.impl.disable.cache", true); + // Reset the counter for each test case. + // Verify the counter number at the end of each test. + FileHandlerCounter.reset(); + } + @AfterClass public static void deleteTestDir() throws IOException { FileContext fs = FileContext.getLocalFSFileContext(); @@ -256,7 +272,6 @@ static LocalResource createZipFile(FileContext files, Path p, int len, @Test (timeout=10000) public void testDownloadBadPublic() throws IOException, URISyntaxException, InterruptedException { - Configuration conf = new Configuration(); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", @@ -302,12 +317,13 @@ public void testDownloadBadPublic() throws IOException, URISyntaxException, } catch (ExecutionException e) { Assert.assertTrue(e.getCause() instanceof IOException); } + + verifyFileSystemClosed(); } @Test (timeout=60000) public void testDownloadPublicWithStatCache() throws IOException, URISyntaxException, InterruptedException, ExecutionException { - final Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); @@ -315,6 +331,7 @@ public void testDownloadPublicWithStatCache() throws IOException, // if test directory doesn't have ancestor permission, skip this test FileSystem f = basedir.getFileSystem(conf); assumeTrue(FSDownload.ancestorsHaveExecutePermissions(f, basedir, null)); + f.close(); files.mkdir(basedir, null, true); conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); @@ -358,7 +375,9 @@ public void testDownloadPublicWithStatCache() throws IOException, final FileStatus sStat = fs.getFileStatus(path); tasks.add(new Callable() { public Boolean call() throws IOException { - return FSDownload.isPublic(fs, path, sStat, statCache); + try (FileSystem fileSystem = fs ) { + return FSDownload.isPublic(fileSystem, path, sStat, statCache); + } } }); } @@ -377,12 +396,13 @@ public Boolean call() throws IOException { } finally { exec.shutdown(); } + + verifyFileSystemClosed(); } @Test (timeout=10000) public void testDownload() throws IOException, URISyntaxException, InterruptedException { - Configuration conf = new Configuration(); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", @@ -451,11 +471,12 @@ public void testDownload() throws IOException, URISyntaxException, } catch (ExecutionException e) { throw new IOException("Failed exec", e); } + + verifyFileSystemClosed(); } - private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, + private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, URISyntaxException, InterruptedException{ - Configuration conf = new Configuration(); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", @@ -534,18 +555,21 @@ private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, public void testDownloadArchive() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.TAR); + verifyFileSystemClosed(); } @Test (timeout=10000) public void testDownloadPatternJar() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.JAR); + verifyFileSystemClosed(); } - @Test (timeout=10000) + @Test (timeout=10000) public void testDownloadArchiveZip() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.ZIP); + verifyFileSystemClosed(); } /* @@ -561,12 +585,14 @@ public void testDownloadArchiveZipWithTurkishLocale() throws IOException, downloadWithFileType(TEST_FILE_TYPE.ZIP); // Set the locale back to original default locale Locale.setDefault(defaultLocale); + verifyFileSystemClosed(); } @Test (timeout=10000) public void testDownloadArchiveTgz() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.TGZ); + verifyFileSystemClosed(); } private void verifyPermsRecursively(FileSystem fs, @@ -598,12 +624,11 @@ private void verifyPermsRecursively(FileSystem fs, Assert.assertTrue(status.getPermission().toShort() == FSDownload.PRIVATE_FILE_PERMS.toShort()); } - } + } } @Test (timeout=10000) public void testDirDownload() throws IOException, InterruptedException { - Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); @@ -657,18 +682,22 @@ public void testDirDownload() throws IOException, InterruptedException { System.out.println("Testing path " + localized); assert(status.isDirectory()); assert(rsrcVis.containsKey(p.getKey())); - - verifyPermsRecursively(localized.getFileSystem(conf), + FileSystem fs = localized.getFileSystem(conf); + verifyPermsRecursively(fs, files, localized, rsrcVis.get(p.getKey())); + fs.close(); } } catch (ExecutionException e) { throw new IOException("Failed exec", e); } + // Compromise this check to allow 1 open + // because LocalDirAllocator leaks FileSystem + Assert.assertEquals("FileSystem was not closed", + 1, FileHandlerCounter.get()); } @Test (timeout=10000) public void testUniqueDestinationPath() throws Exception { - Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); @@ -699,5 +728,57 @@ public void testUniqueDestinationPath() throws Exception { // resource. Therefore the final localizedPath for the resource should be // destination directory (passed as an argument) + file name. Assert.assertEquals(destPath, rPath.get().getParent()); + verifyFileSystemClosed(); + } + + private void verifyFileSystemClosed() { + System.out.println("Number file handlers in" + + "CloseAuditLocalFileSystem is " + FileHandlerCounter.get()); + Assert.assertEquals("FileSystem was not closed", + 0, FileHandlerCounter.get()); + } + + /** + * A simple counter. + * Count the number of fake file handlers number for file system class. + * The state is shared, be sure to call {@code FileHandlerCounter.reset()} + * at the beginning of the test scope. + */ + private static class FileHandlerCounter { + private static AtomicInteger numOfHandlers = + new AtomicInteger(0); + static void delete() { + if(numOfHandlers.intValue() > 0) { + numOfHandlers.decrementAndGet(); + } + } + static void reset() { + numOfHandlers.set(0);; + } + static void add() { + numOfHandlers.incrementAndGet(); + } + static int get() { + return numOfHandlers.intValue(); + } + } + + /** + * A wrapper over {@link LocalFileSystem}. + * Each instance of this class automatically updates the counter in + * {@link FileHandlerCounter} when it gets created and closed. + */ + static class CloseAuditLocalFileSystem extends LocalFileSystem { + @Override + public void initialize(URI name, Configuration conf) + throws IOException { + super.initialize(name, conf); + FileHandlerCounter.add(); + } + @Override + public void close() throws IOException { + super.close(); + FileHandlerCounter.delete(); + } } }