From c34b845ea00638b8044e3d1ac0c7e32afd0e4bec Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Mon, 21 Jan 2019 11:17:56 +0530 Subject: [PATCH] YARN-6929 --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 16 +- .../AggregatedLogDeletionService.java | 37 +++- .../yarn/logaggregation/LogAggregationUtils.java | 208 ++++++++++++++++++--- .../hadoop/yarn/logaggregation/LogCLIHelpers.java | 5 +- .../LogAggregationFileController.java | 64 +++++-- .../LogAggregationFileControllerFactory.java | 28 ++- .../ifile/LogAggregationIndexedFileController.java | 7 + .../src/main/resources/yarn-default.xml | 16 ++ .../TestAggregatedLogDeletionService.java | 199 ++++++++++++++------ .../logaggregation/TestContainerLogsUtils.java | 6 +- .../logaggregation/TestLogAggregationService.java | 115 +++++++++++- 11 files changed, 576 insertions(+), 125 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e1980c3..084a960 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1412,13 +1412,25 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs"; /** - * The remote log dir will be created at - * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} + * The remote log dir will be created at below location. + * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX + * /${clusterTsDir}/${bucketDir}/${appId} */ public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = NM_PREFIX + "remote-app-log-dir-suffix"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs"; + /** Maximum Number of buckets to be created under NM_REMOTE_APP_LOG_DIR. */ + public static final String NM_REMOTE_APP_LOG_DIR_MAX_BUCKETS = + NM_PREFIX + "remote-app-log-dir-maxbuckets"; + public static final int DEFAULT_NM_REMOTE_APP_LOG_DIR_MAX_BUCKETS=1048576; + + /** Specifies whether Older Application Log Directory is included. */ + public static final String NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER = + NM_PREFIX + "remote-app-log-dir-include-older"; + public static final boolean DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER = + true; + public static final String YARN_LOG_SERVER_URL = YARN_PREFIX + "log.server.url"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index 841b870..e2eb459 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -66,7 +66,7 @@ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) { this.conf = conf; this.retentionMillis = retentionSecs * 1000; - this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + this.suffix = LogAggregationUtils.getBucketSuffix(); this.remoteRootLogDir = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); @@ -81,8 +81,39 @@ public void run() { FileSystem fs = remoteRootLogDir.getFileSystem(conf); for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { if(userDir.isDirectory()) { - Path userDirPath = new Path(userDir.getPath(), suffix); - deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient); + for(FileStatus suffixDir : fs.listStatus(userDir.getPath())) { + Path suffixDirPath = suffixDir.getPath(); + if(suffixDir.isDirectory() && suffixDirPath.getName(). + startsWith(suffix)) { + for(FileStatus clusterTimestampDir : fs.listStatus( + suffixDirPath)) { + if(clusterTimestampDir.isDirectory()) { + Path clusterTimestampDirPath = clusterTimestampDir. + getPath(); + FileStatus[] bucketDirs = fs.listStatus( + clusterTimestampDirPath); + if(bucketDirs.length != 0) { + for(FileStatus bucketDir : bucketDirs) { + if(bucketDir.isDirectory()) { + Path bucketDirPath = bucketDir.getPath(); + FileStatus[] appDirs = fs.listStatus(bucketDirPath); + if(appDirs.length != 0) { + deleteOldLogDirsFrom(bucketDirPath, cutoffMillis, + fs, rmClient); + } else if(fs.getFileStatus(bucketDirPath). + getModificationTime() < cutoffMillis) { + fs.delete(bucketDirPath, false); + } + } + } + } else if(fs.getFileStatus(clusterTimestampDirPath). + getModificationTime() < cutoffMillis) { + fs.delete(clusterTimestampDirPath, false); + } + } + } + } + } } } } catch (Throwable t) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index edf2cf3..0f7d6f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -38,6 +38,7 @@ public class LogAggregationUtils { public static final String TMP_FILE_SUFFIX = ".tmp"; + private static final String BUCKET_SUFFIX = "bucket"; /** * Constructs the full filename for an application's log file per node. @@ -46,12 +47,14 @@ * @param user the application owner * @param nodeId the node id * @param suffix the log dir suffix + * @param maxBucket the maximum directory items * @return the remote log file. */ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, - ApplicationId appId, String user, NodeId nodeId, String suffix) { - return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix), - getNodeString(nodeId)); + ApplicationId appId, String user, NodeId nodeId, String suffix, + int maxBucket) { + return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, + suffix, maxBucket), getNodeString(nodeId)); } /** @@ -60,12 +63,27 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, * @param appId the application id * @param user the application owner * @param suffix the log directory suffix + * @param maxBucket the maximum directory items * @return the remote application specific log dir. */ public static Path getRemoteAppLogDir(Path remoteRootLogDir, - ApplicationId appId, String user, String suffix) { - return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix), - appId.toString()); + ApplicationId appId, String user, String suffix, int maxBucket) { + return new Path(getRemoteBucketDir(remoteRootLogDir, user, suffix, + appId, maxBucket), appId.toString()); + } + + /** + * Gets the older remote app log dir. + * @param appId the application id + * @param user the application owner + * @param remoteRootLogDir the aggregated log remote root log dir + * @param suffix the log directory suffix + * @return the remote application specific log dir. + */ + public static Path getOlderRemoteAppLogDir(ApplicationId appId, + String user, Path remoteRootLogDir, String suffix) { + return new Path(getOlderRemoteLogSuffixedDir(remoteRootLogDir, user, + suffix), appId.toString()); } /** @@ -77,6 +95,20 @@ public static Path getRemoteAppLogDir(Path remoteRootLogDir, */ public static Path getRemoteLogSuffixedDir(Path remoteRootLogDir, String user, String suffix) { + suffix = getBucketSuffix() + suffix; + // TODO Maybe support suffix to be more than a single file. + return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix); + } + + /** + * Gets the older remote suffixed log dir for the user. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @return the older remote suffixed log dir. + */ + public static Path getOlderRemoteLogSuffixedDir(Path remoteRootLogDir, + String user, String suffix) { if (suffix == null || suffix.isEmpty()) { return getRemoteLogUserDir(remoteRootLogDir, user); } @@ -95,6 +127,57 @@ public static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) { } /** + * Gets the remote log user's cluster timestamp dir. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @param appId the application id + * @return the remote log per user per cluster timestamp dir. + */ + public static Path getRemoteLogClusterTimestampDir(Path remoteRootLogDir, + String user, String suffix, ApplicationId appId) { + return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, + suffix), String.valueOf(appId.getClusterTimestamp())); + } + + /** + * Gets the remote log user's cluster timestamp's bucket dir. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @param appId the application id + * @param maxBucket the maximum directory items + * @return the remote log per user per cluster timestamp per bucket dir. + */ + public static Path getRemoteBucketDir(Path remoteRootLogDir, String user, + String suffix, ApplicationId appId, int maxBucket) { + int bucket = appId.getId() / maxBucket; + return new Path(getRemoteLogClusterTimestampDir(remoteRootLogDir, + user, suffix, appId), String.valueOf(bucket)); + } + + /** + * Returns the bucket component of the log dir. + * @param conf the configuration + * @return the bucket which will be appended to the user log dir. + */ + public static int getMaxBucket(Configuration conf) { + return conf.getInt(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_MAX_BUCKETS, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_MAX_BUCKETS); + } + + /** + * Check if older Application Log Directory has to be included. + * @param conf the configuration + * @return Is Older App Log Dir enabled? + */ + public static boolean isOlderPathEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration. + NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER); + } + + /** * Returns the suffix component of the log dir. * @param conf the configuration * @return the suffix which will be appended to the user log dir. @@ -104,6 +187,14 @@ public static String getRemoteNodeLogDirSuffix(Configuration conf) { YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); } + /** + * Returns the bucket suffix component of the log dir. + * @return the bucket suffix which appended to user log dir + */ + public static String getBucketSuffix() { + return BUCKET_SUFFIX; + } + /** * Converts a nodeId to a form used in the app log file name. @@ -154,13 +245,14 @@ public static String getNodeString(String nodeId) { org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) throws IOException { org.apache.hadoop.fs.Path remoteAppDir = null; + int maxBucket = LogAggregationUtils.getMaxBucket(conf); if (appOwner == null) { org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir = FileContext.getFileContext(conf).makeQualified(remoteRootLogDir); FileContext fc = FileContext.getFileContext( qualifiedRemoteRootLogDir.toUri(), conf); org.apache.hadoop.fs.Path toMatch = LogAggregationUtils - .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); + .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket); FileStatus[] matching = fc.util().globStatus(toMatch); if (matching == null || matching.length != 1) { throw new IOException("Can not find remote application directory for " @@ -169,7 +261,7 @@ public static String getNodeString(String nodeId) { remoteAppDir = matching[0].getPath(); } else { remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, appId, appOwner, suffix); + remoteRootLogDir, appId, appOwner, suffix, maxBucket); } return remoteAppDir; } @@ -177,6 +269,24 @@ public static String getNodeString(String nodeId) { /** * Get all available log files under remote app log directory. * @param conf the configuration + * @param remoteAppLogDir the application log directory + * @param appId the applicationId + * @param appOwner the application owner + * @return the iterator of available log files + * @throws IOException if there is no log file directory + */ + public static RemoteIterator getNodeFiles(Configuration conf, + Path remoteAppLogDir, ApplicationId appId, String appOwner) + throws IOException { + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); + return FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir); + } + + /** + * Get all available log files under remote app log directory. + * @param conf the configuration * @param appId the applicationId * @param appOwner the application owner * @param remoteRootLogDir the remote root log directory @@ -188,14 +298,47 @@ public static String getNodeString(String nodeId) { Configuration conf, ApplicationId appId, String appOwner, org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) throws IOException { + StringBuilder diagnosis = new StringBuilder(); Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix); - RemoteIterator nodeFiles = null; - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); - nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), - conf).listStatus(remoteAppLogDir); - return nodeFiles; + RemoteIterator nodeFilesCur = null; + try { + nodeFilesCur = getNodeFiles(conf, remoteAppLogDir, appId, appOwner); + } catch(IOException ex) { + diagnosis.append(ex.getMessage() + "\n"); + } + if(isOlderPathEnabled(conf)) { + remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner, + remoteRootLogDir, suffix); + try { + RemoteIterator nodeFilesPrev = getNodeFiles(conf, + remoteAppLogDir, appId, appOwner); + if(nodeFilesCur == null) { + return nodeFilesPrev; + } else if(nodeFilesPrev != null) { + RemoteIterator curDir = nodeFilesCur; + RemoteIterator prevDir = nodeFilesPrev; + RemoteIterator nodeFilesCombined = new + RemoteIterator() { + @Override + public boolean hasNext() throws IOException { + return prevDir.hasNext() || curDir.hasNext(); + } + @Override + public FileStatus next() throws IOException { + return prevDir.hasNext() ? prevDir.next() : curDir.next(); + } + }; + return nodeFilesCombined; + } + } catch(IOException ex) { + diagnosis.append(ex.getMessage() + "\n"); + } + } + if(nodeFilesCur == null) { + throw new IOException(diagnosis.toString()); + } + return nodeFilesCur; } /** @@ -212,13 +355,33 @@ public static String getNodeString(String nodeId) { Configuration conf, ApplicationId appId, String appOwner, org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) throws IOException { + StringBuilder diagnosis = new StringBuilder(); Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix); List nodeFiles = new ArrayList<>(); Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); - nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( - qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + try { + nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + } catch(IOException ex) { + diagnosis.append(ex.getMessage() + "\n"); + } + if(isOlderPathEnabled(conf)) { + remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner, + remoteRootLogDir, suffix); + qualifiedLogDir = FileContext.getFileContext(conf). + makeQualified(remoteAppLogDir); + try { + nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + } catch(IOException ex) { + diagnosis.append(ex.getMessage() + "\n"); + } + } + if(nodeFiles.isEmpty()) { + throw new IOException(diagnosis.toString()); + } return nodeFiles; } @@ -233,12 +396,11 @@ public static String getNodeString(String nodeId) { public static RemoteIterator getRemoteNodeFileDir( Configuration conf, ApplicationId appId, String appOwner) throws IOException { - Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner); - RemoteIterator nodeFiles = null; - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); - nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), - conf).listStatus(remoteAppLogDir); - return nodeFiles; + String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + Path remoteRootLogDir = new Path(conf.get( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + return getRemoteNodeFileDir(conf, appId, appOwner, + remoteRootLogDir, suffix); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 9dae7b9..a7df733 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -86,8 +86,9 @@ public static String getOwnerForAppIdOrNull( YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); Path fullPath = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, - appId, bestGuess, suffix); + appId, bestGuess, suffix, maxBucket); FileContext fc = FileContext.getFileContext(remoteRootLogDir.toUri(), conf); String pathAccess = fullPath.toString(); @@ -96,7 +97,7 @@ public static String getOwnerForAppIdOrNull( return bestGuess; } Path toMatch = LogAggregationUtils. - getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); + getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket); pathAccess = toMatch.toString(); FileStatus[] matching = fc.util().globStatus(toMatch); if (matching == null || matching.length != 1) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index e37308d..de2829e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -106,6 +106,7 @@ protected Configuration conf; protected Path remoteRootLogDir; protected String remoteRootLogDirSuffix; + private int maxBucket; protected int retentionSize; protected String fileControllerName; @@ -132,6 +133,7 @@ public void initialize(Configuration conf, String controllerName) { this.retentionSize = configuredRetentionSize; } this.fileControllerName = controllerName; + this.maxBucket = LogAggregationUtils.getMaxBucket(conf); initInternal(conf); } @@ -360,33 +362,46 @@ public Object run() throws Exception { // Only creating directories if they are missing to avoid // unnecessary load on the filesystem from all of the nodes Path appDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, appId, user, remoteRootLogDirSuffix); + remoteRootLogDir, appId, user, remoteRootLogDirSuffix, + maxBucket); appDir = appDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - remoteRootLogDir, user, remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogDir, user, remoteRootLogDirSuffix, appId, + maxBucket); + bucketDir = bucketDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + if (!checkExists(remoteFS, bucketDir, APP_DIR_PERMISSIONS)) { + Path clusterTimestampDir = + LogAggregationUtils.getRemoteLogClusterTimestampDir( + remoteRootLogDir, user, remoteRootLogDirSuffix, appId); + clusterTimestampDir = clusterTimestampDir.makeQualified( + remoteFS.getUri(), remoteFS.getWorkingDirectory()); + if (!checkExists(remoteFS, clusterTimestampDir, + APP_DIR_PERMISSIONS)) { + Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { + Path userDir = LogAggregationUtils.getRemoteLogUserDir( + remoteRootLogDir, user); + userDir = userDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { + createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + } + createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + } + createDir(remoteFS, clusterTimestampDir, APP_DIR_PERMISSIONS); } - - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + createDir(remoteFS, bucketDir, APP_DIR_PERMISSIONS); } - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); } - } catch (IOException e) { LOG.error("Failed to setup application log directory for " + appId, e); @@ -451,7 +466,7 @@ public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user, NodeId nodeId) { return LogAggregationUtils.getRemoteNodeLogFileForApp( getRemoteRootLogDir(), appId, user, nodeId, - getRemoteRootLogDirSuffix()); + getRemoteRootLogDirSuffix(), this.maxBucket); } /** @@ -467,6 +482,19 @@ public Path getRemoteAppLogDir(ApplicationId appId, String appOwner) this.remoteRootLogDir, this.remoteRootLogDirSuffix); } + /** + * Get the older remote application directory for log aggregation. + * @param appId the Application ID + * @param appOwner the Application Owner + * @return the older remote application directory + * @throws IOException if can not find the remote application directory + */ + public Path getOlderRemoteAppLogDir(ApplicationId appId, String appOwner) + throws IOException { + return LogAggregationUtils.getOlderRemoteAppLogDir(appId, appOwner, + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + } + protected void cleanOldLogs(Path remoteNodeLogFileForApp, final NodeId nodeId, UserGroupInformation userUgi) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java index cf40209..ac60caf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java @@ -34,13 +34,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; /** * Use {@code LogAggregationFileControllerFactory} to get the correct @@ -160,15 +158,27 @@ public LogAggregationFileController getFileControllerForWrite() { public LogAggregationFileController getFileControllerForRead( ApplicationId appId, String appOwner) throws IOException { StringBuilder diagnosis = new StringBuilder(); - for(LogAggregationFileController fileController : controllers) { + if (LogAggregationUtils.isOlderPathEnabled(conf)) { + for (LogAggregationFileController fileController : controllers) { + try { + Path remoteAppLogDir = fileController.getOlderRemoteAppLogDir(appId, + appOwner); + if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir, appId, + appOwner).hasNext()) { + return fileController; + } + } catch (Exception ex) { + diagnosis.append(ex.getMessage() + "\n"); + continue; + } + } + } + for (LogAggregationFileController fileController : controllers) { try { Path remoteAppLogDir = fileController.getRemoteAppLogDir( appId, appOwner); - Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified( - remoteAppLogDir); - RemoteIterator nodeFiles = FileContext.getFileContext( - qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir); - if (nodeFiles.hasNext()) { + if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir, + appId, appOwner).hasNext()) { return fileController; } } catch (Exception ex) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 78b0c13..d7e86ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -819,6 +819,13 @@ public Path getRemoteAppLogDir(ApplicationId appId, String user) this.remoteRootLogDir, this.remoteRootLogDirSuffix); } + @Override + public Path getOlderRemoteAppLogDir(ApplicationId appId, String user) + throws IOException { + return LogAggregationUtils.getOlderRemoteAppLogDir(appId, user, + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + } + @Private public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end, ApplicationId appId) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index fea635b..d217b8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1337,6 +1337,22 @@ + The Maximum number of buckets to be created + under remote app log dir {yarn.nodemanager.remote-app-log-dir}. + + yarn.nodemanager.remote-app-log-dir-maxbuckets + 1048576 + + + + If set to true, the older application log directory + will be considered while fetching application logs. + + yarn.nodemanager.remote-app-log-dir-include-older + true + + + Generate additional logs about container launches. Currently, this creates a copy of the launch script and lists the directory contents of the container work dir. When listing directory diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index f36ebf4..f4f5717 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -60,6 +60,7 @@ public void testDeletion() throws Exception { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; + String newSuffix = "bucketlogs"; final Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); @@ -67,6 +68,7 @@ public void testDeletion() throws Exception { conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); Path rootPath = new Path(root); FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); @@ -80,39 +82,59 @@ public void testDeletion() throws Exception { new FileStatus[]{userDirStatus}); ApplicationId appId1 = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, appId1.toString()); - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); + ApplicationId.newInstance(now, 1); + Path suffixDir = new Path(userDir, newSuffix); + FileStatus suffixDirStatus = new FileStatus(0, true, + 0, 0, toDeleteTime, suffixDir); + Path clusterTimestampDir = LogAggregationUtils. + getRemoteLogClusterTimestampDir(remoteRootLogPath, + "me", suffix, appId1); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1, maxBucket); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix, maxBucket); + FileStatus clusterTimestampDirStatus = new FileStatus(0, + true, 0, 0, toDeleteTime, clusterTimestampDir); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, toDeleteTime, bucketDir); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, + toDeleteTime, app1Dir); ApplicationId appId2 = - ApplicationId.newInstance(System.currentTimeMillis(), 2); - Path app2Dir = new Path(userLogDir, appId2.toString()); - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir); + ApplicationId.newInstance(now, 2); + Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId2, "me", suffix, maxBucket); + FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, + toDeleteTime, app2Dir); ApplicationId appId3 = - ApplicationId.newInstance(System.currentTimeMillis(), 3); - Path app3Dir = new Path(userLogDir, appId3.toString()); - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir); + ApplicationId.newInstance(now, 3); + Path app3Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId3, "me", suffix, maxBucket); + FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, + toDeleteTime, app3Dir); ApplicationId appId4 = - ApplicationId.newInstance(System.currentTimeMillis(), 4); - Path app4Dir = new Path(userLogDir, appId4.toString()); - FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); - - ApplicationId appId5 = - ApplicationId.newInstance(System.currentTimeMillis(), 5); - Path app5Dir = new Path(userLogDir, appId5.toString()); - FileStatus app5DirStatus = - new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir); - - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus, - app4DirStatus, app5DirStatus }); + ApplicationId.newInstance(now, 4); + Path app4Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId4, "me", suffix, maxBucket); + FileStatus app4DirStatus = + new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); + + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus, app2DirStatus, + app3DirStatus, app4DirStatus }); when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{}); - + + Path app2Log1 = new Path(app2Dir, "host1"); FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1); @@ -137,25 +159,15 @@ public void testDeletion() throws Exception { FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1); Path app4Log2 = new Path(app4Dir, "host2"); - FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log2); - + FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app4Log2); + when(mockFs.listStatus(app4Dir)).thenReturn( new FileStatus[]{app4Log1Status, app4Log2Status}); - Path app5Log1 = new Path(app5Dir, "host1"); - FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1); - - Path app5Log2 = new Path(app5Dir, "host2"); - FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2); - - when(mockFs.listStatus(app5Dir)).thenReturn( - new FileStatus[]{app5Log1Status, app5Log2Status}); - final List finishedApplications = - Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3, - appId4)); + Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3)); final List runningApplications = - Collections.unmodifiableList(Arrays.asList(appId5)); + Collections.unmodifiableList(Arrays.asList(appId4)); AggregatedLogDeletionService deletionService = new AggregatedLogDeletionService() { @@ -180,10 +192,9 @@ protected void stopRMClient() { verify(mockFs, timeout(2000)).delete(app1Dir, true); verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true); verify(mockFs, timeout(2000)).delete(app3Dir, true); - verify(mockFs, timeout(2000)).delete(app4Dir, true); - verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true); - verify(mockFs, timeout(2000)).delete(app5Log1, true); - verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true); + verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true); + verify(mockFs, timeout(2000)).delete(app4Log1, true); + verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true); deletionService.stop(); } @@ -198,6 +209,7 @@ public void testRefreshLogRetentionSettings() throws Exception { String root = "mockfs://foo/"; String remoteRootLogDir = root + "tmp/logs"; String suffix = "logs"; + String newSuffix = "bucketlogs"; final Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); @@ -206,6 +218,7 @@ public void testRefreshLogRetentionSettings() throws Exception { "1"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); Path rootPath = new Path(root); FileSystem rootFs = rootPath.getFileSystem(conf); @@ -218,26 +231,45 @@ public void testRefreshLogRetentionSettings() throws Exception { userDir); when(mockFs.listStatus(remoteRootLogPath)).thenReturn( - new FileStatus[] { userDirStatus }); + new FileStatus[] {userDirStatus}); - Path userLogDir = new Path(userDir, suffix); + Path suffixDir = new Path(userDir, newSuffix); + FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs, + suffixDir); ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); //Set time last modified of app1Dir directory and its files to before2000Secs - Path app1Dir = new Path(userLogDir, appId1.toString()); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix, maxBucket); + Path clusterTimestampDir = + LogAggregationUtils.getRemoteLogClusterTimestampDir( + remoteRootLogPath, "me", suffix, appId1); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1, maxBucket); + FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0, + 0, before50Secs, clusterTimestampDir); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, before50Secs, bucketDir); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, app1Dir); ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2); //Set time last modified of app1Dir directory and its files to before50Secs - Path app2Dir = new Path(userLogDir, appId2.toString()); + Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId2, "me", suffix, maxBucket); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, app2Dir); - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[] { app1DirStatus, app2DirStatus }); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixStatus }); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus, app2DirStatus }); Path app1Log1 = new Path(app1Dir, "host1"); FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs, @@ -310,6 +342,7 @@ public void testCheckInterval() throws Exception { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; + String newSuffix = "bucketlogs"; Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); @@ -317,6 +350,7 @@ public void testCheckInterval() throws Exception { conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); // prevent us from picking up the same mockfs instance from another test FileSystem.closeAll(); @@ -334,12 +368,31 @@ public void testCheckInterval() throws Exception { ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, appId1.toString()); + Path suffixDir = new Path(userDir, newSuffix); + FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now, + suffixDir); + Path clusterTimestampDir = + LogAggregationUtils.getRemoteLogClusterTimestampDir( + remoteRootLogPath, "me", suffix, appId1); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1, maxBucket); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix, maxBucket); + FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0, + 0, now, clusterTimestampDir); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, now, bucketDir); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir); - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[]{app1DirStatus}); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {clusterTimestampDirStatus}); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus}); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus}); Path app1Log1 = new Path(app1Dir, "host1"); FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1); @@ -373,10 +426,19 @@ protected void stopRMClient() { verify(mockFs, never()).delete(app1Dir, true); // modify the timestamp of the logs and verify it's picked up quickly + clusterTimestampDirStatus = new FileStatus(0, + true, 0, 0, toDeleteTime, clusterTimestampDir); + bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir); app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1); - when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[]{app1DirStatus}); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus }); when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{app1Log1Status}); @@ -392,6 +454,7 @@ public void testRobustLogDeletion() throws Exception { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; + String newSuffix = "bucketlogs"; Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); @@ -402,6 +465,7 @@ public void testRobustLogDeletion() throws Exception { conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + long clusterTs = System.currentTimeMillis(); // prevent us from picking up the same mockfs instance from another test FileSystem.closeAll(); Path rootPath = new Path(root); @@ -411,26 +475,37 @@ public void testRobustLogDeletion() throws Exception { Path remoteRootLogPath = new Path(remoteRootLogDir); Path userDir = new Path(remoteRootLogPath, "me"); + Path suffixDir = new Path(userDir, newSuffix); FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir); + FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir); + Path clusterTsDir = new Path(suffixDir, String.valueOf(clusterTs)); + FileStatus clusterTsStaus = new FileStatus(0, true, 0, 0, 0, clusterTsDir); + Path bucketDir = new Path(clusterTsDir, String.valueOf(0)); + FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir); when(mockFs.listStatus(remoteRootLogPath)).thenReturn( new FileStatus[]{userDirStatus}); + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[]{suffixStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[]{clusterTsStaus}); + when(mockFs.listStatus(clusterTsDir)).thenReturn( + new FileStatus[]{bucketDirStatus}); - Path userLogDir = new Path(userDir, suffix); ApplicationId appId1 = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path app1Dir = new Path(userLogDir, appId1.toString()); + ApplicationId.newInstance(clusterTs, 1); + Path app1Dir = new Path(bucketDir, appId1.toString()); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir); ApplicationId appId2 = - ApplicationId.newInstance(System.currentTimeMillis(), 2); - Path app2Dir = new Path(userLogDir, "application_a"); + ApplicationId.newInstance(clusterTs, 2); + Path app2Dir = new Path(bucketDir, "application_a"); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir); ApplicationId appId3 = - ApplicationId.newInstance(System.currentTimeMillis(), 3); - Path app3Dir = new Path(userLogDir, appId3.toString()); + ApplicationId.newInstance(clusterTs, 3); + Path app3Dir = new Path(bucketDir, appId3.toString()); FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir); - when(mockFs.listStatus(userLogDir)).thenReturn( + when(mockFs.listStatus(bucketDir)).thenReturn( new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus}); when(mockFs.listStatus(app1Dir)).thenThrow( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index 4767282..508d760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -87,8 +87,10 @@ public static void createContainerLogFileInRemoteFS(Configuration conf, createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName, content); // upload container logs to remote log dir - Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR), - user + "/logs/" + appId.toString()); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); + Path path = LogAggregationUtils.getRemoteAppLogDir( + new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)), + appId, user, "logs", maxBucket); if (fs.exists(path) && deleteRemoteLogDir) { fs.delete(path, true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 8b2e3cc..33b39e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -687,7 +687,7 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() } @Test - public void testAppLogDirCreation() throws Exception { + public void testBucketDirCreation() throws Exception { final String logSuffix = "logs"; this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); @@ -695,6 +695,97 @@ public void testAppLogDirCreation() throws Exception { this.remoteRootLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix); + int maxBucket = this.conf.getInt( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR_MAX_BUCKETS, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_MAX_BUCKETS); + this.conf.setInt(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_MAX_BUCKETS, 2); + + InlineDispatcher dispatcher1 = new InlineDispatcher(); + dispatcher1.init(this.conf); + dispatcher1.start(); + + FileSystem fs = FileSystem.get(this.conf); + final FileSystem spyFs = spy(FileSystem.get(this.conf)); + + final LogAggregationTFileController spyFileFormat + = new LogAggregationTFileController() { + @Override + public FileSystem getFileSystem(Configuration conf) + throws IOException { + return spyFs; + } + }; + spyFileFormat.initialize(conf, "TFile"); + LogAggregationService aggSvc = new LogAggregationService(dispatcher1, + this.context, this.delSrvc, super.dirsHandler) { + @Override + public LogAggregationFileController getLogAggregationFileController( + Configuration conf) { + return spyFileFormat; + } + }; + aggSvc.init(this.conf); + aggSvc.start(); + + int maxBucket1 = LogAggregationUtils.getMaxBucket(this.conf); + LogAggregationContext contextWithAllContainers = + Records.newRecord(LogAggregationContext.class); + contextWithAllContainers.setLogAggregationPolicyClassName( + AllContainerLogAggregationPolicy.class.getName()); + + // start an application and verify bucket 0 is created + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); + + // Verify bucket 1 is created + ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); + Path bucketDir2 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId2, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir2), isA(FsPermission.class)); + + // Verify bucket 1 is used + ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3); + Path bucketDir3 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId3, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir3), isA(FsPermission.class)); + + // Verify bucket 2 is created + ApplicationId appId4 = BuilderUtils.newApplicationId(1, 4); + Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId4, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir4), isA(FsPermission.class)); + + this.conf.setInt( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR_MAX_BUCKETS, maxBucket); + aggSvc.stop(); + aggSvc.close(); + dispatcher1.stop(); + } + + @Test + public void testAppLogDirCreation() throws Exception { + final String logSuffix = "bucketlogs"; + final String inputSuffix = "logs"; + this.conf.set(YarnConfiguration.NM_LOG_DIRS, + localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, inputSuffix); + InlineDispatcher dispatcher = new InlineDispatcher(); dispatcher.init(this.conf); dispatcher.start(); @@ -727,7 +818,17 @@ public LogAggregationFileController getLogAggregationFileController( Path userDir = fs.makeQualified(new Path( remoteRootLogDir.getAbsolutePath(), this.user)); Path suffixDir = new Path(userDir, logSuffix); - Path appDir = new Path(suffixDir, appId.toString()); + int maxBucket = LogAggregationUtils.getMaxBucket(this.conf); + Path clusterTimestampDir = fs.makeQualified( + LogAggregationUtils.getRemoteLogClusterTimestampDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, inputSuffix, appId)); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, inputSuffix, appId, maxBucket)); + Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), appId, + this.user, inputSuffix, maxBucket)); LogAggregationContext contextWithAllContainers = Records.newRecord(LogAggregationContext.class); contextWithAllContainers.setLogAggregationPolicyClassName( @@ -736,11 +837,15 @@ public LogAggregationFileController getLogAggregationFileController( this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(clusterTimestampDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); // start another application and verify only app dir created ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); - Path appDir2 = new Path(suffixDir, appId2.toString()); + Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId2, this.user, inputSuffix, maxBucket)); aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); @@ -748,7 +853,9 @@ public LogAggregationFileController getLogAggregationFileController( // start another application with the app dir already created and verify // we do not try to create it again ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3); - Path appDir3 = new Path(suffixDir, appId3.toString()); + Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId3, this.user, inputSuffix, maxBucket)); new File(appDir3.toUri().getPath()).mkdir(); aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, this.acls, contextWithAllContainers)); -- 2.7.4 (Apple Git-66)