diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index ba7836a..20b25ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -90,12 +90,6 @@ = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app"; private static final int DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30; - - // This configuration is for debug and test purpose. By setting - // this configuration as true. We can break the lower bound of - // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS. - private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED - = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; private final LocalDirsHandlerService dirsHandler; private final Dispatcher dispatcher; @@ -279,10 +273,17 @@ private void uploadLogsForContainers(boolean appFinished) { // b) some set of running containers: For all the Running containers, // we use exitCode of 0 to find those which satisfy the // ContainerLogAggregationPolicy. - Set pendingContainerInThisCycle = new HashSet(); - this.pendingContainers.drainTo(pendingContainerInThisCycle); - Set finishedContainers = - new HashSet(pendingContainerInThisCycle); + + // only make the copy of pendingContainers. + // We do not remove the finished container. + // For the application, it is very possible that we have set + // includePattern/excludePattern which will not be checked during + // the log aggregation in rollingMonitorInterval. + // If we remove the finished container here, we would miss the logs when + // the app finishes. + Set pendingContainerInThisCycle = new HashSet( + pendingContainers); + if (this.context.getApplications().get(this.appId) != null) { for (Container container : this.context.getApplications() .get(this.appId).getContainers().values()) { @@ -335,12 +336,6 @@ private void uploadLogsForContainers(boolean appFinished) { uploadedFilePathsInThisCycle .toArray(new Path[uploadedFilePathsInThisCycle.size()])); } - - // This container is finished, and all its logs have been uploaded, - // remove it from containerLogAggregators. - if (finishedContainers.contains(container)) { - containerLogAggregators.remove(container); - } } // Before upload logs, make sure the number of existing logs diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 3961e1a..ba1b68d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -1575,6 +1575,100 @@ public void testLogAggregationServiceWithPatterns() throws Exception { "getApplicationID"); } + @SuppressWarnings("resource") + @Test (timeout = 50000) + public void testLogAggregationServiceWithPatternsAndIntervals() + throws Exception { + LogAggregationContext logAggregationContext = + Records.newRecord(LogAggregationContext.class); + // set IncludePattern and RolledLogsIncludePattern. + // When the app is running, we only aggregate the log with + // the name stdout. After the app finishes, we only aggregate + // the log with the name std_final. + logAggregationContext.setRolledLogsIncludePattern("stdout"); + logAggregationContext.setIncludePattern("std_final"); + this.conf.set( + YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + //configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to + //have fully qualified path + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.toURI().toString()); + this.conf.setLong( + YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, + 3600); + + this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); + + ApplicationId application = + BuilderUtils.newApplicationId(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(application, 1); + ContainerId container = createContainer(appAttemptId, 1, + ContainerType.APPLICATION_MASTER); + + ConcurrentMap maps = + this.context.getApplications(); + Application app = mock(Application.class); + maps.put(application, app); + when(app.getContainers()).thenReturn(this.context.getContainers()); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, context, this.delSrvc, + super.dirsHandler); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + // AppLogDir should be created + File appLogDir = + new File(localLogDir, ConverterUtils.toString(application)); + appLogDir.mkdir(); + logAggregationService.handle(new LogHandlerAppStartedEvent(application, + this.user, null, this.acls, logAggregationContext)); + + // Simulate log-file creation + // create std_final in log directory which will not be aggregated + // until the app finishes. + String[] logFilesWithFinalLog = + new String[] { "stdout", "std_final" }; + writeContainerLogs(appLogDir, container, logFilesWithFinalLog); + + // Do log aggregation + AppLogAggregatorImpl aggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(application); + + aggregator.doLogAggregationOutOfBand(); + + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 1, false, null)); + + String[] logFiles = new String[] { "stdout" }; + verifyContainerLogs(logAggregationService, application, + new ContainerId[] {container}, logFiles, 1, true); + + logAggregationService.handle( + new LogHandlerContainerFinishedEvent(container, 0)); + + dispatcher.await(); + + // Do the log aggregation after ContainerFinishedEvent but before + // AppFinishedEvent. We need to make sure that the std_final would + // eventually been aggregated. + aggregator.doLogAggregationOutOfBand(); + + logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); + + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 2, false, null)); + + // the app is finished. The log "std_final" should be aggregated this time. + String[] logFinalLog = new String[] {"std_final"}; + verifyContainerLogs(logAggregationService, application, + new ContainerId[] { container }, logFinalLog, 1, true); + logAggregationService.stop(); + } + @Test (timeout = 50000) @SuppressWarnings("unchecked") public void testNoneContainerPolicy() throws Exception { @@ -1590,7 +1684,7 @@ public void testNoneContainerPolicy() throws Exception { finishApplication(appId, logAggregationService); verifyContainerLogs(logAggregationService, appId, - new ContainerId[] { container1 }, logFiles, 0, false); + new ContainerId[] { container1 }, logFiles, 0, true); verifyLogAggFinishEvent(appId); } @@ -2210,6 +2304,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation) new LogHandlerContainerFinishedEvent(container, 0)); dispatcher.await(); + logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); if (retentionSizeLimitation) { Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, @@ -2303,7 +2398,7 @@ public void testSkipUnnecessaryNNOperationsForService() throws Exception { AMOnlyLogAggregationPolicy.class.getName()); contextWithAMOnly.setRolledLogsIncludePattern("sys*"); contextWithAMOnly.setRolledLogsExcludePattern("std_final"); - verifySkipUnnecessaryNNOperations(contextWithAMOnly, 1, 4, 1); + verifySkipUnnecessaryNNOperations(contextWithAMOnly, 3, 4, 1); } private void verifySkipUnnecessaryNNOperations( @@ -2326,13 +2421,17 @@ private void verifySkipUnnecessaryNNOperations( AppLogAggregatorImpl aggregator = (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() .get(appId); + + // Try to aggregate the logs for the container at the first time aggregator.doLogAggregationOutOfBand(); Thread.sleep(2000); + // We did not remove the finished containers and the application is still + // running, try to aggregate the logs for the container at second time. aggregator.doLogAggregationOutOfBand(); Thread.sleep(2000); - // App finishes. + // App finishes. Try to aggregate this container for the third time. logAggregationService.handle(new LogHandlerAppFinishedEvent(appId)); logAggregationService.stop();