diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 628d260..94d5e63 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -56,7 +56,6 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; - import com.google.common.annotations.VisibleForTesting; @Private @@ -66,15 +65,49 @@ public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; + private static final String VERSION_INFO_ZNODE_NAME = "VersionInfo"; + private static final String RM_DT_DELEGATION_TOKEN_ZNODE_NAME = "RMDTDelegationToken"; + private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = "RMDTSequentialNumber"; + private static final String RM_DT_MASTER_KEY_ZNODE_NAME = "RMDelegationKey"; + private static final int VERSION_INFO = 1; private int numRetries; private String zkHostPort = null; private int zkSessionTimeout; private List zkAcl; + + /** + * + * ROOT_DIR_PATH + * |--- VERSION_INFO + * |--- RM_APP_ROOT + * | |----- (#ApplicationId1) + * | | |----- (#ApplicationAttemptIds) + * | | + * | |----- (#ApplicationId2) + * | | |----- (#ApplicationAttemptIds) + * | .... + * | + * |--- RM_DT_SECRET_MANAGER_ROOT + * |----- RMDTSequenceNumber + * |----- RMDelegationToken + * | |----- Token_1 + * | |----- Token_2 + * | .... + * | + * |----- RMDelegationKey + * |----- Key_1 + * |----- Key_2 + * .... + * + */ private String zkRootNodePath; - private String rmDTSecretManagerRoot; + private String versionInfoPath; private String rmAppRoot; - private String dtSequenceNumberPath = null; + private String rmDTSecretManagerRoot; + private String rmDelegationKeyPath; + private String rmDelegationTokenPath; + private String dtSequenceNumberPath; @VisibleForTesting protected String znodeWorkingPath; @@ -114,8 +147,15 @@ public synchronized void initInternal(Configuration conf) throws Exception { } zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME; - rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; + versionInfoPath = zkRootNodePath + "/" + VERSION_INFO_ZNODE_NAME; rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT; + rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; + rmDelegationKeyPath = rmDTSecretManagerRoot + + "/" + RM_DT_MASTER_KEY_ZNODE_NAME; + rmDelegationTokenPath = rmDTSecretManagerRoot + + "/" + RM_DT_DELEGATION_TOKEN_ZNODE_NAME; + dtSequenceNumberPath = rmDTSecretManagerRoot + + "/" + RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME; } @Override @@ -126,13 +166,43 @@ public synchronized void startInternal() throws Exception { // ensure root dirs exist createRootDir(znodeWorkingPath); createRootDir(zkRootNodePath); - createRootDir(rmDTSecretManagerRoot); createRootDir(rmAppRoot); + createRootDir(versionInfoPath); + createRootDir(rmDTSecretManagerRoot); + createRootDir(rmDelegationKeyPath); + createRootDir(rmDelegationTokenPath); + createRootDir(dtSequenceNumberPath); + + ensureVersion(); } - private void createRootDir(String rootPath) throws Exception { + private void ensureVersion() throws Exception { + byte[] versionData = getDataWithRetries(versionInfoPath, false).getData(); + if (versionData != null) { + ByteArrayInputStream versionIs = new ByteArrayInputStream(versionData); + DataInputStream versionIn = new DataInputStream(versionIs); + try { + int currentVersion = versionIn.readInt(); + if (currentVersion != VERSION_INFO) { + throw new YarnRuntimeException("Incompatible version!"); + } + } finally { + versionIn.close(); + } + } else { + ByteArrayOutputStream versionOs = new ByteArrayOutputStream(); + DataOutputStream versionOut = new DataOutputStream(versionOs); + try { + versionOut.writeInt(VERSION_INFO); + } finally { + versionOut.close(); + } + } + } + + private void createRootDir(String rootPath, byte[] data) throws Exception { try { - createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT); + createWithRetries(rootPath, data, zkAcl, CreateMode.PERSISTENT); } catch (KeeperException ke) { if (ke.code() != Code.NODEEXISTS) { throw ke; @@ -140,6 +210,10 @@ private void createRootDir(String rootPath) throws Exception { } } + private void createRootDir(String rootPath) throws Exception { + createRootDir(rootPath, null); + } + private synchronized void closeZkClients() throws IOException { if (zkClient != null) { try { @@ -176,25 +250,67 @@ public synchronized RMState loadState() throws Exception { private synchronized void loadRMDTSecretManagerState(RMState rmState) throws Exception { - List childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true); + loadRMDelegationKeyState(rmState); + loadRMSequentialNumberState(rmState); + loadRMDelegationTokenState(rmState); + } + private void loadRMDelegationKeyState(RMState rmState) throws Exception { + List childNodes = getChildrenWithRetries(rmDelegationKeyPath, true); for (String childNodeName : childNodes) { - if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { - rmState.rmSecretManagerState.dtSequenceNumber = - Integer.parseInt(childNodeName.split("_")[1]); + String childNodePath = getNodePath(rmDelegationKeyPath, childNodeName); + byte[] childData = getDataWithRetries(childNodePath, true).getData(); + + if (childData == null) { + LOG.warn("Content of " + childNodePath + " is broken."); continue; } - String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, true); ByteArrayInputStream is = new ByteArrayInputStream(childData); DataInputStream fsIn = new DataInputStream(is); + try { if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) { DelegationKey key = new DelegationKey(); key.readFields(fsIn); rmState.rmSecretManagerState.masterKeyState.add(key); - } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { + } + } finally { + is.close(); + } + } + } + + private void loadRMSequentialNumberState(RMState rmState) throws Exception { + ZNode seqZnode = getDataWithRetries(dtSequenceNumberPath, false); + if (seqZnode.getData() != null) { + ByteArrayInputStream seqIs = new ByteArrayInputStream(seqZnode.getData()); + DataInputStream seqIn = new DataInputStream(seqIs); + + try { + rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt(); + } finally { + seqIn.close(); + } + } + } + + private void loadRMDelegationTokenState(RMState rmState) throws Exception { + List childNodes = zkClient.getChildren(rmDelegationTokenPath, true); + for (String childNodeName : childNodes) { + String childNodePath = getNodePath(rmDelegationTokenPath, childNodeName); + byte[] childData = getDataWithRetries(childNodePath, true).getData(); + + if (childData == null) { + LOG.warn("Content of " + childNodePath + " is broken."); + continue; + } + + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + + try { + if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(); identifier.readFields(fsIn); @@ -209,12 +325,10 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState) } private synchronized void loadRMAppState(RMState rmState) throws Exception { - List childNodes = zkClient.getChildren(rmAppRoot, true); - List attempts = - new ArrayList(); + List childNodes = getChildrenWithRetries(rmAppRoot, true); for (String childNodeName : childNodes) { String childNodePath = getNodePath(rmAppRoot, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, true); + byte[] childData = getDataWithRetries(childNodePath, true).getData(); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { // application LOG.info("Loading application from znode: " + childNodeName); @@ -234,43 +348,46 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { "from the application id"); } rmState.appState.put(appId, appState); - } else if (childNodeName - .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { - // attempt - LOG.info("Loading application attempt from znode: " + childNodeName); - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(childNodeName); - ApplicationAttemptStateDataPBImpl attemptStateData = - new ApplicationAttemptStateDataPBImpl( - ApplicationAttemptStateDataProto.parseFrom(childData)); - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, + loadApplicationAttemptState(rmState, appId); + } else { + LOG.info("Unknown child node with name: " + childNodeName); + } + } + } + + private void loadApplicationAttemptState(RMState rmState, ApplicationId appId) throws Exception { + String appPath = getNodePath(rmAppRoot, appId.toString()); + List attempts = getChildrenWithRetries(appPath, false); + for (String attemptName:attempts) { + String attemptPath = getNodePath(appPath, attemptName); + byte[] attemptData = getDataWithRetries(attemptPath, true).getData(); + + ApplicationAttemptId attemptId = + ConverterUtils.toApplicationAttemptId(attemptName); + ApplicationAttemptStateDataPBImpl attemptStateData = + new ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStateDataProto.parseFrom(attemptData)); + Credentials credentials = null; + if (attemptStateData.getAppAttemptTokens() != null) { + credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, attemptStateData.getMasterContainer(), credentials, attemptStateData.getStartTime(), attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus()); - if (!attemptId.equals(attemptState.getAttemptId())) { - throw new YarnRuntimeException("The child node name is different " + - "from the application attempt id"); - } - attempts.add(attemptState); - } else { - LOG.info("Unknown child node with name: " + childNodeName); + if (!attemptId.equals(attemptState.getAttemptId())) { + throw new YarnRuntimeException("The child node name is different " + + "from the application attempt id"); } - } - // go through all attempts and add them to their apps - for (ApplicationAttemptState attemptState : attempts) { - ApplicationId appId = attemptState.getAttemptId().getApplicationId(); ApplicationState appState = rmState.appState.get(appId); if (appState != null) { appState.attempts.put(attemptState.getAttemptId(), attemptState); @@ -279,12 +396,13 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { // completed but the RM might have stopped before it could remove the // application attempt znodes LOG.info("Application node not found for attempt: " - + attemptState.getAttemptId()); + + attemptState.getAttemptId()); deleteWithRetries( - getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), - 0); + getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), + 0); } } + } @Override @@ -318,7 +436,12 @@ public synchronized void updateApplicationStateInternal(String appId, public synchronized void storeApplicationAttemptStateInternal( String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, attemptId); + ApplicationAttemptId appAttemptId = + ConverterUtils.toApplicationAttemptId(attemptId); + String appDirPath = getNodePath(rmAppRoot, + appAttemptId.getApplicationId().toString()); + String nodeCreatePath = getNodePath(appDirPath, attemptId); + if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptId + " at: " + nodeCreatePath); @@ -332,7 +455,11 @@ public synchronized void storeApplicationAttemptStateInternal( public synchronized void updateApplicationAttemptStateInternal( String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, attemptId); + ApplicationAttemptId appAttemptId = + ConverterUtils.toApplicationAttemptId(attemptId); + String appDirPath = getNodePath(rmAppRoot, + appAttemptId.getApplicationId().toString()); + String nodeCreatePath = getNodePath(appDirPath, attemptId); if (LOG.isDebugEnabled()) { LOG.debug("Storing final state info for attempt: " + attemptId + " at: " + nodeCreatePath); @@ -367,38 +494,36 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( ArrayList opList = new ArrayList(); // store RM delegation token String nodeCreatePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + getNodePath(rmDelegationTokenPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - DataOutputStream fsOut = new DataOutputStream(os); + ByteArrayOutputStream tokenOs = new ByteArrayOutputStream(); + DataOutputStream tokenOut = new DataOutputStream(tokenOs); + ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); + DataOutputStream seqOut = new DataOutputStream(seqOs); + try { - rmDTIdentifier.write(fsOut); - fsOut.writeLong(renewDate); + rmDTIdentifier.write(tokenOut); + tokenOut.writeLong(renewDate); if (LOG.isDebugEnabled()) { LOG.debug("Storing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl, + + opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl, CreateMode.PERSISTENT)); - } finally { - os.close(); - } - // store sequence number - String latestSequenceNumberPath = - getNodePath(rmDTSecretManagerRoot, - DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + - latestSequenceNumber); - } - if (dtSequenceNumberPath != null) { - opList.add(Op.delete(dtSequenceNumberPath, 0)); + seqOut.writeInt(latestSequenceNumber); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + dtSequenceNumberPath + + ". SequenceNumber: " + latestSequenceNumber); + } + + opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1)); + } finally { + tokenOs.close(); } - opList.add(Op.create(latestSequenceNumberPath, null, zkAcl, - CreateMode.PERSISTENT)); - dtSequenceNumberPath = latestSequenceNumberPath; + doMultiWithRetries(opList); } @@ -406,7 +531,7 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( protected synchronized void removeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { String nodeRemovePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + getNodePath(rmDelegationTokenPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationToken_" @@ -419,7 +544,7 @@ protected synchronized void removeRMDelegationTokenState( protected synchronized void storeRMDTMasterKeyState( DelegationKey delegationKey) throws Exception { String nodeCreatePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX + getNodePath(rmDelegationKeyPath, DELEGATION_KEY_PREFIX + delegationKey.getKeyId()); ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream fsOut = new DataOutputStream(os); @@ -565,13 +690,14 @@ public Void run() throws KeeperException, InterruptedException { } @VisibleForTesting - public byte[] getDataWithRetries(final String path, final boolean watch) + public ZNode getDataWithRetries(final String path, final boolean watch) throws Exception { - return new ZKAction() { + return new ZKAction() { @Override - public byte[] run() throws KeeperException, InterruptedException { + public ZNode run() throws KeeperException, InterruptedException { Stat stat = new Stat(); - return zkClient.getData(path, watch, stat); + byte[] data = zkClient.getData(path, watch, stat); + return new ZNode(data, stat); } }.runWithRetries(); } @@ -652,4 +778,14 @@ protected synchronized ZooKeeper getNewZooKeeper() zk.register(new ForwardingWatcher()); return zk; } + + private List getChildrenWithRetries( + final String path, final boolean watch) throws Exception { + return new ZKAction>() { + @Override + List run() throws KeeperException, InterruptedException { + return zkClient.getChildren(path, watch); + } + }.runWithRetries(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZNode.java new file mode 100644 index 0000000..32b75d2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZNode.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.zookeeper.data.Stat; + +@Private +@Unstable +class ZNode { + + final byte[] data; + final Stat stat; + + public ZNode(byte[] data, Stat stat) { + this.data = data; + this.stat = stat; + } + + byte[] getData() { + return data; + } + + Stat getStat() { + return stat; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java index 82e550c..503363c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java @@ -148,7 +148,7 @@ public void testZKClientDisconnectAndReconnect() zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME); byte[] ret = null; try { - ret = store.getDataWithRetries(path, true); + ret = store.getDataWithRetries(path, true).getData(); } catch (Exception e) { String error = "ZKRMStateStore Session restore failed"; LOG.error(error, e); @@ -182,7 +182,7 @@ public void testZKSessionTimeout() throws Exception { // after this point, expired event has already been processed. try { - byte[] ret = store.getDataWithRetries(path, false); + byte[] ret = store.getDataWithRetries(path, false).getData(); Assert.assertEquals("bytes", new String(ret)); } catch (Exception e) { String error = "New session creation failed";