diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index e46eeda..f1ddcb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -45,6 +45,17 @@ org.apache.hadoop + hadoop-hdfs + provided + + + org.apache.hadoop + hadoop-hdfs-client + provided + + + + org.apache.hadoop hadoop-yarn-api diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index a80f9d7..3b3e60a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -50,7 +49,8 @@ */ @InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"}) public class AggregatedLogDeletionService extends AbstractService { - private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class); + private static final Log LOG = LogFactory.getLog( + AggregatedLogDeletionService.class); private Timer timer = null; private long checkIntervalMsecs; @@ -63,7 +63,8 @@ private Path remoteRootLogDir = null; private ApplicationClientProtocol rmClient = null; - public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) { + LogDeletionTask(Configuration conf, long retentionSecs, + ApplicationClientProtocol rmClient) { this.conf = conf; this.retentionMillis = retentionSecs * 1000; this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); @@ -76,18 +77,45 @@ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClient @Override public void run() { long cutoffMillis = System.currentTimeMillis() - retentionMillis; - LOG.info("aggregated log deletion started."); + LOG.info("aggregated log deletion started here."); try { FileSystem fs = remoteRootLogDir.getFileSystem(conf); for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { + LOG.info("UserDir="+userDir.getPath()); if(userDir.isDirectory()) { Path userDirPath = new Path(userDir.getPath(), suffix); - deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient); + LOG.info("USERDIR="+userDirPath); + for(FileStatus clusterTimestampDir : fs.listStatus(userDirPath)) { + LOG.info("CLUSTERTIMETSMA="+clusterTimestampDir.getPath()); + if(clusterTimestampDir.isDirectory()) { + Path clusterTimestampDirPath = clusterTimestampDir.getPath(); + for(FileStatus bucketDir : fs.listStatus( + clusterTimestampDir.getPath())) { + LOG.info("BUCKET="+bucketDir.getPath()); + if(bucketDir.isDirectory()) { + Path bucketDirPath = bucketDir.getPath(); + deleteOldLogDirsFrom(bucketDirPath, cutoffMillis, + fs, rmClient); + + if(fs.listStatus(bucketDirPath).length == 0 && + fs.getFileStatus(bucketDirPath).getModificationTime() + < cutoffMillis) { + fs.delete(bucketDirPath, false); + } + } + } + if(fs.listStatus(clusterTimestampDirPath).length == 0 && + fs.getFileStatus(clusterTimestampDirPath). + getModificationTime() < cutoffMillis) { + fs.delete(clusterTimestampDirPath, false); + } + } + } } } } catch (IOException e) { logIOException("Error reading root log dir this deletion " + - "attempt is being aborted", e); + "attempt is being aborted", e); } LOG.info("aggregated log deletion finished."); } @@ -121,7 +149,7 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, } } catch(IOException e) { logIOException( - "Error reading the contents of " + appDir.getPath(), e); + "Error reading the contents of " + appDir.getPath(), e); } } } @@ -131,8 +159,8 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, } } - private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis, - FileSystem fs) { + private static boolean shouldDeleteLogDir(FileStatus dir, + long cutoffMillis, FileSystem fs) { boolean shouldDelete = true; try { for(FileStatus node: fs.listStatus(dir.getPath())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index edf2cf3..7428466 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -46,12 +47,14 @@ * @param user the application owner * @param nodeId the node id * @param suffix the log dir suffix + * @param maxBucket the maximum directory items * @return the remote log file. */ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, - ApplicationId appId, String user, NodeId nodeId, String suffix) { - return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix), - getNodeString(nodeId)); + ApplicationId appId, String user, NodeId nodeId, String suffix, + int maxBucket) { + return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, + suffix, maxBucket), getNodeString(nodeId)); } /** @@ -60,12 +63,13 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, * @param appId the application id * @param user the application owner * @param suffix the log directory suffix + * @param maxBucket the maximum directory items * @return the remote application specific log dir. */ public static Path getRemoteAppLogDir(Path remoteRootLogDir, - ApplicationId appId, String user, String suffix) { - return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix), - appId.toString()); + ApplicationId appId, String user, String suffix, int maxBucket) { + return new Path(getRemoteBucketDir(remoteRootLogDir, user, suffix, + appId, maxBucket), appId.toString()); } /** @@ -94,6 +98,48 @@ public static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) { return new Path(remoteRootLogDir, user); } + /** + * Gets the remote log user's cluster timestamp dir. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @param appId the application id + * @return the remote log per user per cluster timestamp dir. + */ + public static Path getRemoteLogClusterTimestampDir(Path remoteRootLogDir, + String user, String suffix, ApplicationId appId) { + return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, + suffix), appId.getClusterTimestamp()+""); + } + + /** + * Gets the remote log user's cluster timestamp's bucket dir. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @param appId the application id + * @param maxBucket the maximum directory items + * @return the remote log per user per cluster timestamp per bucket dir. + */ + public static Path getRemoteBucketDir(Path remoteRootLogDir, String user, + String suffix, ApplicationId appId, int maxBucket) { + int id = appId.getId(); + int bucket = id / maxBucket; + return new Path(getRemoteLogClusterTimestampDir(remoteRootLogDir, + user, suffix, appId), bucket+""); + } + + + /** + * Returns the bucket component of the log dir. + * @param conf the configuration + * @return the bucket which will be appended to the user log dir. + */ + public static int getMaxBucket(Configuration conf) { + return conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, + DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT); + } + /** * Returns the suffix component of the log dir. * @param conf the configuration @@ -154,13 +200,14 @@ public static String getNodeString(String nodeId) { org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) throws IOException { org.apache.hadoop.fs.Path remoteAppDir = null; + int maxBucket = LogAggregationUtils.getMaxBucket(conf); if (appOwner == null) { org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir = FileContext.getFileContext(conf).makeQualified(remoteRootLogDir); FileContext fc = FileContext.getFileContext( qualifiedRemoteRootLogDir.toUri(), conf); org.apache.hadoop.fs.Path toMatch = LogAggregationUtils - .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); + .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket); FileStatus[] matching = fc.util().globStatus(toMatch); if (matching == null || matching.length != 1) { throw new IOException("Can not find remote application directory for " @@ -169,7 +216,7 @@ public static String getNodeString(String nodeId) { remoteAppDir = matching[0].getPath(); } else { remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, appId, appOwner, suffix); + remoteRootLogDir, appId, appOwner, suffix, maxBucket); } return remoteAppDir; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 887d92d..5d74f28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -86,8 +86,9 @@ public static String getOwnerForAppIdOrNull( YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); Path fullPath = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, - appId, bestGuess, suffix); + appId, bestGuess, suffix, maxBucket); FileContext fc = FileContext.getFileContext(remoteRootLogDir.toUri(), conf); String pathAccess = fullPath.toString(); @@ -96,7 +97,7 @@ public static String getOwnerForAppIdOrNull( return bestGuess; } Path toMatch = LogAggregationUtils. - getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); + getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket); pathAccess = toMatch.toString(); FileStatus[] matching = fc.util().globStatus(toMatch); if (matching == null || matching.length != 1) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 5df900b..c0c657d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -110,6 +110,7 @@ protected Configuration conf; protected Path remoteRootLogDir; protected String remoteRootLogDirSuffix; + private int maxBucket; protected int retentionSize; protected String fileControllerName; @@ -132,6 +133,7 @@ public void initialize(Configuration conf, String controllerName) { this.retentionSize = configuredRentionSize; } this.fileControllerName = controllerName; + this.maxBucket = LogAggregationUtils.getMaxBucket(conf); initInternal(conf); } @@ -329,31 +331,54 @@ public Object run() throws Exception { // Only creating directories if they are missing to avoid // unnecessary load on the filesystem from all of the nodes Path appDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, appId, user, remoteRootLogDirSuffix); + remoteRootLogDir, appId, user, remoteRootLogDirSuffix, + maxBucket); appDir = appDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - remoteRootLogDir, user, remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogDir, user, remoteRootLogDirSuffix, appId, + maxBucket); + bucketDir = bucketDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if(!checkExists(remoteFS, bucketDir, APP_DIR_PERMISSIONS)) { + Path clusterTimestampDir = + LogAggregationUtils.getRemoteLogClusterTimestampDir( + remoteRootLogDir, user, remoteRootLogDirSuffix, + appId); + clusterTimestampDir = clusterTimestampDir.makeQualified( + remoteFS.getUri(), remoteFS.getWorkingDirectory()); + + if(!checkExists(remoteFS, clusterTimestampDir, + APP_DIR_PERMISSIONS)) { + Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { + Path userDir = LogAggregationUtils.getRemoteLogUserDir( + remoteRootLogDir, user); + userDir = userDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { + createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, clusterTimestampDir, APP_DIR_PERMISSIONS); } - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); - } + createDir(remoteFS, bucketDir, APP_DIR_PERMISSIONS); + } - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); + createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); } } catch (IOException e) { @@ -409,7 +434,7 @@ public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user, NodeId nodeId) { return LogAggregationUtils.getRemoteNodeLogFileForApp( getRemoteRootLogDir(), appId, user, nodeId, - getRemoteRootLogDirSuffix()); + getRemoteRootLogDirSuffix(), this.maxBucket); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index 026996e..e9978bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -66,7 +66,8 @@ public void testDeletion() throws Exception { conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - + int maxBucket = LogAggregationUtils.getMaxBucket(conf); + Path rootPath = new Path(root); FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); @@ -74,7 +75,8 @@ public void testDeletion() throws Exception { Path remoteRootLogPath = new Path(remoteRootLogDir); Path userDir = new Path(remoteRootLogPath, "me"); - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir); + FileStatus userDirStatus = new FileStatus(0, + true, 0, 0, toKeepTime, userDir); when(mockFs.listStatus(remoteRootLogPath)).thenReturn( new FileStatus[]{userDirStatus}); @@ -82,71 +84,106 @@ public void testDeletion() throws Exception { ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, appId1.toString()); - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); - + + Path clusterTimestampDir = LogAggregationUtils. + getRemoteLogClusterTimestampDir(remoteRootLogPath, + "me", suffix, appId1); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1, maxBucket); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix, maxBucket); + + FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0, + 0, toDeleteTime, clusterTimestampDir); + + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, toDeleteTime, bucketDir); + FileStatus app1DirStatus = new FileStatus(0, true, + 0, 0, toDeleteTime, app1Dir); + ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2); - Path app2Dir = new Path(userLogDir, appId2.toString()); - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir); + Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId2, "me", suffix, maxBucket); + + FileStatus app2DirStatus = new FileStatus(0, true, 0, + 0, toDeleteTime, app2Dir); ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3); - Path app3Dir = new Path(userLogDir, appId3.toString()); - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir); + Path app3Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId3, "me", suffix, maxBucket); + FileStatus app3DirStatus = new FileStatus(0, true, 0, + 0, toDeleteTime, app3Dir); ApplicationId appId4 = ApplicationId.newInstance(System.currentTimeMillis(), 4); - Path app4Dir = new Path(userLogDir, appId4.toString()); - FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); + Path app4Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId4, "me", suffix, maxBucket); + FileStatus app4DirStatus = new FileStatus(0, true, 0, + 0, toDeleteTime, app4Dir); ApplicationId appId5 = ApplicationId.newInstance(System.currentTimeMillis(), 5); - Path app5Dir = new Path(userLogDir, appId5.toString()); + Path app5Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId5, "me", suffix, maxBucket); FileStatus app5DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir); when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus, - app4DirStatus, app5DirStatus }); + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus, app2DirStatus, + app3DirStatus, app4DirStatus, app5DirStatus }); when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{}); Path app2Log1 = new Path(app2Dir, "host1"); - FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1); + FileStatus app2Log1Status = new FileStatus(10, + false, 1, 1, toDeleteTime, app2Log1); Path app2Log2 = new Path(app2Dir, "host2"); - FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2); + FileStatus app2Log2Status = new FileStatus(10, + false, 1, 1, toKeepTime, app2Log2); when(mockFs.listStatus(app2Dir)).thenReturn( new FileStatus[]{app2Log1Status, app2Log2Status}); Path app3Log1 = new Path(app3Dir, "host1"); - FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1); + FileStatus app3Log1Status = new FileStatus(10, + false, 1, 1, toDeleteTime, app3Log1); Path app3Log2 = new Path(app3Dir, "host2"); - FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2); + FileStatus app3Log2Status = new FileStatus(10, + false, 1, 1, toDeleteTime, app3Log2); - when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :(")); + when(mockFs.delete(app3Dir, true)).thenThrow(new + AccessControlException("Injected Error\nStack Trace :(")); when(mockFs.listStatus(app3Dir)).thenReturn( new FileStatus[]{app3Log1Status, app3Log2Status}); Path app4Log1 = new Path(app4Dir, "host1"); - FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1); + FileStatus app4Log1Status = new FileStatus(10, + false, 1, 1, toDeleteTime, app4Log1); Path app4Log2 = new Path(app4Dir, "host2"); - FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log2); + FileStatus app4Log2Status = new FileStatus(10, + false, 1, 1, toDeleteTime, app4Log2); when(mockFs.listStatus(app4Dir)).thenReturn( new FileStatus[]{app4Log1Status, app4Log2Status}); Path app5Log1 = new Path(app5Dir, "host1"); - FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1); + FileStatus app5Log1Status = new FileStatus(10, + false, 1, 1, toDeleteTime, app5Log1); Path app5Log2 = new Path(app5Dir, "host2"); - FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2); + FileStatus app5Log2Status = new FileStatus(10, + false, 1, 1, toKeepTime, app5Log2); when(mockFs.listStatus(app5Dir)).thenReturn( new FileStatus[]{app5Log1Status, app5Log2Status}); @@ -206,6 +243,7 @@ public void testRefreshLogRetentionSettings() throws Exception { "1"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); Path rootPath = new Path(root); FileSystem rootFs = rootPath.getFileSystem(conf); @@ -218,45 +256,69 @@ public void testRefreshLogRetentionSettings() throws Exception { userDir); when(mockFs.listStatus(remoteRootLogPath)).thenReturn( - new FileStatus[] { userDirStatus }); + new FileStatus[] {userDirStatus }); Path userLogDir = new Path(userDir, suffix); ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); //Set time last modified of app1Dir directory and its files to before2000Secs - Path app1Dir = new Path(userLogDir, appId1.toString()); + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix, maxBucket); + + Path clusterTimestampDir = + LogAggregationUtils.getRemoteLogClusterTimestampDir( + remoteRootLogPath, "me", suffix, appId1); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1, maxBucket); + + FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0, + 0, before50Secs, clusterTimestampDir); + + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, before50Secs, bucketDir); + + //Path app1Dir = new Path(userLogDir, appId1.toString()); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, app1Dir); ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2); //Set time last modified of app1Dir directory and its files to before50Secs - Path app2Dir = new Path(userLogDir, appId2.toString()); + + Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId2, "me", suffix, maxBucket); + + //Path app2Dir = new Path(userLogDir, appId2.toString()); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, app2Dir); when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[] { app1DirStatus, app2DirStatus }); + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus, app2DirStatus }); Path app1Log1 = new Path(app1Dir, "host1"); FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs, app1Log1); when(mockFs.listStatus(app1Dir)).thenReturn( - new FileStatus[] { app1Log1Status }); + new FileStatus[] {app1Log1Status }); Path app2Log1 = new Path(app2Dir, "host1"); FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs, app2Log1); when(mockFs.listStatus(app2Dir)).thenReturn( - new FileStatus[] { app2Log1Status }); + new FileStatus[] {app2Log1Status }); final List finishedApplications = Collections.unmodifiableList(Arrays.asList(appId1, appId2)); - AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() { + AggregatedLogDeletionService deletionSvc = + new AggregatedLogDeletionService() { @Override protected Configuration createConf() { return conf; @@ -317,6 +379,7 @@ public void testCheckInterval() throws Exception { conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); // prevent us from picking up the same mockfs instance from another test FileSystem.closeAll(); @@ -335,11 +398,31 @@ public void testCheckInterval() throws Exception { ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, appId1.toString()); + + Path clusterTimestampDir = + LogAggregationUtils.getRemoteLogClusterTimestampDir( + remoteRootLogPath, "me", suffix, appId1); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", suffix, appId1, maxBucket); + + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", suffix, maxBucket); + + FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0, + 0, now, clusterTimestampDir); + + FileStatus bucketDirStatus = new FileStatus(0, true, 0, + 0, now, bucketDir); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir); when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[]{app1DirStatus}); + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus }); + Path app1Log1 = new Path(app1Dir, "host1"); FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1); @@ -373,10 +456,19 @@ protected void stopRMClient() { verify(mockFs, never()).delete(app1Dir, true); // modify the timestamp of the logs and verify it's picked up quickly + + clusterTimestampDirStatus = new FileStatus(0, + true, 0, 0, toDeleteTime, clusterTimestampDir); + bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir); app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1); + when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[]{app1DirStatus}); + new FileStatus[] {clusterTimestampDirStatus }); + when(mockFs.listStatus(clusterTimestampDir)).thenReturn( + new FileStatus[] {bucketDirStatus }); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus }); when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{app1Log1Status}); @@ -424,7 +516,7 @@ private static ApplicationClientProtocol createMockRMClient( createApplicationReportWithRunningApplication() { ApplicationReport report = mock(ApplicationReport.class); when(report.getYarnApplicationState()).thenReturn( - YarnApplicationState.RUNNING); + YarnApplicationState.RUNNING); GetApplicationReportResponse response = mock(GetApplicationReportResponse.class); when(response.getApplicationReport()).thenReturn(report); @@ -435,7 +527,7 @@ private static ApplicationClientProtocol createMockRMClient( createApplicationReportWithFinishedApplication() { ApplicationReport report = mock(ApplicationReport.class); when(report.getYarnApplicationState()).thenReturn( - YarnApplicationState.FINISHED); + YarnApplicationState.FINISHED); GetApplicationReportResponse response = mock(GetApplicationReportResponse.class); when(response.getApplicationReport()).thenReturn(report); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index bee34e0..19ae167 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -195,8 +195,9 @@ public void testAggregatedLogsBlockHar() throws Exception { URL harUrl = ClassLoader.getSystemClassLoader() .getResource("application_1440536969523_0001.har"); assertNotNull(harUrl); - String path = "target/logs/admin/logs/application_1440536969523_0001" + - "/application_1440536969523_0001.har"; + String path = + "target/logs/admin/logs/1440536969523/0/application_1440536969523_0001" + + "/application_1440536969523_0001.har"; FileUtils.copyDirectory(new File(harUrl.getPath()), new File(path)); ByteArrayOutputStream data = new ByteArrayOutputStream(); @@ -239,7 +240,8 @@ public void testNoLogs() throws Exception { FileUtil.fullyDelete(new File("target/logs")); Configuration configuration = getConfiguration(); - File f = new File("target/logs/logs/application_0_0001/container_0_0001_01_000001"); + File f = new + File("target/logs/logs/application_0_0001/container_0_0001_01_000001"); if (!f.exists()) { assertTrue(f.mkdirs()); } @@ -321,10 +323,10 @@ private void writeLog(Configuration configuration, String user) ContainerId containerId = ContainerIdPBImpl.newContainerId(appAttemptId, 1); String path = "target/logs/" + user - + "/logs/application_0_0001/localhost_1234"; + + "/logs/0/0/application_0_0001/localhost_1234"; File f = new File(path); if (!f.getParentFile().exists()) { - assertTrue(f.getParentFile().mkdirs()); + assertTrue(f.getParentFile().mkdirs()); } List rootLogDirs = Arrays.asList("target/logs/logs"); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index a12e2a1..db77143 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -87,8 +87,11 @@ public static void createContainerLogFileInRemoteFS(Configuration conf, createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName, content); // upload container logs to remote log dir - Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR), - user + "/logs/" + appId.toString()); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); + Path path = LogAggregationUtils.getRemoteAppLogDir( + new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)), + appId, user, "logs", maxBucket); + if (fs.exists(path) && deleteRemoteLogDir) { fs.delete(path, true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 70b29f4..8fab2e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -140,6 +140,16 @@ test + org.apache.hadoop + hadoop-hdfs + test + + + org.apache.hadoop + hadoop-hdfs-client + test + + com.google.inject guice diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 05cdd49..57e7027 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -70,6 +70,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -685,6 +686,99 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() } @Test + public void testBucketDirCreation() throws Exception{ + final String logSuffix = "logs"; + this.conf.set(YarnConfiguration.NM_LOG_DIRS, + localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix); + + int maxBucket = this.conf.getInt( + DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, + DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT); + this.conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2); + + InlineDispatcher dispatcher1 = new InlineDispatcher(); + dispatcher1.init(this.conf); + dispatcher1.start(); + + FileSystem fs = FileSystem.get(this.conf); + final FileSystem spyFs = spy(FileSystem.get(this.conf)); + + final LogAggregationTFileController spyFileFormat + = new LogAggregationTFileController() { + @Override + public FileSystem getFileSystem(Configuration conf) + throws IOException { + return spyFs; + } + }; + spyFileFormat.initialize(conf, "TFile"); + LogAggregationService aggSvc = new LogAggregationService(dispatcher1, + this.context, this.delSrvc, super.dirsHandler) { + @Override + public LogAggregationFileController getLogAggregationFileController( + Configuration conf) { + return spyFileFormat; + } + }; + aggSvc.init(this.conf); + aggSvc.start(); + + int maxBucket1 = LogAggregationUtils.getMaxBucket(this.conf); + LogAggregationContext contextWithAllContainers = + Records.newRecord(LogAggregationContext.class); + contextWithAllContainers.setLogAggregationPolicyClassName( + AllContainerLogAggregationPolicy.class.getName()); + + + // start an application and verify bucket 0 is created + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); + + // Verify bucket 1 is created + ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); + Path bucketDir2 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId2, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir2), isA(FsPermission.class)); + + // Verify bucket 1 is used + ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3); + Path bucketDir3 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId3, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir3), isA(FsPermission.class)); + + // Verify bucket 2 is created + ApplicationId appId4 = BuilderUtils.newApplicationId(1, 4); + Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId4, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir4), isA(FsPermission.class)); + + this.conf.setInt( + DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, maxBucket); + aggSvc.stop(); + aggSvc.close(); + dispatcher1.stop(); + + } + + + @Test public void testAppLogDirCreation() throws Exception { final String logSuffix = "logs"; this.conf.set(YarnConfiguration.NM_LOG_DIRS, @@ -725,7 +819,17 @@ public LogAggregationFileController getLogAggregationFileController( Path userDir = fs.makeQualified(new Path( remoteRootLogDir.getAbsolutePath(), this.user)); Path suffixDir = new Path(userDir, logSuffix); - Path appDir = new Path(suffixDir, appId.toString()); + int maxBucket = LogAggregationUtils.getMaxBucket(this.conf); + Path clusterTimestampDir = fs.makeQualified( + LogAggregationUtils.getRemoteLogClusterTimestampDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId)); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), + this.user, logSuffix, appId, maxBucket)); + Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), appId, + this.user, logSuffix, maxBucket)); LogAggregationContext contextWithAllContainers = Records.newRecord(LogAggregationContext.class); contextWithAllContainers.setLogAggregationPolicyClassName( @@ -734,11 +838,15 @@ public LogAggregationFileController getLogAggregationFileController( this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(clusterTimestampDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); // start another application and verify only app dir created ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); - Path appDir2 = new Path(suffixDir, appId2.toString()); + Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId2, this.user, logSuffix, maxBucket)); aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); @@ -746,7 +854,9 @@ public LogAggregationFileController getLogAggregationFileController( // start another application with the app dir already created and verify // we do not try to create it again ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3); - Path appDir3 = new Path(suffixDir, appId3.toString()); + Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), + appId3, this.user, logSuffix, maxBucket)); new File(appDir3.toUri().getPath()).mkdir(); aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, this.acls, contextWithAllContainers));