diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a7f485d..ce524c9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -406,6 +406,10 @@ private static void addDeprecatedKeys() {
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
+ public static final String ZK_APPID_NODE_SPLIT_INDEX =
+ RM_ZK_PREFIX + "appid-node.split-index";
+ public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 4;
+
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 66400c8..377e5b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -482,6 +482,22 @@
+ Index at which last section of application id znode stored in
+ zookeeper RM state store will be split and stored as two different znodes
+ (parent-child). Here each section is separated by _ in application id. For
+ instance, with no split appid znode will be of the form
+ application_1352994193343_0001. If the value of this config is 1, the
+ appid znode will be broken into two parts application_1352994193343_000
+ and 1 respectively with former being the parent node.
+ application_1352994193343_0002 will then be stored as 2 under the parent
+ node application_1352994193343_000. This config can take values from 1 to 4.
+ 4 means there will be no split. If configuration value is outside this
+ range, no split will be done.
+ yarn.resourcemanager.zk-appid-node.split-index
+ 4
+
+
+
Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
election for this cluster and ensures it does not affect
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 9da6400..c6b89b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -93,7 +93,7 @@
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final Version CURRENT_VERSION_INFO = Version
- .newInstance(1, 2);
+ .newInstance(2, 0);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -147,6 +147,8 @@
private String delegationTokensRootPath;
private String dtSequenceNumberPath;
private String amrmTokenSecretManagerRoot;
+ private static final int NO_APPID_NODE_SPLIT = 4;
+ private int appIdNodeSplitIndex = NO_APPID_NODE_SPLIT;
@VisibleForTesting
protected String znodeWorkingPath;
@@ -195,8 +197,8 @@
List zkRootNodeAcl = new ArrayList();
for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL(
- ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
- acl.getId()));
+ ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
+ acl.getId()));
}
zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
@@ -227,6 +229,13 @@ public synchronized void initInternal(Configuration conf) throws Exception {
conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+ appIdNodeSplitIndex =
+ conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
+ YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
+ if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 3) {
+ appIdNodeSplitIndex = NO_APPID_NODE_SPLIT;
+ }
+
if (HAUtil.isHAEnabled(conf)) {
zkRetryInterval = zkSessionTimeout / numRetries;
} else {
@@ -550,37 +559,64 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception {
}
}
+ private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath,
+ String appIdStr) throws Exception {
+ byte[] appData = getDataWithRetries(appNodePath, true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading application from znode: " + appNodePath);
+ }
+ ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+ ApplicationStateDataPBImpl appState =
+ new ApplicationStateDataPBImpl(
+ ApplicationStateDataProto.parseFrom(appData));
+ if (!appId.equals(
+ appState.getApplicationSubmissionContext().getApplicationId())) {
+ throw new YarnRuntimeException("The node name is different " +
+ "from the application id");
+ }
+ rmState.appState.put(appId, appState);
+ loadApplicationAttemptState(appState, appNodePath);
+ }
+
private synchronized void loadRMAppState(RMState rmState) throws Exception {
List childNodes = getChildrenWithRetries(rmAppRoot, false);
for (String childNodeName : childNodes) {
- String childNodePath = getNodePath(rmAppRoot, childNodeName);
- byte[] childData = getDataWithRetries(childNodePath, false);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
- // application
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading application from znode: " + childNodeName);
- }
- ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
- ApplicationStateDataPBImpl appState =
- new ApplicationStateDataPBImpl(
- ApplicationStateDataProto.parseFrom(childData));
- if (!appId.equals(
- appState.getApplicationSubmissionContext().getApplicationId())) {
- throw new YarnRuntimeException("The child node name is different " +
- "from the application id");
+ int appIdLastSectionLen = childNodeName.substring(
+ childNodeName.lastIndexOf("_") + 1).length();
+ if (appIdNodeSplitIndex == NO_APPID_NODE_SPLIT &&
+ appIdLastSectionLen == 4) {
+ loadRMAppStateFromAppNode(rmState,
+ getNodePath(rmAppRoot, childNodeName), childNodeName);
+ } else {
+ // If AppId Node is partitioned as per config
+ String leafNodePath = getNodePath(rmAppRoot, childNodeName);
+ List leafNodes = getChildrenWithRetries(leafNodePath, true);
+ if (leafNodes.size() == 0) {
+ // There must always be leaf nodes
+ throw new YarnRuntimeException("Application ID node " +
+ "split is not as per config. Pls format the state store.");
+ }
+ for (String leafNodeName : leafNodes) {
+ if (leafNodeName.length() == appIdNodeSplitIndex) {
+ String appIdStr = childNodeName + leafNodeName;
+ loadRMAppStateFromAppNode(rmState,
+ getNodePath(leafNodePath, leafNodeName), appIdStr);
+ } else {
+ throw new YarnRuntimeException("Application ID node split is " +
+ "not as per config. Please format the state store.");
+ }
+ }
}
- rmState.appState.put(appId, appState);
- loadApplicationAttemptState(appState, appId);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
}
+ LOG.debug("Done loading applications from ZK state store");
}
}
private void loadApplicationAttemptState(ApplicationStateData appState,
- ApplicationId appId)
- throws Exception {
- String appPath = getNodePath(rmAppRoot, appId.toString());
+ String appPath) throws Exception {
List attempts = getChildrenWithRetries(appPath, false);
for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
@@ -594,27 +630,25 @@ private void loadApplicationAttemptState(ApplicationStateData appState,
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}
- LOG.debug("Done loading applications from ZK state store");
}
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
- String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
+ String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), false);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
createWithRetries(nodeCreatePath, appStateData, zkAcl,
- CreateMode.PERSISTENT);
-
+ CreateMode.PERSISTENT);
}
@Override
public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
- String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
+ String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), true);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for app: " + appId + " at: "
@@ -637,8 +671,8 @@ public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
- String appDirPath = getNodePath(rmAppRoot,
- appAttemptId.getApplicationId().toString());
+ String appDirPath =
+ getLeafAppIdNodePath(appAttemptId.getApplicationId().toString());
String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
if (LOG.isDebugEnabled()) {
@@ -647,7 +681,7 @@ public synchronized void storeApplicationAttemptStateInternal(
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
- CreateMode.PERSISTENT);
+ CreateMode.PERSISTENT);
}
@Override
@@ -657,7 +691,7 @@ public synchronized void updateApplicationAttemptStateInternal(
throws Exception {
String appIdStr = appAttemptId.getApplicationId().toString();
String appAttemptIdStr = appAttemptId.toString();
- String appDirPath = getNodePath(rmAppRoot, appIdStr);
+ String appDirPath = getLeafAppIdNodePath(appIdStr);
String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
@@ -671,7 +705,7 @@ public synchronized void updateApplicationAttemptStateInternal(
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
- + " update the application attempt state.");
+ + " update the application attempt state.");
}
}
@@ -681,7 +715,19 @@ public synchronized void removeApplicationStateInternal(
throws Exception {
String appId = appState.getApplicationSubmissionContext().getApplicationId()
.toString();
- String appIdRemovePath = getNodePath(rmAppRoot, appId);
+ String rootPath = rmAppRoot;
+ String appIdPath = appId;
+ String appIdPartRemovePath;
+ if (appIdNodeSplitIndex != NO_APPID_NODE_SPLIT) {
+ int lastSegLen = appId.length() - appId.lastIndexOf("_") - 1;
+ int splitIdx = lastSegLen - appIdNodeSplitIndex;
+ appIdPartRemovePath = getNodePath(rmAppRoot, appId.substring(0,
+ appId.lastIndexOf("_") + splitIdx + 1));
+ rootPath = appIdPartRemovePath;
+ appIdPath = appId.substring(
+ appId.lastIndexOf("_") + splitIdx + 1);
+ }
+ String appIdRemovePath = getNodePath(rootPath, appIdPath);
ArrayList opList = new ArrayList();
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
@@ -694,6 +740,15 @@ public synchronized void removeApplicationStateInternal(
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
+ " and its attempts.");
}
+
+ // Check if we should remove the root node as well.. Try to
+ // get this done within the same fencing.
+ if (appIdNodeSplitIndex != NO_APPID_NODE_SPLIT &&
+ getChildrenWithRetries(rootPath, true).isEmpty()) {
+ opList.add(Op.delete(rootPath, -1));
+ LOG.info("No leaf app node exists, removing parent node " +
+ rootPath);
+ }
doMultiWithRetries(opList);
}
@@ -711,7 +766,7 @@ protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
ArrayList opList = new ArrayList();
String nodeRemovePath =
- getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
@@ -903,6 +958,38 @@ public synchronized void processWatchEvent(ZooKeeper zk,
}
}
+ private String getLeafAppIdNodePath(String appId) {
+ String path = null;
+ try {
+ path = getLeafAppIdNodePath(appId, false);
+ } catch (Exception e) {
+ // This should not happen (since no ZK node is being created..)
+ throw new YarnRuntimeException("Unexpected Exception", e);
+ }
+ return path;
+ }
+
+ private String getLeafAppIdNodePath(String appId, boolean createIfNotExists)
+ throws Exception {
+ String rootNode = rmAppRoot;
+ String nodeName = appId;
+ if (appIdNodeSplitIndex != NO_APPID_NODE_SPLIT) {
+ int lastSegLen = nodeName.length() - nodeName.lastIndexOf("_") - 1;
+ int splitIdx = lastSegLen - appIdNodeSplitIndex;
+ String partNodeCreatePath = getNodePath(rmAppRoot, nodeName.substring(0,
+ nodeName.lastIndexOf("_") + splitIdx + 1));
+ if (!createIfNotExists ||
+ existsWithRetries(partNodeCreatePath, true) == null) {
+ createWithRetries(partNodeCreatePath, null, zkAcl,
+ CreateMode.PERSISTENT);
+ }
+ rootNode = partNodeCreatePath;
+ nodeName = nodeName.substring(
+ nodeName.lastIndexOf("_") + splitIdx + 1);
+ }
+ return getNodePath(rootNode, nodeName);
+ }
+
@VisibleForTesting
@Private
@Unstable
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 6b09d39..0507b36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -18,14 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import javax.crypto.SecretKey;
@@ -35,28 +41,37 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
@@ -102,8 +117,7 @@ public String getAppNode(String appId) {
}
}
- public RMStateStore getRMStateStore() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
+ private RMStateStore createStore(Configuration conf) throws Exception {
workingZnode = "/jira/issue/3077/rmstore";
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
@@ -112,6 +126,16 @@ public RMStateStore getRMStateStore() throws Exception {
return this.store;
}
+ public RMStateStore getRMStateStore(Configuration conf) throws Exception {
+ return createStore(conf);
+ }
+
+
+ public RMStateStore getRMStateStore() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ return createStore(conf);
+ }
+
@Override
public boolean isFinalStateValid() throws Exception {
List nodes = client.getChildren(store.znodeWorkingPath, false);
@@ -218,7 +242,6 @@ private Configuration createHARMConf(
return conf;
}
- @SuppressWarnings("unchecked")
@Test
public void testFencing() throws Exception {
StateChangeRequestInfo req = new StateChangeRequestInfo(
@@ -370,4 +393,270 @@ public void testFencedState() throws Exception {
store.close();
}
+
+ private Configuration createConfForAppNodeSplit() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, 1);
+ return conf;
+ }
+
+ @Test
+ public void testAppNodeSplit() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+
+ long submitTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis() + 1234;
+ Configuration conf = new YarnConfiguration();
+
+ RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit());
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(store);
+
+ AMRMTokenSecretManager appTokenMgr =
+ spy(new AMRMTokenSecretManager(conf, rmContext));
+
+ MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+ when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+
+ ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+ new ClientToAMTokenSecretManagerInRM();
+
+ ApplicationAttemptId attemptId1 = ConverterUtils
+ .toApplicationAttemptId("appattempt_1352994193343_0001_000001");
+ ApplicationId appId1 = attemptId1.getApplicationId();
+ storeApp(store, appId1, submitTime, startTime);
+
+ Token appAttemptToken1 =
+ generateAMRMToken(attemptId1, appTokenMgr);
+ SecretKey clientTokenKey1 =
+ clientToAMTokenMgr.createMasterKey(attemptId1);
+
+ ContainerId containerId1 = storeAttempt(store, attemptId1,
+ "container_1352994193343_0001_01_000001",
+ appAttemptToken1, clientTokenKey1, dispatcher);
+
+ String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
+ ApplicationAttemptId attemptId2 =
+ ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
+
+ Token appAttemptToken2 =
+ generateAMRMToken(attemptId2, appTokenMgr);
+ SecretKey clientTokenKey2 =
+ clientToAMTokenMgr.createMasterKey(attemptId2);
+
+ ContainerId containerId2 = storeAttempt(store, attemptId2,
+ "container_1352994193343_0001_02_000001",
+ appAttemptToken2, clientTokenKey2, dispatcher);
+
+ ApplicationAttemptId attemptIdRemoved = ConverterUtils
+ .toApplicationAttemptId("appattempt_1352994193343_0002_000001");
+ ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
+ storeApp(store, appIdRemoved, submitTime, startTime);
+ storeAttempt(store, attemptIdRemoved,
+ "container_1352994193343_0002_01_000001", null, null, dispatcher);
+
+ RMApp mockRemovedApp = mock(RMApp.class);
+ RMAppAttemptMetrics mockRmAppAttemptMetrics =
+ mock(RMAppAttemptMetrics.class);
+ HashMap attempts =
+ new HashMap();
+ ApplicationSubmissionContext context =
+ new ApplicationSubmissionContextPBImpl();
+ context.setApplicationId(appIdRemoved);
+ when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
+ when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
+ when(mockRemovedApp.getAppAttempts()).thenReturn(attempts);
+ when(mockRemovedApp.getUser()).thenReturn("user1");
+ RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class);
+ when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved);
+ when(mockRemovedAttempt.getRMAppAttemptMetrics())
+ .thenReturn(mockRmAppAttemptMetrics);
+ when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
+ .thenReturn(new AggregateAppResourceUsage(0,0));
+ attempts.put(attemptIdRemoved, mockRemovedAttempt);
+ store.removeApplication(mockRemovedApp);
+
+ // Remove application
+ storeApp(store, appIdRemoved, submitTime, startTime);
+ storeAttempt(store, attemptIdRemoved,
+ "container_1352994193343_0002_01_000001", null, null, dispatcher);
+ store.removeApplication(mockRemovedApp);
+
+ // Close state store
+ Thread.sleep(1000);
+ store.close();
+
+ // Load state store
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit());
+ store.setRMDispatcher(dispatcher);
+ RMState state = store.loadState();
+ java.util.Map rmAppState =
+ state.getApplicationState();
+
+ ApplicationStateData appState = rmAppState.get(appId1);
+ // Check if app is loaded correctly
+ assertNotNull(appState);
+ assertEquals(submitTime, appState.getSubmitTime());
+ assertEquals(startTime, appState.getStartTime());
+ assertEquals(appId1,
+ appState.getApplicationSubmissionContext().getApplicationId());
+ ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1);
+ // Check if attempt1 is loaded correctly
+ assertNotNull(attemptState);
+ assertEquals(attemptId1, attemptState.getAttemptId());
+ assertEquals(-1000, attemptState.getAMContainerExitStatus());
+ assertEquals(containerId1, attemptState.getMasterContainer().getId());
+ assertArrayEquals(
+ clientTokenKey1.getEncoded(),
+ attemptState.getAppAttemptTokens()
+ .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+
+ attemptState = appState.getAttempt(attemptId2);
+ // Check if attempt2 is loaded correctly
+ assertNotNull(attemptState);
+ assertEquals(attemptId2, attemptState.getAttemptId());
+ assertEquals(containerId2, attemptState.getMasterContainer().getId());
+ assertArrayEquals(
+ clientTokenKey2.getEncoded(),
+ attemptState.getAppAttemptTokens()
+ .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+
+ // Update app/attempt state
+ ApplicationStateData appState2 =
+ ApplicationStateData.newInstance(appState.getSubmitTime(),
+ appState.getStartTime(), appState.getUser(),
+ appState.getApplicationSubmissionContext(), RMAppState.FINISHED,
+ "appDiagnostics", 1234);
+ appState2.attempts.putAll(appState.attempts);
+ store.updateApplicationState(appState2);
+
+ ApplicationAttemptStateData oldAttemptState = attemptState;
+ ApplicationAttemptStateData newAttemptState =
+ ApplicationAttemptStateData.newInstance(
+ oldAttemptState.getAttemptId(),
+ oldAttemptState.getMasterContainer(),
+ oldAttemptState.getAppAttemptTokens(),
+ oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
+ "myTrackingUrl", "attemptDiagnostics",
+ FinalApplicationStatus.SUCCEEDED, 100,
+ oldAttemptState.getFinishTime(), 0, 0);
+ store.updateApplicationAttemptState(newAttemptState);
+
+ // Test updating app/attempt for app whose initial state is not saved
+ ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
+ ApplicationSubmissionContext dummyContext =
+ new ApplicationSubmissionContextPBImpl();
+ dummyContext.setApplicationId(dummyAppId);
+ ApplicationStateData dummyApp =
+ ApplicationStateData.newInstance(appState.getSubmitTime(),
+ appState.getStartTime(), appState.getUser(), dummyContext,
+ RMAppState.FINISHED, "appDiagnostics", 1234);
+ store.updateApplicationState(dummyApp);
+
+ ApplicationAttemptId dummyAttemptId =
+ ApplicationAttemptId.newInstance(dummyAppId, 6);
+ ApplicationAttemptStateData dummyAttempt =
+ ApplicationAttemptStateData.newInstance(dummyAttemptId,
+ oldAttemptState.getMasterContainer(),
+ oldAttemptState.getAppAttemptTokens(),
+ oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
+ "myTrackingUrl", "attemptDiagnostics",
+ FinalApplicationStatus.SUCCEEDED, 111,
+ oldAttemptState.getFinishTime(), 0, 0);
+ store.updateApplicationAttemptState(dummyAttempt);
+
+ // Close the store
+ Thread.sleep(1000);
+ store.close();
+
+ // Check updated application state.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit());
+ store.setRMDispatcher(dispatcher);
+ RMState newRMState = store.loadState();
+ Map newRMAppState =
+ newRMState.getApplicationState();
+ assertNotNull(newRMAppState.get(
+ dummyApp.getApplicationSubmissionContext().getApplicationId()));
+ ApplicationStateData updatedAppState = newRMAppState.get(appId1);
+ assertEquals(appState.getApplicationSubmissionContext().getApplicationId(),
+ updatedAppState.getApplicationSubmissionContext().getApplicationId());
+ assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime());
+ assertEquals(appState.getStartTime(), updatedAppState.getStartTime());
+ assertEquals(appState.getUser(), updatedAppState.getUser());
+ assertEquals( RMAppState.FINISHED, updatedAppState.getState());
+ assertEquals("appDiagnostics", updatedAppState.getDiagnostics());
+ assertEquals(1234, updatedAppState.getFinishTime());
+
+ // Check updated attempt state
+ assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext
+ ().getApplicationId()).getAttempt(dummyAttemptId));
+ ApplicationAttemptStateData updatedAttemptState =
+ updatedAppState.getAttempt(newAttemptState.getAttemptId());
+ assertEquals(oldAttemptState.getAttemptId(),
+ updatedAttemptState.getAttemptId());
+ assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId());
+ assertArrayEquals(
+ clientTokenKey2.getEncoded(),
+ attemptState.getAppAttemptTokens()
+ .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+ // new attempt state fields
+ assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
+ assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
+ assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics());
+ assertEquals(100, updatedAttemptState.getAMContainerExitStatus());
+ assertEquals(FinalApplicationStatus.SUCCEEDED,
+ updatedAttemptState.getFinalApplicationStatus());
+
+ // assert store is in expected state after everything is cleaned
+ assertTrue(zkTester.isFinalStateValid());
+ store.close();
+ }
+
+ @Test
+ public void testSplitConfigChangeException() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+
+ long submitTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis() + 1234;
+
+ RMStateStore store = zkTester.getRMStateStore();
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(store);
+
+ ApplicationId appId1 =
+ ConverterUtils.toApplicationId("application_1352994193343_111111");
+ storeApp(store, appId1, submitTime, startTime);
+
+ // Close state store
+ Thread.sleep(1000);
+ store.close();
+
+ // Load state store. Exception thrown because state store was not formatted.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit());
+ try {
+ store.loadState();
+ Assert.fail("Exception should be thrown" +
+ "warning user to format state store.");
+ } catch(YarnRuntimeException e) {
+ // Ignore
+ }
+ // Close state store
+ Thread.sleep(1000);
+ store.close();
+
+ // Delete state store and load the store again.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit());
+ store.deleteStore();
+ store.close();
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit());
+ store.loadState();
+ assertTrue(zkTester.isFinalStateValid());
+ store.close();
+ }
}