diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index e46eeda..6da01ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -172,6 +172,10 @@
log4j
+ org.apache.hadoop
+ hadoop-hdfs
+
+
com.fasterxml.jackson.core
jackson-core
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index a80f9d7..48ef144 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -41,7 +41,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -50,7 +49,8 @@
*/
@InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"})
public class AggregatedLogDeletionService extends AbstractService {
- private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
+ private static final Log LOG = LogFactory.getLog(
+ AggregatedLogDeletionService.class);
private Timer timer = null;
private long checkIntervalMsecs;
@@ -63,7 +63,8 @@
private Path remoteRootLogDir = null;
private ApplicationClientProtocol rmClient = null;
- public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) {
+ LogDeletionTask(Configuration conf, long retentionSecs,
+ ApplicationClientProtocol rmClient) {
this.conf = conf;
this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
@@ -80,14 +81,40 @@ public void run() {
try {
FileSystem fs = remoteRootLogDir.getFileSystem(conf);
for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
+
if(userDir.isDirectory()) {
Path userDirPath = new Path(userDir.getPath(), suffix);
- deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
+ for(FileStatus clusterTimestampDir : fs.listStatus(userDirPath)) {
+
+ if(clusterTimestampDir.isDirectory()) {
+ Path clusterTimestampDirPath = clusterTimestampDir.getPath();
+ for(FileStatus bucketDir : fs.listStatus(
+ clusterTimestampDir.getPath())) {
+
+ if(bucketDir.isDirectory()) {
+ Path bucketDirPath = bucketDir.getPath();
+ deleteOldLogDirsFrom(bucketDirPath, cutoffMillis,
+ fs, rmClient);
+
+ if(fs.listStatus(bucketDirPath).length == 0 &&
+ fs.getFileStatus(bucketDirPath).getModificationTime()
+ < cutoffMillis) {
+ fs.delete(bucketDirPath, false);
+ }
+ }
+ }
+ if(fs.listStatus(clusterTimestampDirPath).length == 0 &&
+ fs.getFileStatus(clusterTimestampDirPath).
+ getModificationTime() < cutoffMillis) {
+ fs.delete(clusterTimestampDirPath, false);
+ }
+ }
+ }
}
}
} catch (IOException e) {
logIOException("Error reading root log dir this deletion " +
- "attempt is being aborted", e);
+ "attempt is being aborted", e);
}
LOG.info("aggregated log deletion finished.");
}
@@ -121,7 +148,7 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
}
} catch(IOException e) {
logIOException(
- "Error reading the contents of " + appDir.getPath(), e);
+ "Error reading the contents of " + appDir.getPath(), e);
}
}
}
@@ -131,8 +158,8 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
}
}
- private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis,
- FileSystem fs) {
+ private static boolean shouldDeleteLogDir(FileStatus dir,
+ long cutoffMillis, FileSystem fs) {
boolean shouldDelete = true;
try {
for(FileStatus node: fs.listStatus(dir.getPath())) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index edf2cf3..7428466 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -46,12 +47,14 @@
* @param user the application owner
* @param nodeId the node id
* @param suffix the log dir suffix
+ * @param maxBucket the maximum directory items
* @return the remote log file.
*/
public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
- ApplicationId appId, String user, NodeId nodeId, String suffix) {
- return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix),
- getNodeString(nodeId));
+ ApplicationId appId, String user, NodeId nodeId, String suffix,
+ int maxBucket) {
+ return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user,
+ suffix, maxBucket), getNodeString(nodeId));
}
/**
@@ -60,12 +63,13 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
* @param appId the application id
* @param user the application owner
* @param suffix the log directory suffix
+ * @param maxBucket the maximum directory items
* @return the remote application specific log dir.
*/
public static Path getRemoteAppLogDir(Path remoteRootLogDir,
- ApplicationId appId, String user, String suffix) {
- return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix),
- appId.toString());
+ ApplicationId appId, String user, String suffix, int maxBucket) {
+ return new Path(getRemoteBucketDir(remoteRootLogDir, user, suffix,
+ appId, maxBucket), appId.toString());
}
/**
@@ -94,6 +98,48 @@ public static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) {
return new Path(remoteRootLogDir, user);
}
+ /**
+ * Gets the remote log user's cluster timestamp dir.
+ * @param remoteRootLogDir the aggregated log remote root log dir
+ * @param user the application owner
+ * @param suffix the log dir suffix
+ * @param appId the application id
+ * @return the remote log per user per cluster timestamp dir.
+ */
+ public static Path getRemoteLogClusterTimestampDir(Path remoteRootLogDir,
+ String user, String suffix, ApplicationId appId) {
+ return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user,
+ suffix), appId.getClusterTimestamp()+"");
+ }
+
+ /**
+ * Gets the remote log user's cluster timestamp's bucket dir.
+ * @param remoteRootLogDir the aggregated log remote root log dir
+ * @param user the application owner
+ * @param suffix the log dir suffix
+ * @param appId the application id
+ * @param maxBucket the maximum directory items
+ * @return the remote log per user per cluster timestamp per bucket dir.
+ */
+ public static Path getRemoteBucketDir(Path remoteRootLogDir, String user,
+ String suffix, ApplicationId appId, int maxBucket) {
+ int id = appId.getId();
+ int bucket = id / maxBucket;
+ return new Path(getRemoteLogClusterTimestampDir(remoteRootLogDir,
+ user, suffix, appId), bucket+"");
+ }
+
+
+ /**
+ * Returns the bucket component of the log dir.
+ * @param conf the configuration
+ * @return the bucket which will be appended to the user log dir.
+ */
+ public static int getMaxBucket(Configuration conf) {
+ return conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
+ }
+
/**
* Returns the suffix component of the log dir.
* @param conf the configuration
@@ -154,13 +200,14 @@ public static String getNodeString(String nodeId) {
org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
throws IOException {
org.apache.hadoop.fs.Path remoteAppDir = null;
+ int maxBucket = LogAggregationUtils.getMaxBucket(conf);
if (appOwner == null) {
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
FileContext fc = FileContext.getFileContext(
qualifiedRemoteRootLogDir.toUri(), conf);
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
- .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
+ .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
throw new IOException("Can not find remote application directory for "
@@ -169,7 +216,7 @@ public static String getNodeString(String nodeId) {
remoteAppDir = matching[0].getPath();
} else {
remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogDir, appId, appOwner, suffix);
+ remoteRootLogDir, appId, appOwner, suffix, maxBucket);
}
return remoteAppDir;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index 887d92d..5d74f28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -86,8 +86,9 @@ public static String getOwnerForAppIdOrNull(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
+ int maxBucket = LogAggregationUtils.getMaxBucket(conf);
Path fullPath = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
- appId, bestGuess, suffix);
+ appId, bestGuess, suffix, maxBucket);
FileContext fc =
FileContext.getFileContext(remoteRootLogDir.toUri(), conf);
String pathAccess = fullPath.toString();
@@ -96,7 +97,7 @@ public static String getOwnerForAppIdOrNull(
return bestGuess;
}
Path toMatch = LogAggregationUtils.
- getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
+ getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket);
pathAccess = toMatch.toString();
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 5df900b..c0c657d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -110,6 +110,7 @@
protected Configuration conf;
protected Path remoteRootLogDir;
protected String remoteRootLogDirSuffix;
+ private int maxBucket;
protected int retentionSize;
protected String fileControllerName;
@@ -132,6 +133,7 @@ public void initialize(Configuration conf, String controllerName) {
this.retentionSize = configuredRentionSize;
}
this.fileControllerName = controllerName;
+ this.maxBucket = LogAggregationUtils.getMaxBucket(conf);
initInternal(conf);
}
@@ -329,31 +331,54 @@ public Object run() throws Exception {
// Only creating directories if they are missing to avoid
// unnecessary load on the filesystem from all of the nodes
Path appDir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
+ remoteRootLogDir, appId, user, remoteRootLogDirSuffix,
+ maxBucket);
appDir = appDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
- Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
- remoteRootLogDir, user, remoteRootLogDirSuffix);
- suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
-
- if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
- Path userDir = LogAggregationUtils.getRemoteLogUserDir(
- remoteRootLogDir, user);
- userDir = userDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
-
- if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
- createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+ Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix, appId,
+ maxBucket);
+ bucketDir = bucketDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if(!checkExists(remoteFS, bucketDir, APP_DIR_PERMISSIONS)) {
+ Path clusterTimestampDir =
+ LogAggregationUtils.getRemoteLogClusterTimestampDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix,
+ appId);
+ clusterTimestampDir = clusterTimestampDir.makeQualified(
+ remoteFS.getUri(), remoteFS.getWorkingDirectory());
+
+ if(!checkExists(remoteFS, clusterTimestampDir,
+ APP_DIR_PERMISSIONS)) {
+ Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix);
+ suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
+ Path userDir = LogAggregationUtils.getRemoteLogUserDir(
+ remoteRootLogDir, user);
+ userDir = userDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
+ createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, clusterTimestampDir, APP_DIR_PERMISSIONS);
}
- createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
- }
+ createDir(remoteFS, bucketDir, APP_DIR_PERMISSIONS);
+ }
- createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+ createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
}
} catch (IOException e) {
@@ -409,7 +434,7 @@ public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user,
NodeId nodeId) {
return LogAggregationUtils.getRemoteNodeLogFileForApp(
getRemoteRootLogDir(), appId, user, nodeId,
- getRemoteRootLogDirSuffix());
+ getRemoteRootLogDirSuffix(), this.maxBucket);
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 05cdd49..5896238 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -70,6 +70,7 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -684,6 +685,93 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner()
logAggregationService.stop();
}
+ @Test
+ public void testBucketDirCreation() throws Exception{
+ final String logSuffix = "logs";
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS,
+ localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix);
+
+ int maxBucket = this.conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
+ this.conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,2);
+
+ InlineDispatcher dispatcher = new InlineDispatcher();
+ dispatcher.init(this.conf);
+ dispatcher.start();
+
+ FileSystem fs = FileSystem.get(this.conf);
+ final FileSystem spyFs = spy(FileSystem.get(this.conf));
+
+ final LogAggregationTFileController spyFileFormat
+ = new LogAggregationTFileController() {
+ @Override
+ public FileSystem getFileSystem(Configuration conf)
+ throws IOException {
+ return spyFs;
+ }
+ };
+ spyFileFormat.initialize(conf, "TFile");
+ LogAggregationService aggSvc = new LogAggregationService(dispatcher,
+ this.context, this.delSrvc, super.dirsHandler) {
+ @Override
+ public LogAggregationFileController getLogAggregationFileController(
+ Configuration conf) {
+ return spyFileFormat;
+ }
+ };
+ aggSvc.init(this.conf);
+ aggSvc.start();
+
+ int maxBucket1 = LogAggregationUtils.getMaxBucket(this.conf);
+ LogAggregationContext contextWithAllContainers =
+ Records.newRecord(LogAggregationContext.class);
+ contextWithAllContainers.setLogAggregationPolicyClassName(
+ AllContainerLogAggregationPolicy.class.getName());
+
+
+ // start an application and verify bucket 0 is created
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId, maxBucket1));
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null,
+ this.acls, contextWithAllContainers));
+ verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class));
+
+ // Verify bucket 1 is created
+ ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
+ Path bucketDir2 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId2, maxBucket1));
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
+ this.acls, contextWithAllContainers));
+ verify(spyFs).mkdirs(eq(bucketDir2), isA(FsPermission.class));
+
+ // Verify bucket 1 is used
+ ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
+ Path bucketDir3 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId3, maxBucket1));
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
+ this.acls, contextWithAllContainers));
+ verify(spyFs).mkdirs(eq(bucketDir3), isA(FsPermission.class));
+
+ // Verify bucket 2 is created
+ ApplicationId appId4 = BuilderUtils.newApplicationId(1, 4);
+ Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId4, maxBucket1));
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null,
+ this.acls, contextWithAllContainers));
+ verify(spyFs).mkdirs(eq(bucketDir4), isA(FsPermission.class));
+
+ this.conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,maxBucket);
+ aggSvc.stop();
+ aggSvc.close();
+ dispatcher.stop();
+
+ }
+
+
@Test
public void testAppLogDirCreation() throws Exception {
final String logSuffix = "logs";
@@ -725,7 +813,13 @@ public LogAggregationFileController getLogAggregationFileController(
Path userDir = fs.makeQualified(new Path(
remoteRootLogDir.getAbsolutePath(), this.user));
Path suffixDir = new Path(userDir, logSuffix);
- Path appDir = new Path(suffixDir, appId.toString());
+ int maxBucket = LogAggregationUtils.getMaxBucket(this.conf);
+ Path clusterTimestampDir = fs.makeQualified(LogAggregationUtils.getRemoteLogClusterTimestampDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId));
+ Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), this.user, logSuffix, appId, maxBucket));
+ Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), appId, this.user, logSuffix, maxBucket));
LogAggregationContext contextWithAllContainers =
Records.newRecord(LogAggregationContext.class);
contextWithAllContainers.setLogAggregationPolicyClassName(
@@ -734,11 +828,14 @@ public LogAggregationFileController getLogAggregationFileController(
this.acls, contextWithAllContainers));
verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
+ verify(spyFs).mkdirs(eq(clusterTimestampDir), isA(FsPermission.class));
+ verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
// start another application and verify only app dir created
ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
- Path appDir2 = new Path(suffixDir, appId2.toString());
+ Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), appId2, this.user, logSuffix, maxBucket));
aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
this.acls, contextWithAllContainers));
verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
@@ -746,7 +843,8 @@ public LogAggregationFileController getLogAggregationFileController(
// start another application with the app dir already created and verify
// we do not try to create it again
ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
- Path appDir3 = new Path(suffixDir, appId3.toString());
+ Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), appId3, this.user, logSuffix, maxBucket));
new File(appDir3.toUri().getPath()).mkdir();
aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
this.acls, contextWithAllContainers));