diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 549d8e7..7f3cb62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -1324,7 +1324,7 @@ private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del, RemoteIterator userDirStatus = lfs.listStatus(userDirPath); FileDeletionTask dependentDeletionTask = del.createFileDeletionTask(null, userDirPath, new Path[] {}); - if (userDirStatus != null) { + if (userDirStatus.hasNext()) { List deletionTasks = new ArrayList(); while (userDirStatus.hasNext()) { FileStatus status = userDirStatus.next(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index e69170e..e9aea0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -35,7 +35,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; @@ -190,17 +192,41 @@ public Void run() throws YarnException, IOException { ResourceLocalizationService.NM_PRIVATE_DIR) > 0); // restart the NodeManager + restartNM(MAX_TRIES); + checkNumOfLocalDirs(); + + verify(delService, times(1)).delete( + (String) isNull(), + argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR + + "_DEL_"))); + verify(delService, times(1)).delete((String) isNull(), + argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_"))); + verify(delService, times(1)).scheduleFileDeletionTask( + argThat(new FileDeletionInclude(user, null, + new String[] { destinationFile }))); + verify(delService, times(1)).scheduleFileDeletionTask( + argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE + + "_DEL_", new String[] {}))); + + // restart the NodeManager again + // this time usercache directory should be empty + restartNM(MAX_TRIES); + checkNumOfLocalDirs(); + + } + + private void restartNM(int maxTries) { nm.stop(); nm = new MyNodeManager(); nm.start(); - numTries = 0; + int numTries = 0; while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer.USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs( nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0) - && numTries < MAX_TRIES) { + && numTries < maxTries) { try { Thread.sleep(500); } catch (InterruptedException ex) { @@ -208,7 +234,9 @@ public Void run() throws YarnException, IOException { } numTries++; } - + } + + private void checkNumOfLocalDirs() throws IOException { Assert .assertTrue( "After NM reboots, all local files should be deleted", @@ -218,20 +246,13 @@ public Void run() throws YarnException, IOException { ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) == 0); - verify(delService, times(1)).delete( - (String) isNull(), - argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR - + "_DEL_"))); - verify(delService, times(1)).delete((String) isNull(), - argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_"))); - verify(delService, times(1)).scheduleFileDeletionTask( - argThat(new FileDeletionInclude(user, null, - new String[] { destinationFile }))); - verify(delService, times(1)).scheduleFileDeletionTask( - argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE - + "_DEL_", new String[] {}))); + + Assert + .assertTrue( + "After NM reboots, usercache_DEL_* directory should be deleted", + numOfUsercacheDELDirs(nmLocalDir.getAbsolutePath()) == 0); } - + private int numOfLocalDirs(String localDir, String localSubDir) { File[] listOfFiles = new File(localDir, localSubDir).listFiles(); if (listOfFiles == null) { @@ -240,6 +261,19 @@ private int numOfLocalDirs(String localDir, String localSubDir) { return listOfFiles.length; } } + + private int numOfUsercacheDELDirs(String localDir) throws IOException { + int count = 0; + RemoteIterator fileStatus = localFS.listStatus(new Path(localDir)); + while (fileStatus.hasNext()) { + FileStatus status = fileStatus.next(); + if (status.getPath().getName().matches(".*" + + ContainerLocalizer.USERCACHE + "_DEL_.*")) { + count++; + } + } + return count; + } private void createFiles(String dir, String subDir, int numOfFiles) { for (int i = 0; i < numOfFiles; i++) {