diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1a2aa1d..8367cc8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -725,6 +725,13 @@
public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs";
/**
+ * Define how often the logAggregationSerivce uploads container logs in seconds.
+ */
+ public static final String NM_LOG_ROLLING_INTERVAL_SECONDS =
+ NM_PREFIX + "log.rolling-interval-seconds";
+ public static final long DEFAULT_NM_LOG_ROLLING_INTERVAL_SECONDS = -1;
+
+ /**
* The remote log dir will be created at
* NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId}
*/
@@ -1098,7 +1105,7 @@
* OS environment expansion syntax.
*
*
- * Note: Use {@link DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH} for
+ * Note: Use {@link #DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH} for
* cross-platform practice i.e. submit an application from a Windows client to
* a Linux/Unix server or vice versa.
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index 590cfe2..4c1d152 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -24,38 +24,53 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* A service that periodically deletes aggregated logs.
*/
-@Private
+@InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"})
public class AggregatedLogDeletionService extends AbstractService {
private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
private Timer timer = null;
private long checkIntervalMsecs;
+ private LogDeletionTask task;
static class LogDeletionTask extends TimerTask {
private Configuration conf;
private long retentionMillis;
private String suffix = null;
private Path remoteRootLogDir = null;
+ private ApplicationClientProtocol rmClient = null;
- public LogDeletionTask(Configuration conf, long retentionSecs) {
+ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) {
this.conf = conf;
this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ this.rmClient = rmClient;
}
@Override
@@ -64,11 +79,10 @@ public void run() {
LOG.info("aggregated log deletion started.");
try {
FileSystem fs = remoteRootLogDir.getFileSystem(conf);
-
for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
if(userDir.isDirectory()) {
Path userDirPath = new Path(userDir.getPath(), suffix);
- deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs);
+ deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
}
}
} catch (IOException e) {
@@ -79,18 +93,36 @@ public void run() {
}
private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
- FileSystem fs) {
+ FileSystem fs, ApplicationClientProtocol rmClient) {
try {
for(FileStatus appDir : fs.listStatus(dir)) {
if(appDir.isDirectory() &&
appDir.getModificationTime() < cutoffMillis) {
- if(shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
+ boolean appTerminated =
+ isApplicationTerminated(ConverterUtils.toApplicationId(appDir
+ .getPath().getName()), rmClient);
+ if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
try {
LOG.info("Deleting aggregated logs in "+appDir.getPath());
fs.delete(appDir.getPath(), true);
} catch (IOException e) {
logIOException("Could not delete "+appDir.getPath(), e);
}
+ } else if (!appTerminated){
+ try {
+ for(FileStatus node: fs.listStatus(appDir.getPath())) {
+ if(node.getModificationTime() < cutoffMillis) {
+ try {
+ fs.delete(node.getPath(), true);
+ } catch (IOException ex) {
+ logIOException("Could not delete "+appDir.getPath(), ex);
+ }
+ }
+ }
+ } catch(IOException e) {
+ logIOException(
+ "Error reading the contents of " + appDir.getPath(), e);
+ }
}
}
}
@@ -115,6 +147,29 @@ private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis,
}
return shouldDelete;
}
+
+ private static boolean isApplicationTerminated(ApplicationId appId,
+ ApplicationClientProtocol rmClient) throws IOException {
+ ApplicationReport appReport = null;
+ try {
+ appReport =
+ rmClient.getApplicationReport(
+ GetApplicationReportRequest.newInstance(appId))
+ .getApplicationReport();
+ } catch (ApplicationNotFoundException e) {
+ return true;
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ YarnApplicationState currentState = appReport.getYarnApplicationState();
+ return currentState == YarnApplicationState.FAILED
+ || currentState == YarnApplicationState.KILLED
+ || currentState == YarnApplicationState.FINISHED;
+ }
+
+ public ApplicationClientProtocol getRMClient() {
+ return this.rmClient;
+ }
}
private static void logIOException(String comment, IOException e) {
@@ -140,6 +195,7 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
+ stopRMClient();
stopTimer();
super.serviceStop();
}
@@ -156,10 +212,11 @@ private void setLogAggCheckIntervalMsecs(long retentionSecs) {
}
}
- public void refreshLogRetentionSettings() {
+ public void refreshLogRetentionSettings() throws IOException {
if (getServiceState() == STATE.STARTED) {
Configuration conf = createConf();
setConfig(conf);
+ stopRMClient();
stopTimer();
scheduleLogDeletionTask();
} else {
@@ -167,7 +224,7 @@ public void refreshLogRetentionSettings() {
}
}
- private void scheduleLogDeletionTask() {
+ private void scheduleLogDeletionTask() throws IOException {
Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
@@ -183,7 +240,7 @@ private void scheduleLogDeletionTask() {
return;
}
setLogAggCheckIntervalMsecs(retentionSecs);
- TimerTask task = new LogDeletionTask(conf, retentionSecs);
+ task = new LogDeletionTask(conf, retentionSecs, creatRMClient());
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
}
@@ -201,4 +258,20 @@ public long getCheckIntervalMsecs() {
protected Configuration createConf() {
return new Configuration();
}
+
+ // Directly create and use ApplicationClientProtocol.
+ // We have already marked ApplicationClientProtocol.getApplicationReport
+ // as @Idempotent, it will automatically take care of RM restart/failover.
+ @VisibleForTesting
+ protected ApplicationClientProtocol creatRMClient() throws IOException {
+ return ClientRMProxy.createRMProxy(getConfig(),
+ ApplicationClientProtocol.class);
+ }
+
+ @VisibleForTesting
+ protected void stopRMClient() {
+ if (task != null && task.getRMClient() != null) {
+ RPC.stopProxy(task.getRMClient());
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 79244ad..e8f9e74 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1363,4 +1363,15 @@
yarn.nodemanager.container-monitor.procfs-tree.smaps-based-rss.enabledfalse
+
+
+ Defines how often NMs wake up to upload log files.
+ The default value is -1. By default, the logs will be uploaded when
+ the application is finished. By setting this configure, logs can be uploaded
+ periodically when the application is running. The minimum rolling-interval-seconds
+ can be set is 3600.
+
+ yarn.nodemanager.log.rolling-interval-seconds
+ -1
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index 05c7e71..30e9bf7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -20,6 +20,9 @@
import java.io.IOException;
import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -27,6 +30,12 @@
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +60,7 @@ public void testDeletion() throws Exception {
String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs";
- Configuration conf = new Configuration();
+ final Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
@@ -69,22 +78,37 @@ public void testDeletion() throws Exception {
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
-
+
+ ApplicationId appId1 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
- Path app1Dir = new Path(userLogDir, "application_1_1");
+ Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
- Path app2Dir = new Path(userLogDir, "application_1_2");
+ ApplicationId appId2 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 2);
+ Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir);
- Path app3Dir = new Path(userLogDir, "application_1_3");
+ ApplicationId appId3 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 3);
+ Path app3Dir = new Path(userLogDir, appId3.toString());
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir);
- Path app4Dir = new Path(userLogDir, "application_1_4");
+ ApplicationId appId4 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 4);
+ Path app4Dir = new Path(userLogDir, appId4.toString());
FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
+ ApplicationId appId5 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 5);
+ Path app5Dir = new Path(userLogDir, appId5.toString());
+ FileStatus app5DirStatus =
+ new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir);
+
when(mockFs.listStatus(userLogDir)).thenReturn(
- new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus, app4DirStatus});
+ new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus,
+ app4DirStatus, app5DirStatus });
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{});
@@ -117,20 +141,55 @@ public void testDeletion() throws Exception {
when(mockFs.listStatus(app4Dir)).thenReturn(
new FileStatus[]{app4Log1Status, app4Log2Status});
+
+ Path app5Log1 = new Path(app5Dir, "host1");
+ FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1);
- AggregatedLogDeletionService.LogDeletionTask task =
- new AggregatedLogDeletionService.LogDeletionTask(conf, 1800);
-
- task.run();
-
- verify(mockFs).delete(app1Dir, true);
- verify(mockFs, times(0)).delete(app2Dir, true);
- verify(mockFs).delete(app3Dir, true);
- verify(mockFs).delete(app4Dir, true);
+ Path app5Log2 = new Path(app5Dir, "host2");
+ FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2);
+
+ when(mockFs.listStatus(app5Dir)).thenReturn(
+ new FileStatus[]{app5Log1Status, app5Log2Status});
+
+ final List finishedApplications =
+ Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3,
+ appId4));
+ final List runningApplications =
+ Collections.unmodifiableList(Arrays.asList(appId5));
+
+ AggregatedLogDeletionService deletionService =
+ new AggregatedLogDeletionService() {
+ @Override
+ protected ApplicationClientProtocol creatRMClient()
+ throws IOException {
+ try {
+ return createMockRMClient(finishedApplications,
+ runningApplications);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ @Override
+ protected void stopRMClient() {
+ // DO NOTHING
+ }
+ };
+ deletionService.init(conf);
+ deletionService.start();
+
+ verify(mockFs, timeout(2000)).delete(app1Dir, true);
+ verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
+ verify(mockFs, timeout(2000)).delete(app3Dir, true);
+ verify(mockFs, timeout(2000)).delete(app4Dir, true);
+ verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true);
+ verify(mockFs, timeout(2000)).delete(app5Log1, true);
+ verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true);
+
+ deletionService.stop();
}
@Test
- public void testRefreshLogRetentionSettings() throws IOException {
+ public void testRefreshLogRetentionSettings() throws Exception {
long now = System.currentTimeMillis();
//time before 2000 sec
long before2000Secs = now - (2000 * 1000);
@@ -163,13 +222,17 @@ public void testRefreshLogRetentionSettings() throws IOException {
Path userLogDir = new Path(userDir, suffix);
+ ApplicationId appId1 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
//Set time last modified of app1Dir directory and its files to before2000Secs
- Path app1Dir = new Path(userLogDir, "application_1_1");
+ Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
app1Dir);
+ ApplicationId appId2 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 2);
//Set time last modified of app1Dir directory and its files to before50Secs
- Path app2Dir = new Path(userLogDir, "application_1_2");
+ Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
app2Dir);
@@ -190,11 +253,27 @@ public void testRefreshLogRetentionSettings() throws IOException {
when(mockFs.listStatus(app2Dir)).thenReturn(
new FileStatus[] { app2Log1Status });
+ final List finishedApplications =
+ Collections.unmodifiableList(Arrays.asList(appId1, appId2));
+
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
@Override
protected Configuration createConf() {
return conf;
}
+ @Override
+ protected ApplicationClientProtocol creatRMClient()
+ throws IOException {
+ try {
+ return createMockRMClient(finishedApplications, null);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ @Override
+ protected void stopRMClient() {
+ // DO NOTHING
+ }
};
deletionSvc.init(conf);
@@ -253,8 +332,10 @@ public void testCheckInterval() throws Exception {
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
+ ApplicationId appId1 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
- Path app1Dir = new Path(userLogDir, "application_1_1");
+ Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
@@ -266,8 +347,25 @@ public void testCheckInterval() throws Exception {
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{app1Log1Status});
+ final List finishedApplications =
+ Collections.unmodifiableList(Arrays.asList(appId1));
+
AggregatedLogDeletionService deletionSvc =
- new AggregatedLogDeletionService();
+ new AggregatedLogDeletionService() {
+ @Override
+ protected ApplicationClientProtocol creatRMClient()
+ throws IOException {
+ try {
+ return createMockRMClient(finishedApplications, null);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ @Override
+ protected void stopRMClient() {
+ // DO NOTHING
+ }
+ };
deletionSvc.init(conf);
deletionSvc.start();
@@ -286,11 +384,61 @@ public void testCheckInterval() throws Exception {
deletionSvc.stop();
}
-
+
static class MockFileSystem extends FilterFileSystem {
MockFileSystem() {
super(mock(FileSystem.class));
}
public void initialize(URI name, Configuration conf) throws IOException {}
}
+
+ private ApplicationClientProtocol createMockRMClient(
+ List finishedApplicaitons,
+ List runningApplications) throws Exception {
+ final ApplicationClientProtocol mockProtocol =
+ mock(ApplicationClientProtocol.class);
+ if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) {
+ for (ApplicationId appId : finishedApplicaitons) {
+ GetApplicationReportRequest request =
+ GetApplicationReportRequest.newInstance(appId);
+ GetApplicationReportResponse response =
+ createApplicationReportWithFinishedApplication();
+ when(mockProtocol.getApplicationReport(request))
+ .thenReturn(response);
+ }
+ }
+ if (runningApplications != null && !runningApplications.isEmpty()) {
+ for (ApplicationId appId : runningApplications) {
+ GetApplicationReportRequest request =
+ GetApplicationReportRequest.newInstance(appId);
+ GetApplicationReportResponse response =
+ createApplicationReportWithRunningApplication();
+ when(mockProtocol.getApplicationReport(request))
+ .thenReturn(response);
+ }
+ }
+ return mockProtocol;
+ }
+
+ private GetApplicationReportResponse
+ createApplicationReportWithRunningApplication() {
+ ApplicationReport report = mock(ApplicationReport.class);
+ when(report.getYarnApplicationState()).thenReturn(
+ YarnApplicationState.RUNNING);
+ GetApplicationReportResponse response =
+ mock(GetApplicationReportResponse.class);
+ when(response.getApplicationReport()).thenReturn(report);
+ return response;
+ }
+
+ private GetApplicationReportResponse
+ createApplicationReportWithFinishedApplication() {
+ ApplicationReport report = mock(ApplicationReport.class);
+ when(report.getYarnApplicationState()).thenReturn(
+ YarnApplicationState.FINISHED);
+ GetApplicationReportResponse response =
+ mock(GetApplicationReportResponse.class);
+ when(response.getApplicationReport()).thenReturn(report);
+ return response;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 318caf2..58bc6ad 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -20,6 +20,10 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -33,6 +37,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
@@ -41,6 +46,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@@ -65,6 +72,14 @@
private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
+ // This is temporary solution. The configuration will be deleted once
+ // we find a more scalable method to only write a single log file per LRS.
+ private static final String NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP
+ = YarnConfiguration.NM_PREFIX + "history-log.retention-size.per-app";
+ private static final int
+ DEFAULT_NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP = 30;
+
+ private static final long MIN_NM_LOG_ROLLING_INTERVAL_SECONDS = 3600;
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
@@ -85,13 +100,16 @@
private final Map appAcls;
private final LogAggregationContext logAggregationContext;
private final Context context;
+ private final int retentionSize;
+ private final long rollingInterval;
+ private final NodeId nodeId;
private final Map containerLogAggregators =
new HashMap();
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
- ApplicationId appId, UserGroupInformation userUgi,
+ ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy,
Map appAcls,
@@ -111,6 +129,38 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.appAcls = appAcls;
this.logAggregationContext = logAggregationContext;
this.context = context;
+ this.nodeId = nodeId;
+ int configuredRentionSize =
+ conf.getInt(NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP,
+ DEFAULT_NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP);
+ if (configuredRentionSize <= 0) {
+ this.retentionSize =
+ DEFAULT_NM_LOG_AGGREGATION_RETAIN_RETENTION_SIZE_PER_APP;
+ } else {
+ this.retentionSize = configuredRentionSize;
+ }
+ long configuredInterval =
+ conf.getLong(YarnConfiguration.NM_LOG_ROLLING_INTERVAL_SECONDS,
+ YarnConfiguration.DEFAULT_NM_LOG_ROLLING_INTERVAL_SECONDS);
+ if (configuredInterval > 0
+ && configuredInterval < MIN_NM_LOG_ROLLING_INTERVAL_SECONDS) {
+ LOG.warn("Log Rolling Interval should be more than or equal to "
+ + MIN_NM_LOG_ROLLING_INTERVAL_SECONDS + " seconds. Using "
+ + MIN_NM_LOG_ROLLING_INTERVAL_SECONDS + " seconds instead.");
+ this.rollingInterval = MIN_NM_LOG_ROLLING_INTERVAL_SECONDS;
+ } else {
+ if (configuredInterval <= 0) {
+ LOG.warn("LogRollingInterval is set as " + configuredInterval + ". "
+ + YarnConfiguration.NM_LOG_ROLLING_INTERVAL_SECONDS
+ + " is disabled. The logs will be aggregated "
+ + "after this application is finished.");
+ } else {
+ LOG.warn("LogRollingInterval is set as " + configuredInterval + ". "
+ + "The logs will be aggregated every " + configuredInterval
+ + " seconds");
+ }
+ this.rollingInterval = configuredInterval;
+ }
}
private void uploadLogsForContainers() {
@@ -181,12 +231,17 @@ private void uploadLogsForContainers() {
}
}
+ // Before upload logs, make sure the number of existing logs
+ // is smaller than the configured NM log aggregation retention size.
+ if (uploadedLogsInThisCycle) {
+ cleanOldLogs();
+ }
+
if (writer != null) {
writer.close();
}
- final Path renamedPath = logAggregationContext == null ||
- logAggregationContext.getRollingIntervalSeconds() <= 0
+ final Path renamedPath = this.rollingInterval <= 0
? remoteNodeLogFileForApp : new Path(
remoteNodeLogFileForApp.getParent(),
remoteNodeLogFileForApp.getName() + "_"
@@ -198,9 +253,12 @@ private void uploadLogsForContainers() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = FileSystem.get(conf);
- if (remoteFS.exists(remoteNodeTmpLogFileForApp)
- && rename) {
- remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
+ if (remoteFS.exists(remoteNodeTmpLogFileForApp)) {
+ if (rename) {
+ remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
+ } else {
+ remoteFS.delete(remoteNodeTmpLogFileForApp, false);
+ }
}
return null;
}
@@ -218,6 +276,60 @@ public Object run() throws Exception {
}
}
+ private void cleanOldLogs() {
+ try {
+ final FileSystem remoteFS =
+ this.remoteNodeLogFileForApp.getFileSystem(conf);
+ Path appDir =
+ this.remoteNodeLogFileForApp.getParent().makeQualified(
+ remoteFS.getUri(), remoteFS.getWorkingDirectory());
+ Set status =
+ new HashSet(Arrays.asList(remoteFS.listStatus(appDir)));
+
+ Iterable mask =
+ Iterables.filter(status, new Predicate() {
+ @Override
+ public boolean apply(FileStatus next) {
+ return next.getPath().getName()
+ .contains(LogAggregationUtils.getNodeString(nodeId))
+ && !next.getPath().getName().contains(
+ LogAggregationUtils.TMP_FILE_SUFFIX);
+ }
+ });
+ status = Sets.newHashSet(mask);
+ // Normally, we just need to delete one oldest log
+ // before we upload a new log.
+ // If we can not delete the older logs in this cycle,
+ // we will delete them in next cycle.
+ if (status.size() >= this.retentionSize) {
+ // sort by the lastModificationTime ascending
+ List statusList = new ArrayList(status);
+ Collections.sort(statusList, new Comparator() {
+ public int compare(FileStatus s1, FileStatus s2) {
+ return s1.getModificationTime() < s2.getModificationTime() ? -1
+ : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
+ }
+ });
+ for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
+ final FileStatus remove = statusList.get(i);
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction