diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 181094e..5eb2a5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -1370,7 +1370,6 @@ public void run() { if (systemCredentials != null && !systemCredentials.isEmpty()) { ((NMContext) context).setSystemCrendentialsForApps( parseCredentials(systemCredentials)); - context.getContainerManager().handleCredentialUpdate(); } List containersToUpdate = response.getContainersToUpdate(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java index 356c2e0..2aeb245 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -44,5 +44,4 @@ ContainerScheduler getContainerScheduler(); - void handleCredentialUpdate(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index eb862ef..83023cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; @@ -1966,14 +1965,6 @@ public ContainerScheduler getContainerScheduler() { } @Override - public void handleCredentialUpdate() { - Set invalidApps = logHandler.getInvalidTokenApps(); - if (!invalidApps.isEmpty()) { - dispatcher.getEventHandler().handle(new LogHandlerTokenUpdatedEvent()); - } - } - - @Override public GetLocalizationStatusesResponse getLocalizationStatuses( GetLocalizationStatusesRequest request) throws YarnException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 245dc10..3e138b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -28,6 +28,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,8 +94,8 @@ private final Configuration conf; private final DeletionService delService; private final UserGroupInformation userUgi; - private final Path remoteNodeLogFileForApp; - private final Path remoteNodeTmpLogFileForApp; + private Path remoteNodeLogFileForApp; + private Path remoteNodeTmpLogFileForApp; private final BlockingQueue pendingContainers; private final AtomicBoolean appFinishing = new AtomicBoolean(); @@ -103,7 +106,7 @@ private final LogAggregationContext logAggregationContext; private final Context context; private final NodeId nodeId; - private final LogAggregationFileControllerContext logControllerContext; + private LogAggregationFileControllerContext logControllerContext; // These variables are only for testing private final AtomicBoolean waiting = new AtomicBoolean(false); @@ -115,8 +118,11 @@ new HashMap(); private final ContainerLogAggregationPolicy logAggPolicy; - private final LogAggregationFileController logAggregationFileController; + private LogAggregationFileController logAggregationFileController; + private final long rollingMonitorInterval; + private final boolean isLogAggregationInRolling; + private AtomicBoolean isInited; /** * The value recovered from state store to determine the age of application @@ -175,42 +181,58 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.nodeId = nodeId; this.logAggPolicy = getLogAggPolicy(conf); this.recoveredLogInitedTime = recoveredLogInitedTime; - this.logFileSizeThreshold = - conf.getLong(YarnConfiguration.LOG_AGGREGATION_DEBUG_FILESIZE, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE); - if (logAggregationFileController == null) { - // by default, use T-File Controller - this.logAggregationFileController = new LogAggregationTFileController(); - this.logAggregationFileController.initialize(conf, "TFile"); - this.logAggregationFileController.verifyAndCreateRemoteLogDir(); - this.logAggregationFileController.createAppDir( - this.userUgi.getShortUserName(), appId, userUgi); - this.remoteNodeLogFileForApp = this.logAggregationFileController - .getRemoteNodeLogFileForApp(appId, - this.userUgi.getShortUserName(), nodeId); - this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); - } else { - this.logAggregationFileController = logAggregationFileController; - this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; - this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); - } - boolean logAggregationInRolling = - rollingMonitorInterval > 0 && this.logAggregationContext != null - && this.logAggregationContext.getRolledLogsIncludePattern() != null - && !this.logAggregationContext.getRolledLogsIncludePattern() - .isEmpty(); - if (logAggregationInRolling) { - LOG.info("Rolling mode is turned on with include pattern {}", - this.logAggregationContext.getRolledLogsIncludePattern()); - } else { - LOG.debug("Rolling mode is turned off"); + this.rollingMonitorInterval = rollingMonitorInterval; + this.logAggregationFileController = logAggregationFileController; + this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; + this.isLogAggregationInRolling = + rollingMonitorInterval <= 0 || this.logAggregationContext == null + || this.logAggregationContext.getRolledLogsIncludePattern() == null + || this.logAggregationContext.getRolledLogsIncludePattern() + .isEmpty() ? false : true; + logControllerContext = null; + remoteNodeTmpLogFileForApp = null; + this.isInited = new AtomicBoolean(false); + } + + private void lazyInit() { + if (isInited.compareAndSet(false, true)) { + LOG.info("lazy inited app " + appId.toString() + "'s AppLogAggregatorImpl"); + if (logAggregationFileController == null) { + // by default, use T-File Controller + this.logAggregationFileController = new LogAggregationTFileController(); + this.logAggregationFileController.initialize(conf, "TFile"); + this.logAggregationFileController.verifyAndCreateRemoteLogDir(); + this.logAggregationFileController.createAppDir( + this.userUgi.getShortUserName(), appId, userUgi); + this.remoteNodeLogFileForApp = this.logAggregationFileController + .getRemoteNodeLogFileForApp(appId, + this.userUgi.getShortUserName(), nodeId); + this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); + } else { + this.logAggregationFileController.verifyAndCreateRemoteLogDir(); + this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); + YarnRuntimeException appDirException = null; + try { + // Create the app dir + logAggregationFileController.createAppDir(userUgi.getShortUserName(), appId, userUgi); + } catch (Exception e) { + this.disableLogAggregation(); + if (!(e instanceof YarnRuntimeException)) { + appDirException = new YarnRuntimeException(e); + } else { + appDirException = (YarnRuntimeException) e; + } + throw appDirException; + } + + } + logControllerContext = new LogAggregationFileControllerContext( + this.remoteNodeLogFileForApp, + this.remoteNodeTmpLogFileForApp, + isLogAggregationInRolling, + rollingMonitorInterval, + this.appId, this.appAcls, this.nodeId, this.userUgi); } - logControllerContext = new LogAggregationFileControllerContext( - this.remoteNodeLogFileForApp, - this.remoteNodeTmpLogFileForApp, - logAggregationInRolling, - rollingMonitorInterval, - this.appId, this.appAcls, this.nodeId, this.userUgi); } private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { @@ -277,8 +299,8 @@ private void uploadLogsForContainers(boolean appFinished) if (this.logAggregationDisabled) { return; } - addCredentials(); + lazyInit(); // Create a set of Containers whose logs will be uploaded in this cycle. // It includes: @@ -490,8 +512,8 @@ private void doAppLogAggregation() throws LogAggregationDFSException { synchronized(this) { try { waiting.set(true); - if (logControllerContext.isLogAggregationInRolling()) { - wait(logControllerContext.getRollingMonitorInterval() * 1000); + if (isLogAggregationInRolling) { + wait(rollingMonitorInterval * 1000); if (this.appFinishing.get() || this.aborted.get()) { break; } @@ -701,4 +723,14 @@ public LogAggregationFileController getLogAggregationFileController() { getLogAggregationFileControllerContext() { return this.logControllerContext; } + + @VisibleForTesting + public boolean isAppAggregationFinished() { + return this.appAggregationFinished.get(); + } + + @VisibleForTesting + public ApplicationId getAppId() { + return appId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 2280e75..f88dc7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -20,14 +20,11 @@ import java.io.IOException; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.security.token.SecretManager; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,9 +84,6 @@ private final ConcurrentMap appLogAggregators; - // Holds applications whose aggregation is disable due to invalid Token - private final Set invalidTokenApps; - @VisibleForTesting ExecutorService threadPool; @@ -102,7 +96,6 @@ public LogAggregationService(Dispatcher dispatcher, Context context, this.dirsHandler = dirsHandler; this.appLogAggregators = new ConcurrentHashMap(); - this.invalidTokenApps = ConcurrentHashMap.newKeySet(); } private static long calculateRollingMonitorInterval(Configuration conf) { @@ -254,7 +247,6 @@ protected void initAppAggregator(final ApplicationId appId, String user, LogAggregationFileController logAggregationFileController = getLogAggregationFileController(getConfig()); - logAggregationFileController.verifyAndCreateRemoteLogDir(); // New application final AppLogAggregator appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, @@ -266,24 +258,6 @@ protected void initAppAggregator(final ApplicationId appId, String user, if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } - // wait until check for existing aggregator to create dirs - YarnRuntimeException appDirException = null; - try { - // Create the app dir - logAggregationFileController.createAppDir(user, appId, userUgi); - } catch (Exception e) { - appLogAggregator.disableLogAggregation(); - - // add to disabled aggregators if due to InvalidToken - if (e.getCause() instanceof SecretManager.InvalidToken) { - invalidTokenApps.add(appId); - } - if (!(e instanceof YarnRuntimeException)) { - appDirException = new YarnRuntimeException(e); - } else { - appDirException = (YarnRuntimeException)e; - } - } // TODO Get the user configuration for the list of containers that need log // aggregation. @@ -300,10 +274,6 @@ public void run() { } }; this.threadPool.execute(aggregatorWrapper); - - if (appDirException != null) { - throw appDirException; - } } protected void closeFileSystems(final UserGroupInformation userUgi) { @@ -341,20 +311,16 @@ private void stopApp(ApplicationId appId) { // App is complete. Finish up any containers' pending log aggregation and // close the application specific logFile. - try { - AppLogAggregator aggregator = this.appLogAggregators.get(appId); - if (aggregator == null) { - LOG.warn("Log aggregation is not initialized for " + appId - + ", did it fail to start?"); - this.dispatcher.getEventHandler().handle(new ApplicationEvent(appId, - ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); - return; - } - aggregator.finishLogAggregation(); - } finally { - // Remove invalid Token Apps - invalidTokenApps.remove(appId); + AppLogAggregator aggregator = this.appLogAggregators.get(appId); + if (aggregator == null) { + LOG.warn("Log aggregation is not initialized for " + appId + + ", did it fail to start?"); + this.dispatcher.getEventHandler().handle(new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); + return; } + aggregator.finishLogAggregation(); + } @Override @@ -381,47 +347,12 @@ public void handle(LogHandlerEvent event) { (LogHandlerAppFinishedEvent) event; stopApp(appFinishedEvent.getApplicationId()); break; - case LOG_AGG_TOKEN_UPDATE: - checkAndEnableAppAggregators(); - break; default: ; // Ignore } } - private void checkAndEnableAppAggregators() { - for (ApplicationId appId : invalidTokenApps) { - try { - AppLogAggregator aggregator = appLogAggregators.get(appId); - if (aggregator != null) { - Credentials credentials = - context.getSystemCredentialsForApps().get(appId); - if (credentials != null) { - // Create the app dir again with - LogAggregationFileController logAggregationFileController = - getLogAggregationFileController(getConfig()); - UserGroupInformation userUgi = - aggregator.updateCredentials(credentials); - logAggregationFileController - .createAppDir(userUgi.getShortUserName(), appId, userUgi); - aggregator.enableLogAggregation(); - } - invalidTokenApps.remove(appId); - LOG.info("LogAggregation enabled for application {}", appId); - } - } catch (Exception e) { - //Ignore exception - LOG.warn("Enable aggregators failed {}", appId); - } - } - } - - @Override - public Set getInvalidTokenApps() { - return invalidTokenApps; - } - @VisibleForTesting public ConcurrentMap getAppLogAggregators() { return this.appLogAggregators; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java index 459fdf4..55b2187 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java @@ -18,16 +18,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; - -import java.util.Set; - public interface LogHandler extends EventHandler { public void handle(LogHandlerEvent event); - - public Set getInvalidTokenApps(); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 9898f8f..8ca657e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -19,12 +19,8 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -206,11 +202,6 @@ public void handle(LogHandlerEvent event) { } } - @Override - public Set getInvalidTokenApps() { - return Collections.emptySet(); - } - ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor( Configuration conf) { ThreadFactory tf = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java index ec477c2..1cfa3889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java @@ -21,5 +21,5 @@ public enum LogHandlerEventType { APPLICATION_STARTED, CONTAINER_FINISHED, - APPLICATION_FINISHED, LOG_AGG_TOKEN_UPDATE + APPLICATION_FINISHED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java deleted file mode 100644 index 772a463..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; - -public class LogHandlerTokenUpdatedEvent extends LogHandlerEvent { - - public LogHandlerTokenUpdatedEvent() { - super(LogHandlerEventType.LOG_AGG_TOKEN_UPDATE); - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 1acf3e9..65a7693 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -190,10 +190,6 @@ public void handle(LogHandlerEvent event) { } } - @Override - public Set getInvalidTokenApps() { - return Collections.emptySet(); - } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 0a2d63e..f7f51c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -56,12 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -80,7 +74,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -136,7 +129,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -596,10 +588,24 @@ public LogAggregationFileController getLogAggregationFileController( logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAMAndFailed)); dispatcher.await(); - + logAggregationService.handle(new LogHandlerAppFinishedEvent(appId)); + AppLogAggregatorImpl appLogAggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators().get(appId); + dispatcher.await(); + //wait for lazyInitLogAggregationFileController completed + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return new Boolean(appLogAggregator.isAppAggregationFinished()); + } + }, + 1000, 10 * 1000); // Verify that it failed ApplicationEvent[] expectedEvents = new ApplicationEvent[] { - new ApplicationEvent(appId, + new ApplicationEvent(appId, // original failure + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED) }; checkEvents(appEventHandler, expectedEvents, false, @@ -621,9 +627,11 @@ public LogAggregationFileController getLogAggregationFileController( // Verify that it worked expectedEvents = new ApplicationEvent[] { new ApplicationEvent(appId, // original failure - ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED), + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(appId2, // success - ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent(appId, // original failure + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED), }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); @@ -656,7 +664,19 @@ public void testVerifyAndCreateRemoteDirNonExistence() logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAMAndFailed)); dispatcher.await(); - + logAggregationService.handle(new LogHandlerAppFinishedEvent(appId)); + dispatcher.await(); + //wait for lazyInitLogAggregationFileController completed + AppLogAggregatorImpl appLogAggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators().get(appId); + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return new Boolean(appLogAggregator.isAppAggregationFinished()); + } + }, + 100, 10 * 1000); boolean existsAfter = aNewFile.exists(); assertTrue("The new aggregate file is not successfully created", existsAfter); aNewFile.delete(); //housekeeping @@ -665,7 +685,7 @@ public void testVerifyAndCreateRemoteDirNonExistence() @Test public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() - throws IOException { + throws IOException, TimeoutException, InterruptedException { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); Path aNewFile = new Path(String.valueOf("tmp"+System.currentTimeMillis())); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, aNewFile.getName()); @@ -684,7 +704,19 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAMAndFailed)); dispatcher.await(); - + AppLogAggregatorImpl appLogAggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators().get(appId); + logAggregationService.handle(new LogHandlerAppFinishedEvent(appId)); + dispatcher.await(); + // wait for lazyInitLogAggregationFileController completed + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return new Boolean(appLogAggregator.isAppAggregationFinished()); + } + }, + 1000, 10 * 1000); String targetGroup = UserGroupInformation.getLoginUser().getPrimaryGroupName(); FileSystem fs = FileSystem.get(this.conf); @@ -750,6 +782,18 @@ public LogAggregationFileController getLogAggregationFileController( AllContainerLogAggregationPolicy.class.getName()); aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAllContainers)); + AppLogAggregatorImpl appLogAggregator = + (AppLogAggregatorImpl) aggSvc.getAppLogAggregators().get(appId); + aggSvc.handle(new LogHandlerAppFinishedEvent(appId)); + // wait for lazyInitLogAggregationFileController completed + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return new Boolean(appLogAggregator.isAppAggregationFinished()); + } + }, + 1000, 10 * 1000); verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); @@ -762,6 +806,19 @@ public LogAggregationFileController getLogAggregationFileController( appId2, this.user, inputSuffix)); aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, this.acls, contextWithAllContainers)); + AppLogAggregatorImpl appLogAggregator2 = + (AppLogAggregatorImpl) aggSvc.getAppLogAggregators().get(appId2); + aggSvc.handle(new LogHandlerAppFinishedEvent(appId2)); + // wait for lazyInitLogAggregationFileController completed + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return new Boolean(appLogAggregator2 + .isAppAggregationFinished()); + } + }, + 1000, 10 * 1000); verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); // start another application with the app dir already created and verify @@ -773,6 +830,19 @@ public LogAggregationFileController getLogAggregationFileController( new File(appDir3.toUri().getPath()).mkdir(); aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, this.acls, contextWithAllContainers)); + AppLogAggregatorImpl appLogAggregator3 = + (AppLogAggregatorImpl) aggSvc.getAppLogAggregators().get(appId3); + aggSvc.handle(new LogHandlerAppFinishedEvent(appId3)); + // wait for lazyInitLogAggregationFileController completed + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return new Boolean(appLogAggregator3 + .isAppAggregationFinished()); + } + }, + 1000, 10 * 1000); verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class)); @@ -786,6 +856,19 @@ public LogAggregationFileController getLogAggregationFileController( aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null, this.acls, contextWithAllContainers)); + AppLogAggregatorImpl appLogAggregator4 = + (AppLogAggregatorImpl) aggSvc.getAppLogAggregators().get(appId4); + aggSvc.handle(new LogHandlerAppFinishedEvent(appId4)); + // wait for lazyInitLogAggregationFileController completed + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return new Boolean(appLogAggregator4 + .isAppAggregationFinished()); + } + }, + 1000, 10 * 1000); verify(spyFs, never()).mkdirs(eq(bucketDir4), isA(FsPermission.class)); verify(spyFs).mkdirs(eq(appDir4), isA(FsPermission.class)); @@ -861,27 +944,26 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() .getFileControllerForWrite(); LogAggregationFileController spyLogAggregationFileFormat = spy(logAggregationFileFormat); - Exception e = - new YarnRuntimeException(new SecretManager.InvalidToken("KABOOM!")); + Exception e = new RuntimeException("KABOOM!"); doThrow(e).when(spyLogAggregationFileFormat) .createAppDir(any(String.class), any(ApplicationId.class), any(UserGroupInformation.class)); LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, spyDelSrvc, super.dirsHandler){ - @Override - public LogAggregationFileController getLogAggregationFileController( - Configuration conf) { - return spyLogAggregationFileFormat; - } - }); + @Override + public LogAggregationFileController getLogAggregationFileController( + Configuration conf) { + return spyLogAggregationFileFormat; + } + }); logAggregationService.init(this.conf); logAggregationService.start(); ApplicationId appId = BuilderUtils.newApplicationId(System.currentTimeMillis(), - (int) (Math.random() * 1000)); + (int) (Math.random() * 1000)); File appLogDir = new File(localLogDir, appId.toString()); @@ -895,46 +977,50 @@ public LogAggregationFileController getLogAggregationFileController( this.user, null, this.acls, contextWithAMAndFailed)); dispatcher.await(); + AppLogAggregatorImpl appLogAggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators().get(appId); + logAggregationService.handle(new LogHandlerAppFinishedEvent(appId)); + dispatcher.await(); + // wait for lazyInitLogAggregationFileController completed + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return new Boolean(appLogAggregator.isAppAggregationFinished()); + } + }, + 1000, 10 * 1000); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED) }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); - assertThat(logAggregationService.getInvalidTokenApps()).hasSize(1); + // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM logAggregationService.handle(new LogHandlerContainerFinishedEvent( BuilderUtils.newContainerId(4, 1, 1, 1), ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); - - AppLogAggregator appAgg = - logAggregationService.getAppLogAggregators().get(appId); - Assert.assertFalse("Aggregation should be disabled", - appAgg.isAggregationEnabled()); - - // Enabled aggregation - logAggregationService.handle(new LogHandlerTokenUpdatedEvent()); - dispatcher.await(); - - appAgg = - logAggregationService.getAppLogAggregators().get(appId); - Assert.assertFalse("Aggregation should be enabled", - appAgg.isAggregationEnabled()); - - // Check disabled apps are cleared - Assert.assertEquals(0, logAggregationService.getInvalidTokenApps().size()); - logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); dispatcher.await(); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); - verify(spyDelSrvc).delete(any(FileDeletionTask.class)); + // local log dir shouldn't be deleted given log aggregation cannot + // continue due to aggregated log dir creation failure on remoteFS. + FileDeletionTask deletionTask = new FileDeletionTask(spyDelSrvc, user, + null, null); + verify(spyDelSrvc, never()).delete(deletionTask); verify(logAggregationService).closeFileSystems( any(UserGroupInformation.class)); + // make sure local log dir is not deleted in case log aggregation + // service cannot be initiated. + assertTrue(appLogDir.exists()); } private void writeContainerLogs(File appLogDir, ContainerId containerId,