diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index 537807227d4..b98964816e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -40,6 +40,10 @@
hadoop-common
provided
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index af3066ebd15..81d50534979 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable;
@@ -547,7 +548,7 @@ public void append(LogKey logKey, LogValue logValue) throws IOException {
}
@Override
- public void close() {
+ public void close() throws DSQuotaExceededException {
try {
if (writer != null) {
writer.close();
@@ -555,7 +556,16 @@ public void close() {
} catch (Exception e) {
LOG.warn("Exception closing writer", e);
} finally {
- IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
+ try {
+ this.fsDataOStream.close();
+ } catch (DSQuotaExceededException e) {
+ LOG.error("Exception in closing {}",
+ this.fsDataOStream.getClass(), e);
+ throw e;
+ } catch (Throwable e) {
+ LOG.error("Exception in closing {}",
+ this.fsDataOStream.getClass(), e);
+ }
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index aeef5748596..0f58c514468 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -167,8 +167,10 @@ public abstract void initializeWriter(
/**
* Close the writer.
+ * @throws LogAggregationDFSException if the closing of the writer fails
+ * (for example due to HDFS quota being exceeded)
*/
- public abstract void closeWriter();
+ public abstract void closeWriter() throws LogAggregationDFSException;
/**
* Write the log content.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
index a4f50d2ebda..e87af7f358a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -46,6 +47,7 @@
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
@@ -95,10 +97,15 @@ public void initializeWriter(LogAggregationFileControllerContext context)
}
@Override
- public void closeWriter() {
+ public void closeWriter() throws LogAggregationDFSException {
if (this.writer != null) {
- this.writer.close();
- this.writer = null;
+ try {
+ this.writer.close();
+ } catch (DSQuotaExceededException e) {
+ throw new LogAggregationDFSException(e);
+ } finally {
+ this.writer = null;
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index a12e2a152b0..47672822aca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -65,7 +65,7 @@ private TestContainerLogsUtils() {}
public static void createContainerLogFileInRemoteFS(Configuration conf,
FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId,
String fileName, String user, String content,
- boolean deleteRemoteLogDir) throws IOException {
+ boolean deleteRemoteLogDir) throws Exception {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
//prepare the logs for remote directory
ApplicationId appId = containerId.getApplicationAttemptId()
@@ -113,7 +113,7 @@ private static void createContainerLogInLocalDir(Path appLogsDir,
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List rootLogDirs, NodeId nodeId,
- ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
+ ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
LogAggregationFileControllerFactory factory
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index c7e06ff73b7..59568230e47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
@@ -263,7 +264,8 @@ private String getLogAggPolicyParameters(Configuration conf) {
return params;
}
- private void uploadLogsForContainers(boolean appFinished) {
+ private void uploadLogsForContainers(boolean appFinished)
+ throws LogAggregationDFSException {
if (this.logAggregationDisabled) {
return;
}
@@ -301,6 +303,7 @@ private void uploadLogsForContainers(boolean appFinished) {
logAggregationTimes++;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
+ DeletionTask deletionTask = null;
try {
try {
logAggregationFileController.initializeWriter(logControllerContext);
@@ -327,10 +330,9 @@ private void uploadLogsForContainers(boolean appFinished) {
uploadedLogsInThisCycle = true;
List uploadedFilePathsInThisCycleList = new ArrayList<>();
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
- DeletionTask deletionTask = new FileDeletionTask(delService,
+ deletionTask = new FileDeletionTask(delService,
this.userUgi.getShortUserName(), null,
uploadedFilePathsInThisCycleList);
- delService.delete(deletionTask);
}
// This container is finished, and all its logs have been uploaded,
@@ -356,9 +358,23 @@ private void uploadLogsForContainers(boolean appFinished) {
logAggregationSucceedInThisCycle = false;
}
} finally {
+ LogAggregationDFSException exc = null;
+ try {
+ this.logAggregationFileController.closeWriter();
+ } catch (LogAggregationDFSException e) {
+ diagnosticMessage = e.getMessage();
+ renameTemporaryLogFileFailed = true;
+ logAggregationSucceedInThisCycle = false;
+ exc = e;
+ }
+ if (logAggregationSucceedInThisCycle && deletionTask != null) {
+ delService.delete(deletionTask);
+ }
sendLogAggregationReport(logAggregationSucceedInThisCycle,
diagnosticMessage, appFinished);
- logAggregationFileController.closeWriter();
+ if (exc != null) {
+ throw exc;
+ }
}
}
@@ -413,13 +429,18 @@ private void sendLogAggregationReportInternal(
diagnosticMessage, finalized);
}
- @SuppressWarnings("unchecked")
@Override
public void run() {
try {
doAppLogAggregation();
+ } catch (LogAggregationDFSException e) {
+ // if the log aggregation could not be performed due to DFS issues
+ // let's not clean up the log files, since that can result in
+ // loss of logs
+ LOG.error("Error occurred while aggregating the log for the application "
+ + appId, e);
} catch (Exception e) {
- // do post clean up of log directories on any exception
+ // do post clean up of log directories on any other exception
LOG.error("Error occurred while aggregating the log for the application "
+ appId, e);
doAppLogAggregationPostCleanUp();
@@ -434,8 +455,7 @@ public void run() {
}
}
- @SuppressWarnings("unchecked")
- private void doAppLogAggregation() {
+ private void doAppLogAggregation() throws LogAggregationDFSException {
while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
@@ -452,6 +472,9 @@ private void doAppLogAggregation() {
} catch (InterruptedException e) {
LOG.warn("PendingContainers queue is interrupted");
this.appFinishing.set(true);
+ } catch (LogAggregationDFSException e) {
+ this.appFinishing.set(true);
+ throw e;
}
}
}
@@ -460,10 +483,14 @@ private void doAppLogAggregation() {
return;
}
- // App is finished, upload the container logs.
- uploadLogsForContainers(true);
+ try {
+ // App is finished, upload the container logs.
+ uploadLogsForContainers(true);
- doAppLogAggregationPostCleanUp();
+ doAppLogAggregationPostCleanUp();
+ } catch (LogAggregationDFSException e) {
+ LOG.error("Error during log aggregation", e);
+ }
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index e13c805f1e3..95f4c320cbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -42,7 +43,9 @@
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -52,12 +55,14 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -228,10 +233,15 @@ public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
config.setLong(
YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs);
+ LogAggregationTFileController format = spy(
+ new LogAggregationTFileController());
+ format.initialize(config, "TFile");
+
+ Context context = createContext(config);
final AppLogAggregatorInTest appLogAggregator =
createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
- config, recoveredLogInitedTimeMillis,
- deletionServiceWithExpectedFiles);
+ config, context, recoveredLogInitedTimeMillis,
+ deletionServiceWithExpectedFiles, format);
appLogAggregator.startContainerLogAggregation(
new ContainerLogContext(containerId, ContainerType.TASK, 0));
// set app finished flag first
@@ -269,8 +279,10 @@ private static void verifyFilesUploaded(Set filesUploaded,
private static AppLogAggregatorInTest createAppLogAggregator(
ApplicationId applicationId, String rootLogDir,
- YarnConfiguration config, long recoveredLogInitedTimeMillis,
- DeletionService deletionServiceWithFilesToExpect)
+ YarnConfiguration config, Context context,
+ long recoveredLogInitedTimeMillis,
+ DeletionService deletionServiceWithFilesToExpect,
+ LogAggregationTFileController tFileController)
throws IOException {
final Dispatcher dispatcher = createNullDispatcher();
@@ -284,16 +296,12 @@ private static AppLogAggregatorInTest createAppLogAggregator(
final LogAggregationContext logAggregationContext = null;
final Map appAcls = new HashMap<>();
- final Context context = createContext(config);
final FileContext fakeLfs = mock(FileContext.class);
final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
- LogAggregationTFileController format = spy(
- new LogAggregationTFileController());
- format.initialize(config, "TFile");
return new AppLogAggregatorInTest(dispatcher, deletionService,
config, applicationId, ugi, nodeId, dirsService,
remoteLogDirForApp, appAcls, logAggregationContext,
- context, fakeLfs, recoveredLogInitedTimeMillis, format);
+ context, fakeLfs, recoveredLogInitedTimeMillis, tFileController);
}
/**
@@ -423,4 +431,53 @@ public AppLogAggregatorInTest(Dispatcher dispatcher,
this.logValue = ArgumentCaptor.forClass(LogValue.class);
}
}
+
+ @Test
+ public void testDFSQuotaExceeded() throws Exception {
+
+ // the expectation is that no log files are deleted if the quota has
+ // been exceeded, since that would result in loss of logs
+ DeletionService deletionServiceWithExpectedFiles =
+ createDeletionServiceWithExpectedFile2Delete(Collections.emptySet());
+
+ final YarnConfiguration config = new YarnConfiguration();
+
+ ApplicationId appId = ApplicationId.newInstance(1357543L, 1);
+
+ // we need a LogAggregationTFileController that throws a
+ // LogAggregationDFSException
+ LogAggregationTFileController format =
+ Mockito.mock(LogAggregationTFileController.class);
+ Mockito.doThrow(new LogAggregationDFSException())
+ .when(format).closeWriter();
+
+ NodeManager.NMContext context = (NMContext) createContext(config);
+ context.setNMLogAggregationStatusTracker(
+ Mockito.mock(NMLogAggregationStatusTracker.class));
+
+ final AppLogAggregatorInTest appLogAggregator =
+ createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
+ config, context, 1000L, deletionServiceWithExpectedFiles, format);
+
+ appLogAggregator.startContainerLogAggregation(
+ new ContainerLogContext(
+ ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(appId, 0), 0),
+ ContainerType.TASK, 0));
+ // set app finished flag first
+ appLogAggregator.finishLogAggregation();
+ appLogAggregator.run();
+
+ // verify that no files have been uploaded
+ ArgumentCaptor logValCaptor =
+ ArgumentCaptor.forClass(LogValue.class);
+ verify(appLogAggregator.getLogAggregationFileController()).write(
+ any(LogKey.class), logValCaptor.capture());
+ Set filesUploaded = new HashSet<>();
+ LogValue logValue = logValCaptor.getValue();
+ for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
+ filesUploaded.add(file.getAbsolutePath());
+ }
+ verifyFilesUploaded(filesUploaded, Collections.emptySet());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 39e403da5bf..42ba602bc8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -356,7 +356,7 @@ public void testSingleNodesXML() throws JSONException, Exception {
}
@Test (timeout = 5000)
- public void testContainerLogsWithNewAPI() throws IOException, JSONException{
+ public void testContainerLogsWithNewAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containers")
@@ -365,7 +365,7 @@ public void testContainerLogsWithNewAPI() throws IOException, JSONException{
}
@Test (timeout = 5000)
- public void testContainerLogsWithOldAPI() throws IOException, JSONException{
+ public void testContainerLogsWithOldAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containerlogs")
@@ -538,7 +538,7 @@ public void testGetYarnGpuResourceInfo()
}
private void testContainerLogs(WebResource r, ContainerId containerId)
- throws IOException {
+ throws Exception {
final String containerIdStr = containerId.toString();
final ApplicationAttemptId appAttemptId = containerId
.getApplicationAttemptId();