diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index c185abc..1017bd0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; import org.junit.Assert; @@ -370,7 +371,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new CustomedResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()); + this.rmContext.getNMTokenSecretManager(), + this.decommissioningNodesWatcher); } return super.createResourceTrackerService(); } @@ -723,9 +725,11 @@ public CustomedResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, RMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInRM nmTokenSecretManager) { + NMTokenSecretManagerInRM nmTokenSecretManager, + DecommissioningNodesWatcher decommissioningNodesWatcher) { super(rmContext, nodesListManager, nmLivelinessMonitor, - containerTokenSecretManager, nmTokenSecretManager); + containerTokenSecretManager, nmTokenSecretManager, + decommissioningNodesWatcher); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java index 9631803..ef35272 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java @@ -25,8 +25,10 @@ import java.util.Timer; import java.util.TimerTask; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -35,6 +37,9 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -66,7 +71,8 @@ * host containers of an application that is still running * (the affected map tasks will be rescheduled). */ -public class DecommissioningNodesWatcher { +@InterfaceAudience.Private +public class DecommissioningNodesWatcher implements Recoverable { private static final Log LOG = LogFactory.getLog(DecommissioningNodesWatcher.class); @@ -74,19 +80,19 @@ // Default timeout value in mills. // Negative value indicates no timeout. 0 means immediate. - private long defaultTimeoutMs = + private static long defaultTimeoutMs = 1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT; // Once a RMNode is observed in DECOMMISSIONING state, // All its ContainerStatus update are tracked inside DecomNodeContext. - class DecommissioningNodeContext { + public static class DecommissioningNodeContext { private final NodeId nodeId; // Last known NodeState. private NodeState nodeState; // The moment node is observed in DECOMMISSIONING state. - private final long decommissioningStartTime; + private long decommissioningStartTime; private long lastContainerFinishTime; @@ -116,6 +122,34 @@ void updateTimeout(Integer timeoutSec) { this.timeoutMs = (timeoutSec == null)? defaultTimeoutMs : (1000L * timeoutSec); } + + public NodeId getNodeId() { + return nodeId; + } + + public long getDecommissioningStartTime() { + return decommissioningStartTime; + } + + public void setDecommissioningStartTime(long decommissioningStartTime) { + this.decommissioningStartTime = decommissioningStartTime; + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } + + public void setLastUpdateTime(long lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public long getTimeoutMs() { + return timeoutMs; + } + + public void setTimeoutMs(long timeoutMs) { + this.timeoutMs = timeoutMs; + } } // All DECOMMISSIONING nodes to track. @@ -123,12 +157,11 @@ void updateTimeout(Integer timeoutSec) { new HashMap(); private Timer pollTimer; - private MonotonicClock mclock; + private static MonotonicClock mclock = new MonotonicClock(); public DecommissioningNodesWatcher(RMContext rmContext) { this.rmContext = rmContext; pollTimer = new Timer(true); - mclock = new MonotonicClock(); } public void init(Configuration conf) { @@ -158,15 +191,15 @@ public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { if (context.decommissionedTime == 0) { context.decommissionedTime = now; } else if (now - context.decommissionedTime > 60000L) { - decomNodes.remove(rmNode.getNodeID()); + remove(rmNode.getNodeID()); } } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { if (context == null) { context = new DecommissioningNodeContext(rmNode.getNodeID()); decomNodes.put(rmNode.getNodeID(), context); - context.nodeState = rmNode.getState(); context.decommissionedTime = 0; } + context.nodeState = rmNode.getState(); context.updateTimeout(rmNode.getDecommissioningTimeout()); context.lastUpdateTime = now; @@ -197,10 +230,12 @@ public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { context.lastContainerFinishTime == 0) { context.lastContainerFinishTime = now; } + + rmContext.getStateStore().updateDecommissioningNode(context); } else { // remove node in other states if (context != null) { - decomNodes.remove(rmNode.getNodeID()); + remove(rmNode.getNodeID()); } } } @@ -210,9 +245,15 @@ public synchronized void remove(NodeId nodeId) { if (context != null) { LOG.info("remove " + nodeId + " in " + context.nodeState); decomNodes.remove(nodeId); + rmContext.getStateStore().removeDecommissioningNode(context); } } + @VisibleForTesting + public synchronized void clearDecomNodes() { + decomNodes.clear(); + } + /** * Status about a specific decommissioning node. * @@ -235,6 +276,10 @@ public synchronized void remove(NodeId nodeId) { // The node has already been decommissioned DECOMMISSIONED, + + // TODO: do we need this new status? + // recovered from state store and needs an update + WAIT_UPDATE } public boolean checkReadyToBeDecommissioned(NodeId nodeId) { @@ -249,6 +294,10 @@ public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) { return DecommissioningNodeStatus.NONE; } + if (context.nodeState == null) { + return DecommissioningNodeStatus.WAIT_UPDATE; + } + if (context.nodeState == NodeState.DECOMMISSIONED) { return DecommissioningNodeStatus.DECOMMISSIONED; } @@ -300,6 +349,7 @@ public void run() { if (d.nodeState != NodeState.DECOMMISSIONING) { LOG.debug("remove " + d.nodeState + " " + d.nodeId); it.remove(); + rmContext.getStateStore().removeDecommissioningNode(d); continue; } else if (now - d.lastUpdateTime > 60000L) { // Node DECOMMISSIONED could become stale, remove as necessary. @@ -308,6 +358,7 @@ public void run() { rmNode.getState() == NodeState.DECOMMISSIONED) { LOG.debug("remove " + rmNode.getState() + " " + d.nodeId); it.remove(); + rmContext.getStateStore().removeDecommissioningNode(d); continue; } } @@ -436,4 +487,43 @@ private void readDecommissioningTimeout(Configuration conf) { LOG.info("Error readDecommissioningTimeout ", e); } } + + @Override + public synchronized void recover(RMStateStore.RMState rmState) + throws Exception { + LOG.info("recovering DecommissioningNodesWatcher."); + for (Map.Entry entry : + rmState.getDecommissioningNodesState().entrySet()) { + NodeId nodeId = getNodeId(entry.getKey()); + if (nodeId != null) { + DecommissioningNodeContext context = + entry.getValue().getDecommissioningNodeContext(nodeId); + decomNodes.put(nodeId, context); + } else { + LOG.warn("Could not find node " + entry.getKey()); + // Remove the stale/invalid node from the store + DecommissioningNodeContext nodeContext = + new DecommissioningNodeContext(NodeId.fromString(entry.getKey())); + rmContext.getStateStore().removeDecommissioningNode(nodeContext); + } + } + rmContext.getNodesListManager().refreshNodes( + rmContext.getYarnConfiguration(), true); + } + + public boolean isNodeDecommissioning(NodeId nodeId) { + DecommissioningNodeStatus nodeStatus = checkDecommissioningStatus(nodeId); + return !(nodeStatus == DecommissioningNodeStatus.NONE + || nodeStatus == DecommissioningNodeStatus.DECOMMISSIONED + || nodeStatus == DecommissioningNodeStatus.READY); + } + + private NodeId getNodeId(String id) { + for (NodeId nodeId : rmContext.getRMNodes().keySet()) { + if (nodeId.toString().equals(id)) { + return nodeId; + } + } + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 7d69f93..80fb1f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -100,21 +100,10 @@ protected void serviceInit(Configuration conf) throws Exception { addIfService(resolver); } - // Read the hosts/exclude files to restrict access to the RM - try { - this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); - this.excludesFile = conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); - this.hostsReader = - createHostsFileReader(this.includesFile, this.excludesFile); - setDecomissionedNMs(); - printConfiguredHosts(); - } catch (YarnException ex) { - disableHostsFileReader(ex); - } catch (IOException ioe) { - disableHostsFileReader(ioe); - } + this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); + this.excludesFile = conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); final int nodeRemovalTimeout = conf.getInt( @@ -177,8 +166,25 @@ private void decrInactiveNMMetrics(RMNode rmNode) { } @Override - public void serviceStop() { + protected void serviceStart() throws Exception { + // Read the hosts/exclude files to restrict access to the RM + try { + this.hostsReader = + createHostsFileReader(this.includesFile, this.excludesFile); + setDecomissionedNMs(); + printConfiguredHosts(); + } catch (YarnException ex) { + disableHostsFileReader(ex); + } catch (IOException ioe) { + disableHostsFileReader(ioe); + } + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { removalTimer.cancel(); + super.serviceStop(); } private void printConfiguredHosts() { @@ -220,22 +226,24 @@ private void refreshHostsReader( if (null == yarnConf) { yarnConf = new YarnConfiguration(); } - includesFile = - yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); - excludesFile = - yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); - LOG.info("refreshNodes excludesFile " + excludesFile); - hostsReader.refresh(includesFile, excludesFile); - printConfiguredHosts(); - - LOG.info("hostsReader include:{" + - StringUtils.join(",", hostsReader.getHosts()) + - "} exclude:{" + - StringUtils.join(",", hostsReader.getExcludedHosts()) + "}"); - - handleExcludeNodeList(graceful, timeout); + if (hostsReader != null) { + includesFile = + yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); + excludesFile = + yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); + LOG.info("refreshNodes excludesFile " + excludesFile); + hostsReader.refresh(includesFile, excludesFile); + printConfiguredHosts(); + + LOG.info("hostsReader include:{" + + StringUtils.join(",", hostsReader.getHosts()) + + "} exclude:{" + + StringUtils.join(",", hostsReader.getExcludedHosts()) + "}"); + + handleExcludeNodeList(graceful, timeout); + } } private void setDecomissionedNMs() { @@ -314,17 +322,22 @@ private void handleExcludeNodeList(boolean graceful, Integer timeout) { } for (RMNode n : nodesToDecom) { - RMNodeEvent e; - if (graceful) { - Integer timeoutToUse = (excludes.get(n.getHostName()) != null)? - excludes.get(n.getHostName()) : timeout; - e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse); - } else { - RMNodeEventType eventType = isUntrackedNode(n.getHostName())? - RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; - e = new RMNodeEvent(n.getNodeID(), eventType); + // ignore nodes that the DecommissioningNodeWatcher already knows about + if (!rmContext.getResourceTrackerService() + .getDecommissioningNodesWatcher() + .isNodeDecommissioning(n.getNodeID())) { + RMNodeEvent e; + if (graceful) { + Integer timeoutToUse = (excludes.get(n.getHostName()) != null) ? + excludes.get(n.getHostName()) : timeout; + e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse); + } else { + RMNodeEventType eventType = isUntrackedNode(n.getHostName()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; + e = new RMNodeEvent(n.getNodeID(), eventType); + } + this.rmContext.getDispatcher().getEventHandler().handle(e); } - this.rmContext.getDispatcher().getEventHandler().handle(e); } updateInactiveNodes(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 81c3f1b..643f25e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -188,6 +188,7 @@ protected RMAppManager rmAppManager; protected ApplicationACLsManager applicationACLsManager; protected QueueACLsManager queueACLsManager; + protected DecommissioningNodesWatcher decommissioningNodesWatcher; private WebApp webApp; private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; @@ -675,6 +676,8 @@ protected void serviceInit(Configuration configuration) throws Exception { nmLivelinessMonitor = createNMLivelinessMonitor(); addService(nmLivelinessMonitor); + decommissioningNodesWatcher = new DecommissioningNodesWatcher(rmContext); + resourceTracker = createResourceTrackerService(); addService(resourceTracker); rmContext.setResourceTrackerService(resourceTracker); @@ -1294,7 +1297,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()); + this.rmContext.getNMTokenSecretManager(), + this.decommissioningNodesWatcher); } protected ClientRMService createClientRMService() { @@ -1415,6 +1419,9 @@ public void recover(RMState state) throws Exception { // recover applications rmAppManager.recover(state); + // recover decommissioning nodes + resourceTracker.getDecommissioningNodesWatcher().recover(state); + setSchedulerRecoveryStartAndWaitTime(state, conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 40bd610..c1a8d54 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -113,7 +113,7 @@ private int minAllocMb; private int minAllocVcores; - private DecommissioningNodesWatcher decommissioningWatcher; + private DecommissioningNodesWatcher decommissioningNodesWatcher; private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; @@ -123,7 +123,8 @@ public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, RMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInRM nmTokenSecretManager) { + NMTokenSecretManagerInRM nmTokenSecretManager, + DecommissioningNodesWatcher decommissioningNodesWatcher) { super(ResourceTrackerService.class.getName()); this.rmContext = rmContext; this.nodesListManager = nodesListManager; @@ -133,7 +134,7 @@ public ResourceTrackerService(RMContext rmContext, ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); - this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext); + this.decommissioningNodesWatcher = decommissioningNodesWatcher; } @Override @@ -173,7 +174,7 @@ protected void serviceInit(Configuration conf) throws Exception { } loadDynamicResourceConfiguration(conf); - decommissioningWatcher.init(conf); + decommissioningNodesWatcher.init(conf); super.serviceInit(conf); } @@ -499,7 +500,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); - this.decommissioningWatcher.update(rmNode, remoteNodeStatus); + this.decommissioningNodesWatcher.update(rmNode, remoteNodeStatus); // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); @@ -534,7 +535,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED. if (rmNode.getState() == NodeState.DECOMMISSIONING && - decommissioningWatcher.checkReadyToBeDecommissioned( + decommissioningNodesWatcher.checkReadyToBeDecommissioned( rmNode.getNodeID())) { String message = "DECOMMISSIONING " + nodeId + " is ready to be decommissioned"; @@ -780,4 +781,8 @@ void refreshServiceAcls(Configuration configuration, public Server getServer() { return this.server; } + + public DecommissioningNodesWatcher getDecommissioningNodesWatcher() { + return decommissioningNodesWatcher; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 9591945..097649a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -56,10 +56,12 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; @@ -81,6 +83,8 @@ * Also, AMRMToken has been removed from ApplicationAttemptState. * * Changes from 1.2 to 1.3, Addition of ReservationSystem state. + * + * Changes from 1.3 to 1.4, Addition of Decommissioning Nodes */ public class FileSystemRMStateStore extends RMStateStore { @@ -88,7 +92,7 @@ protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 3); + .newInstance(1, 4); protected static final String AMRMTOKEN_SECRET_MANAGER_NODE = "AMRMTokenSecretManagerNode"; @@ -114,6 +118,7 @@ Path amrmTokenSecretManagerRoot; private Path reservationRoot; + private Path decomNodesRoot; @Override public synchronized void initInternal(Configuration conf) @@ -125,6 +130,7 @@ public synchronized void initInternal(Configuration conf) amrmTokenSecretManagerRoot = new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT); reservationRoot = new Path(rootDirPath, RESERVATION_SYSTEM_ROOT); + decomNodesRoot = new Path(rootDirPath, DECOMMISSIONING_NODES_ROOT); fsNumRetries = conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES); @@ -157,6 +163,7 @@ protected synchronized void startInternal() throws Exception { mkdirsWithRetries(rmAppRoot); mkdirsWithRetries(amrmTokenSecretManagerRoot); mkdirsWithRetries(reservationRoot); + mkdirsWithRetries(decomNodesRoot); } @Override @@ -228,6 +235,8 @@ public synchronized RMState loadState() throws Exception { loadAMRMTokenSecretManagerState(rmState); // recover reservation state loadReservationSystemState(rmState); + // recover decommissioning nodes + loadDecommissioningNodesState(rmState); return rmState; } @@ -290,6 +299,19 @@ private void loadRMAppState(RMState rmState) throws Exception { } } + private void loadDecommissioningNodesState(RMState rmState) throws Exception { + try { + final RMDecommissioningNodesFileProcessor fileProcessor = new + RMDecommissioningNodesFileProcessor(rmState); + final Path rootDirectory = this.decomNodesRoot; + + processDirectoriesOfFiles(fileProcessor, rootDirectory); + } catch (Exception e) { + LOG.error("Failed to load state.", e); + throw e; + } + } + private void processDirectoriesOfFiles( RMStateFileProcessor rmAppStateFileProcessor, Path rootDirectory) throws Exception { @@ -882,6 +904,47 @@ protected void removeReservationState( deleteFileWithRetries(reservationPath); } + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + Path nodeCreatePath = getNodePath(decomNodesRoot, + nodeIdToFilename(nodeContext.getNodeId().toString())); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + nodeContext.getNodeId().toString()); + } + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + writeFileWithRetries(nodeCreatePath, data.toByteArray(), false); + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + Path nodeRemovePath = getNodePath(decomNodesRoot, + nodeIdToFilename(nodeContext.getNodeId().toString())); + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + nodeContext.getNodeId().toString()); + } + + deleteFileWithRetries(nodeRemovePath); + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + Path nodeUpdatePath = getNodePath(decomNodesRoot, + nodeIdToFilename(nodeContext.getNodeId().toString())); + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + if (existsWithRetries(nodeUpdatePath)) { + updateFile(nodeUpdatePath, data.toByteArray(), false); + } else { + writeFileWithRetries(nodeUpdatePath, data.toByteArray(), false); + } + } + @VisibleForTesting public int getNumRetries() { return fsNumRetries; @@ -972,4 +1035,34 @@ void processChildNode(String appDirName, String childNodeName, byte[] childData) throws IOException; } + + private static class RMDecommissioningNodesFileProcessor + implements RMStateFileProcessor { + private RMState rmState; + + public RMDecommissioningNodesFileProcessor(RMState rmState) { + this.rmState = rmState; + } + + @Override + public void processChildNode(String appDirName, String childNodeName, + byte[] childData) throws IOException { + childNodeName = filenameToNodeId(childNodeName); + RMDecommissioningNodeData nodeData = new RMDecommissioningNodeData(); + nodeData.readFields(childData); + rmState.getDecommissioningNodesState().put(childNodeName, nodeData); + } + } + + private static String nodeIdToFilename(String nodeId) { + // replace :, which is not allowed in Paths, with a space, + // which is allowed in Paths, but not in hostnames, so there should + // be no collision + return nodeId.replace(":", " "); + } + + private static String filenameToNodeId(String filename) { + // convert back from #nodeIdToFilename + return filename.replace(" ", ":"); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index 2ca53db..6ecca5c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -55,10 +55,12 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; @@ -77,6 +79,8 @@ /** * Changes from 1.0 to 1.1, Addition of ReservationSystem state. + * + * Changes from 1.1 to 1.2, Addition of Decommissioning Nodes */ public class LeveldbRMStateStore extends RMStateStore { @@ -95,9 +99,11 @@ RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix; private static final String RM_RESERVATION_KEY_PREFIX = RESERVATION_SYSTEM_ROOT + SEPARATOR; + private static final String RM_DECOMMISSIONING_NODES_PREFIX = + DECOMMISSIONING_NODES_ROOT + SEPARATOR; private static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 1); + .newInstance(1, 2); private DB db; private Timer compactionTimer; @@ -131,6 +137,10 @@ private String getReservationNodeKey(String planName, + reservationId; } + private String getDecomNodeNodeKey(String nodeId) { + return RM_DECOMMISSIONING_NODES_PREFIX + nodeId; + } + @Override protected void initInternal(Configuration conf) throws Exception { compactionIntervalMsec = conf.getLong( @@ -273,10 +283,11 @@ public synchronized long getAndIncrementEpoch() throws Exception { @Override public RMState loadState() throws Exception { RMState rmState = new RMState(); - loadRMDTSecretManagerState(rmState); - loadRMApps(rmState); - loadAMRMTokenSecretManagerState(rmState); + loadRMDTSecretManagerState(rmState); + loadRMApps(rmState); + loadAMRMTokenSecretManagerState(rmState); loadReservationState(rmState); + loadDecommissioningNodesState(rmState); return rmState; } @@ -324,6 +335,34 @@ private void loadReservationState(RMState rmState) throws IOException { LOG.info("Recovered " + numReservations + " reservations"); } + private void loadDecommissioningNodesState(RMState rmState) throws Exception { + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(RM_DECOMMISSIONING_NODES_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(RM_DECOMMISSIONING_NODES_PREFIX)) { + break; + } + RMDecommissioningNodeData nodeData = new RMDecommissioningNodeData(); + nodeData.readFields(entry.getValue()); + key = key.substring(RM_DECOMMISSIONING_NODES_PREFIX.length()); + rmState.decommissioningNodesState.put(key, nodeData); + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded decommissioning node " + key); + } + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + } + private void loadRMDTSecretManagerState(RMState state) throws IOException { int numKeys = loadRMDTSecretManagerKeys(state); LOG.info("Recovered " + numKeys + " RM delegation token master keys"); @@ -882,4 +921,49 @@ public void log(String message) { LOG.info(message); } } + + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + try { + WriteBatch batch = db.createWriteBatch(); + try { + String key = getDecomNodeNodeKey(nodeContext.getNodeId().toString()); + RMDecommissioningNodeData data = + new RMDecommissioningNodeData(nodeContext); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + nodeContext.getNodeId().toString()); + } + batch.put(bytes(key), data.toByteArray()); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + String key = getDecomNodeNodeKey(nodeContext.getNodeId().toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + nodeContext.getNodeId().toString()); + } + try { + db.delete(bytes(key)); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + storeDecommissioningNodeState(nodeContext); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5f3328b..6739f9e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -34,11 +34,13 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; @Private @Unstable @@ -78,6 +80,8 @@ public synchronized RMState loadState() throws Exception { state.amrmTokenSecretManagerState == null ? null : AMRMTokenSecretManagerState .newInstance(state.amrmTokenSecretManagerState); + returnState.decommissioningNodesState.putAll( + state.getDecommissioningNodesState()); return returnState; } @@ -308,4 +312,28 @@ public void deleteStore() throws Exception { public void removeApplication(ApplicationId removeAppId) throws Exception { } + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + state.getDecommissioningNodesState().put(nodeContext.getNodeId().toString(), + data); + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + state.getDecommissioningNodesState().remove( + nodeContext.getNodeId().toString()); + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + storeDecommissioningNodeState(nodeContext); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 4e134ac..912a3f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -174,4 +175,25 @@ public void deleteStore() throws Exception { public void removeApplication(ApplicationId removeAppId) throws Exception { // Do nothing } + + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + // Do nothing + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + // Do nothing + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + // Do nothing + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5e3cf22..8f61a8a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -56,12 +56,14 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeContext; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -97,6 +99,8 @@ "AMRMTokenSecretManagerRoot"; protected static final String RESERVATION_SYSTEM_ROOT = "ReservationSystemRoot"; + protected static final String DECOMMISSIONING_NODES_ROOT = + "DecommissioningNodesRoot"; protected static final String VERSION_NODE = "RMVersionNode"; protected static final String EPOCH_NODE = "EpochNode"; protected ResourceManager resourceManager; @@ -175,6 +179,18 @@ EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.REMOVE_RESERVATION, new RemoveReservationAllocationTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.STORE_DECOMMISSIONING_NODE, + new StoreDecommissioningNodeTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.REMOVE_DECOMMISSIONING_NODE, + new RemoveDecommissioningNodeTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.UPDATE_DECOMMISSIONING_NODE, + new UpdateDecommissioningNodeTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED, RMStateStoreEventType.FENCED) .addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED, @@ -192,7 +208,10 @@ RMStateStoreEventType.UPDATE_DELEGATION_TOKEN, RMStateStoreEventType.UPDATE_AMRM_TOKEN, RMStateStoreEventType.STORE_RESERVATION, - RMStateStoreEventType.REMOVE_RESERVATION)); + RMStateStoreEventType.REMOVE_RESERVATION, + RMStateStoreEventType.STORE_DECOMMISSIONING_NODE, + RMStateStoreEventType.REMOVE_DECOMMISSIONING_NODE, + RMStateStoreEventType.UPDATE_DECOMMISSIONING_NODE)); private final StateMachine> reservationState = new TreeMap<>(); + Map decommissioningNodesState = + new HashMap<>(); + public Map getApplicationState() { return appState; } @@ -658,6 +680,11 @@ public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() { getReservationState() { return reservationState; } + + public Map + getDecommissioningNodesState() { + return decommissioningNodesState; + } } private Dispatcher rmDispatcher; @@ -1210,4 +1237,131 @@ public RMStateStoreState getRMStateStoreState() { protected EventHandler getRMStateStoreEventHandler() { return dispatcher.getEventHandler(); } + + private static class StoreDecommissioningNodeTransition implements + MultipleArcTransition { + @Override + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreDecommissioningNodeEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return RMStateStoreState.ACTIVE; + } + boolean isFenced = false; + RMStateStoreDecommissioningNodeEvent dnEvent = + (RMStateStoreDecommissioningNodeEvent) event; + try { + LOG.info("Storing Decommissioning Node"); + store.storeDecommissioningNodeState(dnEvent.getNodeContext()); + } catch (Exception e) { + LOG.error("Error While Storing Decommissioning Node ", e); + isFenced = store.notifyStoreOperationFailedInternal(e); + } + return finalState(isFenced); + } + } + + private static class RemoveDecommissioningNodeTransition implements + MultipleArcTransition { + @Override + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreDecommissioningNodeEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return RMStateStoreState.ACTIVE; + } + boolean isFenced = false; + RMStateStoreDecommissioningNodeEvent dnEvent = + (RMStateStoreDecommissioningNodeEvent) event; + try { + LOG.info("Removing Decommissioning Node"); + store.removeDecommissioningNodeState(dnEvent.getNodeContext()); + } catch (Exception e) { + LOG.error("Error While Removing Decommissioning Node ", e); + isFenced = store.notifyStoreOperationFailedInternal(e); + } + return finalState(isFenced); + } + } + + private static class UpdateDecommissioningNodeTransition implements + MultipleArcTransition { + @Override + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreDecommissioningNodeEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return RMStateStoreState.ACTIVE; + } + boolean isFenced = false; + RMStateStoreDecommissioningNodeEvent dnEvent = + (RMStateStoreDecommissioningNodeEvent) event; + try { + LOG.info("Updating Decommissioning Node"); + store.updateDecommissioningNodeState(dnEvent.getNodeContext()); + } catch (Exception e) { + LOG.error("Error While Updating Decommissioning Node ", e); + isFenced = store.notifyStoreOperationFailedInternal(e); + } + return finalState(isFenced); + } + } + /** + * DecommissioningNodesWatcher call this to store the state of a + * Decommissioning Node + */ + public void storeDecommissioningNode(DecommissioningNodeContext nodeContext) { + handleStoreEvent(new RMStateStoreDecommissioningNodeEvent(nodeContext, + RMStateStoreEventType.STORE_DECOMMISSIONING_NODE)); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of a + * Decommissioning Node + */ + protected abstract void storeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception; + + /** + * DecommissioningNodesWatcher call this to remove the state of a + * Decommissioning Node + */ + public void removeDecommissioningNode(DecommissioningNodeContext nodeContext) { + handleStoreEvent(new RMStateStoreDecommissioningNodeEvent(nodeContext, + RMStateStoreEventType.REMOVE_DECOMMISSIONING_NODE)); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of a + * Decommissioning Node + */ + protected abstract void removeDecommissioningNodeState( + DecommissioningNodeContext nodeContext) throws Exception; + + /** + * DecommissioningNodesWatcher call this to update the state of a + * Decommissioning Node + */ + public void updateDecommissioningNode(DecommissioningNodeContext nodeContext) { + handleStoreEvent(new RMStateStoreDecommissioningNodeEvent(nodeContext, + RMStateStoreEventType.UPDATE_DECOMMISSIONING_NODE)); + } + + /** + * Blocking API + * Derived classes must implement this method to update the state of a + * Decommissioning Node + */ + protected abstract void updateDecommissioningNodeState( + DecommissioningNodeContext nodeContext) + throws Exception; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreDecommissioningNodeEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreDecommissioningNodeEvent.java new file mode 100644 index 0000000..29204d2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreDecommissioningNodeEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeContext; + +public class RMStateStoreDecommissioningNodeEvent extends RMStateStoreEvent { + private DecommissioningNodeContext nodeContext; + + public RMStateStoreDecommissioningNodeEvent(RMStateStoreEventType type) { + super(type); + } + + public RMStateStoreDecommissioningNodeEvent( + DecommissioningNodeContext nodeContext, + RMStateStoreEventType type) { + this(type); + this.nodeContext = nodeContext; + } + + public DecommissioningNodeContext getNodeContext() { + return nodeContext; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index b34634d..af04cf6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -36,4 +36,7 @@ UPDATE_AMRM_TOKEN, STORE_RESERVATION, REMOVE_RESERVATION, + STORE_DECOMMISSIONING_NODE, + REMOVE_DECOMMISSIONING_NODE, + UPDATE_DECOMMISSIONING_NODE } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 1212a91..41ff093 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -33,6 +33,7 @@ import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -46,11 +47,13 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; @@ -104,18 +107,25 @@ * |----- currentMasterKey * |----- nextMasterKey * - * |-- RESERVATION_SYSTEM_ROOT - * |------PLAN_1 - * | |------ RESERVATION_1 - * | |------ RESERVATION_2 + * |--- RESERVATION_SYSTEM_ROOT + * |----- PLAN_1 + * | |----- RESERVATION_1 + * | |----- RESERVATION_2 * | .... - * |------PLAN_2 + * |----- PLAN_2 * .... + * |--- DECOMMISSIONING_NODES_ROOT + * |----- (#NodeId1) + * |----- (#NodeId2) + * .... + * * Note: Changes from 1.1 to 1.2 - AMRMTokenSecretManager state has been saved * separately. The currentMasterkey and nextMasterkey have been stored. * Also, AMRMToken has been removed from ApplicationAttemptState. * * Changes from 1.2 to 1.3, Addition of ReservationSystem state. + * + * Changes from 1.3 to 1.4, Addition of Decommissioning Nodes */ @Private @Unstable @@ -130,7 +140,7 @@ @VisibleForTesting public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = - Version.newInstance(1, 3); + Version.newInstance(1, 4); /* Znode paths */ private String zkRootNodePath; @@ -141,6 +151,7 @@ private String dtSequenceNumberPath; private String amrmTokenSecretManagerRoot; private String reservationRoot; + private String decomNodesRoot; @VisibleForTesting protected String znodeWorkingPath; @@ -249,6 +260,7 @@ public synchronized void initInternal(Configuration conf) amrmTokenSecretManagerRoot = getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); + decomNodesRoot = getNodePath(zkRootNodePath, DECOMMISSIONING_NODES_ROOT); curatorFramework = resourceManager.getCurator(); if (curatorFramework == null) { @@ -275,6 +287,7 @@ public synchronized void startInternal() throws Exception { create(dtSequenceNumberPath); create(amrmTokenSecretManagerRoot); create(reservationRoot); + create(decomNodesRoot); } private void logRootNodeAcls(String prefix) throws Exception { @@ -385,7 +398,8 @@ public synchronized RMState loadState() throws Exception { loadAMRMTokenSecretManagerState(rmState); // recover reservation state loadReservationSystemState(rmState); - + // recover decommissioning nodes + loadDecommissioningNodesState(rmState); return rmState; } @@ -578,6 +592,32 @@ private void loadApplicationAttemptState(ApplicationStateData appState, LOG.debug("Done loading applications from ZK state store"); } + private synchronized void loadDecommissioningNodesState(RMState rmState) + throws Exception { + List childNodes = getChildren(decomNodesRoot); + + for (String childNodeName : childNodes) { + String childNodePath = getNodePath(decomNodesRoot, childNodeName); + byte[] childData = getData(childNodePath); + + if (childData == null) { + LOG.warn("Content of " + childNodePath + " is broken."); + continue; + } + + try (DataInputStream fsIn = + new DataInputStream(new ByteArrayInputStream(childData))) { + RMDecommissioningNodeData nodeData = new RMDecommissioningNodeData(); + nodeData.readFields(fsIn); + rmState.getDecommissioningNodesState().put(childNodeName, nodeData); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded decommissioning node " + childNodeName); + } + } + } + } + @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { @@ -904,6 +944,63 @@ private void addOrUpdateReservationState( } } + @Override + protected void storeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + SafeTransaction trx = new SafeTransaction(); + String nodeCreatePath = + getNodePath(decomNodesRoot, nodeContext.getNodeId().toString()); + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + trx.create(nodeCreatePath, data.toByteArray(), zkAcl, + CreateMode.PERSISTENT); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + nodeContext.getNodeId().toString()); + } + trx.commit(); + } + + @Override + protected void removeDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + String nodeRemovePath = + getNodePath(decomNodesRoot, nodeContext.getNodeId().toString()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + nodeContext.getNodeId().toString()); + } + + safeDelete(nodeRemovePath); + } + + @Override + protected void updateDecommissioningNodeState( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) + throws Exception { + SafeTransaction trx = new SafeTransaction(); + String nodeUpdatePath = + getNodePath(decomNodesRoot, nodeContext.getNodeId().toString()); + + RMDecommissioningNodeData data = new RMDecommissioningNodeData(nodeContext); + if (exists(nodeUpdatePath)) { + // in case znode exists + if (LOG.isDebugEnabled()) { + LOG.debug("Updating " + nodeContext.getNodeId().toString()); + } + trx.setData(nodeUpdatePath, data.toByteArray(), -1); + } else { + // in case znode doesn't exist + trx.create(nodeUpdatePath, data.toByteArray(), zkAcl, + CreateMode.PERSISTENT); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + nodeContext.getNodeId().toString()); + } + } + + trx.commit(); + } + /** * Utility function to ensure that the configured base znode exists. * This recursively creates the znode as well as all of its parents. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMDecommissioningNodeData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMDecommissioningNodeData.java new file mode 100644 index 0000000..63db5df --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMDecommissioningNodeData.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMDecommissioningNodeDataProto; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; + +public class RMDecommissioningNodeData { + RMDecommissioningNodeDataProto.Builder builder = + RMDecommissioningNodeDataProto.newBuilder(); + + public RMDecommissioningNodeData() {} + + public RMDecommissioningNodeData( + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext) { + builder.setDecommissioningStartTime( + nodeContext.getDecommissioningStartTime()); + builder.setLastUpdateTime(nodeContext.getLastUpdateTime()); + builder.setTimeoutMs(nodeContext.getTimeoutMs()); + } + + public void readFields(DataInput in) throws IOException { + builder.mergeFrom((DataInputStream) in); + } + + public void readFields(byte[] in) throws IOException { + builder.mergeFrom(in); + } + + public byte[] toByteArray() throws IOException { + return builder.build().toByteArray(); + } + + public DecommissioningNodesWatcher.DecommissioningNodeContext + getDecommissioningNodeContext(NodeId nodeId) throws IOException { + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext = + new DecommissioningNodesWatcher.DecommissioningNodeContext(nodeId); + nodeContext.setDecommissioningStartTime( + builder.getDecommissioningStartTime()); + nodeContext.setLastUpdateTime(builder.getLastUpdateTime()); + nodeContext.setTimeoutMs(builder.getTimeoutMs()); + return nodeContext; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index 247cd21..2b4948e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -102,3 +102,9 @@ message RMDelegationTokenIdentifierDataProto { optional YARNDelegationTokenIdentifierProto token_identifier = 1; optional int64 renewDate = 2; } + +message RMDecommissioningNodeDataProto { + required int64 decommissioning_start_time = 1; + required int64 timeout_ms = 2; + required int64 last_Update_time = 3; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 2ff4fb2..be14244 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -975,7 +975,7 @@ protected ResourceTrackerService createResourceTrackerService() { nmTokenSecretManager.rollMasterKey(); return new ResourceTrackerService(getRMContext(), nodesListManager, this.nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager) { + nmTokenSecretManager, decommissioningNodesWatcher) { @Override protected void serviceStart() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java index 690de30..275878e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java @@ -33,10 +33,13 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -49,14 +52,62 @@ @Test public void testDecommissioningNodesWatcher() throws Exception { + testDecommissioningNodesWatcher(false); + } + + @Test + public void testDecommissioningNodesWatcherWithRecovery() throws Exception { + testDecommissioningNodesWatcher(true); + } + + private class IgnoringUpdatesDecommissioningNodesWatcher + extends DecommissioningNodesWatcher { + private boolean ignoreUpdates = false; + + public IgnoringUpdatesDecommissioningNodesWatcher(RMContext rmContext) { + super(rmContext); + } + + public synchronized void setIgnoreUpdates(boolean ignoreUpdates) { + this.ignoreUpdates = ignoreUpdates; + } + + @Override + public synchronized void update(RMNode rmNode, + NodeStatus remoteNodeStatus) { + if (!ignoreUpdates) { + super.update(rmNode, remoteNodeStatus); + } + } + } + + private void testDecommissioningNodesWatcher(boolean withRecovery) + throws Exception { Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "40"); - rm = new MockRM(conf); + MemoryRMStateStore store = new MemoryRMStateStore(); + store.init(conf); + rm = new MockRM(conf, store) { + @Override + protected ResourceTrackerService createResourceTrackerService() { + RMContainerTokenSecretManager containerTokenSecretManager = + getRMContext().getContainerTokenSecretManager(); + containerTokenSecretManager.rollMasterKey(); + NMTokenSecretManagerInRM nmTokenSecretManager = + getRMContext().getNMTokenSecretManager(); + nmTokenSecretManager.rollMasterKey(); + return new ResourceTrackerService(getRMContext(), nodesListManager, + this.nmLivelinessMonitor, containerTokenSecretManager, + nmTokenSecretManager, + new IgnoringUpdatesDecommissioningNodesWatcher(rmContext)); + } + }; rm.start(); - DecommissioningNodesWatcher watcher = - new DecommissioningNodesWatcher(rm.getRMContext()); + IgnoringUpdatesDecommissioningNodesWatcher watcher = + (IgnoringUpdatesDecommissioningNodesWatcher) + rm.getResourceTrackerService().getDecommissioningNodesWatcher(); MockNM nm1 = rm.registerNode("host1:1234", 10240); RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); @@ -77,19 +128,32 @@ public void testDecommissioningNodesWatcher() throws Exception { watcher.update(node1, createNodeStatus(id1, app, 11)); Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); + if (withRecovery) { + // Simulate a failover from the perspective of the + // DecommissioningNodesWatcher and do a recovery + watcher.setIgnoreUpdates(true); + watcher.clearDecomNodes(); + Assert.assertEquals(DecommissioningNodeStatus.NONE, + watcher.checkDecommissioningStatus(id1)); + watcher.recover(rm.getRMContext().getStateStore().loadState()); + Assert.assertEquals(DecommissioningNodeStatus.WAIT_UPDATE, + watcher.checkDecommissioningStatus(id1)); + watcher.setIgnoreUpdates(false); + } + watcher.update(node1, createNodeStatus(id1, app, 1)); Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER, - watcher.checkDecommissioningStatus(id1)); + watcher.checkDecommissioningStatus(id1)); watcher.update(node1, createNodeStatus(id1, app, 0)); Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, - watcher.checkDecommissioningStatus(id1)); + watcher.checkDecommissioningStatus(id1)); // Set app to be FINISHED and verified DecommissioningNodeStatus is READY. MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); Assert.assertEquals(DecommissioningNodeStatus.READY, - watcher.checkDecommissioningStatus(id1)); + watcher.checkDecommissioningStatus(id1)); } @After diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index f807217..49243e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -559,7 +559,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()) { + this.rmContext.getNMTokenSecretManager(), + this.decommissioningNodesWatcher) { @Override protected void serviceStart() throws Exception { throw new Exception("ResourceTracker service failed"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 139e2da..5174a88 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -2057,7 +2057,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()) { + this.rmContext.getNMTokenSecretManager(), + this.decommissioningNodesWatcher) { @Override protected void serviceStart() throws Exception { // send the container_finished event as soon as the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 514e9a0..2a7cdcc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -62,12 +63,14 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDecommissioningNodeData; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; @@ -551,7 +554,7 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper) Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); } } - + public void testEpoch(RMStateStoreHelper stateStoreHelper) throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); @@ -925,4 +928,83 @@ void assertAllocationStateEqual( ReservationSystemUtil.toAllocations( actual.getAllocationRequestsList())); } + + public void testDecommissioningNodes(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + + NodeId nodeId1 = mock(NodeId.class); + when(nodeId1.toString()).thenReturn("host1:1234"); + NodeId nodeId2 = mock(NodeId.class); + when(nodeId2.toString()).thenReturn("host2:1234"); + + // 1. Load empty store + RMState state = store.loadState(); + Map decomNodesState = + state.getDecommissioningNodesState(); + Assert.assertEquals(0, decomNodesState.size()); + + // 2. Store single node + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext1 = + new DecommissioningNodesWatcher.DecommissioningNodeContext(nodeId1); + store.storeDecommissioningNode(nodeContext1); + state = store.loadState(); + Assert.assertEquals(1, state.getDecommissioningNodesState().size()); + RMDecommissioningNodeData readNodeData1 = + state.getDecommissioningNodesState().get(nodeId1.toString()); + Assert.assertNotNull(readNodeData1); + DecommissioningNodesWatcher.DecommissioningNodeContext readNodeContext1 = + readNodeData1.getDecommissioningNodeContext(nodeId1); + assertDecommissioningNodeContextEquals(nodeContext1, readNodeContext1); + + // 3. Update node + nodeContext1.setTimeoutMs(System.currentTimeMillis() + 1000*60*10); + nodeContext1.setLastUpdateTime(System.currentTimeMillis()); + nodeContext1.setDecommissioningStartTime( + System.currentTimeMillis() - 1000*60*20); + store.updateDecommissioningNode(nodeContext1); + state = store.loadState(); + Assert.assertEquals(1, state.getDecommissioningNodesState().size()); + readNodeData1 = + state.getDecommissioningNodesState().get(nodeId1.toString()); + Assert.assertNotNull(readNodeData1); + readNodeContext1 = readNodeData1.getDecommissioningNodeContext(nodeId1); + assertDecommissioningNodeContextEquals(nodeContext1, readNodeContext1); + + // 4. add a second one and remove the first one + DecommissioningNodesWatcher.DecommissioningNodeContext nodeContext2 = + new DecommissioningNodesWatcher.DecommissioningNodeContext(nodeId2); + store.storeDecommissioningNode(nodeContext2); + state = store.loadState(); + Assert.assertEquals(2, state.getDecommissioningNodesState().size()); + readNodeData1 = + state.getDecommissioningNodesState().get(nodeId1.toString()); + Assert.assertNotNull(readNodeData1); + readNodeContext1 = readNodeData1.getDecommissioningNodeContext(nodeId1); + assertDecommissioningNodeContextEquals(nodeContext1, readNodeContext1); + RMDecommissioningNodeData readNodeData2 = + state.getDecommissioningNodesState().get(nodeId2.toString()); + Assert.assertNotNull(readNodeData2); + DecommissioningNodesWatcher.DecommissioningNodeContext readNodeContext2 = + readNodeData2.getDecommissioningNodeContext(nodeId2); + assertDecommissioningNodeContextEquals(nodeContext2, readNodeContext2); + + store.removeDecommissioningNode(nodeContext1); + state = store.loadState(); + Assert.assertEquals(1, state.getDecommissioningNodesState().size()); + readNodeData2 = + state.getDecommissioningNodesState().get(nodeId2.toString()); + Assert.assertNotNull(readNodeData2); + readNodeContext2 = readNodeData2.getDecommissioningNodeContext(nodeId2); + assertDecommissioningNodeContextEquals(nodeContext2, readNodeContext2); + } + + void assertDecommissioningNodeContextEquals( + DecommissioningNodesWatcher.DecommissioningNodeContext expected, + DecommissioningNodesWatcher.DecommissioningNodeContext actual) { + Assert.assertEquals(expected.getDecommissioningStartTime(), actual.getDecommissioningStartTime()); + Assert.assertEquals(expected.getLastUpdateTime(), actual.getLastUpdateTime()); + Assert.assertEquals(expected.getNodeId(), actual.getNodeId()); + Assert.assertEquals(expected.getTimeoutMs(), actual.getTimeoutMs()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 5eeb528..fedac5d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -203,6 +203,7 @@ public void testFSRMStateStore() throws Exception { testRemoveAttempt(fsTester); testAMRMTokenSecretManagerStateStore(fsTester); testReservationStateStore(fsTester); + testDecommissioningNodes(fsTester); } finally { cluster.shutdown(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index e3d0f9c..e09df8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -123,6 +123,12 @@ public void testReservation() throws Exception { } @Test(timeout = 60000) + public void testDecomissioningNodes() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testDecommissioningNodes(tester); + } + + @Test(timeout = 60000) public void testCompactionCycle() throws Exception { final DB mockdb = mock(DB.class); conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 6d5d2d7..919f82d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -204,6 +204,7 @@ public void testZKRMStateStoreRealZK() throws Exception { testRemoveAttempt(zkTester); testAMRMTokenSecretManagerStateStore(zkTester); testReservationStateStore(zkTester); + testDecommissioningNodes(zkTester); ((TestZKRMStateStoreTester.TestZKRMStateStoreInternal) zkTester.getRMStateStore()).testRetryingCreateRootDir(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index c837450..500ab03 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -89,9 +90,11 @@ public void setUp() { NMTokenSecretManagerInRM nmTokenSecretManager = new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.start(); + DecommissioningNodesWatcher decommissioningNodesWatcher = + new DecommissioningNodesWatcher(context); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, decommissioningNodesWatcher); resourceTrackerService.init(conf); resourceTrackerService.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 6a7325c..6840d47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; @@ -109,9 +110,11 @@ public void setUp() { NMTokenSecretManagerInRM nmTokenSecretManager = new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.start(); + DecommissioningNodesWatcher decommissioningNodesWatcher = + new DecommissioningNodesWatcher(context); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, decommissioningNodesWatcher); resourceTrackerService.init(conf); resourceTrackerService.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index 4f94695..e431e1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -82,7 +83,8 @@ public void handle(Event event) { resourceTrackerService = new ResourceTrackerService(context, nodesListManager, new NMLivelinessMonitor(dispatcher), context.getContainerTokenSecretManager(), - context.getNMTokenSecretManager()); + context.getNMTokenSecretManager(), + new DecommissioningNodesWatcher(context)); resourceTrackerService.init(conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index f1adb5e..1f87ac4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -783,10 +784,12 @@ private ResourceTrackerService getPrivateResourceTrackerService( NMTokenSecretManagerInRM nmTokenSecretManager = new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.start(); + DecommissioningNodesWatcher decommissioningNodesWatcher = + new DecommissioningNodesWatcher(privateContext); ResourceTrackerService privateResourceTrackerService = new ResourceTrackerService(privateContext, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, decommissioningNodesWatcher); privateResourceTrackerService.init(conf); privateResourceTrackerService.start(); rm.getResourceScheduler().setRMContext(privateContext);