diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java new file mode 100644 index 0000000..658629b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; + +import com.google.common.annotations.VisibleForTesting; + +public class RMActiveServiceContext { + + private static final Log LOG = LogFactory + .getLog(RMActiveServiceContext.class); + + private final ConcurrentMap applications = + new ConcurrentHashMap(); + + private final ConcurrentMap nodes = + new ConcurrentHashMap(); + + private final ConcurrentMap inactiveNodes = + new ConcurrentHashMap(); + + private final ConcurrentMap systemCredentials = + new ConcurrentHashMap(); + + private boolean isWorkPreservingRecoveryEnabled; + + private AMLivelinessMonitor amLivelinessMonitor; + private AMLivelinessMonitor amFinishingMonitor; + private RMStateStore stateStore = null; + private ContainerAllocationExpirer containerAllocationExpirer; + private DelegationTokenRenewer delegationTokenRenewer; + private AMRMTokenSecretManager amRMTokenSecretManager; + private RMContainerTokenSecretManager containerTokenSecretManager; + private NMTokenSecretManagerInRM nmTokenSecretManager; + private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; + private ClientRMService clientRMService; + private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; + private ResourceScheduler scheduler; + private ReservationSystem reservationSystem; + private NodesListManager nodesListManager; + private ResourceTrackerService resourceTrackerService; + private ApplicationMasterService applicationMasterService; + private RMApplicationHistoryWriter rmApplicationHistoryWriter; + private SystemMetricsPublisher systemMetricsPublisher; + private RMNodeLabelsManager nodeLabelManager; + private long epoch; + private Clock systemClock = new SystemClock(); + private long schedulerRecoveryStartTime = 0; + private long schedulerRecoveryWaitTime = 0; + private boolean printLog = true; + private boolean isSchedulerReady = false; + + public RMActiveServiceContext() { + + } + + @VisibleForTesting + // helper constructor for tests + public RMActiveServiceContext(Dispatcher rmDispatcher, + ContainerAllocationExpirer containerAllocationExpirer, + AMLivelinessMonitor amLivelinessMonitor, + AMLivelinessMonitor amFinishingMonitor, + DelegationTokenRenewer delegationTokenRenewer, + AMRMTokenSecretManager appTokenSecretManager, + RMContainerTokenSecretManager containerTokenSecretManager, + NMTokenSecretManagerInRM nmTokenSecretManager, + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this(); + this.setContainerAllocationExpirer(containerAllocationExpirer); + this.setAMLivelinessMonitor(amLivelinessMonitor); + this.setAMFinishingMonitor(amFinishingMonitor); + this.setDelegationTokenRenewer(delegationTokenRenewer); + this.setAMRMTokenSecretManager(appTokenSecretManager); + this.setContainerTokenSecretManager(containerTokenSecretManager); + this.setNMTokenSecretManager(nmTokenSecretManager); + this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); + this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + + RMStateStore nullStore = new NullRMStateStore(); + nullStore.setRMDispatcher(rmDispatcher); + try { + nullStore.init(new YarnConfiguration()); + setStateStore(nullStore); + } catch (Exception e) { + assert false; + } + } + + @VisibleForTesting + public void setStateStore(RMStateStore store) { + stateStore = store; + } + + public ClientRMService getClientRMService() { + return clientRMService; + } + + public ApplicationMasterService getApplicationMasterService() { + return applicationMasterService; + } + + public ResourceTrackerService getResourceTrackerService() { + return resourceTrackerService; + } + + public RMStateStore getStateStore() { + return stateStore; + } + + public ConcurrentMap getRMApps() { + return this.applications; + } + + public ConcurrentMap getRMNodes() { + return this.nodes; + } + + public ConcurrentMap getInactiveRMNodes() { + return this.inactiveNodes; + } + + public ContainerAllocationExpirer getContainerAllocationExpirer() { + return this.containerAllocationExpirer; + } + + public AMLivelinessMonitor getAMLivelinessMonitor() { + return this.amLivelinessMonitor; + } + + public AMLivelinessMonitor getAMFinishingMonitor() { + return this.amFinishingMonitor; + } + + public DelegationTokenRenewer getDelegationTokenRenewer() { + return delegationTokenRenewer; + } + + public AMRMTokenSecretManager getAMRMTokenSecretManager() { + return this.amRMTokenSecretManager; + } + + public RMContainerTokenSecretManager getContainerTokenSecretManager() { + return this.containerTokenSecretManager; + } + + public NMTokenSecretManagerInRM getNMTokenSecretManager() { + return this.nmTokenSecretManager; + } + + public ResourceScheduler getScheduler() { + return this.scheduler; + } + + public ReservationSystem getReservationSystem() { + return this.reservationSystem; + } + + public NodesListManager getNodesListManager() { + return this.nodesListManager; + } + + public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { + return this.clientToAMTokenSecretManager; + } + + public void setClientRMService(ClientRMService clientRMService) { + this.clientRMService = clientRMService; + } + + public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() { + return this.rmDelegationTokenSecretManager; + } + + public void setRMDelegationTokenSecretManager( + RMDelegationTokenSecretManager delegationTokenSecretManager) { + this.rmDelegationTokenSecretManager = delegationTokenSecretManager; + } + + void setContainerAllocationExpirer( + ContainerAllocationExpirer containerAllocationExpirer) { + this.containerAllocationExpirer = containerAllocationExpirer; + } + + void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) { + this.amLivelinessMonitor = amLivelinessMonitor; + } + + void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) { + this.amFinishingMonitor = amFinishingMonitor; + } + + void setContainerTokenSecretManager( + RMContainerTokenSecretManager containerTokenSecretManager) { + this.containerTokenSecretManager = containerTokenSecretManager; + } + + void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) { + this.nmTokenSecretManager = nmTokenSecretManager; + } + + void setScheduler(ResourceScheduler scheduler) { + this.scheduler = scheduler; + } + + void setReservationSystem(ReservationSystem reservationSystem) { + this.reservationSystem = reservationSystem; + } + + void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) { + this.delegationTokenRenewer = delegationTokenRenewer; + } + + void setClientToAMTokenSecretManager( + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { + this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; + } + + void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) { + this.amRMTokenSecretManager = amRMTokenSecretManager; + } + + void setNodesListManager(NodesListManager nodesListManager) { + this.nodesListManager = nodesListManager; + } + + void setApplicationMasterService( + ApplicationMasterService applicationMasterService) { + this.applicationMasterService = applicationMasterService; + } + + void setResourceTrackerService(ResourceTrackerService resourceTrackerService) { + this.resourceTrackerService = resourceTrackerService; + } + + public void setWorkPreservingRecoveryEnabled(boolean enabled) { + this.isWorkPreservingRecoveryEnabled = enabled; + } + + public boolean isWorkPreservingRecoveryEnabled() { + return this.isWorkPreservingRecoveryEnabled; + } + + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return rmApplicationHistoryWriter; + } + + public void setSystemMetricsPublisher( + SystemMetricsPublisher systemMetricsPublisher) { + this.systemMetricsPublisher = systemMetricsPublisher; + } + + public SystemMetricsPublisher getSystemMetricsPublisher() { + return systemMetricsPublisher; + } + + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; + } + + public long getEpoch() { + return this.epoch; + } + + void setEpoch(long epoch) { + this.epoch = epoch; + } + + public RMNodeLabelsManager getNodeLabelManager() { + return nodeLabelManager; + } + + public void setNodeLabelManager(RMNodeLabelsManager mgr) { + nodeLabelManager = mgr; + } + + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { + this.schedulerRecoveryStartTime = systemClock.getTime(); + this.schedulerRecoveryWaitTime = waitTime; + } + + public boolean isSchedulerReadyForAllocatingContainers() { + if (isSchedulerReady) { + return isSchedulerReady; + } + isSchedulerReady = + (systemClock.getTime() - schedulerRecoveryStartTime) > schedulerRecoveryWaitTime; + if (!isSchedulerReady && printLog) { + LOG.info("Skip allocating containers. Scheduler is waiting for recovery."); + printLog = false; + } + if (isSchedulerReady) { + LOG.info("Scheduler recovery is done. Start allocating new containers."); + } + return isSchedulerReady; + } + + @Private + @VisibleForTesting + public void setSystemClock(Clock clock) { + this.systemClock = clock; + } + + public ConcurrentMap getSystemCredentialsForApps() { + return systemCredentials; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 7c1db3d..e799703 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -19,11 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; @@ -31,12 +28,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,7 +46,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import com.google.common.annotations.VisibleForTesting; @@ -59,52 +53,16 @@ private Dispatcher rmDispatcher; - private final ConcurrentMap applications - = new ConcurrentHashMap(); - - private final ConcurrentMap nodes - = new ConcurrentHashMap(); - - private final ConcurrentMap inactiveNodes - = new ConcurrentHashMap(); - - private final ConcurrentMap systemCredentials = - new ConcurrentHashMap(); - private boolean isHAEnabled; - private boolean isWorkPreservingRecoveryEnabled; + private HAServiceState haServiceState = HAServiceProtocol.HAServiceState.INITIALIZING; - - private AMLivelinessMonitor amLivelinessMonitor; - private AMLivelinessMonitor amFinishingMonitor; - private RMStateStore stateStore = null; - private ContainerAllocationExpirer containerAllocationExpirer; - private DelegationTokenRenewer delegationTokenRenewer; - private AMRMTokenSecretManager amRMTokenSecretManager; - private RMContainerTokenSecretManager containerTokenSecretManager; - private NMTokenSecretManagerInRM nmTokenSecretManager; - private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; + private AdminService adminService; - private ClientRMService clientRMService; - private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; - private ResourceScheduler scheduler; - private ReservationSystem reservationSystem; - private NodesListManager nodesListManager; - private ResourceTrackerService resourceTrackerService; - private ApplicationMasterService applicationMasterService; - private RMApplicationHistoryWriter rmApplicationHistoryWriter; - private SystemMetricsPublisher systemMetricsPublisher; + private ConfigurationProvider configurationProvider; - private RMNodeLabelsManager nodeLabelManager; - private long epoch; - private Clock systemClock = new SystemClock(); - private long schedulerRecoveryStartTime = 0; - private long schedulerRecoveryWaitTime = 0; - private boolean printLog = true; - private boolean isSchedulerReady = false; - private static final Log LOG = LogFactory.getLog(RMContextImpl.class); + private RMActiveServiceContext activeServiceContext; /** * Default constructor. To be used in conjunction with setter methods for @@ -128,24 +86,11 @@ public RMContextImpl(Dispatcher rmDispatcher, RMApplicationHistoryWriter rmApplicationHistoryWriter) { this(); this.setDispatcher(rmDispatcher); - this.setContainerAllocationExpirer(containerAllocationExpirer); - this.setAMLivelinessMonitor(amLivelinessMonitor); - this.setAMFinishingMonitor(amFinishingMonitor); - this.setDelegationTokenRenewer(delegationTokenRenewer); - this.setAMRMTokenSecretManager(appTokenSecretManager); - this.setContainerTokenSecretManager(containerTokenSecretManager); - this.setNMTokenSecretManager(nmTokenSecretManager); - this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); - this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - - RMStateStore nullStore = new NullRMStateStore(); - nullStore.setRMDispatcher(rmDispatcher); - try { - nullStore.init(new YarnConfiguration()); - setStateStore(nullStore); - } catch (Exception e) { - assert false; - } + setActiveServiceContext(new RMActiveServiceContext(rmDispatcher, containerAllocationExpirer, + amLivelinessMonitor, amFinishingMonitor, delegationTokenRenewer, + appTokenSecretManager, containerTokenSecretManager, + nmTokenSecretManager, clientToAMTokenSecretManager, + rmApplicationHistoryWriter)); ConfigurationProvider provider = new LocalConfigurationProvider(); setConfigurationProvider(provider); @@ -155,105 +100,105 @@ public RMContextImpl(Dispatcher rmDispatcher, public Dispatcher getDispatcher() { return this.rmDispatcher; } - - @Override + + @Override public RMStateStore getStateStore() { - return stateStore; + return activeServiceContext.getStateStore(); } @Override public ConcurrentMap getRMApps() { - return this.applications; + return activeServiceContext.getRMApps(); } @Override public ConcurrentMap getRMNodes() { - return this.nodes; + return activeServiceContext.getRMNodes(); } - + @Override public ConcurrentMap getInactiveRMNodes() { - return this.inactiveNodes; + return activeServiceContext.getInactiveRMNodes(); } @Override public ContainerAllocationExpirer getContainerAllocationExpirer() { - return this.containerAllocationExpirer; + return activeServiceContext.getContainerAllocationExpirer(); } @Override public AMLivelinessMonitor getAMLivelinessMonitor() { - return this.amLivelinessMonitor; + return activeServiceContext.getAMLivelinessMonitor(); } @Override public AMLivelinessMonitor getAMFinishingMonitor() { - return this.amFinishingMonitor; + return activeServiceContext.getAMFinishingMonitor(); } @Override public DelegationTokenRenewer getDelegationTokenRenewer() { - return delegationTokenRenewer; + return activeServiceContext.getDelegationTokenRenewer(); } @Override public AMRMTokenSecretManager getAMRMTokenSecretManager() { - return this.amRMTokenSecretManager; + return activeServiceContext.getAMRMTokenSecretManager(); } @Override public RMContainerTokenSecretManager getContainerTokenSecretManager() { - return this.containerTokenSecretManager; + return activeServiceContext.getContainerTokenSecretManager(); } - + @Override public NMTokenSecretManagerInRM getNMTokenSecretManager() { - return this.nmTokenSecretManager; + return activeServiceContext.getNMTokenSecretManager(); } @Override public ResourceScheduler getScheduler() { - return this.scheduler; + return activeServiceContext.getScheduler(); } @Override public ReservationSystem getReservationSystem() { - return this.reservationSystem; + return activeServiceContext.getReservationSystem(); } - + @Override public NodesListManager getNodesListManager() { - return this.nodesListManager; + return activeServiceContext.getNodesListManager(); } @Override public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { - return this.clientToAMTokenSecretManager; + return activeServiceContext.getClientToAMTokenSecretManager(); } @Override public AdminService getRMAdminService() { - return this.adminService; + return adminService; } @VisibleForTesting public void setStateStore(RMStateStore store) { - stateStore = store; + activeServiceContext.setStateStore(store); } - + @Override public ClientRMService getClientRMService() { - return this.clientRMService; + return activeServiceContext.getClientRMService(); } @Override public ApplicationMasterService getApplicationMasterService() { - return applicationMasterService; + return activeServiceContext.getApplicationMasterService(); } @Override public ResourceTrackerService getResourceTrackerService() { - return resourceTrackerService; + return activeServiceContext.getResourceTrackerService(); } void setHAEnabled(boolean isHAEnabled) { @@ -276,78 +221,43 @@ void setRMAdminService(AdminService adminService) { @Override public void setClientRMService(ClientRMService clientRMService) { - this.clientRMService = clientRMService; + activeServiceContext.setClientRMService(clientRMService); } - + @Override public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() { - return this.rmDelegationTokenSecretManager; + return activeServiceContext.getRMDelegationTokenSecretManager(); } - + @Override public void setRMDelegationTokenSecretManager( RMDelegationTokenSecretManager delegationTokenSecretManager) { - this.rmDelegationTokenSecretManager = delegationTokenSecretManager; + activeServiceContext + .setRMDelegationTokenSecretManager(delegationTokenSecretManager); } - void setContainerAllocationExpirer( - ContainerAllocationExpirer containerAllocationExpirer) { - this.containerAllocationExpirer = containerAllocationExpirer; - } - - void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) { - this.amLivelinessMonitor = amLivelinessMonitor; - } - - void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) { - this.amFinishingMonitor = amFinishingMonitor; - } void setContainerTokenSecretManager( RMContainerTokenSecretManager containerTokenSecretManager) { - this.containerTokenSecretManager = containerTokenSecretManager; + activeServiceContext + .setContainerTokenSecretManager(containerTokenSecretManager); } - void setNMTokenSecretManager( NMTokenSecretManagerInRM nmTokenSecretManager) { - this.nmTokenSecretManager = nmTokenSecretManager; + activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager); } - - void setScheduler(ResourceScheduler scheduler) { - this.scheduler = scheduler; - } - - void setReservationSystem(ReservationSystem reservationSystem) { - this.reservationSystem = reservationSystem; - } - - void setDelegationTokenRenewer( - DelegationTokenRenewer delegationTokenRenewer) { - this.delegationTokenRenewer = delegationTokenRenewer; - } - void setClientToAMTokenSecretManager( ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { - this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; + activeServiceContext + .setClientToAMTokenSecretManager(clientToAMTokenSecretManager); } - void setAMRMTokenSecretManager( AMRMTokenSecretManager amRMTokenSecretManager) { - this.amRMTokenSecretManager = amRMTokenSecretManager; + activeServiceContext.setAMRMTokenSecretManager(amRMTokenSecretManager); } void setNodesListManager(NodesListManager nodesListManager) { - this.nodesListManager = nodesListManager; - } - - void setApplicationMasterService( - ApplicationMasterService applicationMasterService) { - this.applicationMasterService = applicationMasterService; - } - - void setResourceTrackerService( - ResourceTrackerService resourceTrackerService) { - this.resourceTrackerService = resourceTrackerService; + activeServiceContext.setNodesListManager(nodesListManager); } @Override @@ -362,35 +272,32 @@ public HAServiceState getHAServiceState() { } } - public void setWorkPreservingRecoveryEnabled(boolean enabled) { - this.isWorkPreservingRecoveryEnabled = enabled; - } - @Override public boolean isWorkPreservingRecoveryEnabled() { - return this.isWorkPreservingRecoveryEnabled; + return activeServiceContext.isWorkPreservingRecoveryEnabled(); } @Override public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { - return rmApplicationHistoryWriter; + return activeServiceContext.getRMApplicationHistoryWriter(); } @Override public void setSystemMetricsPublisher( SystemMetricsPublisher systemMetricsPublisher) { - this.systemMetricsPublisher = systemMetricsPublisher; + activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher); } @Override public SystemMetricsPublisher getSystemMetricsPublisher() { - return systemMetricsPublisher; + return activeServiceContext.getSystemMetricsPublisher(); } @Override public void setRMApplicationHistoryWriter( RMApplicationHistoryWriter rmApplicationHistoryWriter) { - this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; + activeServiceContext + .setRMApplicationHistoryWriter(rmApplicationHistoryWriter); } @Override @@ -405,51 +312,47 @@ public void setConfigurationProvider( @Override public long getEpoch() { - return this.epoch; + return activeServiceContext.getEpoch(); } void setEpoch(long epoch) { - this.epoch = epoch; + activeServiceContext.setEpoch(epoch); } @Override public RMNodeLabelsManager getNodeLabelManager() { - return nodeLabelManager; + return activeServiceContext.getNodeLabelManager(); } - + @Override public void setNodeLabelManager(RMNodeLabelsManager mgr) { - nodeLabelManager = mgr; + activeServiceContext.setNodeLabelManager(mgr); } public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { - this.schedulerRecoveryStartTime = systemClock.getTime(); - this.schedulerRecoveryWaitTime = waitTime; + activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); } public boolean isSchedulerReadyForAllocatingContainers() { - if (isSchedulerReady) { - return isSchedulerReady; - } - isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime) - > schedulerRecoveryWaitTime; - if (!isSchedulerReady && printLog) { - LOG.info("Skip allocating containers. Scheduler is waiting for recovery."); - printLog = false; - } - if (isSchedulerReady) { - LOG.info("Scheduler recovery is done. Start allocating new containers."); - } - return isSchedulerReady; + return activeServiceContext.isSchedulerReadyForAllocatingContainers(); } @Private @VisibleForTesting public void setSystemClock(Clock clock) { - this.systemClock = clock; + activeServiceContext.setSystemClock(clock); } public ConcurrentMap getSystemCredentialsForApps() { - return systemCredentials; + return activeServiceContext.getSystemCredentialsForApps(); } + + public RMActiveServiceContext getActiveServiceContext() { + return activeServiceContext; + } + + void setActiveServiceContext(RMActiveServiceContext activeServiceContext) { + this.activeServiceContext = activeServiceContext; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e0840b6..a487eb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -400,6 +400,7 @@ protected static void validateConfigs(Configuration conf) { private ContainerAllocationExpirer containerAllocationExpirer; private ResourceManager rm; private boolean recoveryEnabled; + private RMActiveServiceContext activeServiceContext; RMActiveServices(ResourceManager rm) { super("RMActiveServices"); @@ -408,6 +409,9 @@ protected static void validateConfigs(Configuration conf) { @Override protected void serviceInit(Configuration configuration) throws Exception { + activeServiceContext = new RMActiveServiceContext(); + rmContext.setActiveServiceContext(activeServiceContext); + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); rmSecretManagerService = createRMSecretManagerService(); @@ -415,15 +419,16 @@ protected void serviceInit(Configuration configuration) throws Exception { containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); addService(containerAllocationExpirer); - rmContext.setContainerAllocationExpirer(containerAllocationExpirer); + activeServiceContext + .setContainerAllocationExpirer(containerAllocationExpirer); AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); addService(amLivelinessMonitor); - rmContext.setAMLivelinessMonitor(amLivelinessMonitor); + activeServiceContext.setAMLivelinessMonitor(amLivelinessMonitor); AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); - rmContext.setAMFinishingMonitor(amFinishingMonitor); + activeServiceContext.setAMFinishingMonitor(amFinishingMonitor); RMNodeLabelsManager nlm = createNodeLabelManager(); addService(nlm); @@ -441,7 +446,7 @@ protected void serviceInit(Configuration configuration) throws Exception { conf.getBoolean( YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); - rmContext + activeServiceContext .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled); } else { recoveryEnabled = false; @@ -462,7 +467,7 @@ protected void serviceInit(Configuration configuration) throws Exception { if (UserGroupInformation.isSecurityEnabled()) { delegationTokenRenewer = createDelegationTokenRenewer(); - rmContext.setDelegationTokenRenewer(delegationTokenRenewer); + activeServiceContext.setDelegationTokenRenewer(delegationTokenRenewer); } RMApplicationHistoryWriter rmApplicationHistoryWriter = @@ -478,13 +483,13 @@ protected void serviceInit(Configuration configuration) throws Exception { nodesListManager = new NodesListManager(rmContext); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); addService(nodesListManager); - rmContext.setNodesListManager(nodesListManager); + activeServiceContext.setNodesListManager(nodesListManager); // Initialize the scheduler scheduler = createScheduler(); scheduler.setRMContext(rmContext); addIfService(scheduler); - rmContext.setScheduler(scheduler); + activeServiceContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(schedulerDispatcher); @@ -507,7 +512,7 @@ protected void serviceInit(Configuration configuration) throws Exception { resourceTracker = createResourceTrackerService(); addService(resourceTracker); - rmContext.setResourceTrackerService(resourceTracker); + activeServiceContext.setResourceTrackerService(resourceTracker); DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); @@ -519,7 +524,7 @@ protected void serviceInit(Configuration configuration) throws Exception { if (reservationSystem != null) { reservationSystem.setRMContext(rmContext); addIfService(reservationSystem); - rmContext.setReservationSystem(reservationSystem); + activeServiceContext.setReservationSystem(reservationSystem); LOG.info("Initialized Reservation system"); } } @@ -529,7 +534,7 @@ protected void serviceInit(Configuration configuration) throws Exception { masterService = createApplicationMasterService(); addService(masterService) ; - rmContext.setApplicationMasterService(masterService); + activeServiceContext.setApplicationMasterService(masterService); applicationACLsManager = new ApplicationACLsManager(conf); @@ -1008,11 +1013,15 @@ void stopActiveServices() throws Exception { if (activeServices != null) { activeServices.stop(); activeServices = null; - rmContext.getRMNodes().clear(); - rmContext.getInactiveRMNodes().clear(); - rmContext.getRMApps().clear(); - ClusterMetrics.destroy(); - QueueMetrics.clearQueueMetrics(); + } + } + + void reinitialize(boolean initialize) throws Exception { + ClusterMetrics.destroy(); + QueueMetrics.clearQueueMetrics(); + if (initialize) { + resetDispatcher(); + createAndInitActiveServices(); } } @@ -1036,8 +1045,7 @@ public Void run() throws Exception { startActiveServices(); return null; } catch (Exception e) { - resetDispatcher(); - createAndInitActiveServices(); + reinitialize(true); throw e; } } @@ -1059,10 +1067,7 @@ synchronized void transitionToStandby(boolean initialize) if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) { stopActiveServices(); - if (initialize) { - resetDispatcher(); - createAndInitActiveServices(); - } + reinitialize(initialize); } rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); LOG.info("Transitioned to standby state"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index c6d7d09..122eb60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -513,6 +513,68 @@ public void run() { rm.stop(); } + @Test + public void testFailoverClearsRMContext() throws Exception { + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + Configuration conf = new YarnConfiguration(configuration); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 1. start RM + rm = new MockRM(conf, memStore); + rm.init(conf); + rm.start(); + + StateChangeRequestInfo requestInfo = + new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + + // 2. Transition to active + rm.adminService.transitionToActive(requestInfo); + checkMonitorHealth(); + checkActiveRMFunctionality(); + verifyClusterMetrics(1, 1, 1, 1, 2048, 1); + assertEquals(1, rm.getRMContext().getRMNodes().size()); + assertEquals(1, rm.getRMContext().getRMApps().size()); + + // 3. Create new RM + rm = new MockRM(conf, memStore) { + @Override + protected ResourceTrackerService createResourceTrackerService() { + return new ResourceTrackerService(this.rmContext, + this.nodesListManager, this.nmLivelinessMonitor, + this.rmContext.getContainerTokenSecretManager(), + this.rmContext.getNMTokenSecretManager()) { + @Override + protected void serviceStart() throws Exception { + throw new Exception("ResourceTracker service failed"); + } + }; + } + }; + rm.init(conf); + rm.start(); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + + // 4. Try Transition to active, throw exception + try { + rm.adminService.transitionToActive(requestInfo); + Assert.fail("Transitioned to Active should throw exception."); + } catch (Exception e) { + assertTrue("Error when transitioning to Active mode".contains(e + .getMessage())); + } + // 5. Clears the metrics + verifyClusterMetrics(0, 0, 0, 0, 0, 0); + assertEquals(0, rm.getRMContext().getRMNodes().size()); + assertEquals(0, rm.getRMContext().getRMApps().size()); + } + public void innerTestHAWithRMHostName(boolean includeBindHost) { //this is run two times, with and without a bind host configured if (includeBindHost) { -- 1.9.2.msysgit.0