diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1a2aa1d..a26bf83 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -725,6 +725,13 @@
public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs";
/**
+ * Number of seconds that logAggregationSerivce uploads container logs.
+ */
+ public static final String NM_LOG_ROLLING_INTERVAL_SECONDS =
+ NM_PREFIX + "log.rolling-interval-seconds";
+ public static final long DEFAULT_NM_LOG_ROLLING_INTERVAL_SECONDS = -1;
+
+ /**
* The remote log dir will be created at
* NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId}
*/
@@ -1098,7 +1105,7 @@
* OS environment expansion syntax.
*
*
- * Note: Use {@link DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH} for
+ * Note: Use {@link #DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH} for
* cross-platform practice i.e. submit an application from a Windows client to
* a Linux/Unix server or vice versa.
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index 590cfe2..e2a057d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -29,9 +29,21 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* A service that periodically deletes aggregated logs.
@@ -42,8 +54,9 @@
private Timer timer = null;
private long checkIntervalMsecs;
+ private ApplicationClientProtocol rmClient;
- static class LogDeletionTask extends TimerTask {
+ private class LogDeletionTask extends TimerTask {
private Configuration conf;
private long retentionMillis;
private String suffix = null;
@@ -64,9 +77,8 @@ public void run() {
LOG.info("aggregated log deletion started.");
try {
FileSystem fs = remoteRootLogDir.getFileSystem(conf);
-
- for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
- if(userDir.isDirectory()) {
+ for (FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
+ if (userDir.isDirectory()) {
Path userDirPath = new Path(userDir.getPath(), suffix);
deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs);
}
@@ -78,19 +90,37 @@ public void run() {
LOG.info("aggregated log deletion finished.");
}
- private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
+ private void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
FileSystem fs) {
try {
for(FileStatus appDir : fs.listStatus(dir)) {
if(appDir.isDirectory() &&
appDir.getModificationTime() < cutoffMillis) {
- if(shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
+ boolean appTerminated =
+ isApplicationTerminated(ConverterUtils.toApplicationId(appDir
+ .getPath().getName()));
+ if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
try {
LOG.info("Deleting aggregated logs in "+appDir.getPath());
fs.delete(appDir.getPath(), true);
} catch (IOException e) {
logIOException("Could not delete "+appDir.getPath(), e);
}
+ } else if (! appTerminated){
+ try {
+ for(FileStatus node: fs.listStatus(appDir.getPath())) {
+ if(node.getModificationTime() < cutoffMillis) {
+ try {
+ fs.delete(node.getPath(), true);
+ } catch (IOException ex) {
+ logIOException("Could not delete "+appDir.getPath(), ex);
+ }
+ }
+ }
+ } catch(IOException e) {
+ logIOException(
+ "Error reading the contents of " + appDir.getPath(), e);
+ }
}
}
}
@@ -99,7 +129,7 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
}
}
- private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis,
+ private boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis,
FileSystem fs) {
boolean shouldDelete = true;
try {
@@ -117,7 +147,7 @@ private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis,
}
}
- private static void logIOException(String comment, IOException e) {
+ private void logIOException(String comment, IOException e) {
if(e instanceof AccessControlException) {
String message = e.getMessage();
//TODO fix this after HADOOP-8661
@@ -134,12 +164,14 @@ public AggregatedLogDeletionService() {
@Override
protected void serviceStart() throws Exception {
+ rmClient = creatRMClient();
scheduleLogDeletionTask();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
+ stopRMClient();
stopTimer();
super.serviceStop();
}
@@ -201,4 +233,36 @@ public long getCheckIntervalMsecs() {
protected Configuration createConf() {
return new Configuration();
}
+
+ private boolean isApplicationTerminated(ApplicationId appId)
+ throws IOException {
+ ApplicationReport appReport = null;
+ try {
+ appReport =
+ rmClient.getApplicationReport(
+ GetApplicationReportRequest.newInstance(appId))
+ .getApplicationReport();
+ } catch (ApplicationNotFoundException e) {
+ return true;
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ YarnApplicationState currentState = appReport.getYarnApplicationState();
+ return currentState == YarnApplicationState.FAILED
+ || currentState == YarnApplicationState.KILLED
+ || currentState == YarnApplicationState.FINISHED;
+ }
+
+ @VisibleForTesting
+ protected ApplicationClientProtocol creatRMClient() throws IOException {
+ return ClientRMProxy.createRMProxy(getConfig(),
+ ApplicationClientProtocol.class);
+ }
+
+ @VisibleForTesting
+ protected void stopRMClient() {
+ if (rmClient != null) {
+ RPC.stopProxy(rmClient);
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index 05c7e71..30e9bf7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -20,6 +20,9 @@
import java.io.IOException;
import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -27,6 +30,12 @@
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +60,7 @@ public void testDeletion() throws Exception {
String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs";
- Configuration conf = new Configuration();
+ final Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
@@ -69,22 +78,37 @@ public void testDeletion() throws Exception {
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
-
+
+ ApplicationId appId1 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
- Path app1Dir = new Path(userLogDir, "application_1_1");
+ Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
- Path app2Dir = new Path(userLogDir, "application_1_2");
+ ApplicationId appId2 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 2);
+ Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir);
- Path app3Dir = new Path(userLogDir, "application_1_3");
+ ApplicationId appId3 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 3);
+ Path app3Dir = new Path(userLogDir, appId3.toString());
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir);
- Path app4Dir = new Path(userLogDir, "application_1_4");
+ ApplicationId appId4 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 4);
+ Path app4Dir = new Path(userLogDir, appId4.toString());
FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
+ ApplicationId appId5 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 5);
+ Path app5Dir = new Path(userLogDir, appId5.toString());
+ FileStatus app5DirStatus =
+ new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir);
+
when(mockFs.listStatus(userLogDir)).thenReturn(
- new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus, app4DirStatus});
+ new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus,
+ app4DirStatus, app5DirStatus });
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{});
@@ -117,20 +141,55 @@ public void testDeletion() throws Exception {
when(mockFs.listStatus(app4Dir)).thenReturn(
new FileStatus[]{app4Log1Status, app4Log2Status});
+
+ Path app5Log1 = new Path(app5Dir, "host1");
+ FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1);
- AggregatedLogDeletionService.LogDeletionTask task =
- new AggregatedLogDeletionService.LogDeletionTask(conf, 1800);
-
- task.run();
-
- verify(mockFs).delete(app1Dir, true);
- verify(mockFs, times(0)).delete(app2Dir, true);
- verify(mockFs).delete(app3Dir, true);
- verify(mockFs).delete(app4Dir, true);
+ Path app5Log2 = new Path(app5Dir, "host2");
+ FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2);
+
+ when(mockFs.listStatus(app5Dir)).thenReturn(
+ new FileStatus[]{app5Log1Status, app5Log2Status});
+
+ final List finishedApplications =
+ Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3,
+ appId4));
+ final List runningApplications =
+ Collections.unmodifiableList(Arrays.asList(appId5));
+
+ AggregatedLogDeletionService deletionService =
+ new AggregatedLogDeletionService() {
+ @Override
+ protected ApplicationClientProtocol creatRMClient()
+ throws IOException {
+ try {
+ return createMockRMClient(finishedApplications,
+ runningApplications);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ @Override
+ protected void stopRMClient() {
+ // DO NOTHING
+ }
+ };
+ deletionService.init(conf);
+ deletionService.start();
+
+ verify(mockFs, timeout(2000)).delete(app1Dir, true);
+ verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
+ verify(mockFs, timeout(2000)).delete(app3Dir, true);
+ verify(mockFs, timeout(2000)).delete(app4Dir, true);
+ verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true);
+ verify(mockFs, timeout(2000)).delete(app5Log1, true);
+ verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true);
+
+ deletionService.stop();
}
@Test
- public void testRefreshLogRetentionSettings() throws IOException {
+ public void testRefreshLogRetentionSettings() throws Exception {
long now = System.currentTimeMillis();
//time before 2000 sec
long before2000Secs = now - (2000 * 1000);
@@ -163,13 +222,17 @@ public void testRefreshLogRetentionSettings() throws IOException {
Path userLogDir = new Path(userDir, suffix);
+ ApplicationId appId1 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
//Set time last modified of app1Dir directory and its files to before2000Secs
- Path app1Dir = new Path(userLogDir, "application_1_1");
+ Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
app1Dir);
+ ApplicationId appId2 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 2);
//Set time last modified of app1Dir directory and its files to before50Secs
- Path app2Dir = new Path(userLogDir, "application_1_2");
+ Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
app2Dir);
@@ -190,11 +253,27 @@ public void testRefreshLogRetentionSettings() throws IOException {
when(mockFs.listStatus(app2Dir)).thenReturn(
new FileStatus[] { app2Log1Status });
+ final List finishedApplications =
+ Collections.unmodifiableList(Arrays.asList(appId1, appId2));
+
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
@Override
protected Configuration createConf() {
return conf;
}
+ @Override
+ protected ApplicationClientProtocol creatRMClient()
+ throws IOException {
+ try {
+ return createMockRMClient(finishedApplications, null);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ @Override
+ protected void stopRMClient() {
+ // DO NOTHING
+ }
};
deletionSvc.init(conf);
@@ -253,8 +332,10 @@ public void testCheckInterval() throws Exception {
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
+ ApplicationId appId1 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
- Path app1Dir = new Path(userLogDir, "application_1_1");
+ Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
@@ -266,8 +347,25 @@ public void testCheckInterval() throws Exception {
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{app1Log1Status});
+ final List finishedApplications =
+ Collections.unmodifiableList(Arrays.asList(appId1));
+
AggregatedLogDeletionService deletionSvc =
- new AggregatedLogDeletionService();
+ new AggregatedLogDeletionService() {
+ @Override
+ protected ApplicationClientProtocol creatRMClient()
+ throws IOException {
+ try {
+ return createMockRMClient(finishedApplications, null);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ @Override
+ protected void stopRMClient() {
+ // DO NOTHING
+ }
+ };
deletionSvc.init(conf);
deletionSvc.start();
@@ -286,11 +384,61 @@ public void testCheckInterval() throws Exception {
deletionSvc.stop();
}
-
+
static class MockFileSystem extends FilterFileSystem {
MockFileSystem() {
super(mock(FileSystem.class));
}
public void initialize(URI name, Configuration conf) throws IOException {}
}
+
+ private ApplicationClientProtocol createMockRMClient(
+ List finishedApplicaitons,
+ List runningApplications) throws Exception {
+ final ApplicationClientProtocol mockProtocol =
+ mock(ApplicationClientProtocol.class);
+ if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) {
+ for (ApplicationId appId : finishedApplicaitons) {
+ GetApplicationReportRequest request =
+ GetApplicationReportRequest.newInstance(appId);
+ GetApplicationReportResponse response =
+ createApplicationReportWithFinishedApplication();
+ when(mockProtocol.getApplicationReport(request))
+ .thenReturn(response);
+ }
+ }
+ if (runningApplications != null && !runningApplications.isEmpty()) {
+ for (ApplicationId appId : runningApplications) {
+ GetApplicationReportRequest request =
+ GetApplicationReportRequest.newInstance(appId);
+ GetApplicationReportResponse response =
+ createApplicationReportWithRunningApplication();
+ when(mockProtocol.getApplicationReport(request))
+ .thenReturn(response);
+ }
+ }
+ return mockProtocol;
+ }
+
+ private GetApplicationReportResponse
+ createApplicationReportWithRunningApplication() {
+ ApplicationReport report = mock(ApplicationReport.class);
+ when(report.getYarnApplicationState()).thenReturn(
+ YarnApplicationState.RUNNING);
+ GetApplicationReportResponse response =
+ mock(GetApplicationReportResponse.class);
+ when(response.getApplicationReport()).thenReturn(report);
+ return response;
+ }
+
+ private GetApplicationReportResponse
+ createApplicationReportWithFinishedApplication() {
+ ApplicationReport report = mock(ApplicationReport.class);
+ when(report.getYarnApplicationState()).thenReturn(
+ YarnApplicationState.FINISHED);
+ GetApplicationReportResponse response =
+ mock(GetApplicationReportResponse.class);
+ when(response.getApplicationReport()).thenReturn(report);
+ return response;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 318caf2..c09611e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -20,6 +20,10 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -33,6 +37,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
@@ -41,6 +46,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@@ -65,6 +72,10 @@
private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
+ // This is temporary solution. The configuration will be deleted once
+ // we find a more scalable method.
+ private static final String NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP
+ = YarnConfiguration.NM_PREFIX + "history-log.retention-size.per-app";
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
@@ -85,13 +96,16 @@
private final Map appAcls;
private final LogAggregationContext logAggregationContext;
private final Context context;
+ private final int retentionSize;
+ private final long rollingInterval;
+ private final NodeId nodeId;
private final Map containerLogAggregators =
new HashMap();
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
- ApplicationId appId, UserGroupInformation userUgi,
+ ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy,
Map appAcls,
@@ -111,6 +125,34 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.appAcls = appAcls;
this.logAggregationContext = logAggregationContext;
this.context = context;
+ this.nodeId = nodeId;
+ int configuredRentionSize =
+ conf.getInt(NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP, 30);
+ if (configuredRentionSize <= 0) {
+ this.retentionSize = 30;
+ } else {
+ this.retentionSize = configuredRentionSize;
+ }
+ long configuredInterval =
+ conf.getLong(YarnConfiguration.NM_LOG_ROLLING_INTERVAL_SECONDS,
+ YarnConfiguration.DEFAULT_NM_LOG_ROLLING_INTERVAL_SECONDS);
+ if (configuredInterval > 0 && configuredInterval < 3600) {
+ LOG.warn("Log Rolling Interval should be more than "
+ + "or equal to 3600 seconds. Using 3600 seconds instead.");
+ this.rollingInterval = 3600;
+ } else {
+ if (configuredInterval <= 0) {
+ LOG.warn("LogRollingInterval is set as " + configuredInterval + ". "
+ + YarnConfiguration.NM_LOG_ROLLING_INTERVAL_SECONDS
+ + " is disabled. The logs will be aggregated "
+ + "after this application is finished.");
+ } else {
+ LOG.warn("LogRollingInterval is set as " + configuredInterval + ". "
+ + "The logs will be aggregated every " + configuredInterval
+ + " seconds");
+ }
+ this.rollingInterval = configuredInterval;
+ }
}
private void uploadLogsForContainers() {
@@ -181,12 +223,15 @@ private void uploadLogsForContainers() {
}
}
+ if (uploadedLogsInThisCycle) {
+ cleanOldLogs();
+ }
+
if (writer != null) {
writer.close();
}
- final Path renamedPath = logAggregationContext == null ||
- logAggregationContext.getRollingIntervalSeconds() <= 0
+ final Path renamedPath = this.rollingInterval <= 0
? remoteNodeLogFileForApp : new Path(
remoteNodeLogFileForApp.getParent(),
remoteNodeLogFileForApp.getName() + "_"
@@ -198,9 +243,12 @@ private void uploadLogsForContainers() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = FileSystem.get(conf);
- if (remoteFS.exists(remoteNodeTmpLogFileForApp)
- && rename) {
- remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
+ if (remoteFS.exists(remoteNodeTmpLogFileForApp)) {
+ if (rename) {
+ remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
+ } else {
+ remoteFS.delete(remoteNodeTmpLogFileForApp, false);
+ }
}
return null;
}
@@ -218,6 +266,56 @@ public Object run() throws Exception {
}
}
+ private void cleanOldLogs() {
+ try {
+ final FileSystem remoteFS =
+ this.remoteNodeLogFileForApp.getFileSystem(conf);
+ Path appDir =
+ this.remoteNodeLogFileForApp.getParent().makeQualified(
+ remoteFS.getUri(), remoteFS.getWorkingDirectory());
+ Set status =
+ new HashSet(Arrays.asList(remoteFS.listStatus(appDir)));
+
+ Iterable mask =
+ Iterables.filter(status, new Predicate() {
+ @Override
+ public boolean apply(FileStatus next) {
+ return next.getPath().getName()
+ .contains(LogAggregationUtils.getNodeString(nodeId))
+ && !next.getPath().getName().contains(
+ LogAggregationUtils.TMP_FILE_SUFFIX);
+ }
+ });
+ status = Sets.newHashSet(mask);
+ if (status.size() >= this.retentionSize) {
+ // sort by the lastModificationTime ascending
+ List statusList = new ArrayList(status);
+ Collections.sort(statusList, new Comparator() {
+ public int compare(FileStatus s1, FileStatus s2) {
+ return s1.getModificationTime() < s2.getModificationTime() ? -1
+ : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
+ }
+ });
+ for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
+ final FileStatus remove = statusList.get(i);
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Object run() throws Exception {
+ remoteFS.delete(remove.getPath(), false);
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Failed to delete " + remove.getPath(), e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to clean old logs", e);
+ }
+ }
+
@Override
public void run() {
try {
@@ -235,9 +333,8 @@ private void doAppLogAggregation() {
while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
- if (this.logAggregationContext != null && this.logAggregationContext
- .getRollingIntervalSeconds() > 0) {
- wait(this.logAggregationContext.getRollingIntervalSeconds() * 1000);
+ if (this.rollingInterval > 0) {
+ wait(this.rollingInterval * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 772f3f1..1d6a9e1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -342,7 +342,7 @@ protected void initAppAggregator(final ApplicationId appId, String user,
// New application
final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
- getConfig(), appId, userUgi, dirsHandler,
+ getConfig(), appId, userUgi, this.nodeId, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
appAcls, logAggregationContext, this.context);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 36c54dc..f17cbd5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -699,7 +699,7 @@ private void writeContainerLogs(File appLogDir, ContainerId containerId,
}
}
- private void verifyContainerLogs(LogAggregationService logAggregationService,
+ private String verifyContainerLogs(LogAggregationService logAggregationService,
ApplicationId appId, ContainerId[] expectedContainerIds,
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
throws IOException {
@@ -811,6 +811,7 @@ private void verifyContainerLogs(LogAggregationService logAggregationService,
Assert.assertEquals(0, thisContainerMap.size());
}
Assert.assertEquals(0, logMap.size());
+ return targetNodeFile.getPath().getName();
} finally {
reader.close();
}
@@ -1219,17 +1220,32 @@ public void testLogAggregationServiceWithPatterns() throws Exception {
dispatcher.stop();
}
- @SuppressWarnings("unchecked")
@Test (timeout = 50000)
public void testLogAggregationServiceWithInterval() throws Exception {
- final int maxAttempts = 50;
+ testLogAggregationService(false);
+ }
+
+ @Test (timeout = 50000)
+ public void testLogAggregationServiceWithRetention() throws Exception {
+ testLogAggregationService(true);
+ }
+
+ private void testLogAggregationService(boolean retentionSizeLimitation)
+ throws Exception {
LogAggregationContext logAggregationContextWithInterval =
Records.newRecord(LogAggregationContext.class);
- logAggregationContextWithInterval.setRollingIntervalSeconds(5000);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
+ this.conf.setLong(YarnConfiguration.NM_LOG_ROLLING_INTERVAL_SECONDS, 3600);
+ if (retentionSizeLimitation) {
+ // set the retention size as 1. The number of logs for one application
+ // in one NM should be 1.
+ this.conf.setInt(YarnConfiguration.NM_PREFIX
+ + "history-log.retention-size.per-app", 1);
+ }
+
// by setting this configuration, the log files will not be deleted immediately after
// they are aggregated to remote directory.
// We could use it to test whether the previous aggregated log files will be aggregated
@@ -1280,23 +1296,29 @@ public void testLogAggregationServiceWithInterval() throws Exception {
.get(application);
aggregator.doLogAggregationOutOfBand();
- int count = 0;
- while (numOfLogsAvailable(logAggregationService, application) != 1
- && count <= maxAttempts) {
- Thread.sleep(100);
- count++;
+ if (retentionSizeLimitation) {
+ Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+ 50, 1, true, null));
+ } else {
+ Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+ 50, 1, false, null));
}
+ String logFileInLastCycle = null;
// Container logs should be uploaded
- verifyContainerLogs(logAggregationService, application,
+ logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles1, 3, true);
+ Thread.sleep(2000);
+
// There is no log generated at this time. Do the log aggregation again.
aggregator.doLogAggregationOutOfBand();
// Same logs will not be aggregated again.
// Only one aggregated log file in Remote file directory.
- Assert.assertEquals(numOfLogsAvailable(logAggregationService, application),
- 1);
+ Assert.assertEquals(numOfLogsAvailable(logAggregationService,
+ application, true, null), 1);
+
+ Thread.sleep(2000);
// Do log aggregation
String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" };
@@ -1304,16 +1326,19 @@ public void testLogAggregationServiceWithInterval() throws Exception {
aggregator.doLogAggregationOutOfBand();
- count = 0;
- while (numOfLogsAvailable(logAggregationService, application) != 2
- && count <= maxAttempts) {
- Thread.sleep(100);
- count ++;
+ if (retentionSizeLimitation) {
+ Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+ 50, 1, true, logFileInLastCycle));
+ } else {
+ Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+ 50, 2, false, null));
}
// Container logs should be uploaded
- verifyContainerLogs(logAggregationService, application,
+ logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles2, 3, true);
+ Thread.sleep(2000);
+
// create another logs
String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" };
writeContainerLogs(appLogDir, container, logFiles3);
@@ -1323,13 +1348,13 @@ public void testLogAggregationServiceWithInterval() throws Exception {
dispatcher.await();
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
- count = 0;
- while (numOfLogsAvailable(logAggregationService, application) != 3
- && count <= maxAttempts) {
- Thread.sleep(100);
- count ++;
+ if (retentionSizeLimitation) {
+ Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+ 50, 1, true, logFileInLastCycle));
+ } else {
+ Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+ 50, 3, false, null));
}
-
verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles3, 3, true);
logAggregationService.stop();
@@ -1338,7 +1363,8 @@ public void testLogAggregationServiceWithInterval() throws Exception {
}
private int numOfLogsAvailable(LogAggregationService logAggregationService,
- ApplicationId appId) throws IOException {
+ ApplicationId appId, boolean sizeLimited, String lastLogFile)
+ throws IOException {
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
RemoteIterator nodeFiles = null;
try {
@@ -1354,7 +1380,9 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService,
while (nodeFiles.hasNext()) {
FileStatus status = nodeFiles.next();
String filename = status.getPath().getName();
- if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) {
+ if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)
+ || (lastLogFile != null && filename.contains(lastLogFile)
+ && sizeLimited)) {
return -1;
}
if (filename.contains(LogAggregationUtils
@@ -1364,4 +1392,18 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService,
}
return count;
}
+
+ private boolean waitAndCheckLogNum(
+ LogAggregationService logAggregationService, ApplicationId application,
+ int maxAttempts, int expectNum, boolean sizeLimited, String lastLogFile)
+ throws IOException, InterruptedException {
+ int count = 0;
+ while (numOfLogsAvailable(logAggregationService, application, sizeLimited,
+ lastLogFile) != expectNum && count <= maxAttempts) {
+ Thread.sleep(500);
+ count++;
+ }
+ return numOfLogsAvailable(logAggregationService, application, sizeLimited,
+ lastLogFile) == expectNum;
+ }
}