diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7dd5ce3..de4a6d7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -562,6 +562,10 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
+ public static final String ZK_APPID_NODE_SPLIT_INDEX =
+ RM_ZK_PREFIX + "appid-node.split-index";
+ public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
+
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 3c30ed3..a8954ec 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -606,7 +606,27 @@
- Name of the cluster. In an HA setting,
+ Index at which last section of application id(with each section
+ separated by _ in application id) will be split so that application znode
+ stored in zookeeper RM state store will be stored as two different znodes
+ (parent-child). Split is done from the end.
+ For instance, with no split, appid znode will be of the form
+ application_1352994193343_0001. If the value of this config is 1, the
+ appid znode will be broken into two parts application_1352994193343_000
+ and 1 respectively with former being the parent node.
+ application_1352994193343_0002 will then be stored as 2 under the parent
+ node application_1352994193343_000. This config can take values from 0 to 4.
+ 0 means there will be no split. If configuration value is outside this
+ range, it will be treated as config value of 0(i.e. no split). A value
+ larger than 0 (up to 4) should be configured if you are storing a large number
+ of apps in ZK based RM state store and state store operations are failing due to
+ LenError in Zookeeper.
+ yarn.resourcemanager.zk-appid-node.split-index
+ 0
+
+
+
+ Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
election for this cluster and ensures it does not affect
other clusters
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index cf6380f..9c4a3af 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
@@ -72,6 +73,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* {@link RMStateStore} implementation backed by ZooKeeper.
@@ -82,6 +84,31 @@
* |--- EPOCH_NODE
* |--- RM_ZK_FENCING_LOCK
* |--- RM_APP_ROOT
+ * | |----- HIERARCHIES
+ * | | |----- 1
+ * | | | |----- (#ApplicationId barring last character)
+ * | | | | |----- (#Last character of ApplicationId)
+ * | | | | | |----- (#ApplicationAttemptIds)
+ * | | | ....
+ * | | |
+ * | | |----- 2
+ * | | | |----- (#ApplicationId barring last 2 characters)
+ * | | | | |----- (#Last 2 characters of ApplicationId)
+ * | | | | | |----- (#ApplicationAttemptIds)
+ * | | | ....
+ * | | |
+ * | | |----- 3
+ * | | | |----- (#ApplicationId barring last 3 characters)
+ * | | | | |----- (#Last 3 characters of ApplicationId)
+ * | | | | | |----- (#ApplicationAttemptIds)
+ * | | | ....
+ * | | |
+ * | | |----- 4
+ * | | | |----- (#ApplicationId barring last 4 characters)
+ * | | | | |----- (#Last 4 characters of ApplicationId)
+ * | | | | | |----- (#ApplicationAttemptIds)
+ * | | | ....
+ * | | |
* | |----- (#ApplicationId1)
* | | |----- (#ApplicationAttemptIds)
* | |
@@ -121,6 +148,7 @@
@Unstable
public class ZKRMStateStore extends RMStateStore {
private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
+
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -129,12 +157,15 @@
"RMDTMasterKeysRoot";
@VisibleForTesting
public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
- protected static final Version CURRENT_VERSION_INFO =
- Version.newInstance(1, 3);
+ protected static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(2, 0);
+ @VisibleForTesting
+ static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
/* Znode paths */
private String zkRootNodePath;
private String rmAppRoot;
+ private Map rmAppRootHierarchies;
private String rmDTSecretManagerRoot;
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
@@ -144,6 +175,7 @@
@VisibleForTesting
protected String znodeWorkingPath;
+ private int appIdNodeSplitIndex = 0;
/* Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@@ -166,6 +198,18 @@
protected CuratorFramework curatorFramework;
/**
+ * Encapsulates full app node path and corresponding split index.
+ */
+ private final static class AppNodeSplitInfo {
+ private final String path;
+ private final int splitIndex;
+ AppNodeSplitInfo(String path, int splitIndex) {
+ this.path = path;
+ this.splitIndex = splitIndex;
+ }
+ }
+
+ /**
* Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
* In the constructed {@link ACL}, all the users allowed by sourceACLs are
@@ -212,11 +256,30 @@ public synchronized void initInternal(Configuration conf)
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
- fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
+ String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES);
+ rmAppRootHierarchies = new HashMap<>(5);
+ rmAppRootHierarchies.put(0, rmAppRoot);
+ for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+ rmAppRootHierarchies.put(splitIndex,
+ getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
+ }
+
+ fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+ appIdNodeSplitIndex =
+ conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
+ YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
+ if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) {
+ LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
+ YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
+ "Resetting it to " +
+ YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
+ appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
+ }
+
zkAcl = RMZKUtils.getZKAcls(conf);
if (HAUtil.isHAEnabled(conf)) {
@@ -269,6 +332,10 @@ public synchronized void startInternal() throws Exception {
verifyActiveStatusThread.start();
}
create(rmAppRoot);
+ create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
+ for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+ create(rmAppRootHierarchies.get(splitIndex));
+ }
create(rmDTSecretManagerRoot);
create(dtMasterKeysRootPath);
create(delegationTokensRootPath);
@@ -524,42 +591,62 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception {
}
}
- private synchronized void loadRMAppState(RMState rmState) throws Exception {
- List childNodes = getChildren(rmAppRoot);
-
- for (String childNodeName : childNodes) {
- String childNodePath = getNodePath(rmAppRoot, childNodeName);
- byte[] childData = getData(childNodePath);
-
- if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
- // application
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading application from znode: " + childNodeName);
- }
-
- ApplicationId appId = ApplicationId.fromString(childNodeName);
- ApplicationStateDataPBImpl appState =
- new ApplicationStateDataPBImpl(
- ApplicationStateDataProto.parseFrom(childData));
+ private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath,
+ String appIdStr) throws Exception {
+ byte[] appData = getData(appNodePath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading application from znode: " + appNodePath);
+ }
+ ApplicationId appId = ApplicationId.fromString(appIdStr);
+ ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(
+ ApplicationStateDataProto.parseFrom(appData));
+ if (!appId.equals(
+ appState.getApplicationSubmissionContext().getApplicationId())) {
+ throw new YarnRuntimeException("The node name is different " +
+ "from the application id");
+ }
+ rmState.appState.put(appId, appState);
+ loadApplicationAttemptState(appState, appNodePath);
+ }
- if (!appId.equals(
- appState.getApplicationSubmissionContext().getApplicationId())) {
- throw new YarnRuntimeException("The child node name is different "
- + "from the application id");
+ private synchronized void loadRMAppState(RMState rmState) throws Exception {
+ for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
+ String appRoot = rmAppRootHierarchies.get(splitIndex);
+ if (appRoot == null) {
+ continue;
+ }
+ List childNodes = getChildren(appRoot);
+ boolean appNodeFound = false;
+ for (String childNodeName : childNodes) {
+ if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+ appNodeFound = true;
+ if (splitIndex == 0) {
+ loadRMAppStateFromAppNode(rmState,
+ getNodePath(appRoot, childNodeName), childNodeName);
+ } else {
+ // If AppId Node is partitioned.
+ String leafNodePath = getNodePath(appRoot, childNodeName);
+ List leafNodes = getChildren(leafNodePath);
+ for (String leafNodeName : leafNodes) {
+ String appIdStr = childNodeName + leafNodeName;
+ loadRMAppStateFromAppNode(rmState,
+ getNodePath(leafNodePath, leafNodeName), appIdStr);
+ }
+ }
}
-
- rmState.appState.put(appId, appState);
- loadApplicationAttemptState(appState, appId);
- } else {
- LOG.info("Unknown child node with name: " + childNodeName);
+ }
+ if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
+ // If no loaded app exists for a particular split index and the split
+ // index for which apps are being loaded is not the one configured, then
+ // we do not need to keep track of this hierarchy for storing/updating/
+ // removing app/app attempt znodes.
+ rmAppRootHierarchies.remove(splitIndex);
}
}
}
private void loadApplicationAttemptState(ApplicationStateData appState,
- ApplicationId appId)
- throws Exception {
- String appPath = getNodePath(rmAppRoot, appId.toString());
+ String appPath) throws Exception {
List attempts = getChildren(appPath);
for (String attemptIDStr : attempts) {
@@ -575,13 +662,66 @@ private void loadApplicationAttemptState(ApplicationStateData appState,
}
}
- LOG.debug("Done loading applications from ZK state store");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Done loading applications from ZK state store");
+ }
+ }
+
+ /**
+ * Get parent app node path based on full path and split index supplied.
+ * @param appIdPath App id path for which parent needs to be returned.
+ * @param splitIndex split index.
+ * @return parent app node path.
+ */
+ private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
+ return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
+ }
+
+ /**
+ * Checks if parent app node has no leaf nodes and if it does not have,
+ * removes it. Called while removing application.
+ * @param appIdPath path of app id to be removed.
+ * @param splitIndex split index.
+ * @throws Exception
+ */
+ private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
+ throws Exception {
+ if (splitIndex != 0) {
+ String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
+ List children = null;
+ try {
+ children = getChildren(parentAppNode);
+ } catch (KeeperException.NoNodeException ke) {
+ // It should be fine to swallow this exception as the parent app node we
+ // intend to delete is already deleted.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to remove app parent node " + parentAppNode +
+ " as it does not exist.");
+ }
+ return;
+ }
+ // No apps stored under parent path.
+ if (children != null && children.isEmpty()) {
+ try {
+ safeDelete(parentAppNode);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No leaf app node exists. Removing parent node " +
+ parentAppNode);
+ }
+ } catch (KeeperException.NotEmptyException ke) {
+ // It should be fine to swallow this exception as the parent app node
+ // has to be deleted only if it has no children. And this node has.
+ LOG.debug("Unable to remove app parent node " + parentAppNode +
+ " as it has children.");
+ }
+ }
+ }
}
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
- String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
+ String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -596,7 +736,26 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
protected synchronized void updateApplicationStateInternal(
ApplicationId appId, ApplicationStateData appStateDataPB)
throws Exception {
- String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
+ String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false);
+ boolean pathExists = true;
+ // Look for paths based on other split indices if path as per split index
+ // does not exist.
+ if (!exists(nodeUpdatePath)) {
+ AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
+ if (alternatePathInfo != null) {
+ nodeUpdatePath = alternatePathInfo.path;
+ } else {
+ // No alternate path exists. Create path as per configured split index.
+ pathExists = false;
+ if (appIdNodeSplitIndex != 0) {
+ String rootNode =
+ getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
+ if (!exists(rootNode)) {
+ safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
+ }
+ }
+ }
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for app: " + appId + " at: "
@@ -605,13 +764,14 @@ protected synchronized void updateApplicationStateInternal(
byte[] appStateData = appStateDataPB.getProto().toByteArray();
- if (exists(nodeUpdatePath)) {
+ if (pathExists) {
safeSetData(nodeUpdatePath, appStateData, -1);
} else {
- safeCreate(nodeUpdatePath, appStateData, zkAcl,
- CreateMode.PERSISTENT);
- LOG.debug(appId + " znode didn't exist. Created a new znode to"
- + " update the application state.");
+ safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " +
+ "exist. Creating a new znode to update the application state.");
+ }
}
}
@@ -620,8 +780,17 @@ protected synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
- String appDirPath = getNodePath(rmAppRoot,
- appAttemptId.getApplicationId().toString());
+ String appId = appAttemptId.getApplicationId().toString();
+ String appDirPath = getLeafAppIdNodePath(appId, false);
+ // Look for paths based on other split indices.
+ if (!exists(appDirPath)) {
+ AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
+ if (alternatePathInfo == null) {
+ throw new YarnRuntimeException("Unexpected Exception. App node for " +
+ "app " + appId + " not found");
+ }
+ appDirPath = alternatePathInfo.path;
+ }
String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
if (LOG.isDebugEnabled()) {
@@ -639,8 +808,17 @@ protected synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
String appIdStr = appAttemptId.getApplicationId().toString();
+ String appDirPath = getLeafAppIdNodePath(appIdStr, false);
+ // Look for paths based on other split indices.
+ if (!exists(appDirPath)) {
+ AppNodeSplitInfo alternatePathInfo = getAlternatePath(appIdStr);
+ if (alternatePathInfo == null) {
+ throw new YarnRuntimeException("Unexpected Exception. App node for " +
+ "app " + appIdStr + " not found");
+ }
+ appDirPath = alternatePathInfo.path;
+ }
String appAttemptIdStr = appAttemptId.toString();
- String appDirPath = getNodePath(rmAppRoot, appIdStr);
String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
if (LOG.isDebugEnabled()) {
@@ -655,8 +833,11 @@ protected synchronized void updateApplicationAttemptStateInternal(
} else {
safeCreate(nodeUpdatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
- LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
- + " update the application attempt state.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Path " + nodeUpdatePath + " for " + appAttemptId +
+ " didn't exist. Created a new znode to update the application" +
+ " attempt state.");
+ }
}
}
@@ -664,7 +845,17 @@ protected synchronized void updateApplicationAttemptStateInternal(
protected synchronized void removeApplicationAttemptInternal(
ApplicationAttemptId appAttemptId) throws Exception {
String appId = appAttemptId.getApplicationId().toString();
- String appIdRemovePath = getNodePath(rmAppRoot, appId);
+ String appIdRemovePath = getLeafAppIdNodePath(appId, false);
+ // Look for paths based on other split indices.
+ if (!exists(appIdRemovePath)) {
+ AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
+ if (alternatePathInfo != null) {
+ appIdRemovePath = alternatePathInfo.path;
+ } else {
+ // Unexpected. Assume that app attempt has been deleted.
+ return;
+ }
+ }
String attemptIdRemovePath =
getNodePath(appIdRemovePath, appAttemptId.toString());
@@ -679,9 +870,22 @@ protected synchronized void removeApplicationAttemptInternal(
@Override
protected synchronized void removeApplicationStateInternal(
ApplicationStateData appState) throws Exception {
- String appId = appState.getApplicationSubmissionContext().getApplicationId()
- .toString();
- String appIdRemovePath = getNodePath(rmAppRoot, appId);
+ String appId = appState.getApplicationSubmissionContext().
+ getApplicationId().toString();
+ String appIdRemovePath = getLeafAppIdNodePath(appId, false);
+ int splitIndex = appIdNodeSplitIndex;
+ // Look for paths based on other split indices if path as per configured
+ // split index does not exist.
+ if (!exists(appIdRemovePath)) {
+ AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
+ if (alternatePathInfo != null) {
+ appIdRemovePath = alternatePathInfo.path;
+ splitIndex = alternatePathInfo.splitIndex;
+ } else {
+ // Alternate path not found so return.
+ return;
+ }
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
@@ -695,6 +899,9 @@ protected synchronized void removeApplicationStateInternal(
}
safeDelete(appIdRemovePath);
+
+ // Check if we should remove the parent app node as well.
+ checkRemoveParentAppNode(appIdRemovePath, splitIndex);
}
@Override
@@ -815,8 +1022,24 @@ public synchronized void deleteStore() throws Exception {
@Override
public synchronized void removeApplication(ApplicationId removeAppId)
throws Exception {
- String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString());
- delete(appIdRemovePath);
+ String appId = removeAppId.toString();
+ String appIdRemovePath = getLeafAppIdNodePath(appId, false);
+ int splitIndex = appIdNodeSplitIndex;
+ // Look for paths based on other split indices.
+ if (!exists(appIdRemovePath)) {
+ AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
+ if (alternatePathInfo != null) {
+ appIdRemovePath = alternatePathInfo.path;
+ splitIndex = alternatePathInfo.splitIndex;
+ } else {
+ // Alternate path not found so return.
+ return;
+ }
+ }
+ curatorFramework.delete().deletingChildrenIfNeeded().
+ forPath(appIdRemovePath);
+ // Check if we should remove the root node as well.
+ checkRemoveParentAppNode(appIdRemovePath, splitIndex);
}
@VisibleForTesting
@@ -915,6 +1138,79 @@ private void createRootDirRecursively(String path) throws Exception {
}
}
+ /**
+ * Get alternate path for app id if path according to configured split index
+ * does not exist. We look for path based on all possible split indices.
+ * @param appId
+ * @return a {@link AppNodeSplitInfo} object containing the path and split
+ * index if it exists, null otherwise.
+ * @throws Exception
+ */
+ private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
+ for (Map.Entry entry : rmAppRootHierarchies.entrySet()) {
+ // Look for other paths
+ int splitIndex = entry.getKey();
+ if (splitIndex != appIdNodeSplitIndex) {
+ String alternatePath =
+ getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
+ if (exists(alternatePath)) {
+ return new AppNodeSplitInfo(alternatePath, splitIndex);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns leaf app node path based on app id and passed split index. If the
+ * passed flag createParentIfNotExists is true, also creates the parent app
+ * node if it does not exist.
+ * @param appId application id.
+ * @param rootNode app root node based on split index.
+ * @param appIdNodeSplitIdx split index.
+ * @param createParentIfNotExists flag which determines if parent app node
+ * needs to be created(as per split) if it does not exist.
+ * @return leaf app node path.
+ * @throws Exception
+ */
+ private String getLeafAppIdNodePath(String appId, String rootNode,
+ int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
+ if (appIdNodeSplitIdx == 0) {
+ return getNodePath(rootNode, appId);
+ }
+ String nodeName = appId;
+ int splitIdx = nodeName.length() - appIdNodeSplitIdx;
+ rootNode = getNodePath(rootNode, nodeName.substring(0, splitIdx));
+ if (createParentIfNotExists && !exists(rootNode)) {
+ try {
+ safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to create app parent node " + rootNode + " as it " +
+ "already exists.");
+ }
+ }
+ }
+ return getNodePath(rootNode, nodeName.substring(splitIdx));
+ }
+
+ /**
+ * Returns leaf app node path based on app id and configured split index. If
+ * the passed flag createParentIfNotExists is true, also creates the parent
+ * app node if it does not exist.
+ * @param appId application id.
+ * @param createParentIfNotExists flag which determines if parent app node
+ * needs to be created(as per split) if it does not exist.
+ * @return leaf app node path.
+ * @throws Exception
+ */
+ private String getLeafAppIdNodePath(String appId,
+ boolean createParentIfNotExists) throws Exception {
+ return getLeafAppIdNodePath(appId,
+ rmAppRootHierarchies.get(appIdNodeSplitIndex),
+ appIdNodeSplitIndex, createParentIfNotExists);
+ }
+
@VisibleForTesting
byte[] getData(final String path) throws Exception {
return curatorFramework.getData().forPath(path);
@@ -925,11 +1221,13 @@ private void createRootDirRecursively(String path) throws Exception {
return curatorFramework.getACL().forPath(path);
}
- private List getChildren(final String path) throws Exception {
+ @VisibleForTesting
+ List getChildren(final String path) throws Exception {
return curatorFramework.getChildren().forPath(path);
}
- private boolean exists(final String path) throws Exception {
+ @VisibleForTesting
+ boolean exists(final String path) throws Exception {
return curatorFramework.checkExists().forPath(path) != null;
}
@@ -958,6 +1256,11 @@ private void safeCreate(String path, byte[] data, List acl,
}
}
+ /**
+ * Deletes the path. Checks for existence of path as well.
+ * @param path Path to be deleted.
+ * @throws Exception if any problem occurs while performing deletion.
+ */
private void safeDelete(final String path) throws Exception {
if (exists(path)) {
SafeTransaction transaction = new SafeTransaction();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 514e9a0..1fb488d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -200,6 +200,13 @@ protected RMAppAttempt storeAttempt(RMStateStore store,
return mockAttempt;
}
+ protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher,
+ ApplicationAttemptStateData attemptState) {
+ dispatcher.attemptId = attemptState.getAttemptId();
+ store.updateApplicationAttemptState(attemptState);
+ waitNotify(dispatcher);
+ }
+
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
throws Exception {
testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index f71cf25..e9e4393 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -28,6 +28,8 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -40,19 +42,26 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
@@ -61,13 +70,18 @@
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import javax.crypto.SecretKey;
@@ -130,9 +144,21 @@ public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
+ private String getAppNode(String appId, int splitIdx) {
+ String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
+ RM_APP_ROOT;
+ String appPath = appId;
+ if (splitIdx != 0) {
+ int idx = appId.length() - splitIdx;
+ appPath = appId.substring(0, idx) + "/" + appId.substring(idx);
+ return rootPath + "/" + RM_APP_ROOT_HIERARCHIES + "/" +
+ Integer.toString(splitIdx) + "/" + appPath;
+ }
+ return rootPath + "/" + appPath;
+ }
+
public String getAppNode(String appId) {
- return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
- + appId;
+ return getAppNode(appId, 0);
}
public String getAttemptNode(String appId, String attemptId) {
@@ -149,8 +175,7 @@ public void testRetryingCreateRootDir() throws Exception {
}
- public RMStateStore getRMStateStore() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
+ private RMStateStore createStore(Configuration conf) throws Exception {
workingZnode = "/jira/issue/3077/rmstore";
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
@@ -159,6 +184,15 @@ public RMStateStore getRMStateStore() throws Exception {
return this.store;
}
+ public RMStateStore getRMStateStore(Configuration conf) throws Exception {
+ return createStore(conf);
+ }
+
+ public RMStateStore getRMStateStore() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ return createStore(conf);
+ }
+
@Override
public boolean isFinalStateValid() throws Exception {
return 1 ==
@@ -178,8 +212,12 @@ public Version getCurrentVersion() throws Exception {
}
public boolean appExists(RMApp app) throws Exception {
+ String appIdPath = app.getApplicationId().toString();
+ int split =
+ store.getConfig().getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
+ YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
return null != curatorFramework.checkExists()
- .forPath(store.getAppNode(app.getApplicationId().toString()));
+ .forPath(store.getAppNode(appIdPath, split));
}
public boolean attemptExists(RMAppAttempt attempt) throws Exception {
@@ -342,7 +380,6 @@ public void testZKRootPathAcls() throws Exception {
rm.close();
}
- @SuppressWarnings("unchecked")
@Test
public void testFencing() throws Exception {
StateChangeRequestInfo req = new StateChangeRequestInfo(
@@ -527,4 +564,548 @@ public void testDuplicateRMAppDeletion() throws Exception {
}
store.close();
}
+
+ private static String createPath(String root, String... parts) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(root);
+ for (String part : parts) {
+ sb.append("/").append(part);
+ }
+ return sb.toString();
+ }
+
+ private static Configuration createConfForAppNodeSplit(int splitIndex) {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, splitIndex);
+ return conf;
+ }
+
+ private static RMApp createMockAppForRemove(ApplicationId appId,
+ ApplicationAttemptId...attemptIds) {
+ RMApp app = mock(RMApp.class);
+ ApplicationSubmissionContextPBImpl context =
+ new ApplicationSubmissionContextPBImpl();
+ context.setApplicationId(appId);
+ when(app.getApplicationSubmissionContext()).thenReturn(context);
+ when(app.getUser()).thenReturn("test");
+ if (attemptIds.length > 0) {
+ HashMap attempts = new HashMap<>();
+ for (ApplicationAttemptId attemptId : attemptIds) {
+ RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+ when(appAttempt.getAppAttemptId()).thenReturn(attemptId);
+ attempts.put(attemptId, appAttempt);
+ }
+ when(app.getAppAttempts()).thenReturn(attempts);
+ }
+ return app;
+ }
+
+ private static void verifyLoadedApp(ApplicationStateData appState,
+ ApplicationId appId, String user, long submitTime, long startTime,
+ RMAppState state, long finishTime, String diagnostics) {
+ // Check if app is loaded correctly
+ assertNotNull("App " + appId + " should have been loaded.", appState);
+ assertEquals("App submit time in app state", submitTime,
+ appState.getSubmitTime());
+ assertEquals("App start time in app state", startTime,
+ appState.getStartTime());
+ assertEquals("App ID in app state", appId,
+ appState.getApplicationSubmissionContext().getApplicationId());
+ assertEquals("App state", state, appState.getState());
+ assertEquals("Finish time in app state", finishTime,
+ appState.getFinishTime());
+ assertEquals("User in app state", user, appState.getUser());
+ assertEquals("Diagnostics in app state", diagnostics,
+ appState.getDiagnostics());
+ }
+
+ private static void verifyLoadedAttempt(ApplicationStateData appState,
+ ApplicationId appId, ApplicationAttemptId attemptId, String trackingURL,
+ ContainerId masterContainerId, SecretKey clientTokenKey,
+ RMAppAttemptState state, String diagnostics, long finishTime,
+ int amExitStatus, FinalApplicationStatus finalStatus) {
+ ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId);
+ // Check if attempt is loaded correctly
+ assertNotNull(
+ "Attempt " + attemptId + " should have been loaded.", attemptState);
+ assertEquals("Attempt Id in attempt state",
+ attemptId, attemptState.getAttemptId());
+ assertEquals("Master Container Id in attempt state",
+ masterContainerId, attemptState.getMasterContainer().getId());
+ if (null != clientTokenKey) {
+ assertArrayEquals("Client token key in attempt state",
+ clientTokenKey.getEncoded(), attemptState.getAppAttemptTokens().
+ getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+ }
+ assertEquals("Attempt state", state, attemptState.getState());
+ assertEquals("Finish time in attempt state", finishTime,
+ attemptState.getFinishTime());
+ assertEquals("Diagnostics in attempt state", diagnostics,
+ attemptState.getDiagnostics());
+ assertEquals("AM Container exit status in attempt state", amExitStatus,
+ attemptState.getAMContainerExitStatus());
+ assertEquals("Final app status in attempt state", finalStatus,
+ attemptState.getFinalApplicationStatus());
+ assertEquals("Tracking URL in attempt state", trackingURL,
+ attemptState.getFinalTrackingUrl());
+ }
+
+ // Test to verify storing of apps and app attempts in ZK state store with app
+ // node split index configured more than 0.
+ @Test
+ public void testAppNodeSplit() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+
+ long submitTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis() + 1234;
+ Configuration conf = new YarnConfiguration();
+
+ // Get store with app node split config set as 1.
+ RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ // Create RM Context and app token manager.
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(store);
+ AMRMTokenSecretManager appTokenMgr =
+ spy(new AMRMTokenSecretManager(conf, rmContext));
+ MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+ when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+ ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+ new ClientToAMTokenSecretManagerInRM();
+
+ // Store app1.
+ ApplicationAttemptId attemptId1 = ConverterUtils
+ .toApplicationAttemptId("appattempt_1352994193343_0001_000001");
+ ApplicationId appId1 = attemptId1.getApplicationId();
+ storeApp(store, appId1, submitTime, startTime);
+ waitNotify(dispatcher);
+
+ // Store app2 with app id application_1352994193343_120213.
+ ApplicationId appId21 = ConverterUtils.toApplicationId(
+ "application_1352994193343_120213");
+ storeApp(store, appId21, submitTime, startTime);
+ waitNotify(dispatcher);
+
+ // Store attempt associated with app1.
+ Token appAttemptToken1 =
+ generateAMRMToken(attemptId1, appTokenMgr);
+ SecretKey clientTokenKey1 =
+ clientToAMTokenMgr.createMasterKey(attemptId1);
+ ContainerId containerId1 =
+ ConverterUtils.toContainerId("container_1352994193343_0001_01_000001");
+ storeAttempt(store, attemptId1, containerId1.toString(), appAttemptToken1,
+ clientTokenKey1, dispatcher);
+ String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
+ ApplicationAttemptId attemptId2 =
+ ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
+
+ // Store attempt associated with app2.
+ Token appAttemptToken2 =
+ generateAMRMToken(attemptId2, appTokenMgr);
+ SecretKey clientTokenKey2 =
+ clientToAMTokenMgr.createMasterKey(attemptId2);
+ Credentials attemptCred = new Credentials();
+ attemptCred.addSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME,
+ clientTokenKey2.getEncoded());
+ ContainerId containerId2 =
+ ConverterUtils.toContainerId("container_1352994193343_0001_02_000001");
+ storeAttempt(store, attemptId2, containerId2.toString(), appAttemptToken2,
+ clientTokenKey2, dispatcher);
+
+ // Store another app which will be removed.
+ ApplicationAttemptId attemptIdRemoved = ConverterUtils
+ .toApplicationAttemptId("appattempt_1352994193343_0002_000001");
+ ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
+ storeApp(store, appIdRemoved, submitTime, startTime);
+ waitNotify(dispatcher);
+ storeAttempt(store, attemptIdRemoved,
+ "container_1352994193343_0002_01_000001", null, null, dispatcher);
+ // Remove the app.
+ RMApp mockRemovedApp =
+ createMockAppForRemove(appIdRemoved, attemptIdRemoved);
+ store.removeApplication(mockRemovedApp);
+
+ // Remove application
+ storeApp(store, appIdRemoved, submitTime, startTime);
+ waitNotify(dispatcher);
+ storeAttempt(store, attemptIdRemoved,
+ "container_1352994193343_0002_01_000001", null, null, dispatcher);
+ store.removeApplication(mockRemovedApp);
+ // Close state store
+ store.close();
+
+ // Load state store
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+ store.setRMDispatcher(dispatcher);
+ RMState state = store.loadState();
+ java.util.Map rmAppState =
+ state.getApplicationState();
+ // Check if application_1352994193343_120213 (i.e. app2) exists in state
+ // store as per split index.
+ String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
+ ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT,
+ ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, "1",
+ "application_1352994193343_12021", "3");
+ assertTrue("Application with id application_1352994193343_120213" +
+ " does not exist as per split in state store.",
+ ((ZKRMStateStore)store).exists(path));
+
+ // Verify loaded apps and attempts based on the operations we did before
+ // reloading the state store.
+ ApplicationStateData appState = rmAppState.get(appId1);
+ verifyLoadedApp(
+ appState, appId1, "test", submitTime, startTime, null, 0, "");
+ assertEquals("Attempts loaded for " + appId1, 2, appState.attempts.size());
+ verifyLoadedAttempt(appState, appId1, attemptId1, "N/A", containerId1,
+ clientTokenKey1, null, "", 0, -1000, null);
+ verifyLoadedAttempt(appState, appId1, attemptId2, "N/A", containerId2,
+ clientTokenKey2, null, "", 0, -1000, null);
+
+ // Update app state for app1.
+ ApplicationStateData appState2 = ApplicationStateData.newInstance(
+ submitTime, startTime, "test",
+ appState.getApplicationSubmissionContext(), RMAppState.FINISHED,
+ "appDiagnostics", 1234, null);
+ appState2.attempts.putAll(appState.attempts);
+ store.updateApplicationState(appState2);
+ waitNotify(dispatcher);
+
+ // Update attempt state for app2.
+ Container container = new ContainerPBImpl();
+ container.setId(containerId2);
+ ApplicationAttemptStateData newAttemptState =
+ ApplicationAttemptStateData.newInstance(attemptId2, container,
+ attemptCred, startTime, RMAppAttemptState.FINISHED, "myTrackingUrl",
+ "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
+ 100, 0, 0, 0, 0, 0);
+ updateAttempt(store, dispatcher, newAttemptState);
+
+ // Test updating app/attempt for app whose initial state is not saved
+ ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
+ ApplicationSubmissionContext dummyContext =
+ new ApplicationSubmissionContextPBImpl();
+ dummyContext.setApplicationId(dummyAppId);
+ ApplicationStateData dummyApp = ApplicationStateData.newInstance(
+ appState.getSubmitTime(), appState.getStartTime(), appState.getUser(),
+ dummyContext, RMAppState.FINISHED, "appDiagnostics", 1234, null);
+ store.updateApplicationState(dummyApp);
+ waitNotify(dispatcher);
+
+ ApplicationAttemptId dummyAttemptId =
+ ApplicationAttemptId.newInstance(dummyAppId, 6);
+ container = new ContainerPBImpl();
+ container.setId(ContainerId.newContainerId(dummyAttemptId, 1));
+ ApplicationAttemptStateData dummyAttempt =
+ ApplicationAttemptStateData.newInstance(dummyAttemptId,
+ container, attemptCred, startTime, RMAppAttemptState.FINISHED,
+ "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
+ 111, 0, 0, 0, 0, 0);
+ updateAttempt(store, dispatcher, dummyAttempt);
+ // Close the store
+ store.close();
+
+ // Check updated application state.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+ store.setRMDispatcher(dispatcher);
+ RMState newRMState = store.loadState();
+ Map newRMAppState =
+ newRMState.getApplicationState();
+ ApplicationStateData dummyAppState = newRMAppState.get(dummyAppId);
+ assertNotNull(dummyAppId + " is not there in loaded apps", dummyAppState);
+ verifyLoadedApp(dummyAppState, dummyAppId, "test",
+ submitTime, startTime, RMAppState.FINISHED, 1234, "appDiagnostics");
+ ApplicationStateData updatedAppState = newRMAppState.get(appId1);
+ assertNotNull(appId1 + " is not there in loaded apps", updatedAppState);
+ verifyLoadedApp(updatedAppState, appId1, "test",
+ submitTime, startTime, RMAppState.FINISHED, 1234, "appDiagnostics");
+ // Check updated attempt state.
+ assertEquals("Attempts loaded for app " + dummyAppId, 1,
+ dummyAppState.attempts.size());
+ verifyLoadedAttempt(dummyAppState, dummyAppId, dummyAttemptId,
+ "myTrackingUrl", container.getId(), clientTokenKey2,
+ RMAppAttemptState.FINISHED, "attemptDiagnostics", 0,
+ 111, FinalApplicationStatus.SUCCEEDED);
+ assertEquals("Attempts loaded for app " + appId1, 2,
+ updatedAppState.attempts.size());
+ verifyLoadedAttempt(updatedAppState, appId1, attemptId1, "N/A",
+ containerId1, clientTokenKey1, null, "", 0, -1000, null);
+ verifyLoadedAttempt(updatedAppState, appId1, attemptId2, "myTrackingUrl",
+ containerId2, clientTokenKey2, RMAppAttemptState.FINISHED,
+ "attemptDiagnostics", 0, 100, FinalApplicationStatus.SUCCEEDED);
+
+ // assert store is in expected state after everything is cleaned
+ assertTrue("Store is not in expected state", zkTester.isFinalStateValid());
+ store.close();
+ }
+
+ // Test to verify storing of apps and app attempts in ZK state store with app
+ // node split index config changing across restarts.
+ @Test
+ public void testAppNodeSplitChangeAcrossRestarts() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ long submitTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis() + 1234;
+ Configuration conf = new YarnConfiguration();
+
+ // Create store with app node split set as 1.
+ RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(store);
+ AMRMTokenSecretManager appTokenMgr =
+ spy(new AMRMTokenSecretManager(conf, rmContext));
+ MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+ when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+ ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+ new ClientToAMTokenSecretManagerInRM();
+
+ // Store app1.
+ ApplicationAttemptId attemptId1 = ConverterUtils
+ .toApplicationAttemptId("appattempt_1442994194053_0001_000001");
+ ApplicationId appId1 = attemptId1.getApplicationId();
+ storeApp(store, appId1, submitTime, startTime);
+ waitNotify(dispatcher);
+
+ // Store attempts for app1.
+ Token appAttemptToken1 =
+ generateAMRMToken(attemptId1, appTokenMgr);
+ SecretKey clientTokenKey1 = clientToAMTokenMgr.createMasterKey(attemptId1);
+ ContainerId containerId1 =
+ ConverterUtils.toContainerId("container_1442994194053_0001_01_000001");
+ storeAttempt(store, attemptId1, containerId1.toString(), appAttemptToken1,
+ clientTokenKey1, dispatcher);
+ String appAttemptIdStr2 = "appattempt_1442994194053_0001_000002";
+ ApplicationAttemptId attemptId2 =
+ ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
+
+ Token appAttemptToken2 =
+ generateAMRMToken(attemptId2, appTokenMgr);
+ SecretKey clientTokenKey2 = clientToAMTokenMgr.createMasterKey(attemptId2);
+ ContainerId containerId2 =
+ ConverterUtils.toContainerId("container_1442994194053_0001_02_000001");
+ storeAttempt(store, attemptId2, containerId2.toString(), appAttemptToken2,
+ clientTokenKey2, dispatcher);
+ Credentials attemptCred = new Credentials();
+ attemptCred.addSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME,
+ clientTokenKey2.getEncoded());
+
+ // Store app2 and associated attempt.
+ ApplicationAttemptId attemptId11 = ConverterUtils
+ .toApplicationAttemptId("appattempt_1442994194053_0002_000001");
+ ApplicationId appId11 = attemptId11.getApplicationId();
+ storeApp(store, appId11, submitTime, startTime);
+ waitNotify(dispatcher);
+ ContainerId containerId11 =
+ ConverterUtils.toContainerId("container_1442994194053_0002_01_000001");
+ storeAttempt(store, attemptId11, containerId11.toString(), null, null,
+ dispatcher);
+ // Close state store
+ store.close();
+
+ // Load state store with app node split config of 2.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(2));
+ store.setRMDispatcher(dispatcher);
+ RMState state = store.loadState();
+ Map rmAppState =
+ state.getApplicationState();
+ ApplicationId appId21 = ConverterUtils.toApplicationId(
+ "application_1442994194053_120213");
+ storeApp(store, appId21, submitTime, startTime);
+ waitNotify(dispatcher);
+
+ // Check if app is loaded correctly despite change in split index.
+ ApplicationStateData appState = rmAppState.get(appId1);
+ verifyLoadedApp(
+ appState, appId1, "test", submitTime, startTime, null, 0, "");
+ assertEquals("Attempts loaded for app " + appId1, 2,
+ appState.attempts.size());
+ verifyLoadedAttempt(appState, appId1, attemptId1, "N/A", containerId1,
+ clientTokenKey1, null, "", 0, -1000, null);
+ verifyLoadedAttempt(appState, appId1, attemptId2, "N/A", containerId2,
+ clientTokenKey2, null, "", 0, -1000, null);
+
+ // Update app/attempt state
+ ApplicationStateData appState2 =
+ ApplicationStateData.newInstance(appState.getSubmitTime(),
+ appState.getStartTime(), appState.getUser(),
+ appState.getApplicationSubmissionContext(), RMAppState.FINISHED,
+ "appDiagnostics", 1234, null);
+ appState2.attempts.putAll(appState.attempts);
+ store.updateApplicationState(appState2);
+ waitNotify(dispatcher);
+
+ Container container = new ContainerPBImpl();
+ container.setId(containerId2);
+ ApplicationAttemptStateData newAttemptState =
+ ApplicationAttemptStateData.newInstance(attemptId2, container,
+ attemptCred, startTime, RMAppAttemptState.FINISHED, "myTrackingUrl",
+ "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
+ 100, 0, 0, 0, 0, 0);
+ updateAttempt(store, dispatcher, newAttemptState);
+
+ // Test updating app/attempt for app whose initial state is not saved
+ ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
+ ApplicationSubmissionContext dummyContext =
+ new ApplicationSubmissionContextPBImpl();
+ dummyContext.setApplicationId(dummyAppId);
+ ApplicationStateData dummyApp = ApplicationStateData.newInstance(
+ appState.getSubmitTime(), appState.getStartTime(), appState.getUser(),
+ dummyContext, RMAppState.FINISHED, "appDiagnostics", 1234, null);
+ store.updateApplicationState(dummyApp);
+ waitNotify(dispatcher);
+
+ ApplicationAttemptId dummyAttemptId =
+ ApplicationAttemptId.newInstance(dummyAppId, 6);
+ container = new ContainerPBImpl();
+ container.setId(ContainerId.newContainerId(dummyAttemptId, 1));
+ ApplicationAttemptStateData dummyAttempt =
+ ApplicationAttemptStateData.newInstance(dummyAttemptId,
+ container, attemptCred, startTime, RMAppAttemptState.FINISHED,
+ "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
+ 111, 0, 0, 0, 0, 0);
+ updateAttempt(store, dispatcher, dummyAttempt);
+ // Close the store
+ store.close();
+
+ // Load state store this time with split index of 0.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(0));
+ store.setRMDispatcher(dispatcher);
+ state = store.loadState();
+ assertEquals("Number of Apps loaded", 4,
+ state.getApplicationState().size());
+ rmAppState = state.getApplicationState();
+ appState = rmAppState.get(appId1);
+ assertNotNull(appId1 + " is not there in loaded apps", appState);
+ // Verify if apps and attempts are loaded correctly.
+ verifyLoadedApp(appState, appId1, "test", submitTime, startTime,
+ RMAppState.FINISHED, 1234, "appDiagnostics");
+ assertEquals("Number of attempts loaded for app " + appId1, 2,
+ appState.attempts.size());
+ verifyLoadedAttempt(appState, appId1, attemptId1, "N/A", containerId1,
+ clientTokenKey1, null, "", 0, -1000, null);
+ verifyLoadedAttempt(appState, appId1, attemptId2, "myTrackingUrl",
+ containerId2, clientTokenKey2, RMAppAttemptState.FINISHED,
+ "attemptDiagnostics", 0, 100, FinalApplicationStatus.SUCCEEDED);
+ // Remove attempt1
+ store.removeApplicationAttempt(attemptId1);
+ ApplicationId appId31 = ConverterUtils.toApplicationId(
+ "application_1442994195071_0045");
+ storeApp(store, appId31, submitTime, startTime);
+ waitNotify(dispatcher);
+ // Close state store.
+ store.close();
+
+ // Load state store with split index of 3.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
+ store.setRMDispatcher(dispatcher);
+ state = store.loadState();
+ assertEquals("Number of apps loaded", 5,
+ state.getApplicationState().size());
+ rmAppState = state.getApplicationState();
+ appState = rmAppState.get(dummyAppId);
+ assertNotNull(dummyAppId + " is not there in loaded apps", appState);
+ // Verify if all the apps and attempts are loaded correctly.
+ verifyLoadedApp(appState, dummyAppId, "test", submitTime, startTime,
+ RMAppState.FINISHED, 1234, "appDiagnostics");
+ assertEquals("Number of attempts loaded for app " + dummyAppId, 1,
+ appState.attempts.size());
+ verifyLoadedAttempt(appState, dummyAppId, dummyAttemptId, "myTrackingUrl",
+ container.getId(), clientTokenKey2, RMAppAttemptState.FINISHED,
+ "attemptDiagnostics", 0, 111, FinalApplicationStatus.SUCCEEDED);
+ appState = rmAppState.get(appId31);
+ assertNotNull(appId31 + " is not there in loaded apps", appState);
+ verifyLoadedApp(
+ appState, appId31, "test", submitTime, startTime, null, 0, "");
+ assertEquals("Number of attempts loaded for app " + appId31, 0,
+ appState.attempts.size());
+ appState = rmAppState.get(appId21);
+ assertNotNull(appId21 + " is not there in loaded apps", appState);
+ verifyLoadedApp(
+ appState, appId21, "test", submitTime, startTime, null, 0, "");
+ assertEquals("Number of attempts loaded for app " + appId21, 0,
+ appState.attempts.size());
+ appState = rmAppState.get(appId11);
+ assertNotNull(appId11 + " is not there in loaded apps", appState);
+ verifyLoadedApp(
+ appState, appId11, "test", submitTime, startTime, null, 0, "");
+ assertEquals("Number of attempts loaded for app " + appId11, 1,
+ appState.attempts.size());
+ verifyLoadedAttempt(appState, appId11, attemptId11, "N/A", containerId11,
+ null, null, "", 0, -1000, null);
+ appState = rmAppState.get(appId1);
+ assertNotNull(appId1 + " is not there in loaded apps", appState);
+ verifyLoadedApp(appState, appId1, "test", submitTime, startTime,
+ RMAppState.FINISHED, 1234, "appDiagnostics");
+ assertEquals("Number of attempts loaded for app " + appId1, 1,
+ appState.attempts.size());
+ verifyLoadedAttempt(appState, appId1, attemptId2, "myTrackingUrl",
+ containerId2, clientTokenKey2, RMAppAttemptState.FINISHED,
+ "attemptDiagnostics", 0, 100, FinalApplicationStatus.SUCCEEDED);
+
+ // Store another app.
+ String appRootPath =
+ createPath(((ZKRMStateStore)store).znodeWorkingPath,
+ ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT);
+ String hierarchicalPath = createPath(appRootPath,
+ ZKRMStateStore.RM_APP_ROOT_HIERARCHIES);
+ ApplicationId appId41 = ConverterUtils.toApplicationId(
+ "application_1442994195087_0001");
+ storeApp(store, appId41, submitTime, startTime);
+ waitNotify(dispatcher);
+ // Check how many apps exist in each of the hierarchy based paths. 0 paths
+ // should exist in "HIERARCHIES/4" path as app split index was never set
+ // as 4 in tests above.
+ assertEquals("Number of childrens for path " + appRootPath, 2,
+ ((ZKRMStateStore)store).getChildren(appRootPath).size());
+ assertEquals("Number of childrens for path " + hierarchicalPath + "/1", 1,
+ ((ZKRMStateStore)store).getChildren(hierarchicalPath + "/1").size());
+ assertEquals("Number of childrens for path " + hierarchicalPath + "/2", 2,
+ ((ZKRMStateStore)store).getChildren(hierarchicalPath + "/2").size());
+ assertEquals("Number of childrens for path " + hierarchicalPath + "/3", 1,
+ ((ZKRMStateStore)store).getChildren(hierarchicalPath + "/3").size());
+ assertEquals("Number of childrens for path " + hierarchicalPath + "/4", 0,
+ ((ZKRMStateStore)store).getChildren(hierarchicalPath + "/4").size());
+ String path = createPath(
+ hierarchicalPath, "3", "application_1442994195087_0", "001");
+ assertTrue("application_1442994195087_0001 should exist in path " + path,
+ ((ZKRMStateStore)store).exists(createPath(path)));
+
+ // Remove applications
+ RMApp mockRemovedApp = createMockAppForRemove(appId11, attemptId11);
+ store.removeApplication(mockRemovedApp);
+ storeApp(store, appId11, submitTime, startTime);
+ waitNotify(dispatcher);
+ storeAttempt(store, attemptId11, "container_1442994194053_0002_01_000001",
+ null, null, dispatcher);
+ store.removeApplication(mockRemovedApp);
+
+ RMApp mockRemovedApp1 = createMockAppForRemove(appId41);
+ store.removeApplication(mockRemovedApp1);
+
+ mockRemovedApp1 = createMockAppForRemove(appId31);
+ store.removeApplication(mockRemovedApp1);
+
+ mockRemovedApp1 = createMockAppForRemove(appId21);
+ store.removeApplication(mockRemovedApp1);
+
+ mockRemovedApp = createMockAppForRemove(dummyAppId, dummyAttemptId);
+ store.removeApplication(mockRemovedApp);
+
+ mockRemovedApp = createMockAppForRemove(appId1, attemptId1, attemptId2);
+ store.removeApplication(mockRemovedApp);
+ store.close();
+
+ // Load state store with split index of 3 again. As all apps have been
+ // removed nothing should be loaded back.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
+ store.setRMDispatcher(dispatcher);
+ state = store.loadState();
+ assertEquals("Number of apps loaded", 0,
+ state.getApplicationState().size());
+ // Close the state store.
+ store.close();
+ }
}