diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index 537807227d4..b98964816e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -40,6 +40,10 @@ hadoop-common provided + + org.apache.hadoop + hadoop-hdfs-client + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index af3066ebd15..81d50534979 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.io.Writable; @@ -547,7 +548,7 @@ public void append(LogKey logKey, LogValue logValue) throws IOException { } @Override - public void close() { + public void close() throws DSQuotaExceededException { try { if (writer != null) { writer.close(); @@ -555,7 +556,16 @@ public void close() { } catch (Exception e) { LOG.warn("Exception closing writer", e); } finally { - IOUtils.cleanupWithLogger(LOG, this.fsDataOStream); + try { + this.fsDataOStream.close(); + } catch (DSQuotaExceededException e) { + LOG.error("Exception in closing {}", + this.fsDataOStream.getClass(), e); + throw e; + } catch (Throwable e) { + LOG.error("Exception in closing {}", + this.fsDataOStream.getClass(), e); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index a4f50d2ebda..072c69df209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.HarFs; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; @@ -97,8 +99,13 @@ public void initializeWriter(LogAggregationFileControllerContext context) @Override public void closeWriter() { if (this.writer != null) { - this.writer.close(); - this.writer = null; + try { + this.writer.close(); + } catch (DSQuotaExceededException e) { + throw new LogAggregationDFSException(e); + } finally { + this.writer = null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index c7e06ff73b7..1e29111bf02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; @@ -67,11 +69,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -301,6 +305,7 @@ private void uploadLogsForContainers(boolean appFinished) { logAggregationTimes++; String diagnosticMessage = ""; boolean logAggregationSucceedInThisCycle = true; + DeletionTask deletionTask = null; try { try { logAggregationFileController.initializeWriter(logControllerContext); @@ -327,10 +332,9 @@ private void uploadLogsForContainers(boolean appFinished) { uploadedLogsInThisCycle = true; List uploadedFilePathsInThisCycleList = new ArrayList<>(); uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle); - DeletionTask deletionTask = new FileDeletionTask(delService, + deletionTask = new FileDeletionTask(delService, this.userUgi.getShortUserName(), null, uploadedFilePathsInThisCycleList); - delService.delete(deletionTask); } // This container is finished, and all its logs have been uploaded, @@ -356,9 +360,23 @@ private void uploadLogsForContainers(boolean appFinished) { logAggregationSucceedInThisCycle = false; } } finally { + LogAggregationDFSException exc = null; + try { + this.logAggregationFileController.closeWriter(); + } catch (LogAggregationDFSException e) { + diagnosticMessage = e.getMessage(); + renameTemporaryLogFileFailed = true; + logAggregationSucceedInThisCycle = false; + exc = e; + } + if (logAggregationSucceedInThisCycle && deletionTask != null) { + delService.delete(deletionTask); + } sendLogAggregationReport(logAggregationSucceedInThisCycle, diagnosticMessage, appFinished); - logAggregationFileController.closeWriter(); + if (exc != null) { + throw exc; + } } } @@ -418,8 +436,14 @@ private void sendLogAggregationReportInternal( public void run() { try { doAppLogAggregation(); + } catch (LogAggregationDFSException e) { + // if the log aggregation could not be performed due to DFS issues + // let's not clean up the log files, since that can result in + // loss of logs + LOG.error("Error occurred while aggregating the log for the application " + + appId, e); } catch (Exception e) { - // do post clean up of log directories on any exception + // do post clean up of log directories on any other exception LOG.error("Error occurred while aggregating the log for the application " + appId, e); doAppLogAggregationPostCleanUp(); @@ -452,6 +476,9 @@ private void doAppLogAggregation() { } catch (InterruptedException e) { LOG.warn("PendingContainers queue is interrupted"); this.appFinishing.set(true); + } catch (LogAggregationDFSException e) { + LOG.error("Error during log aggregation", e); + this.appFinishing.set(true); } } } @@ -460,10 +487,14 @@ private void doAppLogAggregation() { return; } - // App is finished, upload the container logs. - uploadLogsForContainers(true); + try { + // App is finished, upload the container logs. + uploadLogsForContainers(true); - doAppLogAggregationPostCleanUp(); + doAppLogAggregationPostCleanUp(); + } catch (LogAggregationDFSException e) { + LOG.error("Error during log aggregation", e); + } this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index e13c805f1e3..bb29a1caaf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.apache.hadoop.yarn.server.api.ContainerLogContext; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -42,7 +43,9 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -52,12 +55,14 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -228,10 +233,15 @@ public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload( config.setLong( YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs); + LogAggregationTFileController format = spy( + new LogAggregationTFileController()); + format.initialize(config, "TFile"); + + Context context = createContext(config); final AppLogAggregatorInTest appLogAggregator = createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(), - config, recoveredLogInitedTimeMillis, - deletionServiceWithExpectedFiles); + config, context, recoveredLogInitedTimeMillis, + deletionServiceWithExpectedFiles, format); appLogAggregator.startContainerLogAggregation( new ContainerLogContext(containerId, ContainerType.TASK, 0)); // set app finished flag first @@ -269,8 +279,9 @@ private static void verifyFilesUploaded(Set filesUploaded, private static AppLogAggregatorInTest createAppLogAggregator( ApplicationId applicationId, String rootLogDir, - YarnConfiguration config, long recoveredLogInitedTimeMillis, - DeletionService deletionServiceWithFilesToExpect) + YarnConfiguration config, Context context, long recoveredLogInitedTimeMillis, + DeletionService deletionServiceWithFilesToExpect, + LogAggregationTFileController tFileController) throws IOException { final Dispatcher dispatcher = createNullDispatcher(); @@ -284,16 +295,12 @@ private static AppLogAggregatorInTest createAppLogAggregator( final LogAggregationContext logAggregationContext = null; final Map appAcls = new HashMap<>(); - final Context context = createContext(config); final FileContext fakeLfs = mock(FileContext.class); final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath()); - LogAggregationTFileController format = spy( - new LogAggregationTFileController()); - format.initialize(config, "TFile"); return new AppLogAggregatorInTest(dispatcher, deletionService, config, applicationId, ugi, nodeId, dirsService, remoteLogDirForApp, appAcls, logAggregationContext, - context, fakeLfs, recoveredLogInitedTimeMillis, format); + context, fakeLfs, recoveredLogInitedTimeMillis, tFileController); } /** @@ -423,4 +430,53 @@ public AppLogAggregatorInTest(Dispatcher dispatcher, this.logValue = ArgumentCaptor.forClass(LogValue.class); } } + + @Test + public void testDFSQuotaExceeded() throws IOException { + + // the expectation is that no log files are deleted if the quota has + // been exceeded, since that would result in loss of logs + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile2Delete(Collections.emptySet()); + + final YarnConfiguration config = new YarnConfiguration(); + + ApplicationId appId = ApplicationId.newInstance(1357543L, 1); + + // we need a LogAggregationTFileController that throws a + // LogAggregationDFSException + LogAggregationTFileController format = + Mockito.mock(LogAggregationTFileController.class); + Mockito.doThrow(new LogAggregationDFSException()) + .when(format).closeWriter(); + + NodeManager.NMContext context = (NMContext) createContext(config); + context.setNMLogAggregationStatusTracker( + Mockito.mock(NMLogAggregationStatusTracker.class)); + + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(), + config, context, 1000L, deletionServiceWithExpectedFiles, format); + + appLogAggregator.startContainerLogAggregation( + new ContainerLogContext( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance(appId, 0), 0), + ContainerType.TASK, 0)); + // set app finished flag first + appLogAggregator.finishLogAggregation(); + appLogAggregator.run(); + + // verify that no files have been uploaded + ArgumentCaptor logValCaptor = + ArgumentCaptor.forClass(LogValue.class); + verify(appLogAggregator.getLogAggregationFileController()).write( + any(LogKey.class), logValCaptor.capture()); + Set filesUploaded = new HashSet<>(); + LogValue logValue = logValCaptor.getValue(); + for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) { + filesUploaded.add(file.getAbsolutePath()); + } + verifyFilesUploaded(filesUploaded , Collections.emptySet()); + } }