diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index e46eeda..f1ddcb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -45,6 +45,17 @@
org.apache.hadoop
+ hadoop-hdfs
+ provided
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+ provided
+
+
+
+ org.apache.hadoop
hadoop-yarn-api
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index a80f9d7..3b3e60a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -41,7 +41,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -50,7 +49,8 @@
*/
@InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"})
public class AggregatedLogDeletionService extends AbstractService {
- private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
+ private static final Log LOG = LogFactory.getLog(
+ AggregatedLogDeletionService.class);
private Timer timer = null;
private long checkIntervalMsecs;
@@ -63,7 +63,8 @@
private Path remoteRootLogDir = null;
private ApplicationClientProtocol rmClient = null;
- public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) {
+ LogDeletionTask(Configuration conf, long retentionSecs,
+ ApplicationClientProtocol rmClient) {
this.conf = conf;
this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
@@ -76,18 +77,45 @@ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClient
@Override
public void run() {
long cutoffMillis = System.currentTimeMillis() - retentionMillis;
- LOG.info("aggregated log deletion started.");
+ LOG.info("aggregated log deletion started here.");
try {
FileSystem fs = remoteRootLogDir.getFileSystem(conf);
for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
+ LOG.info("UserDir="+userDir.getPath());
if(userDir.isDirectory()) {
Path userDirPath = new Path(userDir.getPath(), suffix);
- deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
+ LOG.info("USERDIR="+userDirPath);
+ for(FileStatus clusterTimestampDir : fs.listStatus(userDirPath)) {
+ LOG.info("CLUSTERTIMETSMA="+clusterTimestampDir.getPath());
+ if(clusterTimestampDir.isDirectory()) {
+ Path clusterTimestampDirPath = clusterTimestampDir.getPath();
+ for(FileStatus bucketDir : fs.listStatus(
+ clusterTimestampDir.getPath())) {
+ LOG.info("BUCKET="+bucketDir.getPath());
+ if(bucketDir.isDirectory()) {
+ Path bucketDirPath = bucketDir.getPath();
+ deleteOldLogDirsFrom(bucketDirPath, cutoffMillis,
+ fs, rmClient);
+
+ if(fs.listStatus(bucketDirPath).length == 0 &&
+ fs.getFileStatus(bucketDirPath).getModificationTime()
+ < cutoffMillis) {
+ fs.delete(bucketDirPath, false);
+ }
+ }
+ }
+ if(fs.listStatus(clusterTimestampDirPath).length == 0 &&
+ fs.getFileStatus(clusterTimestampDirPath).
+ getModificationTime() < cutoffMillis) {
+ fs.delete(clusterTimestampDirPath, false);
+ }
+ }
+ }
}
}
} catch (IOException e) {
logIOException("Error reading root log dir this deletion " +
- "attempt is being aborted", e);
+ "attempt is being aborted", e);
}
LOG.info("aggregated log deletion finished.");
}
@@ -121,7 +149,7 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
}
} catch(IOException e) {
logIOException(
- "Error reading the contents of " + appDir.getPath(), e);
+ "Error reading the contents of " + appDir.getPath(), e);
}
}
}
@@ -131,8 +159,8 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
}
}
- private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis,
- FileSystem fs) {
+ private static boolean shouldDeleteLogDir(FileStatus dir,
+ long cutoffMillis, FileSystem fs) {
boolean shouldDelete = true;
try {
for(FileStatus node: fs.listStatus(dir.getPath())) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index edf2cf3..7428466 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -46,12 +47,14 @@
* @param user the application owner
* @param nodeId the node id
* @param suffix the log dir suffix
+ * @param maxBucket the maximum directory items
* @return the remote log file.
*/
public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
- ApplicationId appId, String user, NodeId nodeId, String suffix) {
- return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix),
- getNodeString(nodeId));
+ ApplicationId appId, String user, NodeId nodeId, String suffix,
+ int maxBucket) {
+ return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user,
+ suffix, maxBucket), getNodeString(nodeId));
}
/**
@@ -60,12 +63,13 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
* @param appId the application id
* @param user the application owner
* @param suffix the log directory suffix
+ * @param maxBucket the maximum directory items
* @return the remote application specific log dir.
*/
public static Path getRemoteAppLogDir(Path remoteRootLogDir,
- ApplicationId appId, String user, String suffix) {
- return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix),
- appId.toString());
+ ApplicationId appId, String user, String suffix, int maxBucket) {
+ return new Path(getRemoteBucketDir(remoteRootLogDir, user, suffix,
+ appId, maxBucket), appId.toString());
}
/**
@@ -94,6 +98,48 @@ public static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) {
return new Path(remoteRootLogDir, user);
}
+ /**
+ * Gets the remote log user's cluster timestamp dir.
+ * @param remoteRootLogDir the aggregated log remote root log dir
+ * @param user the application owner
+ * @param suffix the log dir suffix
+ * @param appId the application id
+ * @return the remote log per user per cluster timestamp dir.
+ */
+ public static Path getRemoteLogClusterTimestampDir(Path remoteRootLogDir,
+ String user, String suffix, ApplicationId appId) {
+ return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user,
+ suffix), appId.getClusterTimestamp()+"");
+ }
+
+ /**
+ * Gets the remote log user's cluster timestamp's bucket dir.
+ * @param remoteRootLogDir the aggregated log remote root log dir
+ * @param user the application owner
+ * @param suffix the log dir suffix
+ * @param appId the application id
+ * @param maxBucket the maximum directory items
+ * @return the remote log per user per cluster timestamp per bucket dir.
+ */
+ public static Path getRemoteBucketDir(Path remoteRootLogDir, String user,
+ String suffix, ApplicationId appId, int maxBucket) {
+ int id = appId.getId();
+ int bucket = id / maxBucket;
+ return new Path(getRemoteLogClusterTimestampDir(remoteRootLogDir,
+ user, suffix, appId), bucket+"");
+ }
+
+
+ /**
+ * Returns the bucket component of the log dir.
+ * @param conf the configuration
+ * @return the bucket which will be appended to the user log dir.
+ */
+ public static int getMaxBucket(Configuration conf) {
+ return conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
+ }
+
/**
* Returns the suffix component of the log dir.
* @param conf the configuration
@@ -154,13 +200,14 @@ public static String getNodeString(String nodeId) {
org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
throws IOException {
org.apache.hadoop.fs.Path remoteAppDir = null;
+ int maxBucket = LogAggregationUtils.getMaxBucket(conf);
if (appOwner == null) {
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
FileContext fc = FileContext.getFileContext(
qualifiedRemoteRootLogDir.toUri(), conf);
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
- .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
+ .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
throw new IOException("Can not find remote application directory for "
@@ -169,7 +216,7 @@ public static String getNodeString(String nodeId) {
remoteAppDir = matching[0].getPath();
} else {
remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogDir, appId, appOwner, suffix);
+ remoteRootLogDir, appId, appOwner, suffix, maxBucket);
}
return remoteAppDir;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index 887d92d..5d74f28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -86,8 +86,9 @@ public static String getOwnerForAppIdOrNull(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
+ int maxBucket = LogAggregationUtils.getMaxBucket(conf);
Path fullPath = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
- appId, bestGuess, suffix);
+ appId, bestGuess, suffix, maxBucket);
FileContext fc =
FileContext.getFileContext(remoteRootLogDir.toUri(), conf);
String pathAccess = fullPath.toString();
@@ -96,7 +97,7 @@ public static String getOwnerForAppIdOrNull(
return bestGuess;
}
Path toMatch = LogAggregationUtils.
- getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
+ getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix, maxBucket);
pathAccess = toMatch.toString();
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 5df900b..c0c657d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -110,6 +110,7 @@
protected Configuration conf;
protected Path remoteRootLogDir;
protected String remoteRootLogDirSuffix;
+ private int maxBucket;
protected int retentionSize;
protected String fileControllerName;
@@ -132,6 +133,7 @@ public void initialize(Configuration conf, String controllerName) {
this.retentionSize = configuredRentionSize;
}
this.fileControllerName = controllerName;
+ this.maxBucket = LogAggregationUtils.getMaxBucket(conf);
initInternal(conf);
}
@@ -329,31 +331,54 @@ public Object run() throws Exception {
// Only creating directories if they are missing to avoid
// unnecessary load on the filesystem from all of the nodes
Path appDir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
+ remoteRootLogDir, appId, user, remoteRootLogDirSuffix,
+ maxBucket);
appDir = appDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
- Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
- remoteRootLogDir, user, remoteRootLogDirSuffix);
- suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
-
- if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
- Path userDir = LogAggregationUtils.getRemoteLogUserDir(
- remoteRootLogDir, user);
- userDir = userDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
-
- if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
- createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+ Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix, appId,
+ maxBucket);
+ bucketDir = bucketDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if(!checkExists(remoteFS, bucketDir, APP_DIR_PERMISSIONS)) {
+ Path clusterTimestampDir =
+ LogAggregationUtils.getRemoteLogClusterTimestampDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix,
+ appId);
+ clusterTimestampDir = clusterTimestampDir.makeQualified(
+ remoteFS.getUri(), remoteFS.getWorkingDirectory());
+
+ if(!checkExists(remoteFS, clusterTimestampDir,
+ APP_DIR_PERMISSIONS)) {
+ Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix);
+ suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
+ Path userDir = LogAggregationUtils.getRemoteLogUserDir(
+ remoteRootLogDir, user);
+ userDir = userDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
+ createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, clusterTimestampDir, APP_DIR_PERMISSIONS);
}
- createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
- }
+ createDir(remoteFS, bucketDir, APP_DIR_PERMISSIONS);
+ }
- createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+ createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
}
} catch (IOException e) {
@@ -409,7 +434,7 @@ public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user,
NodeId nodeId) {
return LogAggregationUtils.getRemoteNodeLogFileForApp(
getRemoteRootLogDir(), appId, user, nodeId,
- getRemoteRootLogDirSuffix());
+ getRemoteRootLogDirSuffix(), this.maxBucket);
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index 026996e..e9978bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -66,7 +66,8 @@ public void testDeletion() throws Exception {
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-
+ int maxBucket = LogAggregationUtils.getMaxBucket(conf);
+
Path rootPath = new Path(root);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
@@ -74,7 +75,8 @@ public void testDeletion() throws Exception {
Path remoteRootLogPath = new Path(remoteRootLogDir);
Path userDir = new Path(remoteRootLogPath, "me");
- FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir);
+ FileStatus userDirStatus = new FileStatus(0,
+ true, 0, 0, toKeepTime, userDir);
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
@@ -82,71 +84,106 @@ public void testDeletion() throws Exception {
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
- Path app1Dir = new Path(userLogDir, appId1.toString());
- FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
-
+
+ Path clusterTimestampDir = LogAggregationUtils.
+ getRemoteLogClusterTimestampDir(remoteRootLogPath,
+ "me", suffix, appId1);
+ Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
+ remoteRootLogPath, "me", suffix, appId1, maxBucket);
+ Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogPath, appId1, "me", suffix, maxBucket);
+
+ FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0,
+ 0, toDeleteTime, clusterTimestampDir);
+
+ FileStatus bucketDirStatus = new FileStatus(0, true, 0,
+ 0, toDeleteTime, bucketDir);
+ FileStatus app1DirStatus = new FileStatus(0, true,
+ 0, 0, toDeleteTime, app1Dir);
+
ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
- Path app2Dir = new Path(userLogDir, appId2.toString());
- FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir);
+ Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogPath, appId2, "me", suffix, maxBucket);
+
+ FileStatus app2DirStatus = new FileStatus(0, true, 0,
+ 0, toDeleteTime, app2Dir);
ApplicationId appId3 =
ApplicationId.newInstance(System.currentTimeMillis(), 3);
- Path app3Dir = new Path(userLogDir, appId3.toString());
- FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir);
+ Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogPath, appId3, "me", suffix, maxBucket);
+ FileStatus app3DirStatus = new FileStatus(0, true, 0,
+ 0, toDeleteTime, app3Dir);
ApplicationId appId4 =
ApplicationId.newInstance(System.currentTimeMillis(), 4);
- Path app4Dir = new Path(userLogDir, appId4.toString());
- FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
+ Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogPath, appId4, "me", suffix, maxBucket);
+ FileStatus app4DirStatus = new FileStatus(0, true, 0,
+ 0, toDeleteTime, app4Dir);
ApplicationId appId5 =
ApplicationId.newInstance(System.currentTimeMillis(), 5);
- Path app5Dir = new Path(userLogDir, appId5.toString());
+ Path app5Dir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogPath, appId5, "me", suffix, maxBucket);
FileStatus app5DirStatus =
new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
- new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus,
- app4DirStatus, app5DirStatus });
+ new FileStatus[] {clusterTimestampDirStatus });
+ when(mockFs.listStatus(clusterTimestampDir)).thenReturn(
+ new FileStatus[] {bucketDirStatus });
+ when(mockFs.listStatus(bucketDir)).thenReturn(
+ new FileStatus[] {app1DirStatus, app2DirStatus,
+ app3DirStatus, app4DirStatus, app5DirStatus });
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{});
Path app2Log1 = new Path(app2Dir, "host1");
- FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
+ FileStatus app2Log1Status = new FileStatus(10,
+ false, 1, 1, toDeleteTime, app2Log1);
Path app2Log2 = new Path(app2Dir, "host2");
- FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2);
+ FileStatus app2Log2Status = new FileStatus(10,
+ false, 1, 1, toKeepTime, app2Log2);
when(mockFs.listStatus(app2Dir)).thenReturn(
new FileStatus[]{app2Log1Status, app2Log2Status});
Path app3Log1 = new Path(app3Dir, "host1");
- FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1);
+ FileStatus app3Log1Status = new FileStatus(10,
+ false, 1, 1, toDeleteTime, app3Log1);
Path app3Log2 = new Path(app3Dir, "host2");
- FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2);
+ FileStatus app3Log2Status = new FileStatus(10,
+ false, 1, 1, toDeleteTime, app3Log2);
- when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :("));
+ when(mockFs.delete(app3Dir, true)).thenThrow(new
+ AccessControlException("Injected Error\nStack Trace :("));
when(mockFs.listStatus(app3Dir)).thenReturn(
new FileStatus[]{app3Log1Status, app3Log2Status});
Path app4Log1 = new Path(app4Dir, "host1");
- FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
+ FileStatus app4Log1Status = new FileStatus(10,
+ false, 1, 1, toDeleteTime, app4Log1);
Path app4Log2 = new Path(app4Dir, "host2");
- FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log2);
+ FileStatus app4Log2Status = new FileStatus(10,
+ false, 1, 1, toDeleteTime, app4Log2);
when(mockFs.listStatus(app4Dir)).thenReturn(
new FileStatus[]{app4Log1Status, app4Log2Status});
Path app5Log1 = new Path(app5Dir, "host1");
- FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1);
+ FileStatus app5Log1Status = new FileStatus(10,
+ false, 1, 1, toDeleteTime, app5Log1);
Path app5Log2 = new Path(app5Dir, "host2");
- FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2);
+ FileStatus app5Log2Status = new FileStatus(10,
+ false, 1, 1, toKeepTime, app5Log2);
when(mockFs.listStatus(app5Dir)).thenReturn(
new FileStatus[]{app5Log1Status, app5Log2Status});
@@ -206,6 +243,7 @@ public void testRefreshLogRetentionSettings() throws Exception {
"1");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
+ int maxBucket = LogAggregationUtils.getMaxBucket(conf);
Path rootPath = new Path(root);
FileSystem rootFs = rootPath.getFileSystem(conf);
@@ -218,45 +256,69 @@ public void testRefreshLogRetentionSettings() throws Exception {
userDir);
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
- new FileStatus[] { userDirStatus });
+ new FileStatus[] {userDirStatus });
Path userLogDir = new Path(userDir, suffix);
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
//Set time last modified of app1Dir directory and its files to before2000Secs
- Path app1Dir = new Path(userLogDir, appId1.toString());
+ Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogPath, appId1, "me", suffix, maxBucket);
+
+ Path clusterTimestampDir =
+ LogAggregationUtils.getRemoteLogClusterTimestampDir(
+ remoteRootLogPath, "me", suffix, appId1);
+ Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
+ remoteRootLogPath, "me", suffix, appId1, maxBucket);
+
+ FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0,
+ 0, before50Secs, clusterTimestampDir);
+
+ FileStatus bucketDirStatus = new FileStatus(0, true, 0,
+ 0, before50Secs, bucketDir);
+
+ //Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
app1Dir);
ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
//Set time last modified of app1Dir directory and its files to before50Secs
- Path app2Dir = new Path(userLogDir, appId2.toString());
+
+ Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogPath, appId2, "me", suffix, maxBucket);
+
+ //Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
app2Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
- new FileStatus[] { app1DirStatus, app2DirStatus });
+ new FileStatus[] {clusterTimestampDirStatus });
+ when(mockFs.listStatus(clusterTimestampDir)).thenReturn(
+ new FileStatus[] {bucketDirStatus });
+ when(mockFs.listStatus(bucketDir)).thenReturn(
+ new FileStatus[] {app1DirStatus, app2DirStatus });
Path app1Log1 = new Path(app1Dir, "host1");
FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
app1Log1);
when(mockFs.listStatus(app1Dir)).thenReturn(
- new FileStatus[] { app1Log1Status });
+ new FileStatus[] {app1Log1Status });
Path app2Log1 = new Path(app2Dir, "host1");
FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
app2Log1);
when(mockFs.listStatus(app2Dir)).thenReturn(
- new FileStatus[] { app2Log1Status });
+ new FileStatus[] {app2Log1Status });
final List finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1, appId2));
- AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
+ AggregatedLogDeletionService deletionSvc =
+ new AggregatedLogDeletionService() {
@Override
protected Configuration createConf() {
return conf;
@@ -317,6 +379,7 @@ public void testCheckInterval() throws Exception {
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
+ int maxBucket = LogAggregationUtils.getMaxBucket(conf);
// prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll();
@@ -335,11 +398,31 @@ public void testCheckInterval() throws Exception {
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
- Path app1Dir = new Path(userLogDir, appId1.toString());
+
+ Path clusterTimestampDir =
+ LogAggregationUtils.getRemoteLogClusterTimestampDir(
+ remoteRootLogPath, "me", suffix, appId1);
+ Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
+ remoteRootLogPath, "me", suffix, appId1, maxBucket);
+
+ Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogPath, appId1, "me", suffix, maxBucket);
+
+ FileStatus clusterTimestampDirStatus = new FileStatus(0, true, 0,
+ 0, now, clusterTimestampDir);
+
+ FileStatus bucketDirStatus = new FileStatus(0, true, 0,
+ 0, now, bucketDir);
+
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
- new FileStatus[]{app1DirStatus});
+ new FileStatus[] {clusterTimestampDirStatus });
+ when(mockFs.listStatus(clusterTimestampDir)).thenReturn(
+ new FileStatus[] {bucketDirStatus });
+ when(mockFs.listStatus(bucketDir)).thenReturn(
+ new FileStatus[] {app1DirStatus });
+
Path app1Log1 = new Path(app1Dir, "host1");
FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
@@ -373,10 +456,19 @@ protected void stopRMClient() {
verify(mockFs, never()).delete(app1Dir, true);
// modify the timestamp of the logs and verify it's picked up quickly
+
+ clusterTimestampDirStatus = new FileStatus(0,
+ true, 0, 0, toDeleteTime, clusterTimestampDir);
+ bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
+
when(mockFs.listStatus(userLogDir)).thenReturn(
- new FileStatus[]{app1DirStatus});
+ new FileStatus[] {clusterTimestampDirStatus });
+ when(mockFs.listStatus(clusterTimestampDir)).thenReturn(
+ new FileStatus[] {bucketDirStatus });
+ when(mockFs.listStatus(bucketDir)).thenReturn(
+ new FileStatus[] {app1DirStatus });
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{app1Log1Status});
@@ -424,7 +516,7 @@ private static ApplicationClientProtocol createMockRMClient(
createApplicationReportWithRunningApplication() {
ApplicationReport report = mock(ApplicationReport.class);
when(report.getYarnApplicationState()).thenReturn(
- YarnApplicationState.RUNNING);
+ YarnApplicationState.RUNNING);
GetApplicationReportResponse response =
mock(GetApplicationReportResponse.class);
when(response.getApplicationReport()).thenReturn(report);
@@ -435,7 +527,7 @@ private static ApplicationClientProtocol createMockRMClient(
createApplicationReportWithFinishedApplication() {
ApplicationReport report = mock(ApplicationReport.class);
when(report.getYarnApplicationState()).thenReturn(
- YarnApplicationState.FINISHED);
+ YarnApplicationState.FINISHED);
GetApplicationReportResponse response =
mock(GetApplicationReportResponse.class);
when(response.getApplicationReport()).thenReturn(report);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
index bee34e0..19ae167 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
@@ -195,8 +195,9 @@ public void testAggregatedLogsBlockHar() throws Exception {
URL harUrl = ClassLoader.getSystemClassLoader()
.getResource("application_1440536969523_0001.har");
assertNotNull(harUrl);
- String path = "target/logs/admin/logs/application_1440536969523_0001" +
- "/application_1440536969523_0001.har";
+ String path =
+ "target/logs/admin/logs/1440536969523/0/application_1440536969523_0001"
+ + "/application_1440536969523_0001.har";
FileUtils.copyDirectory(new File(harUrl.getPath()), new File(path));
ByteArrayOutputStream data = new ByteArrayOutputStream();
@@ -239,7 +240,8 @@ public void testNoLogs() throws Exception {
FileUtil.fullyDelete(new File("target/logs"));
Configuration configuration = getConfiguration();
- File f = new File("target/logs/logs/application_0_0001/container_0_0001_01_000001");
+ File f = new
+ File("target/logs/logs/application_0_0001/container_0_0001_01_000001");
if (!f.exists()) {
assertTrue(f.mkdirs());
}
@@ -321,10 +323,10 @@ private void writeLog(Configuration configuration, String user)
ContainerId containerId = ContainerIdPBImpl.newContainerId(appAttemptId, 1);
String path = "target/logs/" + user
- + "/logs/application_0_0001/localhost_1234";
+ + "/logs/0/0/application_0_0001/localhost_1234";
File f = new File(path);
if (!f.getParentFile().exists()) {
- assertTrue(f.getParentFile().mkdirs());
+ assertTrue(f.getParentFile().mkdirs());
}
List rootLogDirs = Arrays.asList("target/logs/logs");
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index a12e2a1..db77143 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -87,8 +87,11 @@ public static void createContainerLogFileInRemoteFS(Configuration conf,
createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName,
content);
// upload container logs to remote log dir
- Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR),
- user + "/logs/" + appId.toString());
+ int maxBucket = LogAggregationUtils.getMaxBucket(conf);
+ Path path = LogAggregationUtils.getRemoteAppLogDir(
+ new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)),
+ appId, user, "logs", maxBucket);
+
if (fs.exists(path) && deleteRemoteLogDir) {
fs.delete(path, true);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 05cdd49..57e7027 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -70,6 +70,7 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -685,6 +686,99 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner()
}
@Test
+ public void testBucketDirCreation() throws Exception{
+ final String logSuffix = "logs";
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS,
+ localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix);
+
+ int maxBucket = this.conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
+ this.conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2);
+
+ InlineDispatcher dispatcher1 = new InlineDispatcher();
+ dispatcher1.init(this.conf);
+ dispatcher1.start();
+
+ FileSystem fs = FileSystem.get(this.conf);
+ final FileSystem spyFs = spy(FileSystem.get(this.conf));
+
+ final LogAggregationTFileController spyFileFormat
+ = new LogAggregationTFileController() {
+ @Override
+ public FileSystem getFileSystem(Configuration conf)
+ throws IOException {
+ return spyFs;
+ }
+ };
+ spyFileFormat.initialize(conf, "TFile");
+ LogAggregationService aggSvc = new LogAggregationService(dispatcher1,
+ this.context, this.delSrvc, super.dirsHandler) {
+ @Override
+ public LogAggregationFileController getLogAggregationFileController(
+ Configuration conf) {
+ return spyFileFormat;
+ }
+ };
+ aggSvc.init(this.conf);
+ aggSvc.start();
+
+ int maxBucket1 = LogAggregationUtils.getMaxBucket(this.conf);
+ LogAggregationContext contextWithAllContainers =
+ Records.newRecord(LogAggregationContext.class);
+ contextWithAllContainers.setLogAggregationPolicyClassName(
+ AllContainerLogAggregationPolicy.class.getName());
+
+
+ // start an application and verify bucket 0 is created
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()),
+ this.user, logSuffix, appId, maxBucket1));
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null,
+ this.acls, contextWithAllContainers));
+ verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class));
+
+ // Verify bucket 1 is created
+ ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
+ Path bucketDir2 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()),
+ this.user, logSuffix, appId2, maxBucket1));
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
+ this.acls, contextWithAllContainers));
+ verify(spyFs).mkdirs(eq(bucketDir2), isA(FsPermission.class));
+
+ // Verify bucket 1 is used
+ ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
+ Path bucketDir3 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()),
+ this.user, logSuffix, appId3, maxBucket1));
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
+ this.acls, contextWithAllContainers));
+ verify(spyFs).mkdirs(eq(bucketDir3), isA(FsPermission.class));
+
+ // Verify bucket 2 is created
+ ApplicationId appId4 = BuilderUtils.newApplicationId(1, 4);
+ Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()),
+ this.user, logSuffix, appId4, maxBucket1));
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null,
+ this.acls, contextWithAllContainers));
+ verify(spyFs).mkdirs(eq(bucketDir4), isA(FsPermission.class));
+
+ this.conf.setInt(
+ DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, maxBucket);
+ aggSvc.stop();
+ aggSvc.close();
+ dispatcher1.stop();
+
+ }
+
+
+ @Test
public void testAppLogDirCreation() throws Exception {
final String logSuffix = "logs";
this.conf.set(YarnConfiguration.NM_LOG_DIRS,
@@ -725,7 +819,17 @@ public LogAggregationFileController getLogAggregationFileController(
Path userDir = fs.makeQualified(new Path(
remoteRootLogDir.getAbsolutePath(), this.user));
Path suffixDir = new Path(userDir, logSuffix);
- Path appDir = new Path(suffixDir, appId.toString());
+ int maxBucket = LogAggregationUtils.getMaxBucket(this.conf);
+ Path clusterTimestampDir = fs.makeQualified(
+ LogAggregationUtils.getRemoteLogClusterTimestampDir(
+ new Path(remoteRootLogDir.getAbsolutePath()),
+ this.user, logSuffix, appId));
+ Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
+ new Path(remoteRootLogDir.getAbsolutePath()),
+ this.user, logSuffix, appId, maxBucket));
+ Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+ new Path(remoteRootLogDir.getAbsolutePath()), appId,
+ this.user, logSuffix, maxBucket));
LogAggregationContext contextWithAllContainers =
Records.newRecord(LogAggregationContext.class);
contextWithAllContainers.setLogAggregationPolicyClassName(
@@ -734,11 +838,15 @@ public LogAggregationFileController getLogAggregationFileController(
this.acls, contextWithAllContainers));
verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
+ verify(spyFs).mkdirs(eq(clusterTimestampDir), isA(FsPermission.class));
+ verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
// start another application and verify only app dir created
ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
- Path appDir2 = new Path(suffixDir, appId2.toString());
+ Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+ new Path(remoteRootLogDir.getAbsolutePath()),
+ appId2, this.user, logSuffix, maxBucket));
aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
this.acls, contextWithAllContainers));
verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
@@ -746,7 +854,9 @@ public LogAggregationFileController getLogAggregationFileController(
// start another application with the app dir already created and verify
// we do not try to create it again
ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
- Path appDir3 = new Path(suffixDir, appId3.toString());
+ Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
+ new Path(remoteRootLogDir.getAbsolutePath()),
+ appId3, this.user, logSuffix, maxBucket));
new File(appDir3.toUri().getPath()).mkdir();
aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
this.acls, contextWithAllContainers));