diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java index 46c1809..607ac12 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java @@ -32,11 +32,20 @@ * *

* @@ -57,6 +66,22 @@ public static LogAggregationContext newInstance(String includePattern, return context; } + @Public + @Unstable + public static LogAggregationContext newInstance(String includePattern, + String excludePattern, String includePatternRollingAggregation, + String excludePatternRollingAggregation) { + LogAggregationContext context = + Records.newRecord(LogAggregationContext.class); + context.setIncludePattern(includePattern); + context.setExcludePattern(excludePattern); + context + .setIncludePatternInRollingAggregation(includePatternRollingAggregation); + context + .setExcludePatternInRollingAggregation(excludePatternRollingAggregation); + return context; + } + /** * Get include pattern * @@ -92,4 +117,42 @@ public static LogAggregationContext newInstance(String includePattern, @Public @Unstable public abstract void setExcludePattern(String excludePattern); + + /** + * Get include pattern in a rolling fashion. + * + * @return include pattern + */ + @Public + @Unstable + public abstract String getIncludePatternInRollingAggregation(); + + /** + * Set include pattern in a rolling fashion. + * + * @param includePatternInRollingAggregation + */ + @Public + @Unstable + public abstract void setIncludePatternInRollingAggregation( + String includePatternInRollingAggregation); + + /** + * Get exclude pattern for aggregation in a rolling fashion. + * + * @return exclude pattern + */ + @Public + @Unstable + public abstract String getExcludePatternInRollingAggregation(); + + /** + * Set exclude pattern for in a rolling fashion. + * + * @param excludePatternInRollingAggregation + */ + @Public + @Unstable + public abstract void setExcludePatternInRollingAggregation( + String excludePatternInRollingAggregation); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 4e29d2f..456daa4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -313,6 +313,8 @@ message ApplicationSubmissionContextProto { message LogAggregationContextProto { optional string include_pattern = 1 [default = ".*"]; optional string exclude_pattern = 2 [default = ""]; + optional string include_pattern_rolling_aggregation = 3 [default = ""]; + optional string exclude_pattern_rolling_aggregation = 4 [default = ".*"]; } enum ApplicationAccessTypeProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java index dc7a21d..4782b29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder; + import com.google.protobuf.TextFormat; public class LogAggregationContextPBImpl extends LogAggregationContext{ @@ -116,4 +117,46 @@ public void setExcludePattern(String excludePattern) { } builder.setExcludePattern(excludePattern); } + + @Override + public String getIncludePatternInRollingAggregation() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasIncludePatternRollingAggregation()) { + return null; + } + return p.getIncludePatternRollingAggregation(); + } + + @Override + public void setIncludePatternInRollingAggregation( + String includePatternInRollingAggregation) { + maybeInitBuilder(); + if (includePatternInRollingAggregation == null) { + builder.clearIncludePatternRollingAggregation(); + return; + } + builder + .setIncludePatternRollingAggregation(includePatternInRollingAggregation); + } + + @Override + public String getExcludePatternInRollingAggregation() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasExcludePatternRollingAggregation()) { + return null; + } + return p.getExcludePatternRollingAggregation(); + } + + @Override + public void setExcludePatternInRollingAggregation( + String excludePatternInRollingAggregation) { + maybeInitBuilder(); + if (excludePatternInRollingAggregation == null) { + builder.clearExcludePatternRollingAggregation(); + return; + } + builder + .setExcludePatternRollingAggregation(excludePatternInRollingAggregation); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index b669332..a3f1e09 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -167,17 +167,18 @@ public String toString() { private Set uploadedFiles = new HashSet(); private final Set alreadyUploadedLogFiles; private Set allExistingFileMeta = new HashSet(); + private final boolean appFinished; // TODO Maybe add a version string here. Instead of changing the version of // the entire k-v format public LogValue(List rootLogDirs, ContainerId containerId, String user) { - this(rootLogDirs, containerId, user, null, new HashSet()); + this(rootLogDirs, containerId, user, null, new HashSet(), true); } public LogValue(List rootLogDirs, ContainerId containerId, String user, LogAggregationContext logAggregationContext, - Set alreadyUploadedLogFiles) { + Set alreadyUploadedLogFiles, boolean appFinished) { this.rootLogDirs = new ArrayList(rootLogDirs); this.containerId = containerId; this.user = user; @@ -186,6 +187,7 @@ public LogValue(List rootLogDirs, ContainerId containerId, Collections.sort(this.rootLogDirs); this.logAggregationContext = logAggregationContext; this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; + this.appFinished = appFinished; } private Set getPendingLogFilesToUploadForThisContainer() { @@ -296,17 +298,17 @@ public String getUser() { } if (this.logAggregationContext != null && candidates.size() > 0) { - if (this.logAggregationContext.getIncludePattern() != null - && !this.logAggregationContext.getIncludePattern().isEmpty()) { - filterFiles(this.logAggregationContext.getIncludePattern(), - candidates, false); - } - - if (this.logAggregationContext.getExcludePattern() != null - && !this.logAggregationContext.getExcludePattern().isEmpty()) { - filterFiles(this.logAggregationContext.getExcludePattern(), - candidates, true); - } + filterFiles( + this.appFinished ? this.logAggregationContext.getIncludePattern() + : this.logAggregationContext + .getIncludePatternInRollingAggregation(), + candidates, false); + + filterFiles( + this.appFinished ? this.logAggregationContext.getExcludePattern() + : this.logAggregationContext + .getExcludePatternInRollingAggregation(), + candidates, true); Iterable mask = Iterables.filter(candidates, new Predicate() { @@ -323,14 +325,15 @@ public boolean apply(File next) { private void filterFiles(String pattern, Set candidates, boolean exclusion) { - Pattern filterPattern = - Pattern.compile(pattern); - for (Iterator candidatesItr = candidates.iterator(); candidatesItr + if (pattern != null && !pattern.isEmpty()) { + Pattern filterPattern = Pattern.compile(pattern); + for (Iterator candidatesItr = candidates.iterator(); candidatesItr .hasNext();) { - File candidate = candidatesItr.next(); - boolean match = filterPattern.matcher(candidate.getName()).find(); - if ((!match && !exclusion) || (match && exclusion)) { - candidatesItr.remove(); + File candidate = candidatesItr.next(); + boolean match = filterPattern.matcher(candidate.getName()).find(); + if ((!match && !exclusion) || (match && exclusion)) { + candidatesItr.remove(); + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 787422b..98a65d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -116,6 +115,7 @@ private final Context context; private final int retentionSize; private final long rollingMonitorInterval; + private final boolean logAggregationInRolling; private final NodeId nodeId; // This variable is only for testing private final AtomicBoolean waiting = new AtomicBoolean(false); @@ -193,9 +193,15 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, } this.rollingMonitorInterval = configuredRollingMonitorInterval; } + this.logAggregationInRolling = + this.rollingMonitorInterval <= 0 || this.logAggregationContext == null + || this.logAggregationContext + .getIncludePatternInRollingAggregation() == null + || this.logAggregationContext + .getIncludePatternInRollingAggregation().isEmpty() ? false : true; } - private void uploadLogsForContainers() { + private void uploadLogsForContainers(boolean appFinished) { if (this.logAggregationDisabled) { return; } @@ -262,7 +268,7 @@ private void uploadLogsForContainers() { containerLogAggregators.put(container, aggregator); } Set uploadedFilePathsInThisCycle = - aggregator.doContainerLogAggregation(writer); + aggregator.doContainerLogAggregation(writer, appFinished); if (uploadedFilePathsInThisCycle.size() > 0) { uploadedLogsInThisCycle = true; } @@ -394,12 +400,12 @@ private void doAppLogAggregation() { synchronized(this) { try { waiting.set(true); - if (this.rollingMonitorInterval > 0) { + if (logAggregationInRolling) { wait(this.rollingMonitorInterval * 1000); if (this.appFinishing.get() || this.aborted.get()) { break; } - uploadLogsForContainers(); + uploadLogsForContainers(false); } else { wait(THREAD_SLEEP_TIME); } @@ -415,7 +421,7 @@ private void doAppLogAggregation() { } // App is finished, upload the container logs. - uploadLogsForContainers(); + uploadLogsForContainers(true); // Remove the local app-log-dirs List localAppLogDirs = new ArrayList(); @@ -536,7 +542,8 @@ public ContainerLogAggregator(ContainerId containerId) { this.containerId = containerId; } - public Set doContainerLogAggregation(LogWriter writer) { + public Set doContainerLogAggregation(LogWriter writer, + boolean appFinished) { LOG.info("Uploading logs for container " + containerId + ". Current good log dirs are " + StringUtils.join(",", dirsHandler.getLogDirs())); @@ -544,7 +551,7 @@ public ContainerLogAggregator(ContainerId containerId) { final LogValue logValue = new LogValue(dirsHandler.getLogDirs(), containerId, userUgi.getShortUserName(), logAggregationContext, - this.uploadedFileMeta); + this.uploadedFileMeta, appFinished); try { writer.append(logKey, logValue); } catch (Exception e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index a73d583..fd0e90f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -130,8 +130,10 @@ public void testApplicationRecovery() throws Exception { containerTokens, acls); // create the logAggregationContext LogAggregationContext logAggregationContext = - LogAggregationContext.newInstance("includePattern", "excludePattern"); - StartContainersResponse startResponse = startContainer(context, cm, cid, + LogAggregationContext.newInstance("includePattern", "excludePattern", + "includePatternInRollingAggregation", + "excludePatternInRollingAggregation"); + StartContainersResponse startResponse = startContainer(context, cm, cid, clc, logAggregationContext); assertTrue(startResponse.getFailedRequests().isEmpty()); assertEquals(1, context.getApplications().size()); @@ -171,6 +173,10 @@ public void testApplicationRecovery() throws Exception { recovered.getIncludePattern()); assertEquals(logAggregationContext.getExcludePattern(), recovered.getExcludePattern()); + assertEquals(logAggregationContext.getIncludePatternInRollingAggregation(), + recovered.getIncludePatternInRollingAggregation()); + assertEquals(logAggregationContext.getExcludePatternInRollingAggregation(), + recovered.getExcludePatternInRollingAggregation()); waitForAppState(app, ApplicationState.INITING); assertTrue(context.getApplicationACLsManager().checkAccess( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 901e45a..ecc597cde 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -1289,6 +1289,14 @@ private void testLogAggregationService(boolean retentionSizeLimitation) throws Exception { LogAggregationContext logAggregationContextWithInterval = Records.newRecord(LogAggregationContext.class); + // set IncludePattern/excludePattern in rolling fashion + // we expect all the logs except std_final will be uploaded + // when app is running. The std_final will be uploaded when + // the app finishes. + logAggregationContextWithInterval + .setIncludePatternInRollingAggregation(".*"); + logAggregationContextWithInterval + .setExcludePatternInRollingAggregation("std_final"); this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); @@ -1339,8 +1347,12 @@ private void testLogAggregationService(boolean retentionSizeLimitation) logAggregationContextWithInterval)); // Simulate log-file creation - String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" }; - writeContainerLogs(appLogDir, container, logFiles1); + // create std_final in log directory which will not be aggregated + // until the app finishes. + String[] logFiles1WithFinalLog = + new String[] { "stdout", "stderr", "syslog", "std_final" }; + String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"}; + writeContainerLogs(appLogDir, container, logFiles1WithFinalLog); // Do log aggregation AppLogAggregatorImpl aggregator = @@ -1407,8 +1419,12 @@ private void testLogAggregationService(boolean retentionSizeLimitation) Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, 50, 3, false, null)); } + + // the app is finished. The log "std_final" should be aggregated this time. + String[] logFiles3WithFinalLog = + new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" }; verifyContainerLogs(logAggregationService, application, - new ContainerId[] { container }, logFiles3, 3, true); + new ContainerId[] { container }, logFiles3WithFinalLog, 4, true); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 169517d..e03f46d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -230,12 +230,18 @@ public void testLogAggregationContextPassedIntoContainerToken() // create a not-null LogAggregationContext LogAggregationContext logAggregationContext = LogAggregationContext.newInstance( - "includePattern", "excludePattern"); + "includePattern", "excludePattern", + "includePatternInRollingAggregation", + "excludePatternInRollingAggregation"); LogAggregationContext returned = getLogAggregationContextFromContainerToken(rm1, nm2, logAggregationContext); Assert.assertEquals("includePattern", returned.getIncludePattern()); Assert.assertEquals("excludePattern", returned.getExcludePattern()); + Assert.assertEquals("includePatternInRollingAggregation", + returned.getIncludePatternInRollingAggregation()); + Assert.assertEquals("excludePatternInRollingAggregation", + returned.getExcludePatternInRollingAggregation()); rm1.stop(); }