From d4dfcf6bc9f0ea301935021ae4f9d51bba3052ae Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Sun, 7 Apr 2019 18:15:41 +0530 Subject: [PATCH] YARN-6929 --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 11 +- .../AggregatedLogDeletionService.java | 63 +++++-- .../yarn/logaggregation/LogAggregationUtils.java | 183 +++++++++++++++++-- .../LogAggregationFileController.java | 48 ++--- .../LogAggregationFileControllerFactory.java | 28 ++- .../ifile/LogAggregationIndexedFileController.java | 7 + .../src/main/resources/yarn-default.xml | 8 + .../TestAggregatedLogDeletionService.java | 201 ++++++++++++++------- .../logaggregation/TestContainerLogsUtils.java | 5 +- .../logaggregation/TestLogAggregationService.java | 104 ++++++++++- 10 files changed, 521 insertions(+), 137 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 34f1e93..b2dc93d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1423,13 +1423,20 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs"; /** - * The remote log dir will be created at - * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} + * The remote log dir will be created at below location. + * NM_REMOTE_APP_LOG_DIR/${user}/bucket_{NM_REMOTE_APP_LOG_DIR_SUFFIX} + * /${clusterTsDir}/${bucketDir1}/${bucketDir2}/${appId} */ public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = NM_PREFIX + "remote-app-log-dir-suffix"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs"; + /** Specifies whether Older Application Log Directory is included. */ + public static final String NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER = + NM_PREFIX + "remote-app-log-dir-include-older"; + public static final boolean DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER = + true; + public static final String YARN_LOG_SERVER_URL = YARN_PREFIX + "log.server.url"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index 90395aa..86f9a7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -67,12 +67,24 @@ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) { this.conf = conf; this.retentionMillis = retentionSecs * 1000; - this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + this.suffix = LogAggregationUtils.getBucketSuffix(); this.remoteRootLogDir = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); this.rmClient = rmClient; } + + private static boolean isValidApplicationDir(FileStatus stat) { + if (stat.isDirectory()) { + try { + ApplicationId.fromString(stat.getPath().getName()); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + return false; + } @Override public void run() { @@ -80,10 +92,19 @@ public void run() { LOG.info("aggregated log deletion started."); try { FileSystem fs = remoteRootLogDir.getFileSystem(conf); - for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { - if(userDir.isDirectory()) { - Path userDirPath = new Path(userDir.getPath(), suffix); - deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient); + for (FileStatus userDir : fs.listStatus(remoteRootLogDir)) { + if (userDir.isDirectory()) { + for (FileStatus suffixDir : fs.listStatus(userDir.getPath())) { + Path suffixDirPath = suffixDir.getPath(); + if (suffixDir.isDirectory() && suffixDirPath.getName(). + startsWith(suffix)) { + for (FileStatus clusterTimeStampDir : fs.listStatus( + suffixDirPath)) { + deleteOldLogDirsFrom(clusterTimeStampDir, cutoffMillis, + fs, rmClient); + } + } + } } } } catch (Throwable t) { @@ -92,18 +113,26 @@ public void run() { } LOG.info("aggregated log deletion finished."); } - - private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, - FileSystem fs, ApplicationClientProtocol rmClient) { - FileStatus[] appDirs; - try { - appDirs = fs.listStatus(dir); - } catch (IOException e) { - logException("Could not read the contents of " + dir, e); - return; - } - for (FileStatus appDir : appDirs) { - deleteAppDirLogs(cutoffMillis, fs, rmClient, appDir); + + private static void deleteOldLogDirsFrom(FileStatus fileStatus, + long cutoffMillis, FileSystem fs, ApplicationClientProtocol rmClient) + throws IOException { + Path path = fileStatus.getPath(); + if (fileStatus.isDirectory()) { + FileStatus[] listStat = fs.listStatus(path); + if (listStat.length != 0) { + for (FileStatus stat : listStat) { + if (isValidApplicationDir(stat)) { + deleteAppDirLogs(cutoffMillis, fs, rmClient, stat); + } else { + deleteOldLogDirsFrom(stat, cutoffMillis, fs, rmClient); + } + } + } else if(fileStatus.getModificationTime() < cutoffMillis) { + fs.delete(path, false); + } + } else { + LOG.warn("Unexpected file {} found", path); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index edf2cf3..4e7f86e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -38,6 +38,7 @@ public class LogAggregationUtils { public static final String TMP_FILE_SUFFIX = ".tmp"; + private static final String BUCKET_SUFFIX = "bucket_"; /** * Constructs the full filename for an application's log file per node. @@ -64,8 +65,22 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, */ public static Path getRemoteAppLogDir(Path remoteRootLogDir, ApplicationId appId, String user, String suffix) { - return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix), - appId.toString()); + return new Path(getRemoteBucketDir(remoteRootLogDir, user, suffix, + appId), appId.toString()); + } + + /** + * Gets the older remote app log dir. + * @param appId the application id + * @param user the application owner + * @param remoteRootLogDir the aggregated log remote root log dir + * @param suffix the log directory suffix + * @return the remote application specific log dir. + */ + public static Path getOlderRemoteAppLogDir(ApplicationId appId, + String user, Path remoteRootLogDir, String suffix) { + return new Path(getOlderRemoteLogSuffixedDir(remoteRootLogDir, user, + suffix), appId.toString()); } /** @@ -77,6 +92,19 @@ public static Path getRemoteAppLogDir(Path remoteRootLogDir, */ public static Path getRemoteLogSuffixedDir(Path remoteRootLogDir, String user, String suffix) { + suffix = getBucketSuffix() + suffix; + return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix); + } + + /** + * Gets the older remote suffixed log dir for the user. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @return the older remote suffixed log dir. + */ + public static Path getOlderRemoteLogSuffixedDir(Path remoteRootLogDir, + String user, String suffix) { if (suffix == null || suffix.isEmpty()) { return getRemoteLogUserDir(remoteRootLogDir, user); } @@ -95,6 +123,49 @@ public static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) { } /** + * Gets the remote log user's cluster timestamp dir. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @param appId the application id + * @return the remote log per user per cluster timestamp dir. + */ + public static Path getRemoteLogClusterTimestampDir(Path remoteRootLogDir, + String user, String suffix, ApplicationId appId) { + return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, + suffix), String.valueOf(appId.getClusterTimestamp())); + } + + /** + * Gets the remote log user's cluster timestamp's bucket dir. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @param appId the application id + * @return the remote log per user per cluster timestamp per bucket dir. + */ + public static Path getRemoteBucketDir(Path remoteRootLogDir, String user, + String suffix, ApplicationId appId) { + int bucket1 = appId.getId() % 10000; + int bucket2 = (int) (appId.getClusterTimestamp() % 10000); + String bucketFormat = "%04d" + Path.SEPARATOR + "%04d" + Path.SEPARATOR; + String bucketDir = String.format(bucketFormat, bucket1, bucket2); + return new Path(getRemoteLogClusterTimestampDir(remoteRootLogDir, + user, suffix, appId), bucketDir); + } + + /** + * Check if older Application Log Directory has to be included. + * @param conf the configuration + * @return Is Older App Log Dir enabled? + */ + public static boolean isOlderPathEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration. + NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER); + } + + /** * Returns the suffix component of the log dir. * @param conf the configuration * @return the suffix which will be appended to the user log dir. @@ -104,6 +175,14 @@ public static String getRemoteNodeLogDirSuffix(Configuration conf) { YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); } + /** + * Returns the bucket suffix component of the log dir. + * @return the bucket suffix which appended to user log dir + */ + public static String getBucketSuffix() { + return BUCKET_SUFFIX; + } + /** * Converts a nodeId to a form used in the app log file name. @@ -177,6 +256,24 @@ public static String getNodeString(String nodeId) { /** * Get all available log files under remote app log directory. * @param conf the configuration + * @param remoteAppLogDir the application log directory + * @param appId the applicationId + * @param appOwner the application owner + * @return the iterator of available log files + * @throws IOException if there is no log file directory + */ + public static RemoteIterator getNodeFiles(Configuration conf, + Path remoteAppLogDir, ApplicationId appId, String appOwner) + throws IOException { + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); + return FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir); + } + + /** + * Get all available log files under remote app log directory. + * @param conf the configuration * @param appId the applicationId * @param appOwner the application owner * @param remoteRootLogDir the remote root log directory @@ -188,14 +285,47 @@ public static String getNodeString(String nodeId) { Configuration conf, ApplicationId appId, String appOwner, org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) throws IOException { + StringBuilder diagnosis = new StringBuilder(); Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix); - RemoteIterator nodeFiles = null; - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); - nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), - conf).listStatus(remoteAppLogDir); - return nodeFiles; + RemoteIterator nodeFilesCur = null; + try { + nodeFilesCur = getNodeFiles(conf, remoteAppLogDir, appId, appOwner); + } catch(IOException ex) { + diagnosis.append(ex.getMessage() + "\n"); + } + if(isOlderPathEnabled(conf)) { + remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner, + remoteRootLogDir, suffix); + try { + RemoteIterator nodeFilesPrev = getNodeFiles(conf, + remoteAppLogDir, appId, appOwner); + if(nodeFilesCur == null) { + return nodeFilesPrev; + } else if(nodeFilesPrev != null) { + RemoteIterator curDir = nodeFilesCur; + RemoteIterator prevDir = nodeFilesPrev; + RemoteIterator nodeFilesCombined = new + RemoteIterator() { + @Override + public boolean hasNext() throws IOException { + return prevDir.hasNext() || curDir.hasNext(); + } + @Override + public FileStatus next() throws IOException { + return prevDir.hasNext() ? prevDir.next() : curDir.next(); + } + }; + return nodeFilesCombined; + } + } catch(IOException ex) { + diagnosis.append(ex.getMessage() + "\n"); + } + } + if(nodeFilesCur == null) { + throw new IOException(diagnosis.toString()); + } + return nodeFilesCur; } /** @@ -212,13 +342,33 @@ public static String getNodeString(String nodeId) { Configuration conf, ApplicationId appId, String appOwner, org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) throws IOException { + StringBuilder diagnosis = new StringBuilder(); Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix); List nodeFiles = new ArrayList<>(); Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); - nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( - qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + try { + nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + } catch(IOException ex) { + diagnosis.append(ex.getMessage() + "\n"); + } + if(isOlderPathEnabled(conf)) { + remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner, + remoteRootLogDir, suffix); + qualifiedLogDir = FileContext.getFileContext(conf). + makeQualified(remoteAppLogDir); + try { + nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + } catch(IOException ex) { + diagnosis.append(ex.getMessage() + "\n"); + } + } + if(nodeFiles.isEmpty()) { + throw new IOException(diagnosis.toString()); + } return nodeFiles; } @@ -233,12 +383,11 @@ public static String getNodeString(String nodeId) { public static RemoteIterator getRemoteNodeFileDir( Configuration conf, ApplicationId appId, String appOwner) throws IOException { - Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner); - RemoteIterator nodeFiles = null; - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); - nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), - conf).listStatus(remoteAppLogDir); - return nodeFiles; + String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + Path remoteRootLogDir = new Path(conf.get( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + return getRemoteNodeFileDir(conf, appId, appOwner, + remoteRootLogDir, suffix); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 17a8aad..b5dbcc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -31,9 +31,11 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -362,31 +364,23 @@ public Object run() throws Exception { Path appDir = LogAggregationUtils.getRemoteAppLogDir( remoteRootLogDir, appId, user, remoteRootLogDirSuffix); - appDir = appDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); + Path curDir = appDir; + LinkedList pathsToCreate = new LinkedList<>(); - if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - remoteRootLogDir, user, remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + while (curDir != remoteRootLogDir) { + curDir = curDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); - } - - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + if (!checkExists(remoteFS, curDir, APP_DIR_PERMISSIONS)) { + pathsToCreate.addFirst(curDir); + curDir = curDir.getParent(); + } else { + break; } - - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); } + for (Path path : pathsToCreate) { + createDir(remoteFS, path, APP_DIR_PERMISSIONS); + } } catch (IOException e) { LOG.error("Failed to setup application log directory for " + appId, e); @@ -411,7 +405,6 @@ protected FileSystem getFileSystem(Configuration conf) throws IOException { protected void createDir(FileSystem fs, Path path, FsPermission fsPerm) throws IOException { - if (fsSupportsChmod) { FsPermission dirPerm = new FsPermission(fsPerm); fs.mkdirs(path, dirPerm); @@ -467,6 +460,19 @@ public Path getRemoteAppLogDir(ApplicationId appId, String appOwner) this.remoteRootLogDir, this.remoteRootLogDirSuffix); } + /** + * Get the older remote application directory for log aggregation. + * @param appId the Application ID + * @param appOwner the Application Owner + * @return the older remote application directory + * @throws IOException if can not find the remote application directory + */ + public Path getOlderRemoteAppLogDir(ApplicationId appId, String appOwner) + throws IOException { + return LogAggregationUtils.getOlderRemoteAppLogDir(appId, appOwner, + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + } + protected void cleanOldLogs(Path remoteNodeLogFileForApp, final NodeId nodeId, UserGroupInformation userUgi) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java index 8339c1a..173ae11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java @@ -34,13 +34,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; /** * Use {@code LogAggregationFileControllerFactory} to get the correct @@ -160,15 +158,27 @@ public LogAggregationFileController getFileControllerForWrite() { public LogAggregationFileController getFileControllerForRead( ApplicationId appId, String appOwner) throws IOException { StringBuilder diagnosis = new StringBuilder(); - for(LogAggregationFileController fileController : controllers) { + if (LogAggregationUtils.isOlderPathEnabled(conf)) { + for (LogAggregationFileController fileController : controllers) { + try { + Path remoteAppLogDir = fileController.getOlderRemoteAppLogDir(appId, + appOwner); + if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir, appId, + appOwner).hasNext()) { + return fileController; + } + } catch (Exception ex) { + diagnosis.append(ex.getMessage() + "\n"); + continue; + } + } + } + for (LogAggregationFileController fileController : controllers) { try { Path remoteAppLogDir = fileController.getRemoteAppLogDir( appId, appOwner); - Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified( - remoteAppLogDir); - RemoteIterator nodeFiles = FileContext.getFileContext( - qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir); - if (nodeFiles.hasNext()) { + if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir, + appId, appOwner).hasNext()) { return fileController; } } catch (Exception ex) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 9ab3e37..9bb4f9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -819,6 +819,13 @@ public Path getRemoteAppLogDir(ApplicationId appId, String user) this.remoteRootLogDir, this.remoteRootLogDirSuffix); } + @Override + public Path getOlderRemoteAppLogDir(ApplicationId appId, String user) + throws IOException { + return LogAggregationUtils.getOlderRemoteAppLogDir(appId, user, + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + } + @Private public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end, ApplicationId appId) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 004af7c..b3375c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1345,6 +1345,14 @@ + If set to true, the older application log directory + will be considered while fetching application logs. + + yarn.nodemanager.remote-app-log-dir-include-older + true + + + Generate additional logs about container launches. Currently, this creates a copy of the launch script and lists the directory contents of the container work dir. When listing directory diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index f36ebf4..9dcdb37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -60,13 +60,14 @@ public void testDeletion() throws Exception { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; + String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; final Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - + Path rootPath = new Path(root); FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); @@ -80,39 +81,59 @@ public void testDeletion() throws Exception { new FileStatus[]{userDirStatus}); ApplicationId appId1 = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, appId1.toString()); - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); + ApplicationId.newInstance(now, 1); + Path suffixDir = new Path(userDir, newSuffix); + FileStatus suffixDirStatus = new FileStatus(0, true, + 0, 0, toDeleteTime, suffixDir); + Path clusterTimestampDir = LogAggregationUtils. + getRemoteLogClusterTimestampDir(remoteRootLogPath, + "me", suffix, appId1); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix); + FileStatus clusterTimestampDirStatus = new FileStatus(0, + true, 0, 0, toDeleteTime, clusterTimestampDir); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, toDeleteTime, bucketDir); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, + toDeleteTime, app1Dir); ApplicationId appId2 = - ApplicationId.newInstance(System.currentTimeMillis(), 2); - Path app2Dir = new Path(userLogDir, appId2.toString()); - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir); + ApplicationId.newInstance(now, 2); + Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId2, "me", suffix); + FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, + toDeleteTime, app2Dir); ApplicationId appId3 = - ApplicationId.newInstance(System.currentTimeMillis(), 3); - Path app3Dir = new Path(userLogDir, appId3.toString()); - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir); + ApplicationId.newInstance(now, 3); + Path app3Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId3, "me", suffix); + FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, + toDeleteTime, app3Dir); ApplicationId appId4 = - ApplicationId.newInstance(System.currentTimeMillis(), 4); - Path app4Dir = new Path(userLogDir, appId4.toString()); - FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); - - ApplicationId appId5 = - ApplicationId.newInstance(System.currentTimeMillis(), 5); - Path app5Dir = new Path(userLogDir, appId5.toString()); - FileStatus app5DirStatus = - new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir); - - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus, - app4DirStatus, app5DirStatus }); + ApplicationId.newInstance(now, 4); + Path app4Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId4, "me", suffix); + FileStatus app4DirStatus = + new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); + + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus, app2DirStatus, + app3DirStatus, app4DirStatus }); when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{}); - + + Path app2Log1 = new Path(app2Dir, "host1"); FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1); @@ -137,25 +158,16 @@ public void testDeletion() throws Exception { FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1); Path app4Log2 = new Path(app4Dir, "host2"); - FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log2); - + FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, + toKeepTime, app4Log2); + when(mockFs.listStatus(app4Dir)).thenReturn( new FileStatus[]{app4Log1Status, app4Log2Status}); - Path app5Log1 = new Path(app5Dir, "host1"); - FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1); - - Path app5Log2 = new Path(app5Dir, "host2"); - FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2); - - when(mockFs.listStatus(app5Dir)).thenReturn( - new FileStatus[]{app5Log1Status, app5Log2Status}); - final List finishedApplications = - Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3, - appId4)); + Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3)); final List runningApplications = - Collections.unmodifiableList(Arrays.asList(appId5)); + Collections.unmodifiableList(Arrays.asList(appId4)); AggregatedLogDeletionService deletionService = new AggregatedLogDeletionService() { @@ -180,10 +192,9 @@ protected void stopRMClient() { verify(mockFs, timeout(2000)).delete(app1Dir, true); verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true); verify(mockFs, timeout(2000)).delete(app3Dir, true); - verify(mockFs, timeout(2000)).delete(app4Dir, true); - verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true); - verify(mockFs, timeout(2000)).delete(app5Log1, true); - verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true); + verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true); + verify(mockFs, timeout(2000)).delete(app4Log1, true); + verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true); deletionService.stop(); } @@ -198,6 +209,7 @@ public void testRefreshLogRetentionSettings() throws Exception { String root = "mockfs://foo/"; String remoteRootLogDir = root + "tmp/logs"; String suffix = "logs"; + String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; final Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); @@ -218,26 +230,45 @@ public void testRefreshLogRetentionSettings() throws Exception { userDir); when(mockFs.listStatus(remoteRootLogPath)).thenReturn( - new FileStatus[] { userDirStatus }); + new FileStatus[] {userDirStatus}); - Path userLogDir = new Path(userDir, suffix); + Path suffixDir = new Path(userDir, newSuffix); + FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs, + suffixDir); ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); //Set time last modified of app1Dir directory and its files to before2000Secs - Path app1Dir = new Path(userLogDir, appId1.toString()); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix); + Path clusterTimestampDir = + LogAggregationUtils.getRemoteLogClusterTimestampDir( + remoteRootLogPath, "me", suffix, appId1); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1); + FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0, + 0, before50Secs, clusterTimestampDir); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, before50Secs, bucketDir); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, app1Dir); ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2); //Set time last modified of app1Dir directory and its files to before50Secs - Path app2Dir = new Path(userLogDir, appId2.toString()); + Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId2, "me", suffix); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, app2Dir); - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[] { app1DirStatus, app2DirStatus }); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixStatus }); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus, app2DirStatus }); Path app1Log1 = new Path(app1Dir, "host1"); FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs, @@ -310,6 +341,7 @@ public void testCheckInterval() throws Exception { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; + String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); @@ -334,12 +366,31 @@ public void testCheckInterval() throws Exception { ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, appId1.toString()); + Path suffixDir = new Path(userDir, newSuffix); + FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now, + suffixDir); + Path clusterTimestampDir = + LogAggregationUtils.getRemoteLogClusterTimestampDir( + remoteRootLogPath, "me", suffix, appId1); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix); + FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0, + 0, now, clusterTimestampDir); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, now, bucketDir); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir); - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[]{app1DirStatus}); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {clusterTimestampDirStatus}); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus}); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus}); Path app1Log1 = new Path(app1Dir, "host1"); FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1); @@ -373,10 +424,19 @@ protected void stopRMClient() { verify(mockFs, never()).delete(app1Dir, true); // modify the timestamp of the logs and verify it's picked up quickly + clusterTimestampDirStatus = new FileStatus(0, + true, 0, 0, toDeleteTime, clusterTimestampDir); + bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir); app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1); - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[]{app1DirStatus}); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus }); when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{app1Log1Status}); @@ -392,6 +452,7 @@ public void testRobustLogDeletion() throws Exception { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; + String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); @@ -402,6 +463,7 @@ public void testRobustLogDeletion() throws Exception { conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + long clusterTs = System.currentTimeMillis(); // prevent us from picking up the same mockfs instance from another test FileSystem.closeAll(); Path rootPath = new Path(root); @@ -411,27 +473,40 @@ public void testRobustLogDeletion() throws Exception { Path remoteRootLogPath = new Path(remoteRootLogDir); Path userDir = new Path(remoteRootLogPath, "me"); + Path suffixDir = new Path(userDir, newSuffix); FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir); + FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir); + Path clusterTsDir = new Path(suffixDir, String.valueOf(clusterTs)); + FileStatus clusterTsStaus = new FileStatus(0, true, 0, 0, 0, clusterTsDir); + Path bucketDir = new Path(clusterTsDir, String.valueOf(0)); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir); when(mockFs.listStatus(remoteRootLogPath)).thenReturn( new FileStatus[]{userDirStatus}); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[]{suffixStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[]{clusterTsStaus}); + when(mockFs.listStatus(clusterTsDir)).thenReturn( + new FileStatus[]{bucketDirStatus}); - Path userLogDir = new Path(userDir, suffix); ApplicationId appId1 = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path app1Dir = new Path(userLogDir, appId1.toString()); + ApplicationId.newInstance(clusterTs, 1); + Path app1Dir = new Path(bucketDir, appId1.toString()); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir); ApplicationId appId2 = - ApplicationId.newInstance(System.currentTimeMillis(), 2); - Path app2Dir = new Path(userLogDir, "application_a"); + ApplicationId.newInstance(clusterTs, 2); + Path app2Dir = new Path(bucketDir, "application_a"); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir); ApplicationId appId3 = - ApplicationId.newInstance(System.currentTimeMillis(), 3); - Path app3Dir = new Path(userLogDir, appId3.toString()); + ApplicationId.newInstance(clusterTs, 3); + Path app3Dir = new Path(bucketDir, appId3.toString()); FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir); - when(mockFs.listStatus(userLogDir)).thenReturn( + when(mockFs.listStatus(bucketDir)).thenReturn( new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus}); + when(mockFs.listStatus(app2Dir)).thenReturn( + new FileStatus[]{}); when(mockFs.listStatus(app1Dir)).thenThrow( new RuntimeException("Should Be Caught and Logged")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index 4767282..231e0e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -87,8 +87,9 @@ public static void createContainerLogFileInRemoteFS(Configuration conf, createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName, content); // upload container logs to remote log dir - Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR), - user + "/logs/" + appId.toString()); + Path path = LogAggregationUtils.getRemoteAppLogDir( + new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)), + appId, user, "logs"); if (fs.exists(path) && deleteRemoteLogDir) { fs.delete(path, true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 000b73b..a363538 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -692,7 +692,7 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() } @Test - public void testAppLogDirCreation() throws Exception { + public void testBucketDirCreation() throws Exception { final String logSuffix = "logs"; this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); @@ -700,6 +700,83 @@ public void testAppLogDirCreation() throws Exception { this.remoteRootLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix); + InlineDispatcher dispatcher1 = new InlineDispatcher(); + dispatcher1.init(this.conf); + dispatcher1.start(); + + FileSystem fs = FileSystem.get(this.conf); + final FileSystem spyFs = spy(FileSystem.get(this.conf)); + + final LogAggregationTFileController spyFileFormat + = new LogAggregationTFileController() { + @Override + public FileSystem getFileSystem(Configuration conf) + throws IOException { + return spyFs; + } + }; + spyFileFormat.initialize(conf, "TFile"); + LogAggregationService aggSvc = new LogAggregationService(dispatcher1, + this.context, this.delSrvc, super.dirsHandler) { + @Override + public LogAggregationFileController getLogAggregationFileController( + Configuration conf) { + return spyFileFormat; + } + }; + aggSvc.init(this.conf); + aggSvc.start(); + + LogAggregationContext contextWithAllContainers = + Records.newRecord(LogAggregationContext.class); + contextWithAllContainers.setLogAggregationPolicyClassName( + AllContainerLogAggregationPolicy.class.getName()); + + // start an application and verify bucket 8888/9999 is created + ApplicationId appId = BuilderUtils.newApplicationId(99999, 88888); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); + + // Verify bucket 0002/0001 is created + ApplicationId appId2 = BuilderUtils.newApplicationId(10001, 10002); + Path bucketDir2 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId2)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir2), isA(FsPermission.class)); + + // Verify we do not try to create bucket 0002/0001 again + ApplicationId appId3 = BuilderUtils.newApplicationId(10001, 2); + Path bucketDir3 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId3)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, + this.acls, contextWithAllContainers)); + Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId3, this.user, logSuffix)); + verify(spyFs).mkdirs(eq(appDir3), isA(FsPermission.class)); + + aggSvc.stop(); + aggSvc.close(); + dispatcher1.stop(); + } + + @Test + public void testAppLogDirCreation() throws Exception { + final String logSuffix = "bucket_logs"; + final String inputSuffix = "logs"; + this.conf.set(YarnConfiguration.NM_LOG_DIRS, + localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, inputSuffix); + InlineDispatcher dispatcher = new InlineDispatcher(); dispatcher.init(this.conf); dispatcher.start(); @@ -732,7 +809,16 @@ public LogAggregationFileController getLogAggregationFileController( Path userDir = fs.makeQualified(new Path( remoteRootLogDir.getAbsolutePath(), this.user)); Path suffixDir = new Path(userDir, logSuffix); - Path appDir = new Path(suffixDir, appId.toString()); + Path clusterTimestampDir = fs.makeQualified( + LogAggregationUtils.getRemoteLogClusterTimestampDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, inputSuffix, appId)); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, inputSuffix, appId)); + Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), appId, + this.user, inputSuffix)); LogAggregationContext contextWithAllContainers = Records.newRecord(LogAggregationContext.class); contextWithAllContainers.setLogAggregationPolicyClassName( @@ -741,19 +827,25 @@ public LogAggregationFileController getLogAggregationFileController( this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(clusterTimestampDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); // start another application and verify only app dir created - ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); - Path appDir2 = new Path(suffixDir, appId2.toString()); + ApplicationId appId2 = BuilderUtils.newApplicationId(2, 2); + Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId2, this.user, inputSuffix)); aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); // start another application with the app dir already created and verify // we do not try to create it again - ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3); - Path appDir3 = new Path(suffixDir, appId3.toString()); + ApplicationId appId3 = BuilderUtils.newApplicationId(2, 10002); + Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId3, this.user, inputSuffix)); new File(appDir3.toUri().getPath()).mkdir(); aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, this.acls, contextWithAllContainers)); -- 2.7.4 (Apple Git-66)