diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index e46eeda..6da01ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -172,6 +172,10 @@ log4j + org.apache.hadoop + hadoop-hdfs + + com.fasterxml.jackson.core jackson-core diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index a80f9d7..48ef144 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -50,7 +49,8 @@ */ @InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"}) public class AggregatedLogDeletionService extends AbstractService { - private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class); + private static final Log LOG = LogFactory.getLog( + AggregatedLogDeletionService.class); private Timer timer = null; private long checkIntervalMsecs; @@ -63,7 +63,8 @@ private Path remoteRootLogDir = null; private ApplicationClientProtocol rmClient = null; - public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) { + LogDeletionTask(Configuration conf, long retentionSecs, + ApplicationClientProtocol rmClient) { this.conf = conf; this.retentionMillis = retentionSecs * 1000; this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); @@ -80,14 +81,40 @@ public void run() { try { FileSystem fs = remoteRootLogDir.getFileSystem(conf); for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { + if(userDir.isDirectory()) { Path userDirPath = new Path(userDir.getPath(), suffix); - deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient); + for(FileStatus clusterTimestampDir : fs.listStatus(userDirPath)) { + + if(clusterTimestampDir.isDirectory()) { + Path clusterTimestampDirPath = clusterTimestampDir.getPath(); + for(FileStatus bucketDir : fs.listStatus( + clusterTimestampDir.getPath())) { + + if(bucketDir.isDirectory()) { + Path bucketDirPath = bucketDir.getPath(); + deleteOldLogDirsFrom(bucketDirPath, cutoffMillis, + fs, rmClient); + + if(fs.listStatus(bucketDirPath).length == 0 && + fs.getFileStatus(bucketDirPath).getModificationTime() + < cutoffMillis) { + fs.delete(bucketDirPath, false); + } + } + } + if(fs.listStatus(clusterTimestampDirPath).length == 0 && + fs.getFileStatus(clusterTimestampDirPath). + getModificationTime() < cutoffMillis) { + fs.delete(clusterTimestampDirPath, false); + } + } + } } } } catch (IOException e) { logIOException("Error reading root log dir this deletion " + - "attempt is being aborted", e); + "attempt is being aborted", e); } LOG.info("aggregated log deletion finished."); } @@ -121,7 +148,7 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, } } catch(IOException e) { logIOException( - "Error reading the contents of " + appDir.getPath(), e); + "Error reading the contents of " + appDir.getPath(), e); } } } @@ -131,8 +158,8 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, } } - private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis, - FileSystem fs) { + private static boolean shouldDeleteLogDir(FileStatus dir, + long cutoffMillis, FileSystem fs) { boolean shouldDelete = true; try { for(FileStatus node: fs.listStatus(dir.getPath())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index edf2cf3..7428466 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -46,12 +47,14 @@ * @param user the application owner * @param nodeId the node id * @param suffix the log dir suffix + * @param maxBucket the maximum directory items * @return the remote log file. */ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, - ApplicationId appId, String user, NodeId nodeId, String suffix) { - return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix), - getNodeString(nodeId)); + ApplicationId appId, String user, NodeId nodeId, String suffix, + int maxBucket) { + return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, + suffix, maxBucket), getNodeString(nodeId)); } /** @@ -60,12 +63,13 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, * @param appId the application id * @param user the application owner * @param suffix the log directory suffix + * @param maxBucket the maximum directory items * @return the remote application specific log dir. */ public static Path getRemoteAppLogDir(Path remoteRootLogDir, - ApplicationId appId, String user, String suffix) { - return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix), - appId.toString()); + ApplicationId appId, String user, String suffix, int maxBucket) { + return new Path(getRemoteBucketDir(remoteRootLogDir, user, suffix, + appId, maxBucket), appId.toString()); } /** @@ -94,6 +98,48 @@ public static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) { return new Path(remoteRootLogDir, user); } + /** + * Gets the remote log user's cluster timestamp dir. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @param appId the application id + * @return the remote log per user per cluster timestamp dir. + */ + public static Path getRemoteLogClusterTimestampDir(Path remoteRootLogDir, + String user, String suffix, ApplicationId appId) { + return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, + suffix), appId.getClusterTimestamp()+""); + } + + /** + * Gets the remote log user's cluster timestamp's bucket dir. + * @param remoteRootLogDir the aggregated log remote root log dir + * @param user the application owner + * @param suffix the log dir suffix + * @param appId the application id + * @param maxBucket the maximum directory items + * @return the remote log per user per cluster timestamp per bucket dir. + */ + public static Path getRemoteBucketDir(Path remoteRootLogDir, String user, + String suffix, ApplicationId appId, int maxBucket) { + int id = appId.getId(); + int bucket = id / maxBucket; + return new Path(getRemoteLogClusterTimestampDir(remoteRootLogDir, + user, suffix, appId), bucket+""); + } + + + /** + * Returns the bucket component of the log dir. + * @param conf the configuration + * @return the bucket which will be appended to the user log dir. + */ + public static int getMaxBucket(Configuration conf) { + return conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, + DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT); + } + /** * Returns the suffix component of the log dir. * @param conf the configuration @@ -154,13 +200,14 @@ public static String getNodeString(String nodeId) { org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) throws IOException { org.apache.hadoop.fs.Path remoteAppDir = null; + int maxBucket = LogAggregationUtils.getMaxBucket(conf); if (appOwner == null) { org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir = FileContext.getFileContext(conf).makeQualified(remoteRootLogDir); FileContext fc = FileContext.getFileContext( qualifiedRemoteRootLogDir.toUri(), conf); org.apache.hadoop.fs.Path toMatch = LogAggregationUtils - .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); + .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket); FileStatus[] matching = fc.util().globStatus(toMatch); if (matching == null || matching.length != 1) { throw new IOException("Can not find remote application directory for " @@ -169,7 +216,7 @@ public static String getNodeString(String nodeId) { remoteAppDir = matching[0].getPath(); } else { remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, appId, appOwner, suffix); + remoteRootLogDir, appId, appOwner, suffix, maxBucket); } return remoteAppDir; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 887d92d..5d74f28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -86,8 +86,9 @@ public static String getOwnerForAppIdOrNull( YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + int maxBucket = LogAggregationUtils.getMaxBucket(conf); Path fullPath = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, - appId, bestGuess, suffix); + appId, bestGuess, suffix, maxBucket); FileContext fc = FileContext.getFileContext(remoteRootLogDir.toUri(), conf); String pathAccess = fullPath.toString(); @@ -96,7 +97,7 @@ public static String getOwnerForAppIdOrNull( return bestGuess; } Path toMatch = LogAggregationUtils. - getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); + getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket); pathAccess = toMatch.toString(); FileStatus[] matching = fc.util().globStatus(toMatch); if (matching == null || matching.length != 1) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 5df900b..c0c657d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -110,6 +110,7 @@ protected Configuration conf; protected Path remoteRootLogDir; protected String remoteRootLogDirSuffix; + private int maxBucket; protected int retentionSize; protected String fileControllerName; @@ -132,6 +133,7 @@ public void initialize(Configuration conf, String controllerName) { this.retentionSize = configuredRentionSize; } this.fileControllerName = controllerName; + this.maxBucket = LogAggregationUtils.getMaxBucket(conf); initInternal(conf); } @@ -329,31 +331,54 @@ public Object run() throws Exception { // Only creating directories if they are missing to avoid // unnecessary load on the filesystem from all of the nodes Path appDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, appId, user, remoteRootLogDirSuffix); + remoteRootLogDir, appId, user, remoteRootLogDirSuffix, + maxBucket); appDir = appDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - remoteRootLogDir, user, remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogDir, user, remoteRootLogDirSuffix, appId, + maxBucket); + bucketDir = bucketDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if(!checkExists(remoteFS, bucketDir, APP_DIR_PERMISSIONS)) { + Path clusterTimestampDir = + LogAggregationUtils.getRemoteLogClusterTimestampDir( + remoteRootLogDir, user, remoteRootLogDirSuffix, + appId); + clusterTimestampDir = clusterTimestampDir.makeQualified( + remoteFS.getUri(), remoteFS.getWorkingDirectory()); + + if(!checkExists(remoteFS, clusterTimestampDir, + APP_DIR_PERMISSIONS)) { + Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { + Path userDir = LogAggregationUtils.getRemoteLogUserDir( + remoteRootLogDir, user); + userDir = userDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { + createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, clusterTimestampDir, APP_DIR_PERMISSIONS); } - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); - } + createDir(remoteFS, bucketDir, APP_DIR_PERMISSIONS); + } - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); + createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); } } catch (IOException e) { @@ -409,7 +434,7 @@ public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user, NodeId nodeId) { return LogAggregationUtils.getRemoteNodeLogFileForApp( getRemoteRootLogDir(), appId, user, nodeId, - getRemoteRootLogDirSuffix()); + getRemoteRootLogDirSuffix(), this.maxBucket); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 05cdd49..5896238 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -70,6 +70,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -684,6 +685,93 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() logAggregationService.stop(); } + @Test + public void testBucketDirCreation() throws Exception{ + final String logSuffix = "logs"; + this.conf.set(YarnConfiguration.NM_LOG_DIRS, + localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix); + + int maxBucket = this.conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, + DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT); + this.conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,2); + + InlineDispatcher dispatcher = new InlineDispatcher(); + dispatcher.init(this.conf); + dispatcher.start(); + + FileSystem fs = FileSystem.get(this.conf); + final FileSystem spyFs = spy(FileSystem.get(this.conf)); + + final LogAggregationTFileController spyFileFormat + = new LogAggregationTFileController() { + @Override + public FileSystem getFileSystem(Configuration conf) + throws IOException { + return spyFs; + } + }; + spyFileFormat.initialize(conf, "TFile"); + LogAggregationService aggSvc = new LogAggregationService(dispatcher, + this.context, this.delSrvc, super.dirsHandler) { + @Override + public LogAggregationFileController getLogAggregationFileController( + Configuration conf) { + return spyFileFormat; + } + }; + aggSvc.init(this.conf); + aggSvc.start(); + + int maxBucket1 = LogAggregationUtils.getMaxBucket(this.conf); + LogAggregationContext contextWithAllContainers = + Records.newRecord(LogAggregationContext.class); + contextWithAllContainers.setLogAggregationPolicyClassName( + AllContainerLogAggregationPolicy.class.getName()); + + + // start an application and verify bucket 0 is created + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); + + // Verify bucket 1 is created + ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); + Path bucketDir2 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId2, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir2), isA(FsPermission.class)); + + // Verify bucket 1 is used + ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3); + Path bucketDir3 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId3, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir3), isA(FsPermission.class)); + + // Verify bucket 2 is created + ApplicationId appId4 = BuilderUtils.newApplicationId(1, 4); + Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId4, maxBucket1)); + aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null, + this.acls, contextWithAllContainers)); + verify(spyFs).mkdirs(eq(bucketDir4), isA(FsPermission.class)); + + this.conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,maxBucket); + aggSvc.stop(); + aggSvc.close(); + dispatcher.stop(); + + } + + @Test public void testAppLogDirCreation() throws Exception { final String logSuffix = "logs"; @@ -725,7 +813,13 @@ public LogAggregationFileController getLogAggregationFileController( Path userDir = fs.makeQualified(new Path( remoteRootLogDir.getAbsolutePath(), this.user)); Path suffixDir = new Path(userDir, logSuffix); - Path appDir = new Path(suffixDir, appId.toString()); + int maxBucket = LogAggregationUtils.getMaxBucket(this.conf); + Path clusterTimestampDir = fs.makeQualified(LogAggregationUtils.getRemoteLogClusterTimestampDir( + new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId)); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId, maxBucket)); + Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), appId, this.user, logSuffix, maxBucket)); LogAggregationContext contextWithAllContainers = Records.newRecord(LogAggregationContext.class); contextWithAllContainers.setLogAggregationPolicyClassName( @@ -734,11 +828,14 @@ public LogAggregationFileController getLogAggregationFileController( this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(clusterTimestampDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); // start another application and verify only app dir created ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); - Path appDir2 = new Path(suffixDir, appId2.toString()); + Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), appId2, this.user, logSuffix, maxBucket)); aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, this.acls, contextWithAllContainers)); verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); @@ -746,7 +843,8 @@ public LogAggregationFileController getLogAggregationFileController( // start another application with the app dir already created and verify // we do not try to create it again ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3); - Path appDir3 = new Path(suffixDir, appId3.toString()); + Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(remoteRootLogDir.getAbsolutePath()), appId3, this.user, logSuffix, maxBucket)); new File(appDir3.toUri().getPath()).mkdir(); aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, this.acls, contextWithAllContainers));