diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 82a1ad4..b342bfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -312,6 +312,10 @@ private void uploadLogsForContainers(boolean appFinished) { LogWriter writer = null; try { + if (pendingContainerInThisCycle.isEmpty()) { + return; + } + try { writer = new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, @@ -377,12 +381,10 @@ private void uploadLogsForContainers(boolean appFinished) { @Override public Object run() throws Exception { FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf); - if (remoteFS.exists(remoteNodeTmpLogFileForApp)) { - if (rename) { - remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath); - } else { - remoteFS.delete(remoteNodeTmpLogFileForApp, false); - } + if (rename) { + remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath); + } else { + remoteFS.delete(remoteNodeTmpLogFileForApp, false); } return null; } @@ -414,6 +416,7 @@ public Object run() throws Exception { ? LogAggregationStatus.RUNNING : LogAggregationStatus.RUNNING_WITH_FAILURE); this.context.getLogAggregationStatusForApps().add(report); + } finally { if (appFinished) { // If the app is finished, one extra final report with log aggregation // status SUCCEEDED/FAILED will be sent to RM to inform the RM @@ -425,7 +428,7 @@ public Object run() throws Exception { ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED); this.context.getLogAggregationStatusForApps().add(finalReport); } - } finally { + if (writer != null) { writer.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 101fef0..e55c880 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -2127,6 +2127,49 @@ public Boolean get() { logAggregationService.stop(); } + @Test (timeout = 20000) + public void testSkipUnnecessaryNNOperations() throws Exception { + this.conf.setLong( + YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, + 3600); + LogAggregationService logAggregationService = new LogAggregationService( + dispatcher, this.context, this.delSrvc, super.dirsHandler); + logAggregationService.init(this.conf); + logAggregationService.start(); + + LogAggregationContext contextWithAMOnly = + Records.newRecord(LogAggregationContext.class); + contextWithAMOnly.setLogAggregationPolicyClassName( + AMOnlyLogAggregationPolicy.class.getName()); + contextWithAMOnly.setRolledLogsIncludePattern("sys*"); + contextWithAMOnly.setRolledLogsExcludePattern("std_final"); + ApplicationId appId = createApplication(); + logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, + null, this.acls, contextWithAMOnly)); + + // Container finishes, aggregator will upload logs, and add a + // LogAggregationReport to logAggregationReportForApps in context. + String[] logFiles = new String[] { "stdout" }; + finishContainer(appId, logAggregationService, + ContainerType.APPLICATION_MASTER, 1, 0, logFiles); + AppLogAggregatorImpl aggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(appId); + aggregator.doLogAggregationOutOfBand(); + + // No need to upload logs, and do not need add LogAggregationReport to + // logAggregationReportForApps. + Thread.sleep(200); + aggregator.doLogAggregationOutOfBand(); + + // App finishes. Add a final LogAggregationReport to + // logAggregationReportForApps. + logAggregationService.handle(new LogHandlerAppFinishedEvent(appId)); + logAggregationService.stop(); + + assertEquals(2, this.context.getLogAggregationStatusForApps().size()); + } + private int numOfLogsAvailable(LogAggregationService logAggregationService, ApplicationId appId, boolean sizeLimited, String lastLogFile) throws IOException {