diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7dd5ce3..de4a6d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -562,6 +562,10 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms"; public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000; + public static final String ZK_APPID_NODE_SPLIT_INDEX = + RM_ZK_PREFIX + "appid-node.split-index"; + public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0; + public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl"; public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 3c30ed3..a8954ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -606,7 +606,27 @@ - Name of the cluster. In an HA setting, + Index at which last section of application id(with each section + separated by _ in application id) will be split so that application znode + stored in zookeeper RM state store will be stored as two different znodes + (parent-child). Split is done from the end. + For instance, with no split, appid znode will be of the form + application_1352994193343_0001. If the value of this config is 1, the + appid znode will be broken into two parts application_1352994193343_000 + and 1 respectively with former being the parent node. + application_1352994193343_0002 will then be stored as 2 under the parent + node application_1352994193343_000. This config can take values from 0 to 4. + 0 means there will be no split. If configuration value is outside this + range, it will be treated as config value of 0(i.e. no split). A value + larger than 0 (up to 4) should be configured if you are storing a large number + of apps in ZK based RM state store and state store operations are failing due to + LenError in Zookeeper. + yarn.resourcemanager.zk-appid-node.split-index + 0 + + + + Name of the cluster. In a HA setting, this is used to ensure the RM participates in leader election for this cluster and ensures it does not affect other clusters diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index cf6380f..9c4a3af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; @@ -72,6 +73,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; /** * {@link RMStateStore} implementation backed by ZooKeeper. @@ -82,6 +84,31 @@ * |--- EPOCH_NODE * |--- RM_ZK_FENCING_LOCK * |--- RM_APP_ROOT + * | |----- HIERARCHIES + * | | |----- 1 + * | | | |----- (#ApplicationId barring last character) + * | | | | |----- (#Last character of ApplicationId) + * | | | | | |----- (#ApplicationAttemptIds) + * | | | .... + * | | | + * | | |----- 2 + * | | | |----- (#ApplicationId barring last 2 characters) + * | | | | |----- (#Last 2 characters of ApplicationId) + * | | | | | |----- (#ApplicationAttemptIds) + * | | | .... + * | | | + * | | |----- 3 + * | | | |----- (#ApplicationId barring last 3 characters) + * | | | | |----- (#Last 3 characters of ApplicationId) + * | | | | | |----- (#ApplicationAttemptIds) + * | | | .... + * | | | + * | | |----- 4 + * | | | |----- (#ApplicationId barring last 4 characters) + * | | | | |----- (#Last 4 characters of ApplicationId) + * | | | | | |----- (#ApplicationAttemptIds) + * | | | .... + * | | | * | |----- (#ApplicationId1) * | | |----- (#ApplicationAttemptIds) * | | @@ -121,6 +148,7 @@ @Unstable public class ZKRMStateStore extends RMStateStore { private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); + private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot"; private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = @@ -129,12 +157,15 @@ "RMDTMasterKeysRoot"; @VisibleForTesting public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; - protected static final Version CURRENT_VERSION_INFO = - Version.newInstance(1, 3); + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(2, 0); + @VisibleForTesting + static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES"; /* Znode paths */ private String zkRootNodePath; private String rmAppRoot; + private Map rmAppRootHierarchies; private String rmDTSecretManagerRoot; private String dtMasterKeysRootPath; private String delegationTokensRootPath; @@ -144,6 +175,7 @@ @VisibleForTesting protected String znodeWorkingPath; + private int appIdNodeSplitIndex = 0; /* Fencing related variables */ private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; @@ -166,6 +198,18 @@ protected CuratorFramework curatorFramework; /** + * Encapsulates full app node path and corresponding split index. + */ + private final static class AppNodeSplitInfo { + private final String path; + private final int splitIndex; + AppNodeSplitInfo(String path, int splitIndex) { + this.path = path; + this.splitIndex = splitIndex; + } + } + + /** * Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for * ZooKeeper access, construct the {@link ACL}s for the store's root node. * In the constructed {@link ACL}, all the users allowed by sourceACLs are @@ -212,11 +256,30 @@ public synchronized void initInternal(Configuration conf) conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME); - fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK); rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT); + String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES); + rmAppRootHierarchies = new HashMap<>(5); + rmAppRootHierarchies.put(0, rmAppRoot); + for (int splitIndex = 1; splitIndex <= 4; splitIndex++) { + rmAppRootHierarchies.put(splitIndex, + getNodePath(hierarchiesPath, Integer.toString(splitIndex))); + } + + fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK); zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); + appIdNodeSplitIndex = + conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, + YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); + if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) { + LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " + + YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " + + "Resetting it to " + + YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); + appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX; + } + zkAcl = RMZKUtils.getZKAcls(conf); if (HAUtil.isHAEnabled(conf)) { @@ -269,6 +332,10 @@ public synchronized void startInternal() throws Exception { verifyActiveStatusThread.start(); } create(rmAppRoot); + create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES)); + for (int splitIndex = 1; splitIndex <= 4; splitIndex++) { + create(rmAppRootHierarchies.get(splitIndex)); + } create(rmDTSecretManagerRoot); create(dtMasterKeysRootPath); create(delegationTokensRootPath); @@ -524,42 +591,62 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception { } } - private synchronized void loadRMAppState(RMState rmState) throws Exception { - List childNodes = getChildren(rmAppRoot); - - for (String childNodeName : childNodes) { - String childNodePath = getNodePath(rmAppRoot, childNodeName); - byte[] childData = getData(childNodePath); - - if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { - // application - if (LOG.isDebugEnabled()) { - LOG.debug("Loading application from znode: " + childNodeName); - } - - ApplicationId appId = ApplicationId.fromString(childNodeName); - ApplicationStateDataPBImpl appState = - new ApplicationStateDataPBImpl( - ApplicationStateDataProto.parseFrom(childData)); + private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath, + String appIdStr) throws Exception { + byte[] appData = getData(appNodePath); + if (LOG.isDebugEnabled()) { + LOG.debug("Loading application from znode: " + appNodePath); + } + ApplicationId appId = ApplicationId.fromString(appIdStr); + ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl( + ApplicationStateDataProto.parseFrom(appData)); + if (!appId.equals( + appState.getApplicationSubmissionContext().getApplicationId())) { + throw new YarnRuntimeException("The node name is different " + + "from the application id"); + } + rmState.appState.put(appId, appState); + loadApplicationAttemptState(appState, appNodePath); + } - if (!appId.equals( - appState.getApplicationSubmissionContext().getApplicationId())) { - throw new YarnRuntimeException("The child node name is different " - + "from the application id"); + private synchronized void loadRMAppState(RMState rmState) throws Exception { + for (int splitIndex = 0; splitIndex <= 4; splitIndex++) { + String appRoot = rmAppRootHierarchies.get(splitIndex); + if (appRoot == null) { + continue; + } + List childNodes = getChildren(appRoot); + boolean appNodeFound = false; + for (String childNodeName : childNodes) { + if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { + appNodeFound = true; + if (splitIndex == 0) { + loadRMAppStateFromAppNode(rmState, + getNodePath(appRoot, childNodeName), childNodeName); + } else { + // If AppId Node is partitioned. + String leafNodePath = getNodePath(appRoot, childNodeName); + List leafNodes = getChildren(leafNodePath); + for (String leafNodeName : leafNodes) { + String appIdStr = childNodeName + leafNodeName; + loadRMAppStateFromAppNode(rmState, + getNodePath(leafNodePath, leafNodeName), appIdStr); + } + } } - - rmState.appState.put(appId, appState); - loadApplicationAttemptState(appState, appId); - } else { - LOG.info("Unknown child node with name: " + childNodeName); + } + if (splitIndex != appIdNodeSplitIndex && !appNodeFound) { + // If no loaded app exists for a particular split index and the split + // index for which apps are being loaded is not the one configured, then + // we do not need to keep track of this hierarchy for storing/updating/ + // removing app/app attempt znodes. + rmAppRootHierarchies.remove(splitIndex); } } } private void loadApplicationAttemptState(ApplicationStateData appState, - ApplicationId appId) - throws Exception { - String appPath = getNodePath(rmAppRoot, appId.toString()); + String appPath) throws Exception { List attempts = getChildren(appPath); for (String attemptIDStr : attempts) { @@ -575,13 +662,66 @@ private void loadApplicationAttemptState(ApplicationStateData appState, } } - LOG.debug("Done loading applications from ZK state store"); + if (LOG.isDebugEnabled()) { + LOG.debug("Done loading applications from ZK state store"); + } + } + + /** + * Get parent app node path based on full path and split index supplied. + * @param appIdPath App id path for which parent needs to be returned. + * @param splitIndex split index. + * @return parent app node path. + */ + private String getSplitAppNodeParent(String appIdPath, int splitIndex) { + return appIdPath.substring(0, appIdPath.length() - splitIndex - 1); + } + + /** + * Checks if parent app node has no leaf nodes and if it does not have, + * removes it. Called while removing application. + * @param appIdPath path of app id to be removed. + * @param splitIndex split index. + * @throws Exception + */ + private void checkRemoveParentAppNode(String appIdPath, int splitIndex) + throws Exception { + if (splitIndex != 0) { + String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex); + List children = null; + try { + children = getChildren(parentAppNode); + } catch (KeeperException.NoNodeException ke) { + // It should be fine to swallow this exception as the parent app node we + // intend to delete is already deleted. + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to remove app parent node " + parentAppNode + + " as it does not exist."); + } + return; + } + // No apps stored under parent path. + if (children != null && children.isEmpty()) { + try { + safeDelete(parentAppNode); + if (LOG.isDebugEnabled()) { + LOG.debug("No leaf app node exists. Removing parent node " + + parentAppNode); + } + } catch (KeeperException.NotEmptyException ke) { + // It should be fine to swallow this exception as the parent app node + // has to be deleted only if it has no children. And this node has. + LOG.debug("Unable to remove app parent node " + parentAppNode + + " as it has children."); + } + } + } } @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); + String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); @@ -596,7 +736,26 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, protected synchronized void updateApplicationStateInternal( ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); + String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false); + boolean pathExists = true; + // Look for paths based on other split indices if path as per split index + // does not exist. + if (!exists(nodeUpdatePath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString()); + if (alternatePathInfo != null) { + nodeUpdatePath = alternatePathInfo.path; + } else { + // No alternate path exists. Create path as per configured split index. + pathExists = false; + if (appIdNodeSplitIndex != 0) { + String rootNode = + getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex); + if (!exists(rootNode)) { + safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT); + } + } + } + } if (LOG.isDebugEnabled()) { LOG.debug("Storing final state info for app: " + appId + " at: " @@ -605,13 +764,14 @@ protected synchronized void updateApplicationStateInternal( byte[] appStateData = appStateDataPB.getProto().toByteArray(); - if (exists(nodeUpdatePath)) { + if (pathExists) { safeSetData(nodeUpdatePath, appStateData, -1); } else { - safeCreate(nodeUpdatePath, appStateData, zkAcl, - CreateMode.PERSISTENT); - LOG.debug(appId + " znode didn't exist. Created a new znode to" - + " update the application state."); + safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + if (LOG.isDebugEnabled()) { + LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " + + "exist. Creating a new znode to update the application state."); + } } } @@ -620,8 +780,17 @@ protected synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, ApplicationAttemptStateData attemptStateDataPB) throws Exception { - String appDirPath = getNodePath(rmAppRoot, - appAttemptId.getApplicationId().toString()); + String appId = appAttemptId.getApplicationId().toString(); + String appDirPath = getLeafAppIdNodePath(appId, false); + // Look for paths based on other split indices. + if (!exists(appDirPath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId); + if (alternatePathInfo == null) { + throw new YarnRuntimeException("Unexpected Exception. App node for " + + "app " + appId + " not found"); + } + appDirPath = alternatePathInfo.path; + } String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); if (LOG.isDebugEnabled()) { @@ -639,8 +808,17 @@ protected synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptStateData attemptStateDataPB) throws Exception { String appIdStr = appAttemptId.getApplicationId().toString(); + String appDirPath = getLeafAppIdNodePath(appIdStr, false); + // Look for paths based on other split indices. + if (!exists(appDirPath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(appIdStr); + if (alternatePathInfo == null) { + throw new YarnRuntimeException("Unexpected Exception. App node for " + + "app " + appIdStr + " not found"); + } + appDirPath = alternatePathInfo.path; + } String appAttemptIdStr = appAttemptId.toString(); - String appDirPath = getNodePath(rmAppRoot, appIdStr); String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr); if (LOG.isDebugEnabled()) { @@ -655,8 +833,11 @@ protected synchronized void updateApplicationAttemptStateInternal( } else { safeCreate(nodeUpdatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT); - LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to" - + " update the application attempt state."); + if (LOG.isDebugEnabled()) { + LOG.debug("Path " + nodeUpdatePath + " for " + appAttemptId + + " didn't exist. Created a new znode to update the application" + + " attempt state."); + } } } @@ -664,7 +845,17 @@ protected synchronized void updateApplicationAttemptStateInternal( protected synchronized void removeApplicationAttemptInternal( ApplicationAttemptId appAttemptId) throws Exception { String appId = appAttemptId.getApplicationId().toString(); - String appIdRemovePath = getNodePath(rmAppRoot, appId); + String appIdRemovePath = getLeafAppIdNodePath(appId, false); + // Look for paths based on other split indices. + if (!exists(appIdRemovePath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId); + if (alternatePathInfo != null) { + appIdRemovePath = alternatePathInfo.path; + } else { + // Unexpected. Assume that app attempt has been deleted. + return; + } + } String attemptIdRemovePath = getNodePath(appIdRemovePath, appAttemptId.toString()); @@ -679,9 +870,22 @@ protected synchronized void removeApplicationAttemptInternal( @Override protected synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { - String appId = appState.getApplicationSubmissionContext().getApplicationId() - .toString(); - String appIdRemovePath = getNodePath(rmAppRoot, appId); + String appId = appState.getApplicationSubmissionContext(). + getApplicationId().toString(); + String appIdRemovePath = getLeafAppIdNodePath(appId, false); + int splitIndex = appIdNodeSplitIndex; + // Look for paths based on other split indices if path as per configured + // split index does not exist. + if (!exists(appIdRemovePath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId); + if (alternatePathInfo != null) { + appIdRemovePath = alternatePathInfo.path; + splitIndex = alternatePathInfo.splitIndex; + } else { + // Alternate path not found so return. + return; + } + } if (LOG.isDebugEnabled()) { LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath @@ -695,6 +899,9 @@ protected synchronized void removeApplicationStateInternal( } safeDelete(appIdRemovePath); + + // Check if we should remove the parent app node as well. + checkRemoveParentAppNode(appIdRemovePath, splitIndex); } @Override @@ -815,8 +1022,24 @@ public synchronized void deleteStore() throws Exception { @Override public synchronized void removeApplication(ApplicationId removeAppId) throws Exception { - String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString()); - delete(appIdRemovePath); + String appId = removeAppId.toString(); + String appIdRemovePath = getLeafAppIdNodePath(appId, false); + int splitIndex = appIdNodeSplitIndex; + // Look for paths based on other split indices. + if (!exists(appIdRemovePath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId); + if (alternatePathInfo != null) { + appIdRemovePath = alternatePathInfo.path; + splitIndex = alternatePathInfo.splitIndex; + } else { + // Alternate path not found so return. + return; + } + } + curatorFramework.delete().deletingChildrenIfNeeded(). + forPath(appIdRemovePath); + // Check if we should remove the root node as well. + checkRemoveParentAppNode(appIdRemovePath, splitIndex); } @VisibleForTesting @@ -915,6 +1138,79 @@ private void createRootDirRecursively(String path) throws Exception { } } + /** + * Get alternate path for app id if path according to configured split index + * does not exist. We look for path based on all possible split indices. + * @param appId + * @return a {@link AppNodeSplitInfo} object containing the path and split + * index if it exists, null otherwise. + * @throws Exception + */ + private AppNodeSplitInfo getAlternatePath(String appId) throws Exception { + for (Map.Entry entry : rmAppRootHierarchies.entrySet()) { + // Look for other paths + int splitIndex = entry.getKey(); + if (splitIndex != appIdNodeSplitIndex) { + String alternatePath = + getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false); + if (exists(alternatePath)) { + return new AppNodeSplitInfo(alternatePath, splitIndex); + } + } + } + return null; + } + + /** + * Returns leaf app node path based on app id and passed split index. If the + * passed flag createParentIfNotExists is true, also creates the parent app + * node if it does not exist. + * @param appId application id. + * @param rootNode app root node based on split index. + * @param appIdNodeSplitIdx split index. + * @param createParentIfNotExists flag which determines if parent app node + * needs to be created(as per split) if it does not exist. + * @return leaf app node path. + * @throws Exception + */ + private String getLeafAppIdNodePath(String appId, String rootNode, + int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception { + if (appIdNodeSplitIdx == 0) { + return getNodePath(rootNode, appId); + } + String nodeName = appId; + int splitIdx = nodeName.length() - appIdNodeSplitIdx; + rootNode = getNodePath(rootNode, nodeName.substring(0, splitIdx)); + if (createParentIfNotExists && !exists(rootNode)) { + try { + safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to create app parent node " + rootNode + " as it " + + "already exists."); + } + } + } + return getNodePath(rootNode, nodeName.substring(splitIdx)); + } + + /** + * Returns leaf app node path based on app id and configured split index. If + * the passed flag createParentIfNotExists is true, also creates the parent + * app node if it does not exist. + * @param appId application id. + * @param createParentIfNotExists flag which determines if parent app node + * needs to be created(as per split) if it does not exist. + * @return leaf app node path. + * @throws Exception + */ + private String getLeafAppIdNodePath(String appId, + boolean createParentIfNotExists) throws Exception { + return getLeafAppIdNodePath(appId, + rmAppRootHierarchies.get(appIdNodeSplitIndex), + appIdNodeSplitIndex, createParentIfNotExists); + } + @VisibleForTesting byte[] getData(final String path) throws Exception { return curatorFramework.getData().forPath(path); @@ -925,11 +1221,13 @@ private void createRootDirRecursively(String path) throws Exception { return curatorFramework.getACL().forPath(path); } - private List getChildren(final String path) throws Exception { + @VisibleForTesting + List getChildren(final String path) throws Exception { return curatorFramework.getChildren().forPath(path); } - private boolean exists(final String path) throws Exception { + @VisibleForTesting + boolean exists(final String path) throws Exception { return curatorFramework.checkExists().forPath(path) != null; } @@ -958,6 +1256,11 @@ private void safeCreate(String path, byte[] data, List acl, } } + /** + * Deletes the path. Checks for existence of path as well. + * @param path Path to be deleted. + * @throws Exception if any problem occurs while performing deletion. + */ private void safeDelete(final String path) throws Exception { if (exists(path)) { SafeTransaction transaction = new SafeTransaction(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 514e9a0..1fb488d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -200,6 +200,13 @@ protected RMAppAttempt storeAttempt(RMStateStore store, return mockAttempt; } + protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher, + ApplicationAttemptStateData attemptState) { + dispatcher.attemptId = attemptState.getAttemptId(); + store.updateApplicationAttemptState(attemptState); + waitNotify(dispatcher); + } + void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { testRMAppStateStore(stateStoreHelper, new StoreStateVerifier()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index f71cf25..e9e4393 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -28,6 +28,8 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -40,19 +42,26 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.security.MasterKeyData; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.data.ACL; @@ -61,13 +70,18 @@ import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.crypto.SecretKey; @@ -130,9 +144,21 @@ public Version getCurrentVersion() { return CURRENT_VERSION_INFO; } + private String getAppNode(String appId, int splitIdx) { + String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" + + RM_APP_ROOT; + String appPath = appId; + if (splitIdx != 0) { + int idx = appId.length() - splitIdx; + appPath = appId.substring(0, idx) + "/" + appId.substring(idx); + return rootPath + "/" + RM_APP_ROOT_HIERARCHIES + "/" + + Integer.toString(splitIdx) + "/" + appPath; + } + return rootPath + "/" + appPath; + } + public String getAppNode(String appId) { - return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/" - + appId; + return getAppNode(appId, 0); } public String getAttemptNode(String appId, String attemptId) { @@ -149,8 +175,7 @@ public void testRetryingCreateRootDir() throws Exception { } - public RMStateStore getRMStateStore() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); + private RMStateStore createStore(Configuration conf) throws Exception { workingZnode = "/jira/issue/3077/rmstore"; conf.set(YarnConfiguration.RM_ZK_ADDRESS, curatorTestingServer.getConnectString()); @@ -159,6 +184,15 @@ public RMStateStore getRMStateStore() throws Exception { return this.store; } + public RMStateStore getRMStateStore(Configuration conf) throws Exception { + return createStore(conf); + } + + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + return createStore(conf); + } + @Override public boolean isFinalStateValid() throws Exception { return 1 == @@ -178,8 +212,12 @@ public Version getCurrentVersion() throws Exception { } public boolean appExists(RMApp app) throws Exception { + String appIdPath = app.getApplicationId().toString(); + int split = + store.getConfig().getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, + YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); return null != curatorFramework.checkExists() - .forPath(store.getAppNode(app.getApplicationId().toString())); + .forPath(store.getAppNode(appIdPath, split)); } public boolean attemptExists(RMAppAttempt attempt) throws Exception { @@ -342,7 +380,6 @@ public void testZKRootPathAcls() throws Exception { rm.close(); } - @SuppressWarnings("unchecked") @Test public void testFencing() throws Exception { StateChangeRequestInfo req = new StateChangeRequestInfo( @@ -527,4 +564,548 @@ public void testDuplicateRMAppDeletion() throws Exception { } store.close(); } + + private static String createPath(String root, String... parts) { + StringBuilder sb = new StringBuilder(); + sb.append(root); + for (String part : parts) { + sb.append("/").append(part); + } + return sb.toString(); + } + + private static Configuration createConfForAppNodeSplit(int splitIndex) { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, splitIndex); + return conf; + } + + private static RMApp createMockAppForRemove(ApplicationId appId, + ApplicationAttemptId...attemptIds) { + RMApp app = mock(RMApp.class); + ApplicationSubmissionContextPBImpl context = + new ApplicationSubmissionContextPBImpl(); + context.setApplicationId(appId); + when(app.getApplicationSubmissionContext()).thenReturn(context); + when(app.getUser()).thenReturn("test"); + if (attemptIds.length > 0) { + HashMap attempts = new HashMap<>(); + for (ApplicationAttemptId attemptId : attemptIds) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(attemptId); + attempts.put(attemptId, appAttempt); + } + when(app.getAppAttempts()).thenReturn(attempts); + } + return app; + } + + private static void verifyLoadedApp(ApplicationStateData appState, + ApplicationId appId, String user, long submitTime, long startTime, + RMAppState state, long finishTime, String diagnostics) { + // Check if app is loaded correctly + assertNotNull("App " + appId + " should have been loaded.", appState); + assertEquals("App submit time in app state", submitTime, + appState.getSubmitTime()); + assertEquals("App start time in app state", startTime, + appState.getStartTime()); + assertEquals("App ID in app state", appId, + appState.getApplicationSubmissionContext().getApplicationId()); + assertEquals("App state", state, appState.getState()); + assertEquals("Finish time in app state", finishTime, + appState.getFinishTime()); + assertEquals("User in app state", user, appState.getUser()); + assertEquals("Diagnostics in app state", diagnostics, + appState.getDiagnostics()); + } + + private static void verifyLoadedAttempt(ApplicationStateData appState, + ApplicationId appId, ApplicationAttemptId attemptId, String trackingURL, + ContainerId masterContainerId, SecretKey clientTokenKey, + RMAppAttemptState state, String diagnostics, long finishTime, + int amExitStatus, FinalApplicationStatus finalStatus) { + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId); + // Check if attempt is loaded correctly + assertNotNull( + "Attempt " + attemptId + " should have been loaded.", attemptState); + assertEquals("Attempt Id in attempt state", + attemptId, attemptState.getAttemptId()); + assertEquals("Master Container Id in attempt state", + masterContainerId, attemptState.getMasterContainer().getId()); + if (null != clientTokenKey) { + assertArrayEquals("Client token key in attempt state", + clientTokenKey.getEncoded(), attemptState.getAppAttemptTokens(). + getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + } + assertEquals("Attempt state", state, attemptState.getState()); + assertEquals("Finish time in attempt state", finishTime, + attemptState.getFinishTime()); + assertEquals("Diagnostics in attempt state", diagnostics, + attemptState.getDiagnostics()); + assertEquals("AM Container exit status in attempt state", amExitStatus, + attemptState.getAMContainerExitStatus()); + assertEquals("Final app status in attempt state", finalStatus, + attemptState.getFinalApplicationStatus()); + assertEquals("Tracking URL in attempt state", trackingURL, + attemptState.getFinalTrackingUrl()); + } + + // Test to verify storing of apps and app attempts in ZK state store with app + // node split index configured more than 0. + @Test + public void testAppNodeSplit() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + + long submitTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis() + 1234; + Configuration conf = new YarnConfiguration(); + + // Get store with app node split config set as 1. + RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1)); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + // Create RM Context and app token manager. + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(store); + AMRMTokenSecretManager appTokenMgr = + spy(new AMRMTokenSecretManager(conf, rmContext)); + MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey(); + when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData); + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = + new ClientToAMTokenSecretManagerInRM(); + + // Store app1. + ApplicationAttemptId attemptId1 = ConverterUtils + .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); + ApplicationId appId1 = attemptId1.getApplicationId(); + storeApp(store, appId1, submitTime, startTime); + waitNotify(dispatcher); + + // Store app2 with app id application_1352994193343_120213. + ApplicationId appId21 = ConverterUtils.toApplicationId( + "application_1352994193343_120213"); + storeApp(store, appId21, submitTime, startTime); + waitNotify(dispatcher); + + // Store attempt associated with app1. + Token appAttemptToken1 = + generateAMRMToken(attemptId1, appTokenMgr); + SecretKey clientTokenKey1 = + clientToAMTokenMgr.createMasterKey(attemptId1); + ContainerId containerId1 = + ConverterUtils.toContainerId("container_1352994193343_0001_01_000001"); + storeAttempt(store, attemptId1, containerId1.toString(), appAttemptToken1, + clientTokenKey1, dispatcher); + String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; + ApplicationAttemptId attemptId2 = + ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + + // Store attempt associated with app2. + Token appAttemptToken2 = + generateAMRMToken(attemptId2, appTokenMgr); + SecretKey clientTokenKey2 = + clientToAMTokenMgr.createMasterKey(attemptId2); + Credentials attemptCred = new Credentials(); + attemptCred.addSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME, + clientTokenKey2.getEncoded()); + ContainerId containerId2 = + ConverterUtils.toContainerId("container_1352994193343_0001_02_000001"); + storeAttempt(store, attemptId2, containerId2.toString(), appAttemptToken2, + clientTokenKey2, dispatcher); + + // Store another app which will be removed. + ApplicationAttemptId attemptIdRemoved = ConverterUtils + .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); + ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); + storeApp(store, appIdRemoved, submitTime, startTime); + waitNotify(dispatcher); + storeAttempt(store, attemptIdRemoved, + "container_1352994193343_0002_01_000001", null, null, dispatcher); + // Remove the app. + RMApp mockRemovedApp = + createMockAppForRemove(appIdRemoved, attemptIdRemoved); + store.removeApplication(mockRemovedApp); + + // Remove application + storeApp(store, appIdRemoved, submitTime, startTime); + waitNotify(dispatcher); + storeAttempt(store, attemptIdRemoved, + "container_1352994193343_0002_01_000001", null, null, dispatcher); + store.removeApplication(mockRemovedApp); + // Close state store + store.close(); + + // Load state store + store = zkTester.getRMStateStore(createConfForAppNodeSplit(1)); + store.setRMDispatcher(dispatcher); + RMState state = store.loadState(); + java.util.Map rmAppState = + state.getApplicationState(); + // Check if application_1352994193343_120213 (i.e. app2) exists in state + // store as per split index. + String path = createPath(((ZKRMStateStore)store).znodeWorkingPath, + ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT, + ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, "1", + "application_1352994193343_12021", "3"); + assertTrue("Application with id application_1352994193343_120213" + + " does not exist as per split in state store.", + ((ZKRMStateStore)store).exists(path)); + + // Verify loaded apps and attempts based on the operations we did before + // reloading the state store. + ApplicationStateData appState = rmAppState.get(appId1); + verifyLoadedApp( + appState, appId1, "test", submitTime, startTime, null, 0, ""); + assertEquals("Attempts loaded for " + appId1, 2, appState.attempts.size()); + verifyLoadedAttempt(appState, appId1, attemptId1, "N/A", containerId1, + clientTokenKey1, null, "", 0, -1000, null); + verifyLoadedAttempt(appState, appId1, attemptId2, "N/A", containerId2, + clientTokenKey2, null, "", 0, -1000, null); + + // Update app state for app1. + ApplicationStateData appState2 = ApplicationStateData.newInstance( + submitTime, startTime, "test", + appState.getApplicationSubmissionContext(), RMAppState.FINISHED, + "appDiagnostics", 1234, null); + appState2.attempts.putAll(appState.attempts); + store.updateApplicationState(appState2); + waitNotify(dispatcher); + + // Update attempt state for app2. + Container container = new ContainerPBImpl(); + container.setId(containerId2); + ApplicationAttemptStateData newAttemptState = + ApplicationAttemptStateData.newInstance(attemptId2, container, + attemptCred, startTime, RMAppAttemptState.FINISHED, "myTrackingUrl", + "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED, + 100, 0, 0, 0, 0, 0); + updateAttempt(store, dispatcher, newAttemptState); + + // Test updating app/attempt for app whose initial state is not saved + ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10); + ApplicationSubmissionContext dummyContext = + new ApplicationSubmissionContextPBImpl(); + dummyContext.setApplicationId(dummyAppId); + ApplicationStateData dummyApp = ApplicationStateData.newInstance( + appState.getSubmitTime(), appState.getStartTime(), appState.getUser(), + dummyContext, RMAppState.FINISHED, "appDiagnostics", 1234, null); + store.updateApplicationState(dummyApp); + waitNotify(dispatcher); + + ApplicationAttemptId dummyAttemptId = + ApplicationAttemptId.newInstance(dummyAppId, 6); + container = new ContainerPBImpl(); + container.setId(ContainerId.newContainerId(dummyAttemptId, 1)); + ApplicationAttemptStateData dummyAttempt = + ApplicationAttemptStateData.newInstance(dummyAttemptId, + container, attemptCred, startTime, RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED, + 111, 0, 0, 0, 0, 0); + updateAttempt(store, dispatcher, dummyAttempt); + // Close the store + store.close(); + + // Check updated application state. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(1)); + store.setRMDispatcher(dispatcher); + RMState newRMState = store.loadState(); + Map newRMAppState = + newRMState.getApplicationState(); + ApplicationStateData dummyAppState = newRMAppState.get(dummyAppId); + assertNotNull(dummyAppId + " is not there in loaded apps", dummyAppState); + verifyLoadedApp(dummyAppState, dummyAppId, "test", + submitTime, startTime, RMAppState.FINISHED, 1234, "appDiagnostics"); + ApplicationStateData updatedAppState = newRMAppState.get(appId1); + assertNotNull(appId1 + " is not there in loaded apps", updatedAppState); + verifyLoadedApp(updatedAppState, appId1, "test", + submitTime, startTime, RMAppState.FINISHED, 1234, "appDiagnostics"); + // Check updated attempt state. + assertEquals("Attempts loaded for app " + dummyAppId, 1, + dummyAppState.attempts.size()); + verifyLoadedAttempt(dummyAppState, dummyAppId, dummyAttemptId, + "myTrackingUrl", container.getId(), clientTokenKey2, + RMAppAttemptState.FINISHED, "attemptDiagnostics", 0, + 111, FinalApplicationStatus.SUCCEEDED); + assertEquals("Attempts loaded for app " + appId1, 2, + updatedAppState.attempts.size()); + verifyLoadedAttempt(updatedAppState, appId1, attemptId1, "N/A", + containerId1, clientTokenKey1, null, "", 0, -1000, null); + verifyLoadedAttempt(updatedAppState, appId1, attemptId2, "myTrackingUrl", + containerId2, clientTokenKey2, RMAppAttemptState.FINISHED, + "attemptDiagnostics", 0, 100, FinalApplicationStatus.SUCCEEDED); + + // assert store is in expected state after everything is cleaned + assertTrue("Store is not in expected state", zkTester.isFinalStateValid()); + store.close(); + } + + // Test to verify storing of apps and app attempts in ZK state store with app + // node split index config changing across restarts. + @Test + public void testAppNodeSplitChangeAcrossRestarts() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + long submitTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis() + 1234; + Configuration conf = new YarnConfiguration(); + + // Create store with app node split set as 1. + RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1)); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(store); + AMRMTokenSecretManager appTokenMgr = + spy(new AMRMTokenSecretManager(conf, rmContext)); + MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey(); + when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData); + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = + new ClientToAMTokenSecretManagerInRM(); + + // Store app1. + ApplicationAttemptId attemptId1 = ConverterUtils + .toApplicationAttemptId("appattempt_1442994194053_0001_000001"); + ApplicationId appId1 = attemptId1.getApplicationId(); + storeApp(store, appId1, submitTime, startTime); + waitNotify(dispatcher); + + // Store attempts for app1. + Token appAttemptToken1 = + generateAMRMToken(attemptId1, appTokenMgr); + SecretKey clientTokenKey1 = clientToAMTokenMgr.createMasterKey(attemptId1); + ContainerId containerId1 = + ConverterUtils.toContainerId("container_1442994194053_0001_01_000001"); + storeAttempt(store, attemptId1, containerId1.toString(), appAttemptToken1, + clientTokenKey1, dispatcher); + String appAttemptIdStr2 = "appattempt_1442994194053_0001_000002"; + ApplicationAttemptId attemptId2 = + ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + + Token appAttemptToken2 = + generateAMRMToken(attemptId2, appTokenMgr); + SecretKey clientTokenKey2 = clientToAMTokenMgr.createMasterKey(attemptId2); + ContainerId containerId2 = + ConverterUtils.toContainerId("container_1442994194053_0001_02_000001"); + storeAttempt(store, attemptId2, containerId2.toString(), appAttemptToken2, + clientTokenKey2, dispatcher); + Credentials attemptCred = new Credentials(); + attemptCred.addSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME, + clientTokenKey2.getEncoded()); + + // Store app2 and associated attempt. + ApplicationAttemptId attemptId11 = ConverterUtils + .toApplicationAttemptId("appattempt_1442994194053_0002_000001"); + ApplicationId appId11 = attemptId11.getApplicationId(); + storeApp(store, appId11, submitTime, startTime); + waitNotify(dispatcher); + ContainerId containerId11 = + ConverterUtils.toContainerId("container_1442994194053_0002_01_000001"); + storeAttempt(store, attemptId11, containerId11.toString(), null, null, + dispatcher); + // Close state store + store.close(); + + // Load state store with app node split config of 2. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(2)); + store.setRMDispatcher(dispatcher); + RMState state = store.loadState(); + Map rmAppState = + state.getApplicationState(); + ApplicationId appId21 = ConverterUtils.toApplicationId( + "application_1442994194053_120213"); + storeApp(store, appId21, submitTime, startTime); + waitNotify(dispatcher); + + // Check if app is loaded correctly despite change in split index. + ApplicationStateData appState = rmAppState.get(appId1); + verifyLoadedApp( + appState, appId1, "test", submitTime, startTime, null, 0, ""); + assertEquals("Attempts loaded for app " + appId1, 2, + appState.attempts.size()); + verifyLoadedAttempt(appState, appId1, attemptId1, "N/A", containerId1, + clientTokenKey1, null, "", 0, -1000, null); + verifyLoadedAttempt(appState, appId1, attemptId2, "N/A", containerId2, + clientTokenKey2, null, "", 0, -1000, null); + + // Update app/attempt state + ApplicationStateData appState2 = + ApplicationStateData.newInstance(appState.getSubmitTime(), + appState.getStartTime(), appState.getUser(), + appState.getApplicationSubmissionContext(), RMAppState.FINISHED, + "appDiagnostics", 1234, null); + appState2.attempts.putAll(appState.attempts); + store.updateApplicationState(appState2); + waitNotify(dispatcher); + + Container container = new ContainerPBImpl(); + container.setId(containerId2); + ApplicationAttemptStateData newAttemptState = + ApplicationAttemptStateData.newInstance(attemptId2, container, + attemptCred, startTime, RMAppAttemptState.FINISHED, "myTrackingUrl", + "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED, + 100, 0, 0, 0, 0, 0); + updateAttempt(store, dispatcher, newAttemptState); + + // Test updating app/attempt for app whose initial state is not saved + ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10); + ApplicationSubmissionContext dummyContext = + new ApplicationSubmissionContextPBImpl(); + dummyContext.setApplicationId(dummyAppId); + ApplicationStateData dummyApp = ApplicationStateData.newInstance( + appState.getSubmitTime(), appState.getStartTime(), appState.getUser(), + dummyContext, RMAppState.FINISHED, "appDiagnostics", 1234, null); + store.updateApplicationState(dummyApp); + waitNotify(dispatcher); + + ApplicationAttemptId dummyAttemptId = + ApplicationAttemptId.newInstance(dummyAppId, 6); + container = new ContainerPBImpl(); + container.setId(ContainerId.newContainerId(dummyAttemptId, 1)); + ApplicationAttemptStateData dummyAttempt = + ApplicationAttemptStateData.newInstance(dummyAttemptId, + container, attemptCred, startTime, RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED, + 111, 0, 0, 0, 0, 0); + updateAttempt(store, dispatcher, dummyAttempt); + // Close the store + store.close(); + + // Load state store this time with split index of 0. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(0)); + store.setRMDispatcher(dispatcher); + state = store.loadState(); + assertEquals("Number of Apps loaded", 4, + state.getApplicationState().size()); + rmAppState = state.getApplicationState(); + appState = rmAppState.get(appId1); + assertNotNull(appId1 + " is not there in loaded apps", appState); + // Verify if apps and attempts are loaded correctly. + verifyLoadedApp(appState, appId1, "test", submitTime, startTime, + RMAppState.FINISHED, 1234, "appDiagnostics"); + assertEquals("Number of attempts loaded for app " + appId1, 2, + appState.attempts.size()); + verifyLoadedAttempt(appState, appId1, attemptId1, "N/A", containerId1, + clientTokenKey1, null, "", 0, -1000, null); + verifyLoadedAttempt(appState, appId1, attemptId2, "myTrackingUrl", + containerId2, clientTokenKey2, RMAppAttemptState.FINISHED, + "attemptDiagnostics", 0, 100, FinalApplicationStatus.SUCCEEDED); + // Remove attempt1 + store.removeApplicationAttempt(attemptId1); + ApplicationId appId31 = ConverterUtils.toApplicationId( + "application_1442994195071_0045"); + storeApp(store, appId31, submitTime, startTime); + waitNotify(dispatcher); + // Close state store. + store.close(); + + // Load state store with split index of 3. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(3)); + store.setRMDispatcher(dispatcher); + state = store.loadState(); + assertEquals("Number of apps loaded", 5, + state.getApplicationState().size()); + rmAppState = state.getApplicationState(); + appState = rmAppState.get(dummyAppId); + assertNotNull(dummyAppId + " is not there in loaded apps", appState); + // Verify if all the apps and attempts are loaded correctly. + verifyLoadedApp(appState, dummyAppId, "test", submitTime, startTime, + RMAppState.FINISHED, 1234, "appDiagnostics"); + assertEquals("Number of attempts loaded for app " + dummyAppId, 1, + appState.attempts.size()); + verifyLoadedAttempt(appState, dummyAppId, dummyAttemptId, "myTrackingUrl", + container.getId(), clientTokenKey2, RMAppAttemptState.FINISHED, + "attemptDiagnostics", 0, 111, FinalApplicationStatus.SUCCEEDED); + appState = rmAppState.get(appId31); + assertNotNull(appId31 + " is not there in loaded apps", appState); + verifyLoadedApp( + appState, appId31, "test", submitTime, startTime, null, 0, ""); + assertEquals("Number of attempts loaded for app " + appId31, 0, + appState.attempts.size()); + appState = rmAppState.get(appId21); + assertNotNull(appId21 + " is not there in loaded apps", appState); + verifyLoadedApp( + appState, appId21, "test", submitTime, startTime, null, 0, ""); + assertEquals("Number of attempts loaded for app " + appId21, 0, + appState.attempts.size()); + appState = rmAppState.get(appId11); + assertNotNull(appId11 + " is not there in loaded apps", appState); + verifyLoadedApp( + appState, appId11, "test", submitTime, startTime, null, 0, ""); + assertEquals("Number of attempts loaded for app " + appId11, 1, + appState.attempts.size()); + verifyLoadedAttempt(appState, appId11, attemptId11, "N/A", containerId11, + null, null, "", 0, -1000, null); + appState = rmAppState.get(appId1); + assertNotNull(appId1 + " is not there in loaded apps", appState); + verifyLoadedApp(appState, appId1, "test", submitTime, startTime, + RMAppState.FINISHED, 1234, "appDiagnostics"); + assertEquals("Number of attempts loaded for app " + appId1, 1, + appState.attempts.size()); + verifyLoadedAttempt(appState, appId1, attemptId2, "myTrackingUrl", + containerId2, clientTokenKey2, RMAppAttemptState.FINISHED, + "attemptDiagnostics", 0, 100, FinalApplicationStatus.SUCCEEDED); + + // Store another app. + String appRootPath = + createPath(((ZKRMStateStore)store).znodeWorkingPath, + ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT); + String hierarchicalPath = createPath(appRootPath, + ZKRMStateStore.RM_APP_ROOT_HIERARCHIES); + ApplicationId appId41 = ConverterUtils.toApplicationId( + "application_1442994195087_0001"); + storeApp(store, appId41, submitTime, startTime); + waitNotify(dispatcher); + // Check how many apps exist in each of the hierarchy based paths. 0 paths + // should exist in "HIERARCHIES/4" path as app split index was never set + // as 4 in tests above. + assertEquals("Number of childrens for path " + appRootPath, 2, + ((ZKRMStateStore)store).getChildren(appRootPath).size()); + assertEquals("Number of childrens for path " + hierarchicalPath + "/1", 1, + ((ZKRMStateStore)store).getChildren(hierarchicalPath + "/1").size()); + assertEquals("Number of childrens for path " + hierarchicalPath + "/2", 2, + ((ZKRMStateStore)store).getChildren(hierarchicalPath + "/2").size()); + assertEquals("Number of childrens for path " + hierarchicalPath + "/3", 1, + ((ZKRMStateStore)store).getChildren(hierarchicalPath + "/3").size()); + assertEquals("Number of childrens for path " + hierarchicalPath + "/4", 0, + ((ZKRMStateStore)store).getChildren(hierarchicalPath + "/4").size()); + String path = createPath( + hierarchicalPath, "3", "application_1442994195087_0", "001"); + assertTrue("application_1442994195087_0001 should exist in path " + path, + ((ZKRMStateStore)store).exists(createPath(path))); + + // Remove applications + RMApp mockRemovedApp = createMockAppForRemove(appId11, attemptId11); + store.removeApplication(mockRemovedApp); + storeApp(store, appId11, submitTime, startTime); + waitNotify(dispatcher); + storeAttempt(store, attemptId11, "container_1442994194053_0002_01_000001", + null, null, dispatcher); + store.removeApplication(mockRemovedApp); + + RMApp mockRemovedApp1 = createMockAppForRemove(appId41); + store.removeApplication(mockRemovedApp1); + + mockRemovedApp1 = createMockAppForRemove(appId31); + store.removeApplication(mockRemovedApp1); + + mockRemovedApp1 = createMockAppForRemove(appId21); + store.removeApplication(mockRemovedApp1); + + mockRemovedApp = createMockAppForRemove(dummyAppId, dummyAttemptId); + store.removeApplication(mockRemovedApp); + + mockRemovedApp = createMockAppForRemove(appId1, attemptId1, attemptId2); + store.removeApplication(mockRemovedApp); + store.close(); + + // Load state store with split index of 3 again. As all apps have been + // removed nothing should be loaded back. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(3)); + store.setRMDispatcher(dispatcher); + state = store.loadState(); + assertEquals("Number of apps loaded", 0, + state.getApplicationState().size()); + // Close the state store. + store.close(); + } }