Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java (revision af920f138b1f8dcd2417f6af77ab96672f2bc4bc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java (date 1641283331000) @@ -30,6 +30,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.RejectedExecutionException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,6 +182,10 @@ ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); break; } + + // Delete empty log dirs + deleteEmptyLogDirs(appId, user); + LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId); long deletionTimestamp = System.currentTimeMillis() + this.deleteDelaySeconds * 1000; @@ -206,6 +212,64 @@ } } + /** + * Delete empty log dirs + * @param appId application id + * @param user username + */ + private void deleteEmptyLogDirs(ApplicationId appId, String user) { + if (null == appId || user == null) { + return; + } + + FileContext lfs = getLocalFileContext(getConfig()); + Map willDeleteAppLogDirs = new HashMap<>(); + + for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) { + Path appLogDir = new Path(rootLogDir, appId.toString()); + willDeleteAppLogDirs.put(appLogDir.toString(), appLogDir); + try { + List localContainerLogDirs = new ArrayList<>(); + RemoteIterator appLogStatus = lfs.listStatus(appLogDir); + while(appLogStatus.hasNext()){ + Path containerPath = appLogStatus.next().getPath(); + RemoteIterator containerLogStatus = lfs.listStatus(containerPath); + // If any container have log files we can not delete the application dir + if (containerLogStatus.hasNext()) { + willDeleteAppLogDirs.remove(appLogDir.toString()); + } else { + // Empty container dir + localContainerLogDirs.add(containerPath); + } + } + + // Delete empty container log dirs + deletePath(localContainerLogDirs, user); + } catch (UnsupportedFileSystemException ue) { + LOG.warn("Unsupported file system used for log dir " + appLogDir, ue); + } catch (IOException ie) { + LOG.warn("Check for directory errors. Path: " + appLogDir, ie); + } + + // Delete empty app log dirs + deletePath(new ArrayList<>(willDeleteAppLogDirs.values()), user); + } + } + + /** + * Delete paths + * @param paths will delete path + * @param user the user associated with the delete. + */ + private void deletePath(List paths, String user) { + if (null != paths && paths.size() > 0) { + FileDeletionTask deletionTask = new FileDeletionTask( + NonAggregatingLogHandler.this.delService, user, null, + paths); + NonAggregatingLogHandler.this.delService.delete(deletionTask); + } + } + @Override public Set getInvalidTokenApps() { return Collections.emptySet();