diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 23cefd3..e8433e0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -281,11 +281,11 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception { } @Override - public synchronized void storeApplicationStateInternal(String appId, + public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - Path appDirPath = getAppDir(rmAppRoot, appId); + Path appDirPath = getAppDir(rmAppRoot, appId.toString()); fs.mkdirs(appDirPath); - Path nodeCreatePath = getNodePath(appDirPath, appId); + Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -300,10 +300,10 @@ public synchronized void storeApplicationStateInternal(String appId, } @Override - public synchronized void updateApplicationStateInternal(String appId, + public synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - Path appDirPath = getAppDir(rmAppRoot, appId); - Path nodeCreatePath = getNodePath(appDirPath, appId); + Path appDirPath = getAppDir(rmAppRoot, appId.toString()); + Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -319,14 +319,13 @@ public synchronized void updateApplicationStateInternal(String appId, @Override public synchronized void storeApplicationAttemptStateInternal( - String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - ApplicationAttemptId appAttemptId = - ConverterUtils.toApplicationAttemptId(attemptId); Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); - Path nodeCreatePath = getNodePath(appDirPath, attemptId); - LOG.info("Storing info for attempt: " + attemptId + " at: " + Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); + LOG.info("Storing info for attempt: " + appAttemptId + " at: " + nodeCreatePath); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); try { @@ -334,21 +333,20 @@ public synchronized void storeApplicationAttemptStateInternal( // based on whether we have lost the right to write to FS writeFile(nodeCreatePath, attemptStateData); } catch (Exception e) { - LOG.info("Error storing info for attempt: " + attemptId, e); + LOG.info("Error storing info for attempt: " + appAttemptId, e); throw e; } } @Override public synchronized void updateApplicationAttemptStateInternal( - String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - ApplicationAttemptId appAttemptId = - ConverterUtils.toApplicationAttemptId(attemptId); Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); - Path nodeCreatePath = getNodePath(appDirPath, attemptId); - LOG.info("Updating info for attempt: " + attemptId + " at: " + Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); + LOG.info("Updating info for attempt: " + appAttemptId + " at: " + nodeCreatePath); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); try { @@ -356,7 +354,7 @@ public synchronized void updateApplicationAttemptStateInternal( // based on whether we have lost the right to write to FS updateFile(nodeCreatePath, attemptStateData); } catch (Exception e) { - LOG.info("Error updating info for attempt: " + attemptId, e); + LOG.info("Error updating info for attempt: " + appAttemptId, e); throw e; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index d5ff5ed..344d285 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -80,7 +80,7 @@ protected synchronized void closeInternal() throws Exception { } @Override - public void storeApplicationStateInternal(String appId, + public void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationState appState = @@ -88,11 +88,11 @@ public void storeApplicationStateInternal(String appId, appStateData.getStartTime(), appStateData.getApplicationSubmissionContext(), appStateData.getUser()); - state.appState.put(appState.getAppId(), appState); + state.appState.put(appId, appState); } @Override - public void updateApplicationStateInternal(String appId, + public void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationState updatedAppState = new ApplicationState(appStateData.getSubmitTime(), @@ -102,21 +102,19 @@ public void updateApplicationStateInternal(String appId, appStateData.getDiagnostics(), appStateData.getFinishTime()); LOG.info("Updating final state " + appStateData.getState() + " for app: " + appId); - ApplicationId applicationId = updatedAppState.getAppId(); - if (state.appState.get(applicationId) != null) { + if (state.appState.get(appId) != null) { // add the earlier attempts back updatedAppState.attempts - .putAll(state.appState.get(applicationId).attempts); + .putAll(state.appState.get(appId).attempts); } - state.appState.put(applicationId, updatedAppState); + state.appState.put(appId, updatedAppState); } @Override - public synchronized void storeApplicationAttemptStateInternal(String attemptIdStr, - ApplicationAttemptStateDataPBImpl attemptStateData) - throws Exception { - ApplicationAttemptId attemptId = ConverterUtils - .toApplicationAttemptId(attemptIdStr); + public synchronized void storeApplicationAttemptStateInternal( + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateData) + throws Exception { Credentials credentials = null; if(attemptStateData.getAppAttemptTokens() != null){ DataInputByteBuffer dibb = new DataInputByteBuffer(); @@ -125,7 +123,7 @@ public synchronized void storeApplicationAttemptStateInternal(String attemptIdSt credentials.readTokenStorageStream(dibb); } ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, + new ApplicationAttemptState(appAttemptId, attemptStateData.getMasterContainer(), credentials, attemptStateData.getStartTime()); @@ -139,10 +137,9 @@ public synchronized void storeApplicationAttemptStateInternal(String attemptIdSt @Override public synchronized void updateApplicationAttemptStateInternal( - String attemptIdStr, ApplicationAttemptStateDataPBImpl attemptStateData) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(attemptIdStr); Credentials credentials = null; if (attemptStateData.getAppAttemptTokens() != null) { DataInputByteBuffer dibb = new DataInputByteBuffer(); @@ -151,7 +148,7 @@ public synchronized void updateApplicationAttemptStateInternal( credentials.readTokenStorageStream(dibb); } ApplicationAttemptState updatedAttemptState = - new ApplicationAttemptState(attemptId, + new ApplicationAttemptState(appAttemptId, attemptStateData.getMasterContainer(), credentials, attemptStateData.getStartTime(), attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index c212c1f..10767c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; @@ -51,13 +53,13 @@ public RMState loadState() throws Exception { } @Override - protected void storeApplicationStateInternal(String appId, + protected void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception { // Do nothing } @Override - protected void storeApplicationAttemptStateInternal(String attemptId, + protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { // Do nothing } @@ -92,13 +94,13 @@ public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Excepti } @Override - protected void updateApplicationStateInternal(String appId, + protected void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception { // Do nothing } @Override - protected void updateApplicationAttemptStateInternal(String attemptId, + protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5e0e944..9757005 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -388,10 +388,10 @@ public synchronized void updateApplicationState(ApplicationState appState) { * Derived classes must implement this method to store the state of an * application. */ - protected abstract void storeApplicationStateInternal(String appId, + protected abstract void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception; - protected abstract void updateApplicationStateInternal(String appId, + protected abstract void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception; @SuppressWarnings("unchecked") @@ -425,10 +425,12 @@ public synchronized void updateApplicationAttemptState( * Derived classes must implement this method to store the state of an * application attempt */ - protected abstract void storeApplicationAttemptStateInternal(String attemptId, + protected abstract void storeApplicationAttemptStateInternal( + ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; - protected abstract void updateApplicationAttemptStateInternal(String attemptId, + protected abstract void updateApplicationAttemptStateInternal( + ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; /** @@ -600,11 +602,11 @@ protected void handleStoreEvent(RMStateStoreEvent event) { LOG.info("Storing info for app: " + appId); try { if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - storeApplicationStateInternal(appId.toString(), appStateData); + storeApplicationStateInternal(appId, appStateData); notifyDoneStoringApplication(appId, storedException); } else { assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - updateApplicationStateInternal(appId.toString(), appStateData); + updateApplicationStateInternal(appId, appStateData); notifyDoneUpdatingApplication(appId, storedException); } } catch (Exception e) { @@ -645,15 +647,15 @@ protected void handleStoreEvent(RMStateStoreEvent event) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - storeApplicationAttemptStateInternal(attemptState.getAttemptId() - .toString(), attemptStateData); + storeApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), storedException); } else { assert event.getType().equals( RMStateStoreEventType.UPDATE_APP_ATTEMPT); - updateApplicationAttemptStateInternal(attemptState.getAttemptId() - .toString(), attemptStateData); + updateApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), storedException); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index f419ff0..6783b2a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -78,16 +78,47 @@ protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion .newInstance(1, 0); + private static final String RM_DT_DELEGATION_TOKEN_ZNODE_NAME = "RMDTDelegationToken"; + private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = "RMDTSequentialNumber"; + private static final String RM_DT_MASTER_KEY_ZNODE_NAME = "RMDelegationKey"; private int numRetries; private String zkHostPort = null; private int zkSessionTimeout; private long zkRetryInterval; private List zkAcl; + + /** + * + * ROOT_DIR_PATH + * |--- VERSION_INFO + * |--- RM_APP_ROOT + * | |----- (#ApplicationId1) + * | | |----- (#ApplicationAttemptIds) + * | | + * | |----- (#ApplicationId2) + * | | |----- (#ApplicationAttemptIds) + * | .... + * | + * |--- RM_DT_SECRET_MANAGER_ROOT + * |----- RMDTSequenceNumber + * |----- RMDelegationToken + * | |----- Token_1 + * | |----- Token_2 + * | .... + * | + * |----- RMDelegationKey + * |----- Key_1 + * |----- Key_2 + * .... + * + */ private String zkRootNodePath; - private String rmDTSecretManagerRoot; private String rmAppRoot; - private String dtSequenceNumberPath = null; + private String rmDTSecretManagerRoot; + private String rmDelegationKeyPath; + private String rmDelegationTokenPath; + private String dtSequenceNumberPath; @VisibleForTesting protected String znodeWorkingPath; @@ -179,7 +210,6 @@ public synchronized void initInternal(Configuration conf) throws Exception { } zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME; - rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT; /* Initialize fencing related paths, acls, and ops */ @@ -204,6 +234,14 @@ public synchronized void initInternal(Configuration conf) throws Exception { zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl); } } + + rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; + rmDelegationKeyPath = rmDTSecretManagerRoot + + "/" + RM_DT_MASTER_KEY_ZNODE_NAME; + rmDelegationTokenPath = rmDTSecretManagerRoot + + "/" + RM_DT_DELEGATION_TOKEN_ZNODE_NAME; + dtSequenceNumberPath = rmDTSecretManagerRoot + + "/" + RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME; } @Override @@ -217,8 +255,11 @@ public synchronized void startInternal() throws Exception { if (HAUtil.isHAEnabled(getConfig())){ fence(); } - createRootDir(rmDTSecretManagerRoot); createRootDir(rmAppRoot); + createRootDir(rmDTSecretManagerRoot); + createRootDir(rmDelegationKeyPath); + createRootDir(rmDelegationTokenPath); + createRootDir(dtSequenceNumberPath); } private void createRootDir(final String rootPath) throws Exception { @@ -350,26 +391,67 @@ public synchronized RMState loadState() throws Exception { private synchronized void loadRMDTSecretManagerState(RMState rmState) throws Exception { - List childNodes = - getChildrenWithRetries(rmDTSecretManagerRoot, true); + loadRMDelegationKeyState(rmState); + loadRMSequentialNumberState(rmState); + loadRMDelegationTokenState(rmState); + } + private void loadRMDelegationKeyState(RMState rmState) throws Exception { + List childNodes = getChildrenWithRetries(rmDelegationKeyPath, true); for (String childNodeName : childNodes) { - if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { - rmState.rmSecretManagerState.dtSequenceNumber = - Integer.parseInt(childNodeName.split("_")[1]); + String childNodePath = getNodePath(rmDelegationKeyPath, childNodeName); + byte[] childData = getDataWithRetries(childNodePath, true); + + if (childData == null) { + LOG.warn("Content of " + childNodePath + " is broken."); continue; } - String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, true); ByteArrayInputStream is = new ByteArrayInputStream(childData); DataInputStream fsIn = new DataInputStream(is); + try { if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) { DelegationKey key = new DelegationKey(); key.readFields(fsIn); rmState.rmSecretManagerState.masterKeyState.add(key); - } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { + } + } finally { + is.close(); + } + } + } + + private void loadRMSequentialNumberState(RMState rmState) throws Exception { + byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false); + if (seqData != null) { + ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData); + DataInputStream seqIn = new DataInputStream(seqIs); + + try { + rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt(); + } finally { + seqIn.close(); + } + } + } + + private void loadRMDelegationTokenState(RMState rmState) throws Exception { + List childNodes = zkClient.getChildren(rmDelegationTokenPath, true); + for (String childNodeName : childNodes) { + String childNodePath = getNodePath(rmDelegationTokenPath, childNodeName); + byte[] childData = getDataWithRetries(childNodePath, true); + + if (childData == null) { + LOG.warn("Content of " + childNodePath + " is broken."); + continue; + } + + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + + try { + if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(); identifier.readFields(fsIn); @@ -385,13 +467,10 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState) private synchronized void loadRMAppState(RMState rmState) throws Exception { List childNodes = getChildrenWithRetries(rmAppRoot, true); - List attempts = - new ArrayList(); for (String childNodeName : childNodes) { String childNodePath = getNodePath(rmAppRoot, childNodeName); byte[] childData = getDataWithRetries(childNodePath, true); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { - // application LOG.info("Loading application from znode: " + childNodeName); ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); ApplicationStateDataPBImpl appStateData = @@ -409,43 +488,47 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { "from the application id"); } rmState.appState.put(appId, appState); - } else if (childNodeName - .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { - // attempt - LOG.info("Loading application attempt from znode: " + childNodeName); - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(childNodeName); - ApplicationAttemptStateDataPBImpl attemptStateData = - new ApplicationAttemptStateDataPBImpl( - ApplicationAttemptStateDataProto.parseFrom(childData)); - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, + loadApplicationAttemptState(rmState, appId); + } else { + LOG.info("Unknown child node with name: " + childNodeName); + } + } + } + + private void loadApplicationAttemptState(RMState rmState, ApplicationId appId) + throws Exception { + String appPath = getNodePath(rmAppRoot, appId.toString()); + List attempts = getChildrenWithRetries(appPath, false); + for (String attemptName:attempts) { + String attemptPath = getNodePath(appPath, attemptName); + byte[] attemptData = getDataWithRetries(attemptPath, true); + + ApplicationAttemptId attemptId = + ConverterUtils.toApplicationAttemptId(attemptName); + ApplicationAttemptStateDataPBImpl attemptStateData = + new ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStateDataProto.parseFrom(attemptData)); + Credentials credentials = null; + if (attemptStateData.getAppAttemptTokens() != null) { + credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, attemptStateData.getMasterContainer(), credentials, attemptStateData.getStartTime(), attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus()); - if (!attemptId.equals(attemptState.getAttemptId())) { - throw new YarnRuntimeException("The child node name is different " + - "from the application attempt id"); - } - attempts.add(attemptState); - } else { - LOG.info("Unknown child node with name: " + childNodeName); + if (!attemptId.equals(attemptState.getAttemptId())) { + throw new YarnRuntimeException("The child node name is different " + + "from the application attempt id"); } - } - // go through all attempts and add them to their apps - for (ApplicationAttemptState attemptState : attempts) { - ApplicationId appId = attemptState.getAttemptId().getApplicationId(); ApplicationState appState = rmState.appState.get(appId); if (appState != null) { appState.attempts.put(attemptState.getAttemptId(), attemptState); @@ -454,18 +537,19 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { // completed but the RM might have stopped before it could remove the // application attempt znodes LOG.info("Application node not found for attempt: " - + attemptState.getAttemptId()); + + attemptState.getAttemptId()); deleteWithRetries( - getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), - 0); + getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), + 0); } } + } @Override - public synchronized void storeApplicationStateInternal(String appId, + public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, appId); + String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); @@ -477,9 +561,9 @@ public synchronized void storeApplicationStateInternal(String appId, } @Override - public synchronized void updateApplicationStateInternal(String appId, + public synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, appId); + String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { LOG.debug("Storing final state info for app: " + appId + " at: " @@ -491,11 +575,15 @@ public synchronized void updateApplicationStateInternal(String appId, @Override public synchronized void storeApplicationAttemptStateInternal( - String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, attemptId); + String appDirPath = getNodePath(rmAppRoot, + appAttemptId.getApplicationId().toString()); + String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); + if (LOG.isDebugEnabled()) { - LOG.debug("Storing info for attempt: " + attemptId + " at: " + LOG.debug("Storing info for attempt: " + appAttemptId + " at: " + nodeCreatePath); } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); @@ -505,11 +593,14 @@ public synchronized void storeApplicationAttemptStateInternal( @Override public synchronized void updateApplicationAttemptStateInternal( - String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, attemptId); + String appDirPath = getNodePath(rmAppRoot, + appAttemptId.getApplicationId().toString()); + String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); if (LOG.isDebugEnabled()) { - LOG.debug("Storing final state info for attempt: " + attemptId + " at: " + LOG.debug("Storing final state info for attempt: " + appAttemptId + " at: " + nodeCreatePath); } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); @@ -520,16 +611,18 @@ public synchronized void updateApplicationAttemptStateInternal( public synchronized void removeApplicationState(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); - String nodeRemovePath = getNodePath(rmAppRoot, appId); + String appIdRemovePath = getNodePath(rmAppRoot, appId); ArrayList opList = new ArrayList(); - opList.add(Op.delete(nodeRemovePath, 0)); for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { - String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString()); + String attemptRemovePath = getNodePath(appIdRemovePath, + attemptId.toString()); opList.add(Op.delete(attemptRemovePath, 0)); } + opList.add(Op.delete(appIdRemovePath, 0)); + if (LOG.isDebugEnabled()) { - LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath + LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath + " and its attempts."); } doMultiWithRetries(opList); @@ -542,38 +635,37 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( ArrayList opList = new ArrayList(); // store RM delegation token String nodeCreatePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + getNodePath(rmDelegationTokenPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - DataOutputStream fsOut = new DataOutputStream(os); + ByteArrayOutputStream tokenOs = new ByteArrayOutputStream(); + DataOutputStream tokenOut = new DataOutputStream(tokenOs); + ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); + DataOutputStream seqOut = new DataOutputStream(seqOs); + try { - rmDTIdentifier.write(fsOut); - fsOut.writeLong(renewDate); + rmDTIdentifier.write(tokenOut); + tokenOut.writeLong(renewDate); if (LOG.isDebugEnabled()) { LOG.debug("Storing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl, + + opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl, CreateMode.PERSISTENT)); - } finally { - os.close(); - } - // store sequence number - String latestSequenceNumberPath = - getNodePath(rmDTSecretManagerRoot, - DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + - latestSequenceNumber); - } - if (dtSequenceNumberPath != null) { - opList.add(Op.delete(dtSequenceNumberPath, 0)); + seqOut.writeInt(latestSequenceNumber); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + dtSequenceNumberPath + + ". SequenceNumber: " + latestSequenceNumber); + } + + opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1)); + } finally { + tokenOs.close(); + seqOs.close(); } - opList.add(Op.create(latestSequenceNumberPath, null, zkAcl, - CreateMode.PERSISTENT)); - dtSequenceNumberPath = latestSequenceNumberPath; + doMultiWithRetries(opList); } @@ -581,7 +673,7 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( protected synchronized void removeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { String nodeRemovePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + getNodePath(rmDelegationTokenPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationToken_" @@ -594,7 +686,7 @@ protected synchronized void removeRMDelegationTokenState( protected synchronized void storeRMDTMasterKeyState( DelegationKey delegationKey) throws Exception { String nodeCreatePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX + getNodePath(rmDelegationKeyPath, DELEGATION_KEY_PREFIX + delegationKey.getKeyId()); ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream fsOut = new DataOutputStream(os); @@ -753,8 +845,7 @@ public void setDataWithRetries(final String path, final byte[] data, return new ZKAction() { @Override public byte[] run() throws KeeperException, InterruptedException { - Stat stat = new Stat(); - return zkClient.getData(path, watch, stat); + return zkClient.getData(path, watch, null); } }.runWithRetries(); } @@ -861,4 +952,5 @@ protected synchronized ZooKeeper getNewZooKeeper() zk.register(new ForwardingWatcher()); return zk; } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 417fdb1..974237e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -231,6 +231,12 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) attempts.put(attemptIdRemoved, mockRemovedAttempt); store.removeApplication(mockRemovedApp); + // remove application directory recursively. + storeApp(store, appIdRemoved, submitTime, startTime); + storeAttempt(store, attemptIdRemoved, + "container_1352994193343_0002_01_000001", null, null, dispatcher); + store.removeApplication(mockRemovedApp); + // let things settle down Thread.sleep(1000); store.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 4df1c3b..579e89f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @@ -164,10 +165,11 @@ public void testFSRMStateStoreClientRetry() throws Exception { @Override public void run() { try { - store.storeApplicationStateInternal("application1", - (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl - .newApplicationStateData(111, 111, "user", null, - RMAppState.ACCEPTED, "diagnostics", 333)); + store.storeApplicationStateInternal( + ApplicationId.newInstance(100L, 1), + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newApplicationStateData(111, 111, "user", null, + RMAppState.ACCEPTED, "diagnostics", 333)); } catch (Exception e) { // TODO 0 datanode exception will not be retried by dfs client, fix // that separately.