diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 20887b66483..076c4da013d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -75,30 +75,19 @@ private static final Log LOG = LogFactory .getLog(AppLogAggregatorImpl.class); - private static final int THREAD_SLEEP_TIME = 1000; // This is temporary solution. The configuration will be deleted once // we find a more scalable method to only write a single log file per LRS. private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app"; private static final int DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30; - - // This configuration is for debug and test purpose. By setting - // this configuration as true. We can break the lower bound of - // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS. - private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED - = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; - private static final boolean - DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED = false; - - private static final long - NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS = 3600; private final LocalDirsHandlerService dirsHandler; private final Dispatcher dispatcher; private final ApplicationId appId; private final String applicationId; private boolean logAggregationDisabled = false; + private final LogAggregationService logAggregationService; private final Configuration conf; private final DeletionService delService; private final UserGroupInformation userUgi; @@ -128,7 +117,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, ContainerLogsRetentionPolicy retentionPolicy, Map appAcls, LogAggregationContext logAggregationContext, Context context, - FileContext lfs) { + FileContext lfs, LogAggregationService logAggregationService, long rollingMonitorInterval) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; @@ -145,6 +134,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.logAggregationContext = logAggregationContext; this.context = context; this.nodeId = nodeId; + this.logAggregationService = logAggregationService; int configuredRentionSize = conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP, DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP); @@ -154,43 +144,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, } else { this.retentionSize = configuredRentionSize; } - long configuredRollingMonitorInterval = conf.getLong( - YarnConfiguration - .NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, - YarnConfiguration - .DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS); - boolean debug_mode = - conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, - DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED); - if (configuredRollingMonitorInterval > 0 - && configuredRollingMonitorInterval < - NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) { - if (debug_mode) { - this.rollingMonitorInterval = configuredRollingMonitorInterval; - } else { - LOG.warn( - "rollingMonitorIntervall should be more than or equal to " - + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS - + " seconds. Using " - + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS - + " seconds instead."); - this.rollingMonitorInterval = - NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS; - } - } else { - if (configuredRollingMonitorInterval <= 0) { - LOG.warn("rollingMonitorInterval is set as " - + configuredRollingMonitorInterval + ". " - + "The log rolling mornitoring interval is disabled. " - + "The logs will be aggregated after this application is finished."); - } else { - LOG.warn("rollingMonitorInterval is set as " - + configuredRollingMonitorInterval + ". " - + "The logs will be aggregated every " - + configuredRollingMonitorInterval + " seconds"); - } - this.rollingMonitorInterval = configuredRollingMonitorInterval; - } + this.rollingMonitorInterval = rollingMonitorInterval; } private void uploadLogsForContainers() { @@ -379,32 +333,17 @@ public void run() { try { doAppLogAggregation(); } finally { - if (!this.appAggregationFinished.get()) { - LOG.warn("Aggregation did not complete for application " + appId); + if (rollingMonitorInterval <= 0 || appFinishing.get() || aborted.get()) { + this.appAggregationFinished.set(true); } - this.appAggregationFinished.set(true); } } @SuppressWarnings("unchecked") private void doAppLogAggregation() { - while (!this.appFinishing.get() && !this.aborted.get()) { - synchronized(this) { - try { - if (this.rollingMonitorInterval > 0) { - wait(this.rollingMonitorInterval * 1000); - if (this.appFinishing.get() || this.aborted.get()) { - break; - } - uploadLogsForContainers(); - } else { - wait(THREAD_SLEEP_TIME); - } - } catch (InterruptedException e) { - LOG.warn("PendingContainers queue is interrupted"); - this.appFinishing.set(true); - } - } + if (rollingMonitorInterval > 0 && !this.appFinishing.get() && !this.aborted.get()) { + uploadLogsForContainers(); + return; } if (this.aborted.get()) { @@ -446,6 +385,14 @@ private Path getRemoteNodeTmpLogFileForApp() { (remoteNodeLogFileForApp.getName() + LogAggregationUtils.TMP_FILE_SUFFIX)); } + public AtomicBoolean getAppFinishing() { + return appFinishing; + } + + public AtomicBoolean getAborted() { + return aborted; + } + // TODO: The condition: containerId.getId() == 1 to determine an AM container // is not always true. private boolean shouldUploadLogs(ContainerId containerId, @@ -492,24 +439,26 @@ public void startContainerLogAggregation(ContainerId containerId, } @Override - public synchronized void finishLogAggregation() { + public void finishLogAggregation() { LOG.info("Application just finished : " + this.applicationId); this.appFinishing.set(true); - this.notifyAll(); + this.logAggregationService.triggerAppLogAggregatoion(appId); } @Override - public synchronized void abortLogAggregation() { + public void abortLogAggregation() { LOG.info("Aborting log aggregation for " + this.applicationId); this.aborted.set(true); - this.notifyAll(); + this.logAggregationService.triggerAppLogAggregatoion(appId); } @Private @VisibleForTesting - public synchronized void doLogAggregationOutOfBand() { + // This is only used for testing. + // To use this method, make sure rollingMonitorInterval is greater than 0 + public void doLogAggregationOutOfBand() { LOG.info("Do OutOfBand log aggregation"); - this.notifyAll(); + logAggregationService.logAggregatorRoller.refresh(); } private class ContainerLogAggregator { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index bd3e847579d..ac3f76371f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -70,6 +70,16 @@ private static final Log LOG = LogFactory .getLog(LogAggregationService.class); + // This configuration is for debug and test purpose. By setting + // this configuration as true. We can break the lower bound of + // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS. + private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED + = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; + private static final boolean + DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED = false; + private static final long + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS = 3600; + /* * Expected deployment TLD will be 1777, owner=, group= App dirs will be created as 770, @@ -93,15 +103,20 @@ private final DeletionService deletionService; private final Dispatcher dispatcher; + private long rollingMonitorInterval; private LocalDirsHandlerService dirsHandler; + @VisibleForTesting + LogAggregatorRoller logAggregatorRoller; Path remoteRootLogDir; String remoteRootLogDirSuffix; private NodeId nodeId; + private ConcurrentMap wrappers + = new ConcurrentHashMap(); private final ConcurrentMap appLogAggregators; private final ExecutorService threadPool; - + public LogAggregationService(Dispatcher dispatcher, Context context, DeletionService deletionService, LocalDirsHandlerService dirsHandler) { super(LogAggregationService.class.getName()); @@ -125,6 +140,44 @@ protected void serviceInit(Configuration conf) throws Exception { conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); + long configuredRollingMonitorInterval = conf.getLong( + YarnConfiguration + .NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, + YarnConfiguration + .DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS); + boolean debug_mode = + conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, + DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED); + if (configuredRollingMonitorInterval > 0 + && configuredRollingMonitorInterval < + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) { + if (debug_mode) { + this.rollingMonitorInterval = configuredRollingMonitorInterval; + } else { + LOG.warn( + "rollingMonitorIntervall should be more than or equal to " + + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS + + " seconds. Using " + + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS + + " seconds instead."); + this.rollingMonitorInterval = + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS; + } + } else { + if (configuredRollingMonitorInterval <= 0) { + LOG.warn("rollingMonitorInterval is set as " + + configuredRollingMonitorInterval + ". " + + "The log rolling mornitoring interval is disabled. " + + "The logs will be aggregated after this application is finished."); + } else { + LOG.warn("rollingMonitorInterval is set as " + + configuredRollingMonitorInterval + ". " + + "The logs will be aggregated every " + + configuredRollingMonitorInterval + " seconds"); + } + this.rollingMonitorInterval = configuredRollingMonitorInterval; + } + super.serviceInit(conf); } @@ -133,18 +186,25 @@ protected void serviceStart() throws Exception { // NodeId is only available during start, the following cannot be moved // anywhere else. this.nodeId = this.context.getNodeId(); + if (rollingMonitorInterval > 0) { + logAggregatorRoller = new LogAggregatorRoller(); + logAggregatorRoller.start(); + } super.serviceStart(); } @Override protected void serviceStop() throws Exception { LOG.info(this.getName() + " waiting for pending aggregation during exit"); + if (rollingMonitorInterval > 0) { + logAggregatorRoller.interrupt(); + logAggregatorRoller.join(); + } stopAggregators(); super.serviceStop(); } private void stopAggregators() { - threadPool.shutdown(); // if recovery on restart is supported then leave outstanding aggregations // to the next restart boolean shouldAbort = context.getNMStateStore().canRecover() @@ -157,6 +217,7 @@ private void stopAggregators() { aggregator.finishLogAggregation(); } } + threadPool.shutdown(); while (!threadPool.isTerminated()) { // wait for all threads to finish for (ApplicationId appId : appLogAggregators.keySet()) { LOG.info("Waiting for aggregation to complete for " + appId); @@ -356,7 +417,7 @@ protected void initAppAggregator(final ApplicationId appId, String user, getConfig(), appId, userUgi, this.nodeId, dirsHandler, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls, logAggregationContext, this.context, - getLocalFileContext(getConfig())); + getLocalFileContext(getConfig()), this, rollingMonitorInterval); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } @@ -383,12 +444,88 @@ public void run() { try { appLogAggregator.run(); } finally { - appLogAggregators.remove(appId); - closeFileSystems(userUgi); + if (rollingMonitorInterval < 0 + || ((AppLogAggregatorImpl) appLogAggregator).getAborted().get() + || ((AppLogAggregatorImpl) appLogAggregator).getAppFinishing().get()) { + appLogAggregators.remove(appId); + closeFileSystems(userUgi); + } } } }; - this.threadPool.execute(aggregatorWrapper); + + wrappers.put(appId, aggregatorWrapper); + if (rollingMonitorInterval > 0) { + logAggregatorRoller.add(appId); + } + } + + void triggerAppLogAggregatoion(ApplicationId appId) { + Runnable wrapper = wrappers.get(appId); + if (wrapper == null) { + return; + } + this.threadPool.execute(wrapper); + AppLogAggregatorImpl appLogAggregator = (AppLogAggregatorImpl) appLogAggregators.get(appId); + if (rollingMonitorInterval <= 0) { + wrappers.remove(appId); + } else if (appLogAggregator != null + && (appLogAggregator.getAborted().get() + || appLogAggregator.getAppFinishing().get())) { + wrappers.remove(appId); + logAggregatorRoller.remove(appId); + } + } + + class LogAggregatorRoller extends Thread { + final static long INTERVAL = 60 * 1000; + ConcurrentMap rollingIntervals + = new ConcurrentHashMap(); + + public void run() { + LOG.info("Start LogAggregationRoller, rollingMonitorInterval is " + + rollingMonitorInterval); + while (!isInterrupted()) { + synchronized (this) { + long now = System.currentTimeMillis(); + for (Map.Entry entry : rollingIntervals.entrySet()) { + ApplicationId appId = entry.getKey(); + long lastRolling = entry.getValue(); + if (now - lastRolling >= rollingMonitorInterval * 1000) { + triggerAppLogAggregatoion(appId); + rollingIntervals.put(appId, now); + } + } + try { + wait(INTERVAL); + } catch (InterruptedException e) { + LOG.warn("Stop LogAggregatorRoller"); + interrupt(); + } + } + } + } + + @Private + @VisibleForTesting + // This is only used for test purpose + public synchronized void refresh() { + long last = System.currentTimeMillis() - rollingMonitorInterval * 1000; + for (ApplicationId appId : rollingIntervals.keySet()) { + LOG.info(System.currentTimeMillis() + " " + last); + rollingIntervals.put(appId, last); + } + notifyAll(); + } + + public void add(ApplicationId appId) { + rollingIntervals.put(appId, System.currentTimeMillis()); + } + + public void remove(ApplicationId appId) { + rollingIntervals.remove(appId); + } + } protected void closeFileSystems(final UserGroupInformation userUgi) {