diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index d536757..c669cc6 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -20,6 +20,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.isA; @@ -462,20 +463,32 @@ protected Dispatcher createDispatcher() { } @Override - protected EventHandler createSchedulerEventDispatcher() { - // Dispatch inline for test sanity - return new EventHandler() { - @Override - public void handle(SchedulerEvent event) { - scheduler.handle(event); - } - }; + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); } - @Override - protected ResourceScheduler createScheduler() { - return new MyFifoScheduler(this.getRMContext()); + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected EventHandler createSchedulerEventDispatcher() { + // Dispatch inline for test sanity + return new EventHandler() { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + @Override + protected ResourceScheduler createScheduler() { + return new MyFifoScheduler(this.rmContext); + } } - + MyFifoScheduler getMyFifoScheduler() { return (MyFifoScheduler) scheduler; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 4f3cab2..0f2ce37 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.client; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.RMActiveServices; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -334,26 +337,52 @@ protected ResourceManager createResourceManager() { protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcases. } + @Override - protected ClientRMService createClientRMService() { - if (overrideClientRMService) { - return new CustomedClientRMService(this.rmContext, this.scheduler, - this.rmAppManager, this.applicationACLsManager, - this.queueACLsManager, - this.rmContext.getRMDelegationTokenSecretManager()); - } - return super.createClientRMService(); + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); } - @Override - protected ResourceTrackerService createResourceTrackerService() { - if (overrideRTS) { - return new CustomedResourceTrackerService(this.rmContext, - this.nodesListManager, this.nmLivelinessMonitor, - this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()); + + class RMActiveServicesForTest extends RMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected ClientRMService createClientRMService() { + if (overrideClientRMService) { + return new CustomedClientRMService(this.rmContext, this.scheduler, + this.rmAppManager, this.applicationACLsManager, + this.queueACLsManager, + this.rmContext.getRMDelegationTokenSecretManager()); + } + return new ClientRMService(getRMContext(), getResourceScheduler(), + rmAppManager, applicationACLsManager, queueACLsManager, + getRMContext().getRMDelegationTokenSecretManager()) { + @Override + protected void serviceStart() { + // override to not start rpc handler + } + + @Override + protected void serviceStop() { + // don't do anything + } + }; + } + @Override + protected ResourceTrackerService createResourceTrackerService() { + if (overrideRTS) { + return new CustomedResourceTrackerService(this.rmContext, + this.nodesListManager, this.nmLivelinessMonitor, + this.rmContext.getContainerTokenSecretManager(), + this.rmContext.getNMTokenSecretManager()); + } + return super.createResourceTrackerService(); } - return super.createResourceTrackerService(); } + }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServices.java new file mode 100644 index 0000000..b34c0cd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServices.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.io.IOException; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; + +/** + * RMActiveServices handles all the Active services in the RM. + */ +@InterfaceAudience.Private +public class RMActiveServices extends CompositeService { + Log LOG = LogFactory.getLog(RMActiveServices.class.getName()); + + private Configuration conf; + protected Dispatcher rmDispatcher; + protected RMContextImpl rmContext; + private RMSecretManagerService rmSecretManagerService; + private DelegationTokenRenewer delegationTokenRenewer; + private EventHandler schedulerDispatcher; + private ApplicationMasterLauncher applicationMasterLauncher; + private ContainerAllocationExpirer containerAllocationExpirer; + protected ResourceTrackerService resourceTracker; + + protected ResourceScheduler scheduler; + private ClientRMService clientRM; + protected ApplicationMasterService masterService; + protected NMLivelinessMonitor nmLivelinessMonitor; + protected NodesListManager nodesListManager; + protected RMAppManager rmAppManager; + protected ApplicationACLsManager applicationACLsManager; + protected QueueACLsManager queueACLsManager; + + private boolean recoveryEnabled; + + public RMActiveServices(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super("RMActiveServices"); + this.rmContext = rmContext; + this.rmDispatcher = rmDispatcher; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + + rmSecretManagerService = createRMSecretManagerService(); + addService(rmSecretManagerService); + + containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); + addService(containerAllocationExpirer); + rmContext.setContainerAllocationExpirer(containerAllocationExpirer); + + AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); + addService(amLivelinessMonitor); + rmContext.setAMLivelinessMonitor(amLivelinessMonitor); + + AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); + addService(amFinishingMonitor); + rmContext.setAMFinishingMonitor(amFinishingMonitor); + + boolean isRecoveryEnabled = conf.getBoolean( + YarnConfiguration.RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); + + RMStateStore rmStore = null; + if(isRecoveryEnabled) { + recoveryEnabled = true; + rmStore = RMStateStoreFactory.getStore(conf); + } else { + recoveryEnabled = false; + rmStore = new NullRMStateStore(); + } + + try { + rmStore.init(conf); + rmStore.setRMDispatcher(rmDispatcher); + } catch (Exception e) { + // the Exception from stateStore.init() needs to be handled for + // HA and we need to give up master status if we got fenced + LOG.error("Failed to init state store", e); + throw e; + } + rmContext.setStateStore(rmStore); + + if (UserGroupInformation.isSecurityEnabled()) { + delegationTokenRenewer = createDelegationTokenRenewer(); + rmContext.setDelegationTokenRenewer(delegationTokenRenewer); + } + + RMApplicationHistoryWriter rmApplicationHistoryWriter = + createRMApplicationHistoryWriter(); + addService(rmApplicationHistoryWriter); + rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + + // Register event handler for NodesListManager + nodesListManager = new NodesListManager(rmContext); + rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); + addService(nodesListManager); + rmContext.setNodesListManager(nodesListManager); + + // Initialize the scheduler + scheduler = createScheduler(); + rmContext.setScheduler(scheduler); + + schedulerDispatcher = createSchedulerEventDispatcher(); + addIfService(schedulerDispatcher); + rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); + + // Register event handler for RmAppEvents + rmDispatcher.register(RMAppEventType.class, + new ResourceManager.ApplicationEventDispatcher(rmContext)); + + // Register event handler for RmAppAttemptEvents + rmDispatcher.register(RMAppAttemptEventType.class, + new ResourceManager.ApplicationAttemptEventDispatcher(rmContext)); + + // Register event handler for RmNodes + rmDispatcher.register( + RMNodeEventType.class, new ResourceManager.NodeEventDispatcher(rmContext)); + + nmLivelinessMonitor = createNMLivelinessMonitor(); + addService(nmLivelinessMonitor); + + resourceTracker = createResourceTrackerService(); + addService(resourceTracker); + rmContext.setResourceTrackerService(resourceTracker); + + DefaultMetricsSystem.initialize("ResourceManager"); + JvmMetrics.initSingleton("ResourceManager", null); + + try { + scheduler.reinitialize(conf, rmContext); + } catch (IOException ioe) { + throw new RuntimeException("Failed to initialize scheduler", ioe); + } + + // creating monitors that handle preemption + createPolicyMonitors(); + + masterService = createApplicationMasterService(); + addService(masterService) ; + rmContext.setApplicationMasterService(masterService); + + applicationACLsManager = new ApplicationACLsManager(conf); + rmContext.setApplicationACLsManager(applicationACLsManager); + + queueACLsManager = createQueueACLsManager(scheduler, conf); + rmContext.setQueueACLsManager(queueACLsManager); + + rmAppManager = createRMAppManager(); + rmContext.setRMAppManager(rmAppManager); + // Register event handler for RMAppManagerEvents + rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); + + clientRM = createClientRMService(); + rmContext.setClientRMService(clientRM); + addService(clientRM); + rmContext.setClientRMService(clientRM); + + applicationMasterLauncher = createAMLauncher(); + rmDispatcher.register(AMLauncherEventType.class, + applicationMasterLauncher); + + addService(applicationMasterLauncher); + if (UserGroupInformation.isSecurityEnabled()) { + addService(delegationTokenRenewer); + delegationTokenRenewer.setRMContext(rmContext); + } + + new RMNMInfo(rmContext, scheduler); + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + RMStateStore rmStore = rmContext.getStateStore(); + // The state store needs to start irrespective of recoveryEnabled as apps + // need events to move to further states. + rmStore.start(); + + if(recoveryEnabled) { + try { + rmStore.checkVersion(); + RMStateStore.RMState state = rmStore.loadState(); + recover(state); + } catch (Exception e) { + // the Exception from loadState() needs to be handled for + // HA and we need to give up master status if we got fenced + LOG.error("Failed to load/recover state", e); + throw e; + } + } + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + + DefaultMetricsSystem.shutdown(); + + if (rmContext != null) { + RMStateStore store = rmContext.getStateStore(); + try { + store.close(); + } catch (Exception e) { + LOG.error("Error closing store.", e); + } + } + + super.serviceStop(); + } + + protected void createPolicyMonitors() { + if (scheduler instanceof PreemptableResourceScheduler + && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) { + LOG.info("Loading policy monitors"); + List policies = conf.getInstances( + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + SchedulingEditPolicy.class); + if (policies.size() > 0) { + rmDispatcher.register(ContainerPreemptEventType.class, + new ResourceManager.RMContainerPreemptEventDispatcher( + (PreemptableResourceScheduler) scheduler)); + for (SchedulingEditPolicy policy : policies) { + LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); + policy.init(conf, rmContext.getDispatcher().getEventHandler(), + (PreemptableResourceScheduler) scheduler); + // periodically check whether we need to take action to guarantee + // constraints + SchedulingMonitor mon = new SchedulingMonitor(policy); + addService(mon); + } + } else { + LOG.warn("Policy monitors configured (" + + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + + ") but none specified (" + + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")"); + } + } + } + + private void recover(RMStateStore.RMState state) throws Exception { + // recover RMdelegationTokenSecretManager + rmContext.getRMDelegationTokenSecretManager().recover(state); + + // recover applications + rmAppManager.recover(state); + } + + protected ApplicationMasterLauncher createAMLauncher() { + return new ApplicationMasterLauncher(this.rmContext); + } + + private NMLivelinessMonitor createNMLivelinessMonitor() { + return new NMLivelinessMonitor(this.rmContext + .getDispatcher()); + } + + protected AMLivelinessMonitor createAMLivelinessMonitor() { + return new AMLivelinessMonitor(this.rmDispatcher); + } + + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer(); + } + + protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { + return new RMApplicationHistoryWriter(); + } + + + protected ApplicationMasterService createApplicationMasterService() { + return new ApplicationMasterService(this.rmContext, scheduler); + } + + protected RMSecretManagerService createRMSecretManagerService() { + return new RMSecretManagerService(conf, rmContext); + } + + protected ResourceScheduler createScheduler() { + String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER, + YarnConfiguration.DEFAULT_RM_SCHEDULER); + LOG.info("Using Scheduler: " + schedulerClassName); + try { + Class schedulerClazz = Class.forName(schedulerClassName); + if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) { + return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz, + this.conf); + } else { + throw new YarnRuntimeException("Class: " + schedulerClassName + + " not instance of " + ResourceScheduler.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate Scheduler: " + + schedulerClassName, e); + } + } + + protected EventHandler createSchedulerEventDispatcher() { + return new ResourceManager.SchedulerEventDispatcher(this.scheduler); + } + + + protected ResourceTrackerService createResourceTrackerService() { + return new ResourceTrackerService(this.rmContext, this.nodesListManager, + this.nmLivelinessMonitor, + this.rmContext.getContainerTokenSecretManager(), + this.rmContext.getNMTokenSecretManager()); + } + + protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, + Configuration conf) { + return new QueueACLsManager(scheduler, conf); + } + + protected RMAppManager createRMAppManager() { + return new RMAppManager(this.rmContext, this.scheduler, this.masterService, + this.applicationACLsManager, this.conf); + } + + protected ClientRMService createClientRMService() { + return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, + this.applicationACLsManager, this.queueACLsManager, + this.rmContext.getRMDelegationTokenSecretManager()); + } + +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 79fb5df..d2bb831 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -36,8 +36,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; /** * Context of the ResourceManager. @@ -99,4 +101,10 @@ void setRMApplicationHistoryWriter( RMApplicationHistoryWriter rmApplicationHistoryWriter); ConfigurationProvider getConfigurationProvider(); + + ApplicationACLsManager getApplicationACLsManager(); + + QueueACLsManager getQueueACLsManager(); + + RMAppManager getRMAppManager(); } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1eb4b75..0fbbc7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -41,10 +41,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; public class RMContextImpl implements RMContext { @@ -81,6 +83,9 @@ private ApplicationMasterService applicationMasterService; private RMApplicationHistoryWriter rmApplicationHistoryWriter; private ConfigurationProvider configurationProvider; + private ApplicationACLsManager applicationACLsManager; + private QueueACLsManager queueACLsManager; + private RMAppManager rmAppManager; /** * Default constructor. To be used in conjunction with setter methods for @@ -227,6 +232,21 @@ public ResourceTrackerService getResourceTrackerService() { return resourceTrackerService; } + @Override + public ApplicationACLsManager getApplicationACLsManager() { + return applicationACLsManager; + } + + @Override + public QueueACLsManager getQueueACLsManager() { + return queueACLsManager; + } + + @Override + public RMAppManager getRMAppManager() { + return rmAppManager; + } + void setHAEnabled(boolean isHAEnabled) { this.isHAEnabled = isHAEnabled; } @@ -317,6 +337,20 @@ void setResourceTrackerService( this.resourceTrackerService = resourceTrackerService; } + void setApplicationACLsManager( + ApplicationACLsManager applicationACLsManager) { + this.applicationACLsManager = applicationACLsManager; + } + + void setQueueACLsManager(QueueACLsManager queueACLsManager) { + this.queueACLsManager = queueACLsManager; + } + + void setRMAppManager(RMAppManager rmAppManager) { + this.rmAppManager = rmAppManager; + } + + @Override public boolean isHAEnabled() { return isHAEnabled; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 054ec04..345831a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -22,7 +22,6 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; -import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -32,8 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -42,7 +39,6 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; @@ -57,36 +53,21 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -123,7 +104,7 @@ */ @VisibleForTesting protected RMContextImpl rmContext; - private Dispatcher rmDispatcher; + protected Dispatcher rmDispatcher; @VisibleForTesting protected AdminService adminService; @@ -136,19 +117,10 @@ * in Active state. */ protected RMActiveServices activeServices; - protected RMSecretManagerService rmSecretManagerService; protected ResourceScheduler scheduler; - private ClientRMService clientRM; - protected ApplicationMasterService masterService; - protected NMLivelinessMonitor nmLivelinessMonitor; - protected NodesListManager nodesListManager; - protected RMAppManager rmAppManager; - protected ApplicationACLsManager applicationACLsManager; - protected QueueACLsManager queueACLsManager; private WebApp webApp; private AppReportFetcher fetcher = null; - protected ResourceTrackerService resourceTracker; private String webAppAddress; private ConfigurationProvider configurationProvider = null; @@ -231,11 +203,6 @@ protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(this.conf); } - - protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, - Configuration conf) { - return new QueueACLsManager(scheduler, conf); - } @VisibleForTesting protected void setRMStateStore(RMStateStore rmStore) { @@ -243,59 +210,10 @@ protected void setRMStateStore(RMStateStore rmStore) { rmContext.setStateStore(rmStore); } - protected EventHandler createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler); - } - protected Dispatcher createDispatcher() { return new AsyncDispatcher(); } - protected ResourceScheduler createScheduler() { - String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER, - YarnConfiguration.DEFAULT_RM_SCHEDULER); - LOG.info("Using Scheduler: " + schedulerClassName); - try { - Class schedulerClazz = Class.forName(schedulerClassName); - if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) { - return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz, - this.conf); - } else { - throw new YarnRuntimeException("Class: " + schedulerClassName - + " not instance of " + ResourceScheduler.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException("Could not instantiate Scheduler: " - + schedulerClassName, e); - } - } - - protected ApplicationMasterLauncher createAMLauncher() { - return new ApplicationMasterLauncher(this.rmContext); - } - - private NMLivelinessMonitor createNMLivelinessMonitor() { - return new NMLivelinessMonitor(this.rmContext - .getDispatcher()); - } - - protected AMLivelinessMonitor createAMLivelinessMonitor() { - return new AMLivelinessMonitor(this.rmDispatcher); - } - - protected DelegationTokenRenewer createDelegationTokenRenewer() { - return new DelegationTokenRenewer(); - } - - protected RMAppManager createRMAppManager() { - return new RMAppManager(this.rmContext, this.scheduler, this.masterService, - this.applicationACLsManager, this.conf); - } - - protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { - return new RMApplicationHistoryWriter(); - } - // sanity check for configurations protected static void validateConfigs(Configuration conf) { // validate max-attempts @@ -323,223 +241,6 @@ protected static void validateConfigs(Configuration conf) { } } - /** - * RMActiveServices handles all the Active services in the RM. - */ - @Private - class RMActiveServices extends CompositeService { - - private DelegationTokenRenewer delegationTokenRenewer; - private EventHandler schedulerDispatcher; - private ApplicationMasterLauncher applicationMasterLauncher; - private ContainerAllocationExpirer containerAllocationExpirer; - - private boolean recoveryEnabled; - - RMActiveServices() { - super("RMActiveServices"); - } - - @Override - protected void serviceInit(Configuration configuration) throws Exception { - conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); - - rmSecretManagerService = createRMSecretManagerService(); - addService(rmSecretManagerService); - - containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); - addService(containerAllocationExpirer); - rmContext.setContainerAllocationExpirer(containerAllocationExpirer); - - AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); - addService(amLivelinessMonitor); - rmContext.setAMLivelinessMonitor(amLivelinessMonitor); - - AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); - addService(amFinishingMonitor); - rmContext.setAMFinishingMonitor(amFinishingMonitor); - - boolean isRecoveryEnabled = conf.getBoolean( - YarnConfiguration.RECOVERY_ENABLED, - YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); - - RMStateStore rmStore = null; - if(isRecoveryEnabled) { - recoveryEnabled = true; - rmStore = RMStateStoreFactory.getStore(conf); - } else { - recoveryEnabled = false; - rmStore = new NullRMStateStore(); - } - - try { - rmStore.init(conf); - rmStore.setRMDispatcher(rmDispatcher); - } catch (Exception e) { - // the Exception from stateStore.init() needs to be handled for - // HA and we need to give up master status if we got fenced - LOG.error("Failed to init state store", e); - throw e; - } - rmContext.setStateStore(rmStore); - - if (UserGroupInformation.isSecurityEnabled()) { - delegationTokenRenewer = createDelegationTokenRenewer(); - rmContext.setDelegationTokenRenewer(delegationTokenRenewer); - } - - RMApplicationHistoryWriter rmApplicationHistoryWriter = - createRMApplicationHistoryWriter(); - addService(rmApplicationHistoryWriter); - rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - - // Register event handler for NodesListManager - nodesListManager = new NodesListManager(rmContext); - rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); - addService(nodesListManager); - rmContext.setNodesListManager(nodesListManager); - - // Initialize the scheduler - scheduler = createScheduler(); - rmContext.setScheduler(scheduler); - - schedulerDispatcher = createSchedulerEventDispatcher(); - addIfService(schedulerDispatcher); - rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); - - // Register event handler for RmAppEvents - rmDispatcher.register(RMAppEventType.class, - new ApplicationEventDispatcher(rmContext)); - - // Register event handler for RmAppAttemptEvents - rmDispatcher.register(RMAppAttemptEventType.class, - new ApplicationAttemptEventDispatcher(rmContext)); - - // Register event handler for RmNodes - rmDispatcher.register( - RMNodeEventType.class, new NodeEventDispatcher(rmContext)); - - nmLivelinessMonitor = createNMLivelinessMonitor(); - addService(nmLivelinessMonitor); - - resourceTracker = createResourceTrackerService(); - addService(resourceTracker); - rmContext.setResourceTrackerService(resourceTracker); - - DefaultMetricsSystem.initialize("ResourceManager"); - JvmMetrics.initSingleton("ResourceManager", null); - - try { - scheduler.reinitialize(conf, rmContext); - } catch (IOException ioe) { - throw new RuntimeException("Failed to initialize scheduler", ioe); - } - - // creating monitors that handle preemption - createPolicyMonitors(); - - masterService = createApplicationMasterService(); - addService(masterService) ; - rmContext.setApplicationMasterService(masterService); - - applicationACLsManager = new ApplicationACLsManager(conf); - - queueACLsManager = createQueueACLsManager(scheduler, conf); - - rmAppManager = createRMAppManager(); - // Register event handler for RMAppManagerEvents - rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); - - clientRM = createClientRMService(); - rmContext.setClientRMService(clientRM); - addService(clientRM); - rmContext.setClientRMService(clientRM); - - applicationMasterLauncher = createAMLauncher(); - rmDispatcher.register(AMLauncherEventType.class, - applicationMasterLauncher); - - addService(applicationMasterLauncher); - if (UserGroupInformation.isSecurityEnabled()) { - addService(delegationTokenRenewer); - delegationTokenRenewer.setRMContext(rmContext); - } - - new RMNMInfo(rmContext, scheduler); - - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - RMStateStore rmStore = rmContext.getStateStore(); - // The state store needs to start irrespective of recoveryEnabled as apps - // need events to move to further states. - rmStore.start(); - - if(recoveryEnabled) { - try { - rmStore.checkVersion(); - RMState state = rmStore.loadState(); - recover(state); - } catch (Exception e) { - // the Exception from loadState() needs to be handled for - // HA and we need to give up master status if we got fenced - LOG.error("Failed to load/recover state", e); - throw e; - } - } - - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - - DefaultMetricsSystem.shutdown(); - - if (rmContext != null) { - RMStateStore store = rmContext.getStateStore(); - try { - store.close(); - } catch (Exception e) { - LOG.error("Error closing store.", e); - } - } - - super.serviceStop(); - } - - protected void createPolicyMonitors() { - if (scheduler instanceof PreemptableResourceScheduler - && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) { - LOG.info("Loading policy monitors"); - List policies = conf.getInstances( - YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, - SchedulingEditPolicy.class); - if (policies.size() > 0) { - rmDispatcher.register(ContainerPreemptEventType.class, - new RMContainerPreemptEventDispatcher( - (PreemptableResourceScheduler) scheduler)); - for (SchedulingEditPolicy policy : policies) { - LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); - policy.init(conf, rmContext.getDispatcher().getEventHandler(), - (PreemptableResourceScheduler) scheduler); - // periodically check whether we need to take action to guarantee - // constraints - SchedulingMonitor mon = new SchedulingMonitor(policy); - addService(mon); - } - } else { - LOG.warn("Policy monitors configured (" + - YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + - ") but none specified (" + - YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")"); - } - } - } - } @Private public static class SchedulerEventDispatcher extends AbstractService @@ -787,7 +488,8 @@ public void handle(RMNodeEvent event) { protected void startWepApp() { Builder builder = WebApps - .$for("cluster", ApplicationMasterService.class, masterService, + .$for("cluster", ApplicationMasterService.class, + rmContext.getApplicationMasterService(), "ws") .with(conf) .withHttpSpnegoPrincipalKey( @@ -818,8 +520,9 @@ protected void startWepApp() { * instance of {@link RMActiveServices} and initializes it. * @throws Exception */ - void createAndInitActiveServices() throws Exception { - activeServices = new RMActiveServices(); + @VisibleForTesting + public void createAndInitActiveServices() throws Exception { + activeServices = new RMActiveServices(rmContext, rmDispatcher); activeServices.init(conf); } @@ -948,35 +651,15 @@ protected void serviceStop() throws Exception { transitionToStandby(false); rmContext.setHAServiceState(HAServiceState.STOPPING); } - - protected ResourceTrackerService createResourceTrackerService() { - return new ResourceTrackerService(this.rmContext, this.nodesListManager, - this.nmLivelinessMonitor, - this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()); - } - - protected ClientRMService createClientRMService() { - return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, - this.applicationACLsManager, this.queueACLsManager, - this.rmContext.getRMDelegationTokenSecretManager()); - } - - protected ApplicationMasterService createApplicationMasterService() { - return new ApplicationMasterService(this.rmContext, scheduler); - } protected AdminService createAdminService() { return new AdminService(this, rmContext); } - protected RMSecretManagerService createRMSecretManagerService() { - return new RMSecretManagerService(conf, rmContext); - } @Private public ClientRMService getClientRMService() { - return this.clientRM; + return this.rmContext.getClientRMService(); } /** @@ -985,7 +668,7 @@ public ClientRMService getClientRMService() { */ @Private public ResourceScheduler getResourceScheduler() { - return this.scheduler; + return this.rmContext.getScheduler(); } /** @@ -994,22 +677,22 @@ public ResourceScheduler getResourceScheduler() { */ @Private public ResourceTrackerService getResourceTrackerService() { - return this.resourceTracker; + return this.rmContext.getResourceTrackerService(); } @Private public ApplicationMasterService getApplicationMasterService() { - return this.masterService; + return this.rmContext.getApplicationMasterService(); } @Private public ApplicationACLsManager getApplicationACLsManager() { - return this.applicationACLsManager; + return this.rmContext.getApplicationACLsManager(); } @Private public QueueACLsManager getQueueACLsManager() { - return this.queueACLsManager; + return this.rmContext.getQueueACLsManager(); } @Private @@ -1023,7 +706,7 @@ public void recover(RMState state) throws Exception { rmContext.getRMDelegationTokenSecretManager().recover(state); // recover applications - rmAppManager.recover(state); + rmContext.getRMAppManager().recover(state); } public static void main(String argv[]) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index caee228..ae8a7b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -23,6 +23,7 @@ import java.security.PrivilegedAction; import java.util.Map; +import org.apache.hadoop.yarn.event.Dispatcher; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -366,7 +367,8 @@ public KillApplicationResponse killApp(ApplicationId appId) throws Exception { // from AMLauncher public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception { - MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + MockAM am = new MockAM(getRMContext(), + getRMContext().getApplicationMasterService(), appAttemptId); am.waitForState(RMAppAttemptState.ALLOCATED); getRMContext() .getDispatcher() @@ -378,88 +380,102 @@ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) throws Exception { - MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + MockAM am = new MockAM(getRMContext(), + getRMContext().getApplicationMasterService(), appAttemptId); am.waitForState(RMAppAttemptState.ALLOCATED); getRMContext().getDispatcher().getEventHandler() .handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed")); } @Override - protected ClientRMService createClientRMService() { - return new ClientRMService(getRMContext(), getResourceScheduler(), - rmAppManager, applicationACLsManager, queueACLsManager, - getRMContext().getRMDelegationTokenSecretManager()) { - @Override - protected void serviceStart() { - // override to not start rpc handler - } - - @Override - protected void serviceStop() { - // don't do anything - } - }; + public void createAndInitActiveServices() { + activeServices = new MockRMActiveServices(rmContext, rmDispatcher); + activeServices.init(getConfig()); } - @Override - protected ResourceTrackerService createResourceTrackerService() { - Configuration conf = new Configuration(); - - RMContainerTokenSecretManager containerTokenSecretManager = - getRMContext().getContainerTokenSecretManager(); - containerTokenSecretManager.rollMasterKey(); - NMTokenSecretManagerInRM nmTokenSecretManager = - getRMContext().getNMTokenSecretManager(); - nmTokenSecretManager.rollMasterKey(); - return new ResourceTrackerService(getRMContext(), nodesListManager, - this.nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager) { + public class MockRMActiveServices extends RMActiveServices { - @Override - protected void serviceStart() { - // override to not start rpc handler - } + public MockRMActiveServices(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } - @Override - protected void serviceStop() { - // don't do anything - } - }; - } + @Override + protected ClientRMService createClientRMService() { + return new ClientRMService(getRMContext(), getResourceScheduler(), + rmAppManager, applicationACLsManager, queueACLsManager, + getRMContext().getRMDelegationTokenSecretManager()) { + @Override + protected void serviceStart() { + // override to not start rpc handler + } - @Override - protected ApplicationMasterService createApplicationMasterService() { - return new ApplicationMasterService(getRMContext(), scheduler) { - @Override - protected void serviceStart() { - // override to not start rpc handler - } + @Override + protected void serviceStop() { + // don't do anything + } + }; + } - @Override - protected void serviceStop() { - // don't do anything - } - }; - } + @Override + protected ResourceTrackerService createResourceTrackerService() { + Configuration conf = new Configuration(); + + RMContainerTokenSecretManager containerTokenSecretManager = + getRMContext().getContainerTokenSecretManager(); + containerTokenSecretManager.rollMasterKey(); + NMTokenSecretManagerInRM nmTokenSecretManager = + getRMContext().getNMTokenSecretManager(); + nmTokenSecretManager.rollMasterKey(); + return new ResourceTrackerService(getRMContext(), nodesListManager, + this.nmLivelinessMonitor, containerTokenSecretManager, + nmTokenSecretManager) { + + @Override + protected void serviceStart() { + // override to not start rpc handler + } - @Override - protected ApplicationMasterLauncher createAMLauncher() { - return new ApplicationMasterLauncher(getRMContext()) { - @Override - protected void serviceStart() { - // override to not start rpc handler - } + @Override + protected void serviceStop() { + // don't do anything + } + }; + } - @Override - public void handle(AMLauncherEvent appEvent) { - // don't do anything - } + @Override + protected ApplicationMasterService createApplicationMasterService() { + return new ApplicationMasterService(getRMContext(), scheduler) { + @Override + protected void serviceStart() { + // override to not start rpc handler + } - @Override - protected void serviceStop() { - // don't do anything - } - }; + @Override + protected void serviceStop() { + // don't do anything + } + }; + } + + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new ApplicationMasterLauncher(getRMContext()) { + @Override + protected void serviceStart() { + // override to not start rpc handler + } + + @Override + public void handle(AMLauncherEvent appEvent) { + // don't do anything + } + + @Override + protected void serviceStop() { + // don't do anything + } + }; + } } @Override @@ -483,7 +499,7 @@ protected EmbeddedElectorService createEmbeddedElectorService() { } public NodesListManager getNodesListManager() { - return this.nodesListManager; + return this.rmContext.getNodesListManager(); } public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { @@ -491,7 +507,7 @@ public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { } public RMAppManager getRMAppManager() { - return this.rmAppManager; + return this.rmContext.getRMAppManager(); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java index 0ea2b5e..e76a4ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; @@ -47,30 +48,43 @@ public MockRMWithCustomAMLauncher(Configuration conf, } @Override - protected ApplicationMasterLauncher createAMLauncher() { - return new ApplicationMasterLauncher(getRMContext()) { - @Override - protected Runnable createRunnableLauncher(RMAppAttempt application, - AMLauncherEventType event) { - return new AMLauncher(context, application, event, getConfig()) { - @Override - protected ContainerManagementProtocol getContainerMgrProxy( - ContainerId containerId) { - return containerManager; - } - @Override - protected Token getAMRMToken() { - Token amRmToken = super.getAMRMToken(); - InetSocketAddress serviceAddr = - getConfig().getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - SecurityUtil.setTokenService(amRmToken, serviceAddr); - return amRmToken; - } - }; - } - }; + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new ApplicationMasterLauncher(getRMContext()) { + @Override + protected Runnable createRunnableLauncher(RMAppAttempt application, + AMLauncherEventType event) { + return new AMLauncher(context, application, event, getConfig()) { + @Override + protected ContainerManagementProtocol getContainerMgrProxy( + ContainerId containerId) { + return containerManager; + } + @Override + protected Token getAMRMToken() { + Token amRmToken = super.getAMRMToken(); + InetSocketAddress serviceAddr = + getConfig().getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + SecurityUtil.setTokenService(amRmToken, serviceAddr); + return amRmToken; + } + }; + } + }; + } + } + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java index e8f1425..192b580 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.yarn.event.Dispatcher; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -83,11 +84,24 @@ public void setup() throws InterruptedException, IOException { conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString()); resourceManager = new MockRM(conf) { - protected ClientRMService createClientRMService() { - return new ClientRMService(getRMContext(), this.scheduler, - this.rmAppManager, this.applicationACLsManager, - this.queueACLsManager, getRMContext().getRMDelegationTokenSecretManager()); - }; + @Override + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected ClientRMService createClientRMService() { + return new ClientRMService(getRMContext(), this.scheduler, + this.rmAppManager, this.applicationACLsManager, + this.queueACLsManager, getRMContext().getRMDelegationTokenSecretManager()); + }; + } @Override protected void doSecureLogin() throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index 58258ac..8b9a090 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -114,9 +115,21 @@ protected void startRMsWithCustomizedRMAppManager() throws IOException { rm1 = new MockRM(conf1) { @Override - protected RMAppManager createRMAppManager() { - return new MyRMAppManager(this.rmContext, this.scheduler, - this.masterService, this.applicationACLsManager, conf1); + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected RMAppManager createRMAppManager() { + return new MyRMAppManager(this.rmContext, this.scheduler, + this.masterService, this.applicationACLsManager, conf1); + } } }; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index a9f1c1a..af56e2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -143,8 +144,20 @@ protected void doSecureLogin() throws IOException { } @Override - protected ApplicationMasterService createApplicationMasterService() { - return new ApplicationMasterService(getRMContext(), this.scheduler); + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected ApplicationMasterService createApplicationMasterService() { + return new ApplicationMasterService(getRMContext(), this.scheduler); + } } @SuppressWarnings("unchecked") diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java index a288c57..8aa4444 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.event.Dispatcher; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Matchers.any; @@ -105,24 +106,38 @@ public static void setup() throws InterruptedException, IOException { resourceManager = new MockRM(conf) { @Override - protected QueueACLsManager createQueueACLsManager( - ResourceScheduler scheduler, - Configuration conf) { - QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); - when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), - any(QueueACL.class), anyString())).thenAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - return isQueueUser; - } - }); - return mockQueueACLsManager; + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected QueueACLsManager createQueueACLsManager( + ResourceScheduler scheduler, + Configuration conf) { + QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); + when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), + any(QueueACL.class), anyString())).thenAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + return isQueueUser; + } + }); + return mockQueueACLsManager; + } + + @Override + protected ClientRMService createClientRMService() { + return new ClientRMService(getRMContext(), this.scheduler, + this.rmAppManager, this.applicationACLsManager, + this.queueACLsManager, null); + }; } - protected ClientRMService createClientRMService() { - return new ClientRMService(getRMContext(), this.scheduler, - this.rmAppManager, this.applicationACLsManager, - this.queueACLsManager, null); - }; }; new Thread() { public void run() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 30ae089..f757596 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -138,13 +138,25 @@ public void testContainerCleanup() throws Exception { final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRM() { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { - @Override - public void handle(SchedulerEvent event) { - scheduler.handle(event); - } - }; + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler) { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 13ab17c..2834e9b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -158,7 +158,9 @@ public void testGetClusterNodes() throws Exception { MockRM rm = new MockRM() { protected ClientRMService createClientRMService() { return new ClientRMService(this.rmContext, scheduler, - this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, + this.getRMContext().getRMAppManager(), + this.getApplicationACLsManager(), + this.getQueueACLsManager(), this.getRMContext().getRMDelegationTokenSecretManager()); }; }; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java index 149ddf5..ca7b298 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.event.Dispatcher; import static org.junit.Assert.fail; import java.io.IOException; @@ -192,10 +193,22 @@ private void startRMsWithCustomizedClientRMService() throws IOException { rm1 = new MockRM(conf1) { @Override - protected ClientRMService createClientRMService() { - return new MyClientRMService(this.rmContext, this.scheduler, - this.rmAppManager, this.applicationACLsManager, - this.queueACLsManager, getRMContext().getRMDelegationTokenSecretManager()); + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected ClientRMService createClientRMService() { + return new MyClientRMService(this.rmContext, this.scheduler, + this.rmAppManager, this.applicationACLsManager, + this.queueACLsManager, getRMContext().getRMDelegationTokenSecretManager()); + } } }; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 49eff8b..3d06fbd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.event.Dispatcher; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -889,8 +890,20 @@ public void testRMRestartGetApplicationList() throws Exception { MockRM rm2 = new MockRM(conf, memStore) { @Override - protected RMAppManager createRMAppManager() { - return spy(super.createRMAppManager()); + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected RMAppManager createRMAppManager() { + return spy(super.createRMAppManager()); + } } }; @@ -1790,26 +1803,39 @@ public void testSynchronouslyRenewDTOnRecovery() throws Exception { MockRM rm2 = new MockRM(conf, memStore) { @Override - protected ResourceTrackerService createResourceTrackerService() { - return new ResourceTrackerService(this.rmContext, - this.nodesListManager, this.nmLivelinessMonitor, - this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()) { - @Override - protected void serviceStart() throws Exception { - // send the container_finished event as soon as the - // ResourceTrackerService is started. - super.serviceStart(); - nm1.setResourceTrackerService(getResourceTrackerService()); - List status = new ArrayList(); - ContainerId amContainer = - ContainerId.newInstance(am0.getApplicationAttemptId(), 1); - status.add(ContainerStatus.newInstance(amContainer, - ContainerState.COMPLETE, "AM container exit", 143)); - nm1.registerNode(status); - } - }; + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected ResourceTrackerService createResourceTrackerService() { + return new ResourceTrackerService(this.rmContext, + this.nodesListManager, this.nmLivelinessMonitor, + this.rmContext.getContainerTokenSecretManager(), + this.rmContext.getNMTokenSecretManager()) { + @Override + protected void serviceStart() throws Exception { + // send the container_finished event as soon as the + // ResourceTrackerService is started. + super.serviceStart(); + nm1.setResourceTrackerService(getResourceTrackerService()); + List status = new ArrayList(); + ContainerId amContainer = + ContainerId.newInstance(am0.getApplicationAttemptId(), 1); + status.add(ContainerStatus.newInstance(amContainer, + ContainerState.COMPLETE, "AM container exit", 143)); + nm1.registerNode(status); + } + }; + } } + }; // Re-start RM rm2.start(); @@ -1883,22 +1909,35 @@ public void init(Configuration conf) { } @Override - protected ClientRMService createClientRMService() { - return new ClientRMService(getRMContext(), getResourceScheduler(), - rmAppManager, applicationACLsManager, null, - getRMContext().getRMDelegationTokenSecretManager()){ - @Override - protected void serviceStart() throws Exception { - // do nothing - } + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } - @Override - protected void serviceStop() throws Exception { - //do nothing - } - }; + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected ClientRMService createClientRMService() { + return new ClientRMService(getRMContext(), getResourceScheduler(), + rmAppManager, applicationACLsManager, null, + getRMContext().getRMDelegationTokenSecretManager()){ + @Override + protected void serviceStart() throws Exception { + // do nothing + } + + @Override + protected void serviceStop() throws Exception { + //do nothing + } + }; + } } + @Override protected void doSecureLogin() throws IOException { // Do nothing. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 2f16b85..b371fa6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -21,8 +21,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -33,7 +31,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -61,7 +58,6 @@ import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -539,13 +535,25 @@ public void testReconnectNode() throws Exception { final DrainDispatcher dispatcher = new DrainDispatcher(); rm = new MockRM() { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { - @Override - public void handle(SchedulerEvent event) { - scheduler.handle(event); - } - }; + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler) { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java index d8f2a37..90d63ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.ahs; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -357,35 +358,49 @@ public void testRMWritingMassiveHistory() throws Exception { YarnConfiguration conf = new YarnConfiguration(); // don't process history events MockRM rm = new MockRM(conf) { + @Override - protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { - return new RMApplicationHistoryWriter() { - @Override - public void applicationStarted(RMApp app) { - } - - @Override - public void applicationFinished(RMApp app, RMAppState finalState) { - } - - @Override - public void applicationAttemptStarted(RMAppAttempt appAttempt) { - } - - @Override - public void applicationAttemptFinished( - RMAppAttempt appAttempt, RMAppAttemptState finalState) { - } - - @Override - public void containerStarted(RMContainer container) { - } - - @Override - public void containerFinished(RMContainer container) { - } - }; + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { + return new RMApplicationHistoryWriter() { + @Override + public void applicationStarted(RMApp app) { + } + + @Override + public void applicationFinished(RMApp app, RMAppState finalState) { + } + + @Override + public void applicationAttemptStarted(RMAppAttempt appAttempt) { + } + + @Override + public void applicationAttemptFinished( + RMAppAttempt appAttempt, RMAppAttemptState finalState) { + } + + @Override + public void containerStarted(RMContainer container) { + } + + @Override + public void containerFinished(RMContainer container) { + } + }; + } } + }; long startTime1 = System.currentTimeMillis(); testRMWritingMassiveHistory(rm); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index fe4ddb7..cd03a44 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -21,6 +21,7 @@ import java.security.PrivilegedExceptionAction; import java.util.List; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.junit.Assert; import org.apache.hadoop.security.UserGroupInformation; @@ -55,13 +56,25 @@ public void setUp() { dispatcher = new DrainDispatcher(); this.rm = new MockRM() { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { - @Override - public void handle(SchedulerEvent event) { - scheduler.handle(event); - } - }; + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); + } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler) { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 8a37fe9..5e1f375 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.event.Dispatcher; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -221,9 +222,22 @@ public void testAMContainerAllocationWhenDNSUnavailable() throws Exception { final YarnConfiguration conf = new YarnConfiguration(); MockRM rm1 = new MockRM(conf) { @Override - protected RMSecretManagerService createRMSecretManagerService() { - return new TestRMSecretManagerService(conf, rmContext); + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); } + + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected RMSecretManagerService createRMSecretManagerService() { + return new TestRMSecretManagerService(conf, rmContext); + } + } + }; rm1.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index 0dcd228..f951e04 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -167,7 +167,9 @@ public void testClientToAMTokens() throws Exception { MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) { protected ClientRMService createClientRMService() { return new ClientRMService(this.rmContext, scheduler, - this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, + this.getRMContext().getRMAppManager(), + this.getApplicationACLsManager(), + this.getQueueACLsManager(), getRMContext().getRMDelegationTokenSecretManager()); }; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java index cf11e48..38e0557 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -170,21 +172,34 @@ public MyMockRM(Configuration conf, RMStateStore store) { } @Override - protected RMSecretManagerService createRMSecretManagerService() { - return new RMSecretManagerService(conf, rmContext) { - - @Override - protected RMDelegationTokenSecretManager - createRMDelegationTokenSecretManager(Configuration conf, - RMContext rmContext) { - // KeyUpdateInterval-> 1 seconds - // TokenMaxLifetime-> 2 seconds. - return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000, - rmContext); - } - }; + public void createAndInitActiveServices() { + activeServices = new RMActiveServicesForTest(rmContext, rmDispatcher); + activeServices.init(getConfig()); } + class RMActiveServicesForTest extends MockRM.MockRMActiveServices { + public RMActiveServicesForTest(RMContextImpl rmContext, Dispatcher rmDispatcher) { + super(rmContext, rmDispatcher); + } + + @Override + protected RMSecretManagerService createRMSecretManagerService() { + return new RMSecretManagerService(conf, rmContext) { + + @Override + protected RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(Configuration conf, + RMContext rmContext) { + // KeyUpdateInterval-> 1 seconds + // TokenMaxLifetime-> 2 seconds. + return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000, + rmContext); + } + }; + } + } + + } public class TestRMDelegationTokenSecretManager extends