diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3f84a23..96fd0f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1008,8 +1008,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; - - /** + + /** App log aggregator class. */ + public static final String APP_LOG_AGGREGATOR_CLASS = + YARN_PREFIX + "app-log-aggregator.class"; + public static final String DEFAULT_APP_LOG_AGGREGATOR_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.containermanager" + + ".logaggregation.AppLogAggregatorImpl"; + + /** * How long to wait before deleting aggregated logs, -1 disables. * Be careful set this too small and you will spam the name node. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 668821d..17c3d10 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -91,6 +91,8 @@ public void initializeMemberVariables() { configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR); + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_APP_LOG_AGGREGATOR_CLASS); // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index 34c9100..50bf24d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -18,27 +18,126 @@ package org.apache.hadoop.yarn.logaggregation; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; @Private public class LogAggregationUtils { + /** + * Permissions for the Application directory. + */ + public static final FsPermission APP_DIR_PERMISSIONS = FsPermission + .createImmutable((short) 0770); + public static final String TMP_FILE_SUFFIX = ".tmp"; + static void createDir(FileSystem fs, Path path, FsPermission fsPerm) + throws IOException { + FsPermission dirPerm = new FsPermission(fsPerm); + fs.mkdirs(path, dirPerm); + FsPermission umask = FsPermission.getUMask(fs.getConf()); + if (!dirPerm.equals(dirPerm.applyUMask(umask))) { + fs.setPermission(path, new FsPermission(fsPerm)); + } + } + + static boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm) + throws IOException { + boolean exists = true; + try { + FileStatus appDirStatus = fs.getFileStatus(path); + if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) { + fs.setPermission(path, APP_DIR_PERMISSIONS); + } + } catch (FileNotFoundException fnfe) { + exists = false; + } + return exists; + } + + /** + * @param user user + * @param appId appId + * @param remoteRootLogDir remoteRootLogDir + * @param remoteRootLogDirSuffix remoteRootLogDirSuffix + * @param userUgi userUgi + * @param conf conf + * @param remoteFS remoteFS + */ + public static void createAppDir(final String user, final ApplicationId appId, + final Path remoteRootLogDir, final String remoteRootLogDirSuffix, + UserGroupInformation userUgi, final Configuration conf, + final FileSystem remoteFS) { + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + try { + // Only creating directories if they are missing to avoid + // unnecessary load on the filesystem from all of the nodes + Path appDir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogDir, appId, user, remoteRootLogDirSuffix); + appDir = appDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { + Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { + Path userDir = LogAggregationUtils + .getRemoteLogUserDir(remoteRootLogDir, user); + userDir = userDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { + createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); + } + + } catch (IOException e) { + throw new YarnException( + "Failed to setup application log directory for " + appId, e); + } + return null; + } + }); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + /** * Constructs the full filename for an application's log file per node. - * @param remoteRootLogDir - * @param appId - * @param user - * @param nodeId - * @param suffix + * @param remoteRootLogDir remoteRootLogDir + * @param appId appId + * @param user user + * @param nodeId nodeId + * @param suffix suffix * @return the remote log file. */ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, @@ -49,10 +148,10 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, /** * Gets the remote app log dir. - * @param remoteRootLogDir - * @param appId - * @param user - * @param suffix + * @param remoteRootLogDir remoteRootLogDir + * @param appId appId + * @param user user + * @param suffix suffix * @return the remote application specific log dir. */ public static Path getRemoteAppLogDir(Path remoteRootLogDir, @@ -63,9 +162,9 @@ public static Path getRemoteAppLogDir(Path remoteRootLogDir, /** * Gets the remote suffixed log dir for the user. - * @param remoteRootLogDir - * @param user - * @param suffix + * @param remoteRootLogDir remoteRootLogDir + * @param user user + * @param suffix suffix * @return the remote suffixed log dir. */ public static Path getRemoteLogSuffixedDir(Path remoteRootLogDir, @@ -87,6 +186,9 @@ public static Path getRemoteLogSuffixedDir(Path remoteRootLogDir, * @return the remote per user log dir. */ public static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) { + if (user == null || user.isEmpty()) { + return remoteRootLogDir; + } return new Path(remoteRootLogDir, user); } @@ -100,7 +202,6 @@ public static String getRemoteNodeLogDirSuffix(Configuration conf) { YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); } - /** * Converts a nodeId to a form used in the app log file name. * @param nodeId diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c7076e5..3198578 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1132,6 +1132,12 @@ + The class to use for AppLogAggregator + yarn.app-log-aggregator.class + org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl + + + How long to keep aggregation logs before deleting them. -1 disables. Be careful set this too small and you will spam the name node. yarn.log-aggregation.retain-seconds diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index 0178699..148c25a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -18,15 +18,68 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.api.ContainerLogContext; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; +/** + * This API is used by LogAggregationService to do log aggregation + * for an application + */ +@Public +@Unstable public interface AppLogAggregator extends Runnable { + /** + * Initialize the application log appregator. + * + * @param appStartEvent application start event + * @param conf configuration + * @param dispatcher dispatcher + * @param deletionService deletion service + * @param nodeId nodeId + * @param dirsHandler dirsHandler of NM + * @param remoteRootLogDir remote root directory for log aggregation + * @param context context of NM + * @param lfs local file system context + * @param remoteFS remote file system + * @param rollingMonitorInterval rolling monitor interval + */ + void init(LogHandlerAppStartedEvent appStartEvent, Configuration conf, + Dispatcher dispatcher, DeletionService deletionService, NodeId nodeId, + LocalDirsHandlerService dirsHandler, Path remoteRootLogDir, + Context context, FileContext lfs, FileSystem remoteFS, + long rollingMonitorInterval); + + /** + * Start log aggregation for a finished container. + * + * @param logContext container log context + */ void startContainerLogAggregation(ContainerLogContext logContext); + /** + * Abort the log aggregation for the application. + */ void abortLogAggregation(); + /** + * Finish the log aggregation for the application. + */ void finishLogAggregation(); + /** + * Disable the log aggregation for the application. + */ void disableLogAggregation(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 872b805..2f52fa9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -64,12 +64,12 @@ import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; @@ -97,40 +97,44 @@ private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; - private final LocalDirsHandlerService dirsHandler; - private final Dispatcher dispatcher; - private final ApplicationId appId; - private final String applicationId; - private boolean logAggregationDisabled = false; - private final Configuration conf; - private final DeletionService delService; - private final UserGroupInformation userUgi; - private final Path remoteNodeLogFileForApp; - private final Path remoteNodeTmpLogFileForApp; - - private final BlockingQueue pendingContainers; - private final AtomicBoolean appFinishing = new AtomicBoolean(); - private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); - private final AtomicBoolean aborted = new AtomicBoolean(); - private final Map appAcls; - private final FileContext lfs; - private final LogAggregationContext logAggregationContext; - private final Context context; - private final int retentionSize; - private final long rollingMonitorInterval; - private final boolean logAggregationInRolling; - private final NodeId nodeId; - - // These variables are only for testing - private final AtomicBoolean waiting = new AtomicBoolean(false); + protected LocalDirsHandlerService dirsHandler; + protected Dispatcher dispatcher; + protected ApplicationId appId; + protected String user; + protected String applicationId; + protected boolean logAggregationDisabled = false; + protected Configuration conf; + protected DeletionService delService; + protected UserGroupInformation userUgi; + protected Path remoteRootLogDir; + protected String remoteRootLogDirSuffix; + protected Path remoteNodeLogFileForApp; + protected Path remoteNodeTmpLogFileForApp; + + protected BlockingQueue pendingContainers; + protected AtomicBoolean appFinishing = new AtomicBoolean(); + protected AtomicBoolean appAggregationFinished = new AtomicBoolean(); + protected AtomicBoolean aborted = new AtomicBoolean(); + protected Map appAcls; + protected FileContext lfs; + protected FileSystem remoteFS; + protected LogAggregationContext logAggregationContext; + protected Context context; + protected int retentionSize; + protected long rollingMonitorInterval; + protected boolean logAggregationInRolling; + protected NodeId nodeId; + + // This variable is only for testing + private AtomicBoolean waiting = new AtomicBoolean(false); private int logAggregationTimes = 0; private int cleanupOldLogTimes = 0; - private boolean renameTemporaryLogFileFailed = false; + protected boolean renameTemporaryLogFileFailed = false; - private final Map containerLogAggregators = + protected Map containerLogAggregators = new HashMap(); - private final ContainerLogAggregationPolicy logAggPolicy; + protected ContainerLogAggregationPolicy logAggPolicy; /** @@ -138,43 +142,41 @@ * log files if log retention is enabled. Files older than retention policy * will not be uploaded but scheduled for cleaning up. -1 if not recovered. */ - private final long recoveredLogInitedTime; - - public AppLogAggregatorImpl(Dispatcher dispatcher, - DeletionService deletionService, Configuration conf, - ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, - LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, - Map appAcls, - LogAggregationContext logAggregationContext, Context context, - FileContext lfs, long rollingMonitorInterval) { - this(dispatcher, deletionService, conf, appId, userUgi, nodeId, - dirsHandler, remoteNodeLogFileForApp, appAcls, - logAggregationContext, context, lfs, rollingMonitorInterval, -1); - } + protected long recoveredLogInitedTime; - public AppLogAggregatorImpl(Dispatcher dispatcher, - DeletionService deletionService, Configuration conf, - ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, - LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, - Map appAcls, - LogAggregationContext logAggregationContext, Context context, - FileContext lfs, long rollingMonitorInterval, - long recoveredLogInitedTime) { + @Override + public void init(LogHandlerAppStartedEvent appStartEvent, Configuration conf, + Dispatcher dispatcher, DeletionService deletionService, NodeId nodeId, + LocalDirsHandlerService dirsHandler, Path remoteRootLogDir, + Context context, FileContext lfs, FileSystem remoteFS, + long rollingMonitorInterval) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; - this.appId = appId; + this.appId = appStartEvent.getApplicationId(); + this.user = appStartEvent.getUser(); this.applicationId = appId.toString(); - this.userUgi = userUgi; this.dirsHandler = dirsHandler; - this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; - this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); this.pendingContainers = new LinkedBlockingQueue(); - this.appAcls = appAcls; + this.appAcls = appStartEvent.getApplicationAcls(); this.lfs = lfs; - this.logAggregationContext = logAggregationContext; + this.remoteFS = remoteFS; + this.logAggregationContext = appStartEvent.getLogAggregationContext(); this.context = context; this.nodeId = nodeId; + this.remoteRootLogDir = remoteRootLogDir; + this.remoteRootLogDirSuffix = + LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + this.remoteNodeLogFileForApp = getRemoteNodeLogFileForApp(); + this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); + + // Get user's FileSystem credentials + this.userUgi = UserGroupInformation.createRemoteUser(user); + Credentials credentials = appStartEvent.getCredentials(); + if (credentials != null) { + userUgi.addCredentials(credentials); + } + int configuredRentionSize = conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP, DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP); @@ -191,10 +193,10 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, || this.logAggregationContext.getRolledLogsIncludePattern() .isEmpty() ? false : true; this.logAggPolicy = getLogAggPolicy(conf); - this.recoveredLogInitedTime = recoveredLogInitedTime; + this.recoveredLogInitedTime = appStartEvent.getRecoveredAppLogInitedTime(); } - private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { + protected ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf); String params = getLogAggPolicyParameters(conf); if (params != null) { @@ -205,7 +207,7 @@ private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { // Use the policy class specified in LogAggregationContext if available. // Otherwise use the cluster-wide default policy class. - private ContainerLogAggregationPolicy getLogAggPolicyInstance( + protected ContainerLogAggregationPolicy getLogAggPolicyInstance( Configuration conf) { Class policyClass = null; if (this.logAggregationContext != null) { @@ -242,7 +244,7 @@ private ContainerLogAggregationPolicy getLogAggPolicyInstance( // Use the policy parameters specified in LogAggregationContext if available. // Otherwise use the cluster-wide default policy parameters. - private String getLogAggPolicyParameters(Configuration conf) { + protected String getLogAggPolicyParameters(Configuration conf) { String params = null; if (this.logAggregationContext != null) { params = this.logAggregationContext.getLogAggregationPolicyParameters(); @@ -253,7 +255,7 @@ private String getLogAggPolicyParameters(Configuration conf) { return params; } - private void uploadLogsForContainers(boolean appFinished) { + protected void uploadLogsForContainers(boolean appFinished) { if (this.logAggregationDisabled) { return; } @@ -368,7 +370,6 @@ private void uploadLogsForContainers(boolean appFinished) { userUgi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf); if (rename) { remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath); } else { @@ -423,7 +424,7 @@ protected LogWriter createLogWriter() throws IOException { this.userUgi); } - private void sendLogAggregationReport( + protected void sendLogAggregationReport( LogAggregationStatus logAggregationStatus, String diagnosticMessage) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); @@ -433,10 +434,8 @@ private void sendLogAggregationReport( this.context.getLogAggregationStatusForApps().add(report); } - private void cleanOldLogs() { + protected void cleanOldLogs() { try { - final FileSystem remoteFS = - this.remoteNodeLogFileForApp.getFileSystem(conf); Path appDir = this.remoteNodeLogFileForApp.getParent().makeQualified( remoteFS.getUri(), remoteFS.getWorkingDirectory()); @@ -491,11 +490,19 @@ public Object run() throws Exception { @Override public void run() { try { + // Create the app dir + LogAggregationUtils.createAppDir(this.user, this.appId, + this.remoteRootLogDir, this.remoteRootLogDirSuffix, this.userUgi, + conf, remoteFS); + doAppLogAggregation(); } catch (Exception e) { - // do post clean up of log directories on any exception LOG.error("Error occured while aggregating the log for the application " + appId, e); + this.dispatcher.getEventHandler().handle(new ApplicationEvent(this.appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); + + // do post clean up of log directories on any exception doAppLogAggregationPostCleanUp(); } finally { if (!this.appAggregationFinished.get()) { @@ -505,11 +512,20 @@ public void run() { ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); } this.appAggregationFinished.set(true); + closeFileSystems(userUgi); + } + } + + void closeFileSystems(final UserGroupInformation uri) { + try { + FileSystem.closeAllForUGI(uri); + } catch (IOException e) { + LOG.warn("Failed to close filesystems: ", e); } } @SuppressWarnings("unchecked") - private void doAppLogAggregation() { + protected void doAppLogAggregation() { while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { @@ -531,6 +547,8 @@ private void doAppLogAggregation() { } if (this.aborted.get()) { + this.dispatcher.getEventHandler().handle(new ApplicationEvent(this.appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); return; } @@ -545,7 +563,7 @@ private void doAppLogAggregation() { this.appAggregationFinished.set(true); } - private void doAppLogAggregationPostCleanUp() { + protected void doAppLogAggregationPostCleanUp() { // Remove the local app-log-dirs List localAppLogDirs = new ArrayList(); for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) { @@ -568,14 +586,22 @@ private void doAppLogAggregationPostCleanUp() { } } - private Path getRemoteNodeTmpLogFileForApp() { + protected Path getRemoteNodeLogFileForApp() { + return LogAggregationUtils.getRemoteNodeLogFileForApp(this.remoteRootLogDir, + this.appId, this.user, this.nodeId, this.remoteRootLogDirSuffix); + } + + protected Path getRemoteAppLogDir() { + return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, + this.appId, this.user, this.remoteRootLogDirSuffix); + } + + protected Path getRemoteNodeTmpLogFileForApp() { return new Path(remoteNodeLogFileForApp.getParent(), (remoteNodeLogFileForApp.getName() + LogAggregationUtils.TMP_FILE_SUFFIX)); } - // TODO: The condition: containerId.getId() == 1 to determine an AM container - // is not always true. - private boolean shouldUploadLogs(ContainerLogContext logContext) { + protected boolean shouldUploadLogs(ContainerLogContext logContext) { return logAggPolicy.shouldDoLogAggregation(logContext); } @@ -698,4 +724,9 @@ public int getLogAggregationTimes() { int getCleanupOldLogTimes() { return this.cleanupOldLogTimes; } + + @VisibleForTesting + public NodeId getNodeId() { + return this.nodeId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index a4ae643..58f9147 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -20,8 +20,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -32,23 +30,18 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopExecutors; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.api.ContainerLogContext; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -91,22 +84,16 @@ */ private static final FsPermission TLDIR_PERMISSIONS = FsPermission .createImmutable((short) 01777); - /** - * Permissions for the Application directory. - */ - private static final FsPermission APP_DIR_PERMISSIONS = FsPermission - .createImmutable((short) 0770); - private final Context context; - private final DeletionService deletionService; - private final Dispatcher dispatcher; + private Context context; + private DeletionService deletionService; + private Dispatcher dispatcher; private LocalDirsHandlerService dirsHandler; Path remoteRootLogDir; - String remoteRootLogDirSuffix; private NodeId nodeId; - private final ConcurrentMap appLogAggregators; + private ConcurrentMap appLogAggregators; private boolean logPermError = true; @VisibleForTesting @@ -127,9 +114,6 @@ protected void serviceInit(Configuration conf) throws Exception { this.remoteRootLogDir = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - this.remoteRootLogDirSuffix = - conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); int threadPoolSize = getAggregatorThreadPoolSize(conf); this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize, new ThreadFactoryBuilder() @@ -265,108 +249,16 @@ void verifyAndCreateRemoteLogDir(Configuration conf) { } } - Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) { - return LogAggregationUtils.getRemoteNodeLogFileForApp( - this.remoteRootLogDir, appId, user, this.nodeId, - this.remoteRootLogDirSuffix); - } - - Path getRemoteAppLogDir(ApplicationId appId, String user) { - return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, appId, - user, this.remoteRootLogDirSuffix); - } - - private void createDir(FileSystem fs, Path path, FsPermission fsPerm) - throws IOException { - FsPermission dirPerm = new FsPermission(fsPerm); - fs.mkdirs(path, dirPerm); - FsPermission umask = FsPermission.getUMask(fs.getConf()); - if (!dirPerm.equals(dirPerm.applyUMask(umask))) { - fs.setPermission(path, new FsPermission(fsPerm)); - } - } - - private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm) - throws IOException { - boolean exists = true; - try { - FileStatus appDirStatus = fs.getFileStatus(path); - if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) { - fs.setPermission(path, APP_DIR_PERMISSIONS); - } - } catch (FileNotFoundException fnfe) { - exists = false; - } - return exists; - } - - protected void createAppDir(final String user, final ApplicationId appId, - UserGroupInformation userUgi) { - try { - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - try { - // TODO: Reuse FS for user? - FileSystem remoteFS = getFileSystem(getConfig()); - - // Only creating directories if they are missing to avoid - // unnecessary load on the filesystem from all of the nodes - Path appDir = LogAggregationUtils.getRemoteAppLogDir( - LogAggregationService.this.remoteRootLogDir, appId, user, - LogAggregationService.this.remoteRootLogDirSuffix); - appDir = appDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - LogAggregationService.this.remoteRootLogDir, user, - LogAggregationService.this.remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - LogAggregationService.this.remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); - } - - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); - } - - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); - } - - } catch (IOException e) { - LOG.error("Failed to setup application log directory for " - + appId, e); - throw e; - } - return null; - } - }); - } catch (Exception e) { - throw new YarnRuntimeException(e); - } - } - @SuppressWarnings("unchecked") - private void initApp(final ApplicationId appId, String user, - Credentials credentials, Map appAcls, - LogAggregationContext logAggregationContext, - long recoveredLogInitedTime) { + private void initApp(LogHandlerAppStartedEvent appStartEvent) { + ApplicationId appId = appStartEvent.getApplicationId(); ApplicationEvent eventResponse; try { verifyAndCreateRemoteLogDir(getConfig()); - initAppAggregator(appId, user, credentials, appAcls, - logAggregationContext, recoveredLogInitedTime); + initAppAggregator(appId, appStartEvent); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); - } catch (YarnRuntimeException e) { + } catch (Exception e) { LOG.warn("Application failed to init aggregation", e); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED); @@ -382,48 +274,42 @@ FileContext getLocalFileContext(Configuration conf) { } } - protected void initAppAggregator(final ApplicationId appId, String user, - Credentials credentials, Map appAcls, - LogAggregationContext logAggregationContext, - long recoveredLogInitedTime) { - - // Get user's FileSystem credentials - final UserGroupInformation userUgi = - UserGroupInformation.createRemoteUser(user); - if (credentials != null) { - userUgi.addCredentials(credentials); + protected AppLogAggregator createAppAggregator(Configuration conf, + LogHandlerAppStartedEvent appStartEvent) { + try { + Class clazz = Class + .forName(conf.get(YarnConfiguration.APP_LOG_AGGREGATOR_CLASS, + YarnConfiguration.DEFAULT_APP_LOG_AGGREGATOR_CLASS)); + LOG.info( + "Created AppLogAggregator class: " + clazz.getCanonicalName()); + if (!AppLogAggregator.class.isAssignableFrom(clazz)) { + throw new YarnRuntimeException("Class: " + clazz.getCanonicalName() + + " not instance of " + AppLogAggregator.class.getCanonicalName()); + } + AppLogAggregator appLogAggregator = + (AppLogAggregator) ReflectionUtils.newInstance(clazz, conf); + return appLogAggregator; + } catch (ClassNotFoundException e) { + LOG.error("App log aggregator not initialized", e); + throw new YarnRuntimeException(e); } + } + protected void initAppAggregator(final ApplicationId appId, + LogHandlerAppStartedEvent appStartEvent) throws IOException { + Configuration conf = getConfig(); // New application final AppLogAggregator appLogAggregator = - new AppLogAggregatorImpl(this.dispatcher, this.deletionService, - getConfig(), appId, userUgi, this.nodeId, dirsHandler, - getRemoteNodeLogFileForApp(appId, user), - appAcls, logAggregationContext, this.context, - getLocalFileContext(getConfig()), this.rollingMonitorInterval, - recoveredLogInitedTime); + createAppAggregator(conf, appStartEvent); + appLogAggregator.init(appStartEvent, conf, this.dispatcher, + this.deletionService, this.nodeId, dirsHandler, this.remoteRootLogDir, + this.context, getLocalFileContext(getConfig()), + getFileSystem(getConfig()), + this.rollingMonitorInterval); + if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } - // wait until check for existing aggregator to create dirs - YarnRuntimeException appDirException = null; - try { - // Create the app dir - createAppDir(user, appId, userUgi); - } catch (Exception e) { - appLogAggregator.disableLogAggregation(); - if (!(e instanceof YarnRuntimeException)) { - appDirException = new YarnRuntimeException(e); - } else { - appDirException = (YarnRuntimeException)e; - } - appLogAggregators.remove(appId); - closeFileSystems(userUgi); - throw appDirException; - } - - // TODO Get the user configuration for the list of containers that need log - // aggregation. // Schedule the aggregator. Runnable aggregatorWrapper = new Runnable() { @@ -432,21 +318,12 @@ public void run() { appLogAggregator.run(); } finally { appLogAggregators.remove(appId); - closeFileSystems(userUgi); } } }; this.threadPool.execute(aggregatorWrapper); } - protected void closeFileSystems(final UserGroupInformation userUgi) { - try { - FileSystem.closeAllForUGI(userUgi); - } catch (IOException e) { - LOG.warn("Failed to close filesystems: ", e); - } - } - // for testing only @Private int getNumAggregators() { @@ -497,28 +374,24 @@ private void stopApp(ApplicationId appId) { @Override public void handle(LogHandlerEvent event) { switch (event.getType()) { - case APPLICATION_STARTED: - LogHandlerAppStartedEvent appStartEvent = - (LogHandlerAppStartedEvent) event; - initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), - appStartEvent.getCredentials(), - appStartEvent.getApplicationAcls(), - appStartEvent.getLogAggregationContext(), - appStartEvent.getRecoveredAppLogInitedTime()); - break; - case CONTAINER_FINISHED: - LogHandlerContainerFinishedEvent containerFinishEvent = - (LogHandlerContainerFinishedEvent) event; - stopContainer(containerFinishEvent.getContainerId(), - containerFinishEvent.getExitCode()); - break; - case APPLICATION_FINISHED: - LogHandlerAppFinishedEvent appFinishedEvent = - (LogHandlerAppFinishedEvent) event; - stopApp(appFinishedEvent.getApplicationId()); - break; - default: - ; // Ignore + case APPLICATION_STARTED: + LogHandlerAppStartedEvent appStartEvent = + (LogHandlerAppStartedEvent) event; + initApp(appStartEvent); + break; + case CONTAINER_FINISHED: + LogHandlerContainerFinishedEvent containerFinishEvent = + (LogHandlerContainerFinishedEvent) event; + stopContainer(containerFinishEvent.getContainerId(), + containerFinishEvent.getExitCode()); + break; + case APPLICATION_FINISHED: + LogHandlerAppFinishedEvent appFinishedEvent = + (LogHandlerAppFinishedEvent) event; + stopApp(appFinishedEvent.getApplicationId()); + break; + default: + // Ignore } } @@ -528,12 +401,6 @@ public void handle(LogHandlerEvent event) { return this.appLogAggregators; } - @VisibleForTesting - public NodeId getNodeId() { - return this.nodeId; - } - - private int getAggregatorThreadPoolSize(Configuration conf) { int threadPoolSize; try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index 88d9688..f70ebd6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -42,11 +43,11 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Before; @@ -79,23 +80,23 @@ private static final File LOCAL_LOG_DIR = new File("target", TestAppLogAggregatorImpl.class.getName() + "-localLogDir"); - private static final File REMOTE_LOG_FILE = new File("target", - TestAppLogAggregatorImpl.class.getName() + "-remoteLogFile"); + private static final File REMOTE_LOG_ROOT_DIR = new File("target", + TestAppLogAggregatorImpl.class.getName() + "-remoteLogDir"); @Before public void setUp() throws IOException { if(LOCAL_LOG_DIR.exists()) { FileUtils.cleanDirectory(LOCAL_LOG_DIR); } - if(REMOTE_LOG_FILE.exists()) { - FileUtils.cleanDirectory(REMOTE_LOG_FILE); + if(REMOTE_LOG_ROOT_DIR.exists()) { + FileUtils.cleanDirectory(REMOTE_LOG_ROOT_DIR); } } @After public void cleanUp() throws IOException { FileUtils.deleteDirectory(LOCAL_LOG_DIR); - FileUtils.deleteQuietly(REMOTE_LOG_FILE); + FileUtils.deleteQuietly(REMOTE_LOG_ROOT_DIR); } @Test @@ -274,25 +275,30 @@ private static AppLogAggregatorInTest createAppLogAggregator( DeletionService deletionServiceWithFilesToExpect) throws IOException { - final Dispatcher dispatcher = createNullDispatcher(); - final NodeId nodeId = NodeId.newInstance("localhost", 0); - final String userId = "AppLogAggregatorTest"; - final UserGroupInformation ugi = + Dispatcher dispatcher = createNullDispatcher(); + NodeId nodeId = NodeId.newInstance("localhost", 0); + String userId = "AppLogAggregatorTest"; + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(userId); - final LocalDirsHandlerService dirsService = + LocalDirsHandlerService dirsService = createLocalDirsHandlerService(config, rootLogDir); - final DeletionService deletionService = deletionServiceWithFilesToExpect; - final LogAggregationContext logAggregationContext = null; - final Map appAcls = new HashMap<>(); - - final Context context = createContext(config); - final FileContext fakeLfs = mock(FileContext.class); - final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath()); - - return new AppLogAggregatorInTest(dispatcher, deletionService, - config, applicationId, ugi, nodeId, dirsService, - remoteLogDirForApp, appAcls, logAggregationContext, - context, fakeLfs, recoveredLogInitedTimeMillis); + DeletionService deletionService = deletionServiceWithFilesToExpect; + LogAggregationContext logAggregationContext = null; + Map appAcls = new HashMap<>(); + + Context context = createContext(config); + FileContext fakeLfs = mock(FileContext.class); + Path remoteLogRootDir = new Path(REMOTE_LOG_ROOT_DIR.getAbsolutePath()); + FileSystem remoteFS = remoteLogRootDir.getFileSystem(config); + + LogHandlerAppStartedEvent appStartEvent = + new LogHandlerAppStartedEvent(applicationId, userId, null, appAcls, + logAggregationContext, recoveredLogInitedTimeMillis); + + AppLogAggregatorInTest appAggregator = new AppLogAggregatorInTest(); + appAggregator.init(appStartEvent, config, dispatcher, deletionService, + nodeId, dirsService, remoteLogRootDir, context, fakeLfs, remoteFS, -1); + return appAggregator; } /** @@ -396,34 +402,16 @@ private static Context createContext(YarnConfiguration conf) { new NMNullStateStoreService(), false, conf); } - private static final class AppLogAggregatorInTest extends - AppLogAggregatorImpl { - - final DeletionService deletionService; - final ApplicationId applicationId; - final LogWriter logWriter; - final ArgumentCaptor logValue; - - public AppLogAggregatorInTest(Dispatcher dispatcher, - DeletionService deletionService, Configuration conf, - ApplicationId appId, UserGroupInformation ugi, NodeId nodeId, - LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, - Map appAcls, - LogAggregationContext logAggregationContext, Context context, - FileContext lfs, long recoveredLogInitedTime) throws IOException { - super(dispatcher, deletionService, conf, appId, ugi, nodeId, - dirsHandler, remoteNodeLogFileForApp, appAcls, - logAggregationContext, context, lfs, recoveredLogInitedTime); - this.applicationId = appId; - this.deletionService = deletionService; - - this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp); - this.logValue = ArgumentCaptor.forClass(LogValue.class); - } + private static final class AppLogAggregatorInTest + extends AppLogAggregatorImpl { + + private LogWriter logWriter = null; @Override - protected LogWriter createLogWriter() { - return this.logWriter; + protected LogWriter createLogWriter() throws IOException { + logWriter = + getSpiedLogWriter(conf, this.userUgi, this.remoteNodeLogFileForApp); + return logWriter; } private LogWriter getSpiedLogWriter(Configuration conf, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 1edb841..94b16c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -93,6 +93,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; @@ -111,6 +112,7 @@ import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; @@ -126,7 +128,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -200,6 +201,10 @@ private void verifyLocalFileDeletion( .handle(new LogHandlerAppStartedEvent( application1, this.user, null, this.acls)); + AppLogAggregatorImpl aggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(application1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(application1, 1); ContainerId container11 = createContainer(appAttemptId, 1, @@ -216,11 +221,10 @@ private void verifyLocalFileDeletion( logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); // ensure filesystems were closed - verify(logAggregationService).closeFileSystems( - any(UserGroupInformation.class)); + verify(aggregator).closeFileSystems(any(UserGroupInformation.class)); verify(delSrvc).delete(eq(user), eq((Path) null), - eq(new Path(app1LogDir.getAbsolutePath()))); - + eq(new Path(app1LogDir.getAbsolutePath()))); + String containerIdStr = container11.toString(); File containerLogDir = new File(app1LogDir, containerIdStr); int count = 0; @@ -242,9 +246,7 @@ private void verifyLocalFileDeletion( Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted", app1LogDir.exists()); - Path logFilePath = - logAggregationService.getRemoteNodeLogFileForApp(application1, - this.user); + Path logFilePath = aggregator.getRemoteNodeLogFileForApp(); Assert.assertTrue("Log file [" + logFilePath + "] not found", new File( logFilePath.toUri().getPath()).exists()); @@ -273,9 +275,9 @@ public void testLocalFileDeletionAfterUpload() throws Exception { this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - LogAggregationService logAggregationService = spy( - new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler)); + LogAggregationService logAggregationService = + spy(new TestableLogAggregationService(dispatcher, this.context, + this.delSrvc, super.dirsHandler)); verifyLocalFileDeletion(logAggregationService); } @@ -293,9 +295,9 @@ public void testLocalFileDeletionOnDiskFull() throws Exception { // directory in full log dirs. when(dirsHandler.getLogDirs()).thenReturn(new ArrayList()); when(dirsHandler.getLogDirsForRead()).thenReturn(logDirs); - LogAggregationService logAggregationService = spy( - new LogAggregationService(dispatcher, this.context, this.delSrvc, - dirsHandler)); + LogAggregationService logAggregationService = + spy(new TestableLogAggregationService(dispatcher, this.context, + this.delSrvc, dirsHandler)); verifyLocalFileDeletion(logAggregationService); } @@ -358,15 +360,19 @@ public void testNoContainerOnNode() throws Exception { .handle(new LogHandlerAppStartedEvent( application1, this.user, null, this.acls)); + AppLogAggregatorImpl aggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(application1); + logAggregationService.handle(new LogHandlerAppFinishedEvent( application1)); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); - Assert.assertFalse(new File(logAggregationService - .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath()) - .exists()); + Assert.assertFalse( + new File(aggregator.getRemoteNodeLogFileForApp().toUri().getPath()) + .exists()); dispatcher.await(); @@ -501,6 +507,15 @@ public void testMultipleAppsLogAggregation() throws Exception { logAggregationService.handle( new LogHandlerContainerFinishedEvent(container33, 0)); + Map aggregators = + logAggregationService.getAppLogAggregators(); + AppLogAggregatorImpl aggregator1 = + (AppLogAggregatorImpl) aggregators.get(application1); + AppLogAggregatorImpl aggregator2 = + (AppLogAggregatorImpl) aggregators.get(application2); + AppLogAggregatorImpl aggregator3 = + (AppLogAggregatorImpl) aggregators.get(application3); + logAggregationService.handle(new LogHandlerAppFinishedEvent( application2)); logAggregationService.handle(new LogHandlerAppFinishedEvent( @@ -511,13 +526,11 @@ public void testMultipleAppsLogAggregation() throws Exception { logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); - verifyContainerLogs(logAggregationService, application1, + verifyContainerLogs(aggregator1, application1, new ContainerId[] { container11, container12 }, fileNames, 3, false); - - verifyContainerLogs(logAggregationService, application2, + verifyContainerLogs(aggregator2, application2, new ContainerId[] { container21 }, fileNames, 3, false); - - verifyContainerLogs(logAggregationService, application3, + verifyContainerLogs(aggregator3, application3, new ContainerId[] { container31, container32 }, fileNames, 3, false); dispatcher.await(); @@ -664,6 +677,9 @@ protected FileSystem getFileSystem(Configuration conf) { AllContainerLogAggregationPolicy.class.getName()); aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAllContainers)); + + // Sleep and then verify because createAppDir is called async + Thread.sleep(500); verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); @@ -673,6 +689,9 @@ protected FileSystem getFileSystem(Configuration conf) { Path appDir2 = new Path(suffixDir, appId2.toString()); aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, this.acls, contextWithAllContainers)); + + // Sleep and then verify because createAppDir is called async + Thread.sleep(500); verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); // start another application with the app dir already created and verify @@ -682,7 +701,11 @@ protected FileSystem getFileSystem(Configuration conf) { new File(appDir3.toUri().getPath()).mkdir(); aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, this.acls, contextWithAllContainers)); + + // Sleep and then verify because createAppDir is called async + Thread.sleep(500); verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class)); + aggSvc.stop(); aggSvc.close(); dispatcher.stop(); @@ -708,8 +731,7 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { (int) (Math.random() * 1000)); doThrow(new YarnRuntimeException("KABOOM!")) .when(logAggregationService).initAppAggregator( - eq(appId), eq(user), any(Credentials.class), - anyMap(), any(LogAggregationContext.class), anyLong()); + eq(appId), any(LogHandlerAppStartedEvent.class)); LogAggregationContext contextWithAMAndFailed = Records.newRecord(LogAggregationContext.class); contextWithAMAndFailed.setLogAggregationPolicyClassName( @@ -724,9 +746,7 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); - // no filesystems instantiated yet - verify(logAggregationService, never()).closeFileSystems( - any(UserGroupInformation.class)); + assertEquals(0, logAggregationService.getNumAggregators()); // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM @@ -739,9 +759,9 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { } @Test - public void testLogAggregationCreateDirsFailsWithoutKillingNM() + public void testCreateAppLogAggregatorFailsWithoutKillingNM() throws Exception { - + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); @@ -762,9 +782,8 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() appLogDir.mkdir(); Exception e = new RuntimeException("KABOOM!"); - doThrow(e) - .when(logAggregationService).createAppDir(any(String.class), - any(ApplicationId.class), any(UserGroupInformation.class)); + doThrow(e).when(logAggregationService).createAppAggregator( + any(Configuration.class), any(LogHandlerAppStartedEvent.class)); LogAggregationContext contextWithAMAndFailed = Records.newRecord(LogAggregationContext.class); contextWithAMAndFailed.setLogAggregationPolicyClassName( @@ -779,6 +798,7 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); + assertEquals(0, logAggregationService.getNumAggregators()); // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM @@ -795,8 +815,6 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() // continue due to aggregated log dir creation failure on remoteFS. verify(spyDelSrvc, never()).delete(eq(user), any(Path.class), Mockito.anyVararg()); - verify(logAggregationService).closeFileSystems( - any(UserGroupInformation.class)); // make sure local log dir is not deleted in case log aggregation // service cannot be initiated. assertTrue(appLogDir.exists()); @@ -818,11 +836,11 @@ private void writeContainerLogs(File appLogDir, ContainerId containerId, } private LogFileStatusInLastCycle verifyContainerLogs( - LogAggregationService logAggregationService, + AppLogAggregatorImpl aggregator, ApplicationId appId, ContainerId[] expectedContainerIds, String[] logFiles, int numOfLogsPerContainer, boolean multiLogs) throws IOException { - return verifyContainerLogs(logAggregationService, appId, + return verifyContainerLogs(aggregator, appId, expectedContainerIds, expectedContainerIds.length, expectedContainerIds.length, logFiles, numOfLogsPerContainer, multiLogs); @@ -833,12 +851,12 @@ private LogFileStatusInLastCycle verifyContainerLogs( // Verify the size of the actual list is in the range of // [minNumOfContainers, maxNumOfContainers]. private LogFileStatusInLastCycle verifyContainerLogs( - LogAggregationService logAggregationService, + AppLogAggregatorImpl aggregator, ApplicationId appId, ContainerId[] expectedContainerIds, int minNumOfContainers, int maxNumOfContainers, String[] logFiles, int numOfLogsPerContainer, boolean multiLogs) throws IOException { - Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user); + Path appLogDir = aggregator.getRemoteAppLogDir(); RemoteIterator nodeFiles = null; try { Path qualifiedLogDir = @@ -858,8 +876,8 @@ private LogFileStatusInLastCycle verifyContainerLogs( FileStatus targetNodeFile = null; if (! multiLogs) { targetNodeFile = nodeFiles.next(); - Assert.assertTrue(targetNodeFile.getPath().getName().equals( - LogAggregationUtils.getNodeString(logAggregationService.getNodeId()))); + Assert.assertTrue(targetNodeFile.getPath().getName() + .equals(LogAggregationUtils.getNodeString(aggregator.getNodeId()))); } else { long fileCreateTime = 0; while (nodeFiles.hasNext()) { @@ -877,7 +895,7 @@ private LogFileStatusInLastCycle verifyContainerLogs( String[] fileName = targetNodeFile.getPath().getName().split("_"); Assert.assertTrue(fileName.length == 3); Assert.assertEquals(fileName[0] + ":" + fileName[1], - logAggregationService.getNodeId().toString()); + aggregator.getNodeId().toString()); } AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(this.conf, targetNodeFile.getPath()); @@ -1378,8 +1396,6 @@ public void testFailedDirsLocalFileDeletionAfterUpload() throws Exception { logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); - verify(logAggregationService).closeFileSystems( - any(UserGroupInformation.class)); ApplicationEvent expectedEvents[] = new ApplicationEvent[] { @@ -1537,6 +1553,17 @@ public void testLogAggregationServiceWithPatterns() throws Exception { "getApplicationID"); reset(appEventHandler); + Map aggregators = + logAggregationService.getAppLogAggregators(); + AppLogAggregatorImpl aggregator1 = + (AppLogAggregatorImpl) aggregators.get(application1); + AppLogAggregatorImpl aggregator2 = + (AppLogAggregatorImpl) aggregators.get(application2); + AppLogAggregatorImpl aggregator3 = + (AppLogAggregatorImpl) aggregators.get(application3); + AppLogAggregatorImpl aggregator4 = + (AppLogAggregatorImpl) aggregators.get(application4); + logAggregationService.handle(new LogHandlerAppFinishedEvent(application1)); logAggregationService.handle(new LogHandlerAppFinishedEvent(application2)); logAggregationService.handle(new LogHandlerAppFinishedEvent(application3)); @@ -1545,20 +1572,20 @@ public void testLogAggregationServiceWithPatterns() throws Exception { assertEquals(0, logAggregationService.getNumAggregators()); String[] logFiles = new String[] { "stdout", "syslog" }; - verifyContainerLogs(logAggregationService, application1, - new ContainerId[] { container1 }, logFiles, 2, false); + verifyContainerLogs(aggregator1, application1, + new ContainerId[] { container1 }, logFiles, 2, false); logFiles = new String[] { "stderr" }; - verifyContainerLogs(logAggregationService, application2, - new ContainerId[] { container2 }, logFiles, 1, false); + verifyContainerLogs(aggregator2, application2, + new ContainerId[] { container2 }, logFiles, 1, false); logFiles = new String[] { "out.log", "err.log" }; - verifyContainerLogs(logAggregationService, application3, - new ContainerId[] { container3 }, logFiles, 2, false); + verifyContainerLogs(aggregator3, application3, + new ContainerId[] { container3 }, logFiles, 2, false); logFiles = new String[] { "sys.log" }; - verifyContainerLogs(logAggregationService, application4, - new ContainerId[] { container4 }, logFiles, 1, false); + verifyContainerLogs(aggregator4, application4, + new ContainerId[] { container4 }, logFiles, 1, false); dispatcher.await(); @@ -1621,7 +1648,7 @@ public void testLogAggregationServiceWithPatternsAndIntervals() // AppLogDir should be created File appLogDir = - new File(localLogDir, ConverterUtils.toString(application)); + new File(localLogDir, application.toString()); appLogDir.mkdir(); logAggregationService.handle(new LogHandlerAppStartedEvent(application, this.user, null, this.acls, logAggregationContext)); @@ -1640,12 +1667,12 @@ public void testLogAggregationServiceWithPatternsAndIntervals() aggregator.doLogAggregationOutOfBand(); - Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 1, false, null)); + Assert.assertTrue( + waitAndCheckLogNum(aggregator, application, 50, 1, false, null)); String[] logFiles = new String[] { "stdout" }; - verifyContainerLogs(logAggregationService, application, - new ContainerId[] {container}, logFiles, 1, true); + verifyContainerLogs(aggregator, application, + new ContainerId[] { container }, logFiles, 1, true); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container, 0)); @@ -1657,14 +1684,14 @@ public void testLogAggregationServiceWithPatternsAndIntervals() // even if the app is running but the container finishes. aggregator.doLogAggregationOutOfBand(); - Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 2, false, null)); + Assert.assertTrue( + waitAndCheckLogNum(aggregator, application, 50, 2, false, null)); // This container finishes. // The log "std_final" should be aggregated this time. String[] logFinalLog = new String[] {"std_final"}; - verifyContainerLogs(logAggregationService, application, - new ContainerId[] {container}, logFinalLog, 1, true); + verifyContainerLogs(aggregator, application, + new ContainerId[] { container }, logFinalLog, 1, true); logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); @@ -1683,10 +1710,11 @@ public void testNoneContainerPolicy() throws Exception { ContainerId container1 = finishContainer(appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0, logFiles); - finishApplication(appId, logAggregationService); + AppLogAggregatorImpl aggregator = + finishApplication(appId, logAggregationService); - verifyContainerLogs(logAggregationService, appId, - new ContainerId[] {container1}, logFiles, 0, false); + verifyContainerLogs(aggregator, appId, new ContainerId[] { container1 }, + logFiles, 0, false); verifyLogAggFinishEvent(appId); } @@ -1707,10 +1735,11 @@ public void testFailedContainerPolicy() throws Exception { finishContainer(appId, logAggregationService, ContainerType.TASK, 3, ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles); - finishApplication(appId, logAggregationService); + AppLogAggregatorImpl aggregator = + finishApplication(appId, logAggregationService); - verifyContainerLogs(logAggregationService, appId, - new ContainerId[] { container1 }, logFiles, 1, false); + verifyContainerLogs(aggregator, appId, new ContainerId[] { container1 }, + logFiles, 1, false); verifyLogAggFinishEvent(appId); } @@ -1731,9 +1760,10 @@ public void testAMOrFailedContainerPolicy() throws Exception { finishContainer(appId, logAggregationService, ContainerType.TASK, 3, ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles); - finishApplication(appId, logAggregationService); + AppLogAggregatorImpl aggregator = + finishApplication(appId, logAggregationService); - verifyContainerLogs(logAggregationService, appId, + verifyContainerLogs(aggregator, appId, new ContainerId[] { container1, container2 }, logFiles, 1, false); verifyLogAggFinishEvent(appId); @@ -1755,9 +1785,10 @@ public void testFailedOrKilledContainerPolicy() throws Exception { ContainerType.TASK, 3, ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles); - finishApplication(appId, logAggregationService); + AppLogAggregatorImpl aggregator = + finishApplication(appId, logAggregationService); - verifyContainerLogs(logAggregationService, appId, + verifyContainerLogs(aggregator, appId, new ContainerId[] { container2, container3 }, logFiles, 1, false); verifyLogAggFinishEvent(appId); @@ -1797,10 +1828,11 @@ public void testAMOnlyContainerPolicy() throws Exception { finishContainer(appId, logAggregationService, ContainerType.TASK, 3, 0, logFiles); - finishApplication(appId, logAggregationService); + AppLogAggregatorImpl aggregator = + finishApplication(appId, logAggregationService); - verifyContainerLogs(logAggregationService, appId, - new ContainerId[] { container1 }, logFiles, 1, false); + verifyContainerLogs(aggregator, appId, new ContainerId[] { container1 }, + logFiles, 1, false); verifyLogAggFinishEvent(appId); } @@ -1945,11 +1977,12 @@ private void verifyDefaultPolicy(ApplicationId appId, ContainerType.TASK, 3, ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles); - finishApplication(appId, logAggregationService); + AppLogAggregatorImpl aggregator = + finishApplication(appId, logAggregationService); - verifyContainerLogs(logAggregationService, appId, - new ContainerId[] { container1, container2, container3 }, - logFiles, 1, false); + verifyContainerLogs(aggregator, appId, + new ContainerId[] { container1, container2, container3 }, logFiles, 1, + false); verifyLogAggFinishEvent(appId); } @@ -2014,7 +2047,8 @@ private void setupAndTestSampleContainerPolicy(int successfulContainers, ContainerType.TASK, cid++, ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles)); - finishApplication(appId, logAggregationService); + AppLogAggregatorImpl aggregator = + finishApplication(appId, logAggregationService); // The number of containers with logs should be 3(AM + failed + killed) + // DEFAULT_SAMPLE_MIN_THRESHOLD + @@ -2028,10 +2062,9 @@ private void setupAndTestSampleContainerPolicy(int successfulContainers, (int)((successfulContainers - minThreshold) * sampleRate / 2); int maxOfContainersWithLogs = 3 + minThreshold + (int)((successfulContainers - minThreshold) * sampleRate * 2); - verifyContainerLogs(logAggregationService, appId, + verifyContainerLogs(aggregator, appId, containerIds.toArray(new ContainerId[containerIds.size()]), - minOfContainersWithLogs, maxOfContainersWithLogs, - logFiles, 1, false); + minOfContainersWithLogs, maxOfContainersWithLogs, logFiles, 1, false); verifyLogAggFinishEvent(appId); } @@ -2130,7 +2163,7 @@ private ContainerId finishContainer(ApplicationId application1, return containerIds; } - private void finishApplication(ApplicationId appId, + private AppLogAggregatorImpl finishApplication(ApplicationId appId, LogAggregationService logAggregationService) throws Exception { dispatcher.await(); ApplicationEvent expectedInitEvents[] = @@ -2140,9 +2173,14 @@ private void finishApplication(ApplicationId appId, "getApplicationID"); reset(appEventHandler); + AppLogAggregatorImpl aggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(appId); logAggregationService.handle(new LogHandlerAppFinishedEvent(appId)); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); + + return aggregator; } private void verifyLogAggFinishEvent(ApplicationId appId) throws Exception { @@ -2240,14 +2278,14 @@ private void testLogAggregationService(boolean retentionSizeLimitation) aggregator.doLogAggregationOutOfBand(); if (retentionSizeLimitation) { - Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 1, true, null)); + Assert.assertTrue( + waitAndCheckLogNum(aggregator, application, 50, 1, true, null)); } else { - Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 1, false, null)); + Assert.assertTrue( + waitAndCheckLogNum(aggregator, application, 50, 1, false, null)); } // Container logs should be uploaded - logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application, + logFileStatusInLastCycle = verifyContainerLogs(aggregator, application, new ContainerId[] { container }, logFiles1, 3, true); for(String logFile : logFiles1) { Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle() @@ -2266,7 +2304,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation) // Only one aggregated log file in Remote file directory. Assert.assertTrue( "Only one aggregated log file in Remote file directory expected", - waitAndCheckLogNum(logAggregationService, application, 50, 1, true, + waitAndCheckLogNum(aggregator, application, 50, 1, true, null)); Thread.sleep(2000); @@ -2278,14 +2316,14 @@ private void testLogAggregationService(boolean retentionSizeLimitation) aggregator.doLogAggregationOutOfBand(); if (retentionSizeLimitation) { - Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle())); + Assert.assertTrue(waitAndCheckLogNum(aggregator, application, 50, 1, true, + logFileStatusInLastCycle.getLogFilePathInLastCycle())); } else { - Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 2, false, null)); + Assert.assertTrue( + waitAndCheckLogNum(aggregator, application, 50, 2, false, null)); } // Container logs should be uploaded - logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application, + logFileStatusInLastCycle = verifyContainerLogs(aggregator, application, new ContainerId[] { container }, logFiles2, 3, true); for(String logFile : logFiles2) { @@ -2308,17 +2346,17 @@ private void testLogAggregationService(boolean retentionSizeLimitation) dispatcher.await(); logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); if (retentionSizeLimitation) { - Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle())); + Assert.assertTrue(waitAndCheckLogNum(aggregator, application, 50, 1, true, + logFileStatusInLastCycle.getLogFilePathInLastCycle())); } else { - Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 3, false, null)); + Assert.assertTrue( + waitAndCheckLogNum(aggregator, application, 50, 3, false, null)); } // the app is finished. The log "std_final" should be aggregated this time. String[] logFiles3WithFinalLog = new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" }; - verifyContainerLogs(logAggregationService, application, + verifyContainerLogs(aggregator, application, new ContainerId[] { container }, logFiles3WithFinalLog, 4, true); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); @@ -2440,10 +2478,10 @@ private void verifySkipUnnecessaryNNOperations( aggregator.getCleanupOldLogTimes()); } - private int numOfLogsAvailable(LogAggregationService logAggregationService, + private int numOfLogsAvailable(AppLogAggregatorImpl aggregator, ApplicationId appId, boolean sizeLimited, String lastLogFile) throws IOException { - Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user); + Path appLogDir = aggregator.getRemoteAppLogDir(); RemoteIterator nodeFiles = null; try { Path qualifiedLogDir = @@ -2466,8 +2504,8 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService, LOG.info("lastLogFile :" + lastLogFile); return -1; } - if (filename.contains(LogAggregationUtils - .getNodeString(logAggregationService.getNodeId()))) { + if (filename.contains( + LogAggregationUtils.getNodeString(aggregator.getNodeId()))) { LOG.info("Node list filename :" + filename); count++; } @@ -2476,20 +2514,19 @@ private int numOfLogsAvailable(LogAggregationService logAggregationService, return count; } - private boolean waitAndCheckLogNum( - LogAggregationService logAggregationService, ApplicationId application, - int maxAttempts, int expectNum, boolean sizeLimited, String lastLogFile) + private boolean waitAndCheckLogNum(AppLogAggregatorImpl aggregator, + ApplicationId application, int maxAttempts, int expectNum, + boolean sizeLimited, String lastLogFile) throws IOException, InterruptedException { int count = 0; - int logFiles=numOfLogsAvailable(logAggregationService, application, sizeLimited, + int logFiles = numOfLogsAvailable(aggregator, application, sizeLimited, lastLogFile); while ((logFiles != expectNum) && (count <= maxAttempts)) { Thread.sleep(500); count++; logFiles = - numOfLogsAvailable(logAggregationService, application, sizeLimited, - lastLogFile); + numOfLogsAvailable(aggregator, application, sizeLimited, lastLogFile); } return (logFiles == expectNum); } @@ -2512,4 +2549,21 @@ public String getLogFilePathInLastCycle() { return this.logFileTypesInLastCycle; } } + + /** + * A testable LogAggregationService class with spied AppLogAggregator. + */ + private static class TestableLogAggregationService + extends LogAggregationService { + public TestableLogAggregationService(Dispatcher dispatcher, Context context, + DeletionService deletionService, LocalDirsHandlerService dirsHandler) { + super(dispatcher, context, deletionService, dirsHandler); + } + + @Override + protected AppLogAggregator createAppAggregator(Configuration conf, + LogHandlerAppStartedEvent appStartEvent) { + return spy(super.createAppAggregator(conf, appStartEvent)); + } + } }