diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 46e3323..1c2ef7c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1000,8 +1000,15 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
-
- /**
+
+ /** Log aggregation service class. */
+ public static final String LOG_AGGREGATION_SERVICE_CLASS =
+ YARN_PREFIX + "log-aggregation-service.class";
+ public static final String DEFAULT_LOG_AGGREGATION_SERVICE_CLASS =
+ "org.apache.hadoop.yarn.server.nodemanager.containermanager."
+ + "logaggregation.LogAggregationService";
+
+ /**
* How long to wait before deleting aggregated logs, -1 disables.
* Be careful set this too small and you will spam the name node.
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 668821d..8e306b1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -91,6 +91,8 @@ public void initializeMemberVariables() {
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.DEFAULT_LOG_AGGREGATION_SERVICE_CLASS);
// Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress"
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e956507..891b6d5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1129,6 +1129,12 @@
+ The service class for log aggregation.
+ yarn.log-aggregation-service.class
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService
+
+
+
Whether to enable log aggregation. Log aggregation collects
each container's logs and moves these logs onto a file-system, for e.g.
HDFS, after the application completes. Users can configure the
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index cb63ae3..07ea743 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -22,6 +22,8 @@
import java.io.DataInputStream;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
@@ -83,6 +85,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
@@ -421,8 +424,25 @@ protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
- return new LogAggregationService(this.dispatcher, context,
- deletionService, dirsHandler);
+ try {
+ Class> clazz = Class
+ .forName(conf.get(YarnConfiguration.LOG_AGGREGATION_SERVICE_CLASS,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_SERVICE_CLASS));
+ LOG.info(
+ "Created LogAggregationService class: " + clazz.getCanonicalName());
+ Constructor> cons =
+ clazz.getConstructor(Dispatcher.class, Context.class,
+ DeletionService.class, LocalDirsHandlerService.class);
+ LogAggregationService logAggregationService =
+ (LogAggregationService) cons.newInstance(this.dispatcher, context,
+ deletionService, dirsHandler);
+ return logAggregationService;
+ } catch (ClassNotFoundException | NoSuchMethodException
+ | InstantiationException | IllegalAccessException
+ | InvocationTargetException e) {
+ LOG.error("Log aggregation service not initialized", e);
+ throw new YarnRuntimeException(e);
+ }
} else {
return new NonAggregatingLogHandler(this.dispatcher, deletionService,
dirsHandler,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 872b805..d0c4e4f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -97,29 +97,29 @@
private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
- private final LocalDirsHandlerService dirsHandler;
- private final Dispatcher dispatcher;
- private final ApplicationId appId;
- private final String applicationId;
- private boolean logAggregationDisabled = false;
- private final Configuration conf;
- private final DeletionService delService;
- private final UserGroupInformation userUgi;
- private final Path remoteNodeLogFileForApp;
- private final Path remoteNodeTmpLogFileForApp;
-
- private final BlockingQueue pendingContainers;
- private final AtomicBoolean appFinishing = new AtomicBoolean();
- private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
- private final AtomicBoolean aborted = new AtomicBoolean();
- private final Map appAcls;
- private final FileContext lfs;
- private final LogAggregationContext logAggregationContext;
- private final Context context;
- private final int retentionSize;
- private final long rollingMonitorInterval;
- private final boolean logAggregationInRolling;
- private final NodeId nodeId;
+ protected final LocalDirsHandlerService dirsHandler;
+ protected final Dispatcher dispatcher;
+ protected final ApplicationId appId;
+ protected final String applicationId;
+ protected boolean logAggregationDisabled = false;
+ protected final Configuration conf;
+ protected final DeletionService delService;
+ protected final UserGroupInformation userUgi;
+ protected final Path remoteNodeLogFileForApp;
+ protected final Path remoteNodeTmpLogFileForApp;
+
+ protected final BlockingQueue pendingContainers;
+ protected final AtomicBoolean appFinishing = new AtomicBoolean();
+ protected final AtomicBoolean appAggregationFinished = new AtomicBoolean();
+ protected final AtomicBoolean aborted = new AtomicBoolean();
+ protected final Map appAcls;
+ protected final FileContext lfs;
+ protected final LogAggregationContext logAggregationContext;
+ protected final Context context;
+ protected final int retentionSize;
+ protected final long rollingMonitorInterval;
+ protected final boolean logAggregationInRolling;
+ protected final NodeId nodeId;
// These variables are only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
@@ -194,9 +194,9 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.recoveredLogInitedTime = recoveredLogInitedTime;
}
- private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
- ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf);
- String params = getLogAggPolicyParameters(conf);
+ protected ContainerLogAggregationPolicy getLogAggPolicy(Configuration myConf) {
+ ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(myConf);
+ String params = getLogAggPolicyParameters(myConf);
if (params != null) {
policy.parseParameters(params);
}
@@ -205,7 +205,7 @@ private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
// Use the policy class specified in LogAggregationContext if available.
// Otherwise use the cluster-wide default policy class.
- private ContainerLogAggregationPolicy getLogAggPolicyInstance(
+ protected ContainerLogAggregationPolicy getLogAggPolicyInstance(
Configuration conf) {
Class extends ContainerLogAggregationPolicy> policyClass = null;
if (this.logAggregationContext != null) {
@@ -242,18 +242,18 @@ private ContainerLogAggregationPolicy getLogAggPolicyInstance(
// Use the policy parameters specified in LogAggregationContext if available.
// Otherwise use the cluster-wide default policy parameters.
- private String getLogAggPolicyParameters(Configuration conf) {
+ protected String getLogAggPolicyParameters(Configuration myConf) {
String params = null;
if (this.logAggregationContext != null) {
params = this.logAggregationContext.getLogAggregationPolicyParameters();
}
if (params == null) {
- params = conf.get(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS);
+ params = myConf.get(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS);
}
return params;
}
- private void uploadLogsForContainers(boolean appFinished) {
+ protected void uploadLogsForContainers(boolean appFinished) {
if (this.logAggregationDisabled) {
return;
}
@@ -423,7 +423,7 @@ protected LogWriter createLogWriter() throws IOException {
this.userUgi);
}
- private void sendLogAggregationReport(
+ protected void sendLogAggregationReport(
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
LogAggregationReport report =
Records.newRecord(LogAggregationReport.class);
@@ -433,7 +433,7 @@ private void sendLogAggregationReport(
this.context.getLogAggregationStatusForApps().add(report);
}
- private void cleanOldLogs() {
+ protected void cleanOldLogs() {
try {
final FileSystem remoteFS =
this.remoteNodeLogFileForApp.getFileSystem(conf);
@@ -509,7 +509,7 @@ public void run() {
}
@SuppressWarnings("unchecked")
- private void doAppLogAggregation() {
+ protected void doAppLogAggregation() {
while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
@@ -545,7 +545,7 @@ private void doAppLogAggregation() {
this.appAggregationFinished.set(true);
}
- private void doAppLogAggregationPostCleanUp() {
+ protected void doAppLogAggregationPostCleanUp() {
// Remove the local app-log-dirs
List localAppLogDirs = new ArrayList();
for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
@@ -575,7 +575,7 @@ private Path getRemoteNodeTmpLogFileForApp() {
// TODO: The condition: containerId.getId() == 1 to determine an AM container
// is not always true.
- private boolean shouldUploadLogs(ContainerLogContext logContext) {
+ protected boolean shouldUploadLogs(ContainerLogContext logContext) {
return logAggPolicy.shouldDoLogAggregation(logContext);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index a4ae643..72b8bf5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -89,28 +89,28 @@
* Permissions for the top level directory under which app directories will be
* created.
*/
- private static final FsPermission TLDIR_PERMISSIONS = FsPermission
+ public static final FsPermission TLDIR_PERMISSIONS = FsPermission
.createImmutable((short) 01777);
/**
* Permissions for the Application directory.
*/
- private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
+ public static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0770);
- private final Context context;
- private final DeletionService deletionService;
- private final Dispatcher dispatcher;
+ protected final Context context;
+ protected final DeletionService deletionService;
+ protected final Dispatcher dispatcher;
- private LocalDirsHandlerService dirsHandler;
- Path remoteRootLogDir;
- String remoteRootLogDirSuffix;
- private NodeId nodeId;
+ protected LocalDirsHandlerService dirsHandler;
+ protected Path remoteRootLogDir;
+ protected String remoteRootLogDirSuffix;
+ protected NodeId nodeId;
- private final ConcurrentMap appLogAggregators;
- private boolean logPermError = true;
+ protected final ConcurrentMap appLogAggregators;
+ protected boolean logPermError = true;
@VisibleForTesting
- ExecutorService threadPool;
+ protected ExecutorService threadPool;
public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
@@ -182,7 +182,7 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
- private void stopAggregators() {
+ protected void stopAggregators() {
threadPool.shutdown();
boolean supervised = getConfig().getBoolean(
YarnConfiguration.NM_RECOVERY_SUPERVISED,
@@ -217,24 +217,26 @@ private void stopAggregators() {
}
}
- protected FileSystem getFileSystem(Configuration conf) throws IOException {
- return this.remoteRootLogDir.getFileSystem(conf);
+ protected FileSystem getFileSystem(Configuration conf, Path myRemoteRootLogDir)
+ throws IOException {
+ return myRemoteRootLogDir.getFileSystem(conf);
}
- void verifyAndCreateRemoteLogDir(Configuration conf) {
+ protected void verifyAndCreateRemoteLogDir(Configuration conf,
+ Path myRemoteRootLogDir) {
// Checking the existence of the TLD
FileSystem remoteFS = null;
try {
- remoteFS = getFileSystem(conf);
+ remoteFS = getFileSystem(conf, myRemoteRootLogDir);
} catch (IOException e) {
throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e);
}
boolean remoteExists = true;
try {
FsPermission perms =
- remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
+ remoteFS.getFileStatus(myRemoteRootLogDir).getPermission();
if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
- LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ LOG.warn("Remote Root Log Dir [" + myRemoteRootLogDir
+ "] already exist, but with incorrect permissions. "
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ "]." + " The cluster may have problems with multiple users.");
@@ -247,20 +249,20 @@ void verifyAndCreateRemoteLogDir(Configuration conf) {
} catch (IOException e) {
throw new YarnRuntimeException(
"Failed to check permissions for dir ["
- + this.remoteRootLogDir + "]", e);
+ + myRemoteRootLogDir + "]", e);
}
if (!remoteExists) {
- LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ LOG.warn("Remote Root Log Dir [" + myRemoteRootLogDir
+ "] does not exist. Attempting to create it.");
try {
Path qualified =
- this.remoteRootLogDir.makeQualified(remoteFS.getUri(),
+ myRemoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
} catch (IOException e) {
throw new YarnRuntimeException("Failed to create remoteLogDir ["
- + this.remoteRootLogDir + "]", e);
+ + myRemoteRootLogDir + "]", e);
}
}
}
@@ -301,6 +303,7 @@ private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
}
protected void createAppDir(final String user, final ApplicationId appId,
+ final Path myRemoteRootLogDir, final String myRemoteRootLogDirSuffix,
UserGroupInformation userUgi) {
try {
userUgi.doAs(new PrivilegedExceptionAction