diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
index 46c1809..607ac12 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
@@ -32,11 +32,20 @@
*
* - includePattern. It uses Java Regex to filter the log files
* which match the defined include pattern and those log files
- * will be uploaded.
+ * will be uploaded when the application finishes.
* - excludePattern. It uses Java Regex to filter the log files
* which match the defined exclude pattern and those log files
- * will not be uploaded. If the log file name matches both the
- * include and the exclude pattern, this file will be excluded eventually
+ * will not be uploaded when application finishes. If the log file
+ * name matches both the include and the exclude pattern, this file
+ * will be excluded eventually
+ * - includePattern_Rolling. It uses Java Regex to filter the log files
+ * which match the defined include pattern and those log files
+ * will be aggregated in a rolling fashion.
+ * - excludePattern_Rolling. It uses Java Regex to filter the log files
+ * which match the defined exclude pattern and those log files
+ * will not be aggregated in a rolling fashion. If the log file
+ * name matches both the include and the exclude pattern, this file
+ * will be excluded eventually
*
*
*
@@ -57,6 +66,22 @@ public static LogAggregationContext newInstance(String includePattern,
return context;
}
+ @Public
+ @Unstable
+ public static LogAggregationContext newInstance(String includePattern,
+ String excludePattern, String includePatternRollingAggregation,
+ String excludePatternRollingAggregation) {
+ LogAggregationContext context =
+ Records.newRecord(LogAggregationContext.class);
+ context.setIncludePattern(includePattern);
+ context.setExcludePattern(excludePattern);
+ context
+ .setIncludePatternInRollingAggregation(includePatternRollingAggregation);
+ context
+ .setExcludePatternInRollingAggregation(excludePatternRollingAggregation);
+ return context;
+ }
+
/**
* Get include pattern
*
@@ -92,4 +117,42 @@ public static LogAggregationContext newInstance(String includePattern,
@Public
@Unstable
public abstract void setExcludePattern(String excludePattern);
+
+ /**
+ * Get include pattern in a rolling fashion.
+ *
+ * @return include pattern
+ */
+ @Public
+ @Unstable
+ public abstract String getIncludePatternInRollingAggregation();
+
+ /**
+ * Set include pattern in a rolling fashion.
+ *
+ * @param includePatternInRollingAggregation
+ */
+ @Public
+ @Unstable
+ public abstract void setIncludePatternInRollingAggregation(
+ String includePatternInRollingAggregation);
+
+ /**
+ * Get exclude pattern for aggregation in a rolling fashion.
+ *
+ * @return exclude pattern
+ */
+ @Public
+ @Unstable
+ public abstract String getExcludePatternInRollingAggregation();
+
+ /**
+ * Set exclude pattern for in a rolling fashion.
+ *
+ * @param excludePatternInRollingAggregation
+ */
+ @Public
+ @Unstable
+ public abstract void setExcludePatternInRollingAggregation(
+ String excludePatternInRollingAggregation);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 4e29d2f..456daa4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -313,6 +313,8 @@ message ApplicationSubmissionContextProto {
message LogAggregationContextProto {
optional string include_pattern = 1 [default = ".*"];
optional string exclude_pattern = 2 [default = ""];
+ optional string include_pattern_rolling_aggregation = 3 [default = ""];
+ optional string exclude_pattern_rolling_aggregation = 4 [default = ".*"];
}
enum ApplicationAccessTypeProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
index dc7a21d..4782b29 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
+
import com.google.protobuf.TextFormat;
public class LogAggregationContextPBImpl extends LogAggregationContext{
@@ -116,4 +117,46 @@ public void setExcludePattern(String excludePattern) {
}
builder.setExcludePattern(excludePattern);
}
+
+ @Override
+ public String getIncludePatternInRollingAggregation() {
+ LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (! p.hasIncludePatternRollingAggregation()) {
+ return null;
+ }
+ return p.getIncludePatternRollingAggregation();
+ }
+
+ @Override
+ public void setIncludePatternInRollingAggregation(
+ String includePatternInRollingAggregation) {
+ maybeInitBuilder();
+ if (includePatternInRollingAggregation == null) {
+ builder.clearIncludePatternRollingAggregation();
+ return;
+ }
+ builder
+ .setIncludePatternRollingAggregation(includePatternInRollingAggregation);
+ }
+
+ @Override
+ public String getExcludePatternInRollingAggregation() {
+ LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (! p.hasExcludePatternRollingAggregation()) {
+ return null;
+ }
+ return p.getExcludePatternRollingAggregation();
+ }
+
+ @Override
+ public void setExcludePatternInRollingAggregation(
+ String excludePatternInRollingAggregation) {
+ maybeInitBuilder();
+ if (excludePatternInRollingAggregation == null) {
+ builder.clearExcludePatternRollingAggregation();
+ return;
+ }
+ builder
+ .setExcludePatternRollingAggregation(excludePatternInRollingAggregation);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index b669332..a3f1e09 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -167,17 +167,18 @@ public String toString() {
private Set uploadedFiles = new HashSet();
private final Set alreadyUploadedLogFiles;
private Set allExistingFileMeta = new HashSet();
+ private final boolean appFinished;
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
public LogValue(List rootLogDirs, ContainerId containerId,
String user) {
- this(rootLogDirs, containerId, user, null, new HashSet());
+ this(rootLogDirs, containerId, user, null, new HashSet(), true);
}
public LogValue(List rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext,
- Set alreadyUploadedLogFiles) {
+ Set alreadyUploadedLogFiles, boolean appFinished) {
this.rootLogDirs = new ArrayList(rootLogDirs);
this.containerId = containerId;
this.user = user;
@@ -186,6 +187,7 @@ public LogValue(List rootLogDirs, ContainerId containerId,
Collections.sort(this.rootLogDirs);
this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
+ this.appFinished = appFinished;
}
private Set getPendingLogFilesToUploadForThisContainer() {
@@ -296,17 +298,17 @@ public String getUser() {
}
if (this.logAggregationContext != null && candidates.size() > 0) {
- if (this.logAggregationContext.getIncludePattern() != null
- && !this.logAggregationContext.getIncludePattern().isEmpty()) {
- filterFiles(this.logAggregationContext.getIncludePattern(),
- candidates, false);
- }
-
- if (this.logAggregationContext.getExcludePattern() != null
- && !this.logAggregationContext.getExcludePattern().isEmpty()) {
- filterFiles(this.logAggregationContext.getExcludePattern(),
- candidates, true);
- }
+ filterFiles(
+ this.appFinished ? this.logAggregationContext.getIncludePattern()
+ : this.logAggregationContext
+ .getIncludePatternInRollingAggregation(),
+ candidates, false);
+
+ filterFiles(
+ this.appFinished ? this.logAggregationContext.getExcludePattern()
+ : this.logAggregationContext
+ .getExcludePatternInRollingAggregation(),
+ candidates, true);
Iterable mask =
Iterables.filter(candidates, new Predicate() {
@@ -323,14 +325,15 @@ public boolean apply(File next) {
private void filterFiles(String pattern, Set candidates,
boolean exclusion) {
- Pattern filterPattern =
- Pattern.compile(pattern);
- for (Iterator candidatesItr = candidates.iterator(); candidatesItr
+ if (pattern != null && !pattern.isEmpty()) {
+ Pattern filterPattern = Pattern.compile(pattern);
+ for (Iterator candidatesItr = candidates.iterator(); candidatesItr
.hasNext();) {
- File candidate = candidatesItr.next();
- boolean match = filterPattern.matcher(candidate.getName()).find();
- if ((!match && !exclusion) || (match && exclusion)) {
- candidatesItr.remove();
+ File candidate = candidatesItr.next();
+ boolean match = filterPattern.matcher(candidate.getName()).find();
+ if ((!match && !exclusion) || (match && exclusion)) {
+ candidatesItr.remove();
+ }
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 787422b..98a65d4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -44,7 +44,6 @@
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -116,6 +115,7 @@
private final Context context;
private final int retentionSize;
private final long rollingMonitorInterval;
+ private final boolean logAggregationInRolling;
private final NodeId nodeId;
// This variable is only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
@@ -193,9 +193,15 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
}
this.rollingMonitorInterval = configuredRollingMonitorInterval;
}
+ this.logAggregationInRolling =
+ this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
+ || this.logAggregationContext
+ .getIncludePatternInRollingAggregation() == null
+ || this.logAggregationContext
+ .getIncludePatternInRollingAggregation().isEmpty() ? false : true;
}
- private void uploadLogsForContainers() {
+ private void uploadLogsForContainers(boolean appFinished) {
if (this.logAggregationDisabled) {
return;
}
@@ -262,7 +268,7 @@ private void uploadLogsForContainers() {
containerLogAggregators.put(container, aggregator);
}
Set uploadedFilePathsInThisCycle =
- aggregator.doContainerLogAggregation(writer);
+ aggregator.doContainerLogAggregation(writer, appFinished);
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
}
@@ -394,12 +400,12 @@ private void doAppLogAggregation() {
synchronized(this) {
try {
waiting.set(true);
- if (this.rollingMonitorInterval > 0) {
+ if (logAggregationInRolling) {
wait(this.rollingMonitorInterval * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}
- uploadLogsForContainers();
+ uploadLogsForContainers(false);
} else {
wait(THREAD_SLEEP_TIME);
}
@@ -415,7 +421,7 @@ private void doAppLogAggregation() {
}
// App is finished, upload the container logs.
- uploadLogsForContainers();
+ uploadLogsForContainers(true);
// Remove the local app-log-dirs
List localAppLogDirs = new ArrayList();
@@ -536,7 +542,8 @@ public ContainerLogAggregator(ContainerId containerId) {
this.containerId = containerId;
}
- public Set doContainerLogAggregation(LogWriter writer) {
+ public Set doContainerLogAggregation(LogWriter writer,
+ boolean appFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs()));
@@ -544,7 +551,7 @@ public ContainerLogAggregator(ContainerId containerId) {
final LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
userUgi.getShortUserName(), logAggregationContext,
- this.uploadedFileMeta);
+ this.uploadedFileMeta, appFinished);
try {
writer.append(logKey, logValue);
} catch (Exception e) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index a73d583..fd0e90f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -130,8 +130,10 @@ public void testApplicationRecovery() throws Exception {
containerTokens, acls);
// create the logAggregationContext
LogAggregationContext logAggregationContext =
- LogAggregationContext.newInstance("includePattern", "excludePattern");
- StartContainersResponse startResponse = startContainer(context, cm, cid,
+ LogAggregationContext.newInstance("includePattern", "excludePattern",
+ "includePatternInRollingAggregation",
+ "excludePatternInRollingAggregation");
+ StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
@@ -171,6 +173,10 @@ public void testApplicationRecovery() throws Exception {
recovered.getIncludePattern());
assertEquals(logAggregationContext.getExcludePattern(),
recovered.getExcludePattern());
+ assertEquals(logAggregationContext.getIncludePatternInRollingAggregation(),
+ recovered.getIncludePatternInRollingAggregation());
+ assertEquals(logAggregationContext.getExcludePatternInRollingAggregation(),
+ recovered.getExcludePatternInRollingAggregation());
waitForAppState(app, ApplicationState.INITING);
assertTrue(context.getApplicationACLsManager().checkAccess(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 901e45a..ecc597cde 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -1289,6 +1289,14 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
throws Exception {
LogAggregationContext logAggregationContextWithInterval =
Records.newRecord(LogAggregationContext.class);
+ // set IncludePattern/excludePattern in rolling fashion
+ // we expect all the logs except std_final will be uploaded
+ // when app is running. The std_final will be uploaded when
+ // the app finishes.
+ logAggregationContextWithInterval
+ .setIncludePatternInRollingAggregation(".*");
+ logAggregationContextWithInterval
+ .setExcludePatternInRollingAggregation("std_final");
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
@@ -1339,8 +1347,12 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
logAggregationContextWithInterval));
// Simulate log-file creation
- String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" };
- writeContainerLogs(appLogDir, container, logFiles1);
+ // create std_final in log directory which will not be aggregated
+ // until the app finishes.
+ String[] logFiles1WithFinalLog =
+ new String[] { "stdout", "stderr", "syslog", "std_final" };
+ String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"};
+ writeContainerLogs(appLogDir, container, logFiles1WithFinalLog);
// Do log aggregation
AppLogAggregatorImpl aggregator =
@@ -1407,8 +1419,12 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 3, false, null));
}
+
+ // the app is finished. The log "std_final" should be aggregated this time.
+ String[] logFiles3WithFinalLog =
+ new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" };
verifyContainerLogs(logAggregationService, application,
- new ContainerId[] { container }, logFiles3, 3, true);
+ new ContainerId[] { container }, logFiles3WithFinalLog, 4, true);
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 169517d..e03f46d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -230,12 +230,18 @@ public void testLogAggregationContextPassedIntoContainerToken()
// create a not-null LogAggregationContext
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance(
- "includePattern", "excludePattern");
+ "includePattern", "excludePattern",
+ "includePatternInRollingAggregation",
+ "excludePatternInRollingAggregation");
LogAggregationContext returned =
getLogAggregationContextFromContainerToken(rm1, nm2,
logAggregationContext);
Assert.assertEquals("includePattern", returned.getIncludePattern());
Assert.assertEquals("excludePattern", returned.getExcludePattern());
+ Assert.assertEquals("includePatternInRollingAggregation",
+ returned.getIncludePatternInRollingAggregation());
+ Assert.assertEquals("excludePatternInRollingAggregation",
+ returned.getExcludePatternInRollingAggregation());
rm1.stop();
}