diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index ef164a5..7d5a751 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -1329,7 +1329,8 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, new Path(appDir, LogAggregationUtils.getNodeString(nodeId) + System.currentTimeMillis()); AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter(configuration, path, ugi); + new AggregatedLogFormat.LogWriter(); + writer.initialize(configuration, path, ugi); writer.writeApplicationOwner(ugi.getUserName()); Map appAcls = @@ -1349,7 +1350,8 @@ private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ug new Path(appDir, LogAggregationUtils.getNodeString(nodeId) + System.currentTimeMillis()); AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter(configuration, path, ugi); + new AggregatedLogFormat.LogWriter(); + writer.initialize(configuration, path, ugi); writer.writeApplicationOwner(ugi.getUserName()); Map appAcls = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 1b46007..7e68d27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -446,13 +446,21 @@ public boolean shouldRetainLog() { * The writer that writes out the aggregated logs. */ @Private - public static class LogWriter { + public static class LogWriter implements AutoCloseable { - private final FSDataOutputStream fsDataOStream; - private final TFile.Writer writer; + private FSDataOutputStream fsDataOStream; + private TFile.Writer writer; private FileContext fc; - public LogWriter(final Configuration conf, final Path remoteAppLogFile, + /** + * Initialize the LogWriter. + * Must be called just after the instance is created. + * @param conf Configuration + * @param remoteAppLogFile remote log file path + * @param userUgi Ugi of the user + * @throws IOException Failed to initialize + */ + public void initialize(final Configuration conf, final Path remoteAppLogFile, UserGroupInformation userUgi) throws IOException { try { this.fsDataOStream = @@ -530,11 +538,14 @@ public void append(LogKey logKey, LogValue logValue) throws IOException { } } + @Override public void close() { - try { - this.writer.close(); - } catch (IOException e) { - LOG.warn("Exception closing writer", e); + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.warn("Exception closing writer", e); + } } IOUtils.closeStream(fsDataOStream); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 8cbec10..60a6d39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -140,8 +140,8 @@ private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long l final int ch = filler; UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - LogWriter logWriter = new LogWriter(new Configuration(), remoteAppLogFile, - ugi); + LogWriter logWriter = new LogWriter(); + logWriter.initialize(new Configuration(), remoteAppLogFile, ugi); LogKey logKey = new LogKey(testContainerId); LogValue logValue = @@ -216,7 +216,8 @@ private void testReadAcontainerLog(boolean logUploadedTime) throws Exception { writeSrcFile(srcFilePath, "stdout", numChars); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi); + LogWriter logWriter = new LogWriter(); + logWriter.initialize(conf, remoteAppLogFile, ugi); LogKey logKey = new LogKey(testContainerId); LogValue logValue = @@ -311,7 +312,8 @@ public void testContainerLogsFileAccess() throws IOException { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi); + LogWriter logWriter = new LogWriter(); + logWriter.initialize(conf, remoteAppLogFile, ugi); LogKey logKey = new LogKey(testContainerId1); String randomUser = "randomUser"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index 594f186..7454a22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -295,8 +295,8 @@ private void writeLog(Configuration configuration, String user) List rootLogDirs = Arrays.asList("target/logs/logs"); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - AggregatedLogFormat.LogWriter writer = new AggregatedLogFormat.LogWriter( - configuration, new Path(path), ugi); + AggregatedLogFormat.LogWriter writer = new AggregatedLogFormat.LogWriter(); + writer.initialize(configuration, new Path(path), ugi); writer.writeApplicationOwner(ugi.getUserName()); Map appAcls = new HashMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index c6841c9..ce3ae69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -111,7 +111,8 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId)); AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter(configuration, path, ugi); + new AggregatedLogFormat.LogWriter(); + writer.initialize(configuration, path, ugi); writer.writeApplicationOwner(ugi.getUserName()); writer.append(new AggregatedLogFormat.LogKey(containerId), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 065854e..f318a18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -295,18 +295,18 @@ private void uploadLogsForContainers(boolean appFinished) { } } - LogWriter writer = null; + if (pendingContainerInThisCycle.isEmpty()) { + sendLogAggregationReport(true, "", appFinished); + return; + } + + logAggregationTimes++; String diagnosticMessage = ""; boolean logAggregationSucceedInThisCycle = true; - try { - if (pendingContainerInThisCycle.isEmpty()) { - return; - } - - logAggregationTimes++; - + try (LogWriter writer = createLogWriter()) { try { - writer = createLogWriter(); + writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp, + this.userUgi); // Write ACLs once when the writer is created. writer.writeApplicationACLs(appAcls); writer.writeApplicationOwner(this.userUgi.getShortUserName()); @@ -351,11 +351,6 @@ private void uploadLogsForContainers(boolean appFinished) { cleanupOldLogTimes++; } - if (writer != null) { - writer.close(); - writer = null; - } - long currentTime = System.currentTimeMillis(); final Path renamedPath = this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp : new Path( @@ -396,34 +391,37 @@ public Object run() throws Exception { logAggregationSucceedInThisCycle = false; } } finally { - LogAggregationStatus logAggregationStatus = - logAggregationSucceedInThisCycle - ? LogAggregationStatus.RUNNING - : LogAggregationStatus.RUNNING_WITH_FAILURE; - sendLogAggregationReport(logAggregationStatus, diagnosticMessage); - if (appFinished) { - // If the app is finished, one extra final report with log aggregation - // status SUCCEEDED/FAILED will be sent to RM to inform the RM - // that the log aggregation in this NM is completed. - LogAggregationStatus finalLogAggregationStatus = - renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle - ? LogAggregationStatus.FAILED - : LogAggregationStatus.SUCCEEDED; - sendLogAggregationReport(finalLogAggregationStatus, ""); - } - - if (writer != null) { - writer.close(); - } + sendLogAggregationReport(logAggregationSucceedInThisCycle, + diagnosticMessage, appFinished); } } - protected LogWriter createLogWriter() throws IOException { - return new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, - this.userUgi); + @VisibleForTesting + protected LogWriter createLogWriter() { + return new LogWriter(); } private void sendLogAggregationReport( + boolean logAggregationSucceedInThisCycle, String diagnosticMessage, + boolean appFinished) { + LogAggregationStatus logAggregationStatus = + logAggregationSucceedInThisCycle + ? LogAggregationStatus.RUNNING + : LogAggregationStatus.RUNNING_WITH_FAILURE; + sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage); + if (appFinished) { + // If the app is finished, one extra final report with log aggregation + // status SUCCEEDED/FAILED will be sent to RM to inform the RM + // that the log aggregation in this NM is completed. + LogAggregationStatus finalLogAggregationStatus = + renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle + ? LogAggregationStatus.FAILED + : LogAggregationStatus.SUCCEEDED; + sendLogAggregationReportInternal(finalLogAggregationStatus, ""); + } + } + + private void sendLogAggregationReportInternal( LogAggregationStatus logAggregationStatus, String diagnosticMessage) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index 17d527a..097146b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -416,8 +416,7 @@ public AppLogAggregatorInTest(Dispatcher dispatcher, logAggregationContext, context, lfs, -1, recoveredLogInitedTime); this.applicationId = appId; this.deletionService = deletionService; - - this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp); + this.logWriter = spy(new LogWriter()); this.logValue = ArgumentCaptor.forClass(LogValue.class); } @@ -425,10 +424,5 @@ public AppLogAggregatorInTest(Dispatcher dispatcher, protected LogWriter createLogWriter() { return this.logWriter; } - - private LogWriter getSpiedLogWriter(Configuration conf, - UserGroupInformation ugi, Path remoteAppLogFile) throws IOException { - return spy(new LogWriter(conf, remoteAppLogFile, ugi)); - } } }