diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index 537807227d4..b98964816e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -40,6 +40,10 @@ hadoop-common provided + + org.apache.hadoop + hadoop-hdfs-client + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index af3066ebd15..81d50534979 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.io.Writable; @@ -547,7 +548,7 @@ public void append(LogKey logKey, LogValue logValue) throws IOException { } @Override - public void close() { + public void close() throws DSQuotaExceededException { try { if (writer != null) { writer.close(); @@ -555,7 +556,16 @@ public void close() { } catch (Exception e) { LOG.warn("Exception closing writer", e); } finally { - IOUtils.cleanupWithLogger(LOG, this.fsDataOStream); + try { + this.fsDataOStream.close(); + } catch (DSQuotaExceededException e) { + LOG.error("Exception in closing {}", + this.fsDataOStream.getClass(), e); + throw e; + } catch (Throwable e) { + LOG.error("Exception in closing {}", + this.fsDataOStream.getClass(), e); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index aeef5748596..0f58c514468 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -167,8 +167,10 @@ public abstract void initializeWriter( /** * Close the writer. + * @throws LogAggregationDFSException if the closing of the writer fails + * (for example due to HDFS quota being exceeded) */ - public abstract void closeWriter(); + public abstract void closeWriter() throws LogAggregationDFSException; /** * Write the log content. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index a4f50d2ebda..e87af7f358a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.HarFs; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; @@ -95,10 +97,15 @@ public void initializeWriter(LogAggregationFileControllerContext context) } @Override - public void closeWriter() { + public void closeWriter() throws LogAggregationDFSException { if (this.writer != null) { - this.writer.close(); - this.writer = null; + try { + this.writer.close(); + } catch (DSQuotaExceededException e) { + throw new LogAggregationDFSException(e); + } finally { + this.writer = null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index a12e2a152b0..47672822aca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -65,7 +65,7 @@ private TestContainerLogsUtils() {} public static void createContainerLogFileInRemoteFS(Configuration conf, FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId, String fileName, String user, String content, - boolean deleteRemoteLogDir) throws IOException { + boolean deleteRemoteLogDir) throws Exception { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); //prepare the logs for remote directory ApplicationId appId = containerId.getApplicationAttemptId() @@ -113,7 +113,7 @@ private static void createContainerLogInLocalDir(Path appLogsDir, private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, Configuration configuration, List rootLogDirs, NodeId nodeId, - ContainerId containerId, Path appDir, FileSystem fs) throws IOException { + ContainerId containerId, Path appDir, FileSystem fs) throws Exception { Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId)); LogAggregationFileControllerFactory factory diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index c7e06ff73b7..59568230e47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; @@ -263,7 +264,8 @@ private String getLogAggPolicyParameters(Configuration conf) { return params; } - private void uploadLogsForContainers(boolean appFinished) { + private void uploadLogsForContainers(boolean appFinished) + throws LogAggregationDFSException { if (this.logAggregationDisabled) { return; } @@ -301,6 +303,7 @@ private void uploadLogsForContainers(boolean appFinished) { logAggregationTimes++; String diagnosticMessage = ""; boolean logAggregationSucceedInThisCycle = true; + DeletionTask deletionTask = null; try { try { logAggregationFileController.initializeWriter(logControllerContext); @@ -327,10 +330,9 @@ private void uploadLogsForContainers(boolean appFinished) { uploadedLogsInThisCycle = true; List uploadedFilePathsInThisCycleList = new ArrayList<>(); uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle); - DeletionTask deletionTask = new FileDeletionTask(delService, + deletionTask = new FileDeletionTask(delService, this.userUgi.getShortUserName(), null, uploadedFilePathsInThisCycleList); - delService.delete(deletionTask); } // This container is finished, and all its logs have been uploaded, @@ -356,9 +358,23 @@ private void uploadLogsForContainers(boolean appFinished) { logAggregationSucceedInThisCycle = false; } } finally { + LogAggregationDFSException exc = null; + try { + this.logAggregationFileController.closeWriter(); + } catch (LogAggregationDFSException e) { + diagnosticMessage = e.getMessage(); + renameTemporaryLogFileFailed = true; + logAggregationSucceedInThisCycle = false; + exc = e; + } + if (logAggregationSucceedInThisCycle && deletionTask != null) { + delService.delete(deletionTask); + } sendLogAggregationReport(logAggregationSucceedInThisCycle, diagnosticMessage, appFinished); - logAggregationFileController.closeWriter(); + if (exc != null) { + throw exc; + } } } @@ -413,13 +429,18 @@ private void sendLogAggregationReportInternal( diagnosticMessage, finalized); } - @SuppressWarnings("unchecked") @Override public void run() { try { doAppLogAggregation(); + } catch (LogAggregationDFSException e) { + // if the log aggregation could not be performed due to DFS issues + // let's not clean up the log files, since that can result in + // loss of logs + LOG.error("Error occurred while aggregating the log for the application " + + appId, e); } catch (Exception e) { - // do post clean up of log directories on any exception + // do post clean up of log directories on any other exception LOG.error("Error occurred while aggregating the log for the application " + appId, e); doAppLogAggregationPostCleanUp(); @@ -434,8 +455,7 @@ public void run() { } } - @SuppressWarnings("unchecked") - private void doAppLogAggregation() { + private void doAppLogAggregation() throws LogAggregationDFSException { while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { @@ -452,6 +472,9 @@ private void doAppLogAggregation() { } catch (InterruptedException e) { LOG.warn("PendingContainers queue is interrupted"); this.appFinishing.set(true); + } catch (LogAggregationDFSException e) { + this.appFinishing.set(true); + throw e; } } } @@ -460,10 +483,14 @@ private void doAppLogAggregation() { return; } - // App is finished, upload the container logs. - uploadLogsForContainers(true); + try { + // App is finished, upload the container logs. + uploadLogsForContainers(true); - doAppLogAggregationPostCleanUp(); + doAppLogAggregationPostCleanUp(); + } catch (LogAggregationDFSException e) { + LOG.error("Error during log aggregation", e); + } this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index e13c805f1e3..95f4c320cbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.apache.hadoop.yarn.server.api.ContainerLogContext; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -42,7 +43,9 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -52,12 +55,14 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -228,10 +233,15 @@ public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload( config.setLong( YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs); + LogAggregationTFileController format = spy( + new LogAggregationTFileController()); + format.initialize(config, "TFile"); + + Context context = createContext(config); final AppLogAggregatorInTest appLogAggregator = createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(), - config, recoveredLogInitedTimeMillis, - deletionServiceWithExpectedFiles); + config, context, recoveredLogInitedTimeMillis, + deletionServiceWithExpectedFiles, format); appLogAggregator.startContainerLogAggregation( new ContainerLogContext(containerId, ContainerType.TASK, 0)); // set app finished flag first @@ -269,8 +279,10 @@ private static void verifyFilesUploaded(Set filesUploaded, private static AppLogAggregatorInTest createAppLogAggregator( ApplicationId applicationId, String rootLogDir, - YarnConfiguration config, long recoveredLogInitedTimeMillis, - DeletionService deletionServiceWithFilesToExpect) + YarnConfiguration config, Context context, + long recoveredLogInitedTimeMillis, + DeletionService deletionServiceWithFilesToExpect, + LogAggregationTFileController tFileController) throws IOException { final Dispatcher dispatcher = createNullDispatcher(); @@ -284,16 +296,12 @@ private static AppLogAggregatorInTest createAppLogAggregator( final LogAggregationContext logAggregationContext = null; final Map appAcls = new HashMap<>(); - final Context context = createContext(config); final FileContext fakeLfs = mock(FileContext.class); final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath()); - LogAggregationTFileController format = spy( - new LogAggregationTFileController()); - format.initialize(config, "TFile"); return new AppLogAggregatorInTest(dispatcher, deletionService, config, applicationId, ugi, nodeId, dirsService, remoteLogDirForApp, appAcls, logAggregationContext, - context, fakeLfs, recoveredLogInitedTimeMillis, format); + context, fakeLfs, recoveredLogInitedTimeMillis, tFileController); } /** @@ -423,4 +431,53 @@ public AppLogAggregatorInTest(Dispatcher dispatcher, this.logValue = ArgumentCaptor.forClass(LogValue.class); } } + + @Test + public void testDFSQuotaExceeded() throws Exception { + + // the expectation is that no log files are deleted if the quota has + // been exceeded, since that would result in loss of logs + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile2Delete(Collections.emptySet()); + + final YarnConfiguration config = new YarnConfiguration(); + + ApplicationId appId = ApplicationId.newInstance(1357543L, 1); + + // we need a LogAggregationTFileController that throws a + // LogAggregationDFSException + LogAggregationTFileController format = + Mockito.mock(LogAggregationTFileController.class); + Mockito.doThrow(new LogAggregationDFSException()) + .when(format).closeWriter(); + + NodeManager.NMContext context = (NMContext) createContext(config); + context.setNMLogAggregationStatusTracker( + Mockito.mock(NMLogAggregationStatusTracker.class)); + + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(), + config, context, 1000L, deletionServiceWithExpectedFiles, format); + + appLogAggregator.startContainerLogAggregation( + new ContainerLogContext( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance(appId, 0), 0), + ContainerType.TASK, 0)); + // set app finished flag first + appLogAggregator.finishLogAggregation(); + appLogAggregator.run(); + + // verify that no files have been uploaded + ArgumentCaptor logValCaptor = + ArgumentCaptor.forClass(LogValue.class); + verify(appLogAggregator.getLogAggregationFileController()).write( + any(LogKey.class), logValCaptor.capture()); + Set filesUploaded = new HashSet<>(); + LogValue logValue = logValCaptor.getValue(); + for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) { + filesUploaded.add(file.getAbsolutePath()); + } + verifyFilesUploaded(filesUploaded, Collections.emptySet()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 39e403da5bf..42ba602bc8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -356,7 +356,7 @@ public void testSingleNodesXML() throws JSONException, Exception { } @Test (timeout = 5000) - public void testContainerLogsWithNewAPI() throws IOException, JSONException{ + public void testContainerLogsWithNewAPI() throws Exception { final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0); WebResource r = resource(); r = r.path("ws").path("v1").path("node").path("containers") @@ -365,7 +365,7 @@ public void testContainerLogsWithNewAPI() throws IOException, JSONException{ } @Test (timeout = 5000) - public void testContainerLogsWithOldAPI() throws IOException, JSONException{ + public void testContainerLogsWithOldAPI() throws Exception { final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1); WebResource r = resource(); r = r.path("ws").path("v1").path("node").path("containerlogs") @@ -538,7 +538,7 @@ public void testGetYarnGpuResourceInfo() } private void testContainerLogs(WebResource r, ContainerId containerId) - throws IOException { + throws Exception { final String containerIdStr = containerId.toString(); final ApplicationAttemptId appAttemptId = containerId .getApplicationAttemptId();