diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index acc4a05..a625606 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -666,7 +666,7 @@ public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; - + /** * How long to wait before deleting aggregated logs, -1 disables. * Be careful set this too small and you will spam the name node. @@ -676,6 +676,13 @@ public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1; /** + * How many logs NM could save per application. + */ + public static final String NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP + = NM_PREFIX + "history-log.retention-size.per-app"; + public static final int DEFAULT_NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP + = 30; + /** * How long to wait between aggregated log retention checks. If set to * a value <= 0 then the value is computed as one-tenth of the log retention * setting. Be careful set this too small and you will spam the name node. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 45af2ba..58d0a8b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -20,6 +20,10 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -32,6 +36,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; @@ -40,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; @@ -76,6 +83,7 @@ private final Path remoteNodeLogFileForApp; private final Path remoteNodeTmpLogFileForApp; private final ContainerLogsRetentionPolicy retentionPolicy; + private final NodeId nodeId; private final BlockingQueue pendingContainers; private final AtomicBoolean appFinishing = new AtomicBoolean(); @@ -84,13 +92,14 @@ private final Map appAcls; private final LogAggregationContext logAggregationContext; private final Context context; + private final int retentionSize; private Set currentUploadedFiles = new HashSet(); private Set alreadyUploadedLogs = new HashSet(); private Set currentExistingLogFiles = new HashSet(); public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, - ApplicationId appId, UserGroupInformation userUgi, + ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, ContainerLogsRetentionPolicy retentionPolicy, Map appAcls, @@ -102,6 +111,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.appId = appId; this.applicationId = ConverterUtils.toString(appId); this.userUgi = userUgi; + this.nodeId = nodeId; this.dirsHandler = dirsHandler; this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); @@ -110,6 +120,11 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.appAcls = appAcls; this.logAggregationContext = logAggregationContext; this.context = context; + this.retentionSize = + conf.getInt(YarnConfiguration + .NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP, + YarnConfiguration + .DEFAULT_NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP); } private void uploadLogsForContainer(ContainerId containerId, LogWriter writer) { @@ -194,6 +209,7 @@ public Object run() throws Exception { return null; } }); + cleanOldLogs(); } catch (Exception e) { LOG.error( "Failed to move temporary log file to final location: [" @@ -207,6 +223,52 @@ public Object run() throws Exception { } } + private void cleanOldLogs() { + try { + final FileSystem remoteFS = FileSystem.get(conf); + Path appDir = + this.remoteNodeLogFileForApp.getParent().makeQualified( + remoteFS.getUri(), remoteFS.getWorkingDirectory()); + Set status = + new HashSet(Arrays.asList(remoteFS.listStatus(appDir))); + + Iterable mask = + Iterables.filter(status, new Predicate() { + @Override + public boolean apply(FileStatus next) { + return next.getPath().getName() + .contains(LogAggregationUtils.getNodeString(nodeId)) + && !next.getPath().getName().contains( + LogAggregationUtils.TMP_FILE_SUFFIX); + } + }); + status = Sets.newHashSet(mask); + if (status.size() > this.retentionSize) { + List statusList = new ArrayList(status); + Collections.sort(statusList, new Comparator() { + public int compare(FileStatus s1, FileStatus s2) { + return s1.getModificationTime() < s2.getModificationTime() ? -1 + : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0; + } + }); + final FileStatus remove = statusList.get(0); + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + remoteFS.delete(remove.getPath(), false); + return null; + } + }); + } catch (Exception e) { + LOG.error("Failed to delete " + remove, e); + } + } + } catch (Exception e) { + LOG.error("Failed to clean old logs", e); + } + } + @Override public void run() { try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 772f3f1..1d6a9e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -342,7 +342,7 @@ protected void initAppAggregator(final ApplicationId appId, String user, // New application final AppLogAggregator appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, - getConfig(), appId, userUgi, dirsHandler, + getConfig(), appId, userUgi, this.nodeId, dirsHandler, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls, logAggregationContext, this.context); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index dbe72bf..9d12743 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -1143,6 +1143,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception { @SuppressWarnings("unchecked") @Test (timeout = 50000) public void testLogAggregationServiceWithInterval() throws Exception { + final int retentionSize = 1; LogAggregationContext logAggregationContextWithInterval = Records.newRecord(LogAggregationContext.class); logAggregationContextWithInterval.setRollingIntervalSeconds(5000); @@ -1151,6 +1152,10 @@ public void testLogAggregationServiceWithInterval() throws Exception { this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); + this.conf + .setInt(YarnConfiguration + .NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP, + retentionSize); DrainDispatcher dispatcher = createDispatcher(); EventHandler appEventHandler = mock(EventHandler.class); @@ -1199,6 +1204,8 @@ public void testLogAggregationServiceWithInterval() throws Exception { // Make sure the log is uploaded Thread.sleep(2000); + Assert.assertTrue(numOfLogsAvailable( + logAggregationService, application) == retentionSize); // Container logs should be uploaded verifyContainerLogs(logAggregationService, application, @@ -1208,6 +1215,8 @@ public void testLogAggregationServiceWithInterval() throws Exception { aggregator.doLogAggregationOutOfBand(); // Make sure there is no new log uploaded Thread.sleep(2000); + Assert.assertTrue(numOfLogsAvailable( + logAggregationService, application) == retentionSize); // Do log aggregation String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" }; @@ -1218,6 +1227,8 @@ public void testLogAggregationServiceWithInterval() throws Exception { // Make sure the log is uploaded Thread.sleep(2000); + Assert.assertTrue(numOfLogsAvailable( + logAggregationService, application) == retentionSize); // Container logs should be uploaded verifyContainerLogs(logAggregationService, application, @@ -1236,6 +1247,8 @@ public void testLogAggregationServiceWithInterval() throws Exception { // Make sure the log is uploaded Thread.sleep(2000); + Assert.assertTrue(numOfLogsAvailable( + logAggregationService, application) == retentionSize); verifyContainerLogs(logAggregationService, application, new ContainerId[] { container }, logFiles3, 3, true); @@ -1243,4 +1256,30 @@ public void testLogAggregationServiceWithInterval() throws Exception { assertEquals(0, logAggregationService.getNumAggregators()); dispatcher.stop(); } + + private int numOfLogsAvailable(LogAggregationService logAggregationService, + ApplicationId appId) throws IOException { + Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user); + RemoteIterator nodeFiles = null; + try { + Path qualifiedLogDir = + FileContext.getFileContext(this.conf).makeQualified(appLogDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf) + .listStatus(appLogDir); + } catch (FileNotFoundException fnf) { + return -1; + } + int count = 0; + while (nodeFiles.hasNext()) { + FileStatus status = nodeFiles.next(); + String filename = status.getPath().getName(); + if (filename.contains(LogAggregationUtils + .getNodeString(logAggregationService.getNodeId())) + && !filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) { + count++; + } + } + return count; + } }