diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a7f485d..ce524c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -406,6 +406,10 @@ private static void addDeprecatedKeys() { public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms"; public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000; + public static final String ZK_APPID_NODE_SPLIT_INDEX = + RM_ZK_PREFIX + "appid-node.split-index"; + public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 4; + public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl"; public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 66400c8..377e5b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -482,6 +482,22 @@ + Index at which last section of application id znode stored in + zookeeper RM state store will be split and stored as two different znodes + (parent-child). Here each section is separated by _ in application id. For + instance, with no split appid znode will be of the form + application_1352994193343_0001. If the value of this config is 1, the + appid znode will be broken into two parts application_1352994193343_000 + and 1 respectively with former being the parent node. + application_1352994193343_0002 will then be stored as 2 under the parent + node application_1352994193343_000. This config can take values from 1 to 4. + 4 means there will be no split. If configuration value is outside this + range, no split will be done. + yarn.resourcemanager.zk-appid-node.split-index + 4 + + + Name of the cluster. In a HA setting, this is used to ensure the RM participates in leader election for this cluster and ensures it does not affect diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 9da6400..c6b89b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -93,7 +93,7 @@ protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 2); + .newInstance(2, 0); private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot"; private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = @@ -147,6 +147,8 @@ private String delegationTokensRootPath; private String dtSequenceNumberPath; private String amrmTokenSecretManagerRoot; + private static final int NO_APPID_NODE_SPLIT = 4; + private int appIdNodeSplitIndex = NO_APPID_NODE_SPLIT; @VisibleForTesting protected String znodeWorkingPath; @@ -195,8 +197,8 @@ List zkRootNodeAcl = new ArrayList(); for (ACL acl : sourceACLs) { zkRootNodeAcl.add(new ACL( - ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), - acl.getId())); + ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), + acl.getId())); } zkRootNodeUsername = HAUtil.getConfValueForRMInstance( @@ -227,6 +229,13 @@ public synchronized void initInternal(Configuration conf) throws Exception { conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); + appIdNodeSplitIndex = + conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, + YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); + if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 3) { + appIdNodeSplitIndex = NO_APPID_NODE_SPLIT; + } + if (HAUtil.isHAEnabled(conf)) { zkRetryInterval = zkSessionTimeout / numRetries; } else { @@ -550,37 +559,64 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception { } } + private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath, + String appIdStr) throws Exception { + byte[] appData = getDataWithRetries(appNodePath, true); + if (LOG.isDebugEnabled()) { + LOG.debug("Loading application from znode: " + appNodePath); + } + ApplicationId appId = ConverterUtils.toApplicationId(appIdStr); + ApplicationStateDataPBImpl appState = + new ApplicationStateDataPBImpl( + ApplicationStateDataProto.parseFrom(appData)); + if (!appId.equals( + appState.getApplicationSubmissionContext().getApplicationId())) { + throw new YarnRuntimeException("The node name is different " + + "from the application id"); + } + rmState.appState.put(appId, appState); + loadApplicationAttemptState(appState, appNodePath); + } + private synchronized void loadRMAppState(RMState rmState) throws Exception { List childNodes = getChildrenWithRetries(rmAppRoot, false); for (String childNodeName : childNodes) { - String childNodePath = getNodePath(rmAppRoot, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, false); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { - // application - if (LOG.isDebugEnabled()) { - LOG.debug("Loading application from znode: " + childNodeName); - } - ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); - ApplicationStateDataPBImpl appState = - new ApplicationStateDataPBImpl( - ApplicationStateDataProto.parseFrom(childData)); - if (!appId.equals( - appState.getApplicationSubmissionContext().getApplicationId())) { - throw new YarnRuntimeException("The child node name is different " + - "from the application id"); + int appIdLastSectionLen = childNodeName.substring( + childNodeName.lastIndexOf("_") + 1).length(); + if (appIdNodeSplitIndex == NO_APPID_NODE_SPLIT && + appIdLastSectionLen == 4) { + loadRMAppStateFromAppNode(rmState, + getNodePath(rmAppRoot, childNodeName), childNodeName); + } else { + // If AppId Node is partitioned as per config + String leafNodePath = getNodePath(rmAppRoot, childNodeName); + List leafNodes = getChildrenWithRetries(leafNodePath, true); + if (leafNodes.size() == 0) { + // There must always be leaf nodes + throw new YarnRuntimeException("Application ID node " + + "split is not as per config. Pls format the state store."); + } + for (String leafNodeName : leafNodes) { + if (leafNodeName.length() == appIdNodeSplitIndex) { + String appIdStr = childNodeName + leafNodeName; + loadRMAppStateFromAppNode(rmState, + getNodePath(leafNodePath, leafNodeName), appIdStr); + } else { + throw new YarnRuntimeException("Application ID node split is " + + "not as per config. Please format the state store."); + } + } } - rmState.appState.put(appId, appState); - loadApplicationAttemptState(appState, appId); } else { LOG.info("Unknown child node with name: " + childNodeName); } + LOG.debug("Done loading applications from ZK state store"); } } private void loadApplicationAttemptState(ApplicationStateData appState, - ApplicationId appId) - throws Exception { - String appPath = getNodePath(rmAppRoot, appId.toString()); + String appPath) throws Exception { List attempts = getChildrenWithRetries(appPath, false); for (String attemptIDStr : attempts) { if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { @@ -594,27 +630,25 @@ private void loadApplicationAttemptState(ApplicationStateData appState, appState.attempts.put(attemptState.getAttemptId(), attemptState); } } - LOG.debug("Done loading applications from ZK state store"); } @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); + String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), false); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); } byte[] appStateData = appStateDataPB.getProto().toByteArray(); createWithRetries(nodeCreatePath, appStateData, zkAcl, - CreateMode.PERSISTENT); - + CreateMode.PERSISTENT); } @Override public synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); + String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), true); if (LOG.isDebugEnabled()) { LOG.debug("Storing final state info for app: " + appId + " at: " @@ -637,8 +671,8 @@ public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, ApplicationAttemptStateData attemptStateDataPB) throws Exception { - String appDirPath = getNodePath(rmAppRoot, - appAttemptId.getApplicationId().toString()); + String appDirPath = + getLeafAppIdNodePath(appAttemptId.getApplicationId().toString()); String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); if (LOG.isDebugEnabled()) { @@ -647,7 +681,7 @@ public synchronized void storeApplicationAttemptStateInternal( } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); createWithRetries(nodeCreatePath, attemptStateData, zkAcl, - CreateMode.PERSISTENT); + CreateMode.PERSISTENT); } @Override @@ -657,7 +691,7 @@ public synchronized void updateApplicationAttemptStateInternal( throws Exception { String appIdStr = appAttemptId.getApplicationId().toString(); String appAttemptIdStr = appAttemptId.toString(); - String appDirPath = getNodePath(rmAppRoot, appIdStr); + String appDirPath = getLeafAppIdNodePath(appIdStr); String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr); if (LOG.isDebugEnabled()) { LOG.debug("Storing final state info for attempt: " + appAttemptIdStr @@ -671,7 +705,7 @@ public synchronized void updateApplicationAttemptStateInternal( createWithRetries(nodeUpdatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT); LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to" - + " update the application attempt state."); + + " update the application attempt state."); } } @@ -681,7 +715,19 @@ public synchronized void removeApplicationStateInternal( throws Exception { String appId = appState.getApplicationSubmissionContext().getApplicationId() .toString(); - String appIdRemovePath = getNodePath(rmAppRoot, appId); + String rootPath = rmAppRoot; + String appIdPath = appId; + String appIdPartRemovePath; + if (appIdNodeSplitIndex != NO_APPID_NODE_SPLIT) { + int lastSegLen = appId.length() - appId.lastIndexOf("_") - 1; + int splitIdx = lastSegLen - appIdNodeSplitIndex; + appIdPartRemovePath = getNodePath(rmAppRoot, appId.substring(0, + appId.lastIndexOf("_") + splitIdx + 1)); + rootPath = appIdPartRemovePath; + appIdPath = appId.substring( + appId.lastIndexOf("_") + splitIdx + 1); + } + String appIdRemovePath = getNodePath(rootPath, appIdPath); ArrayList opList = new ArrayList(); for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { @@ -694,6 +740,15 @@ public synchronized void removeApplicationStateInternal( LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath + " and its attempts."); } + + // Check if we should remove the root node as well.. Try to + // get this done within the same fencing. + if (appIdNodeSplitIndex != NO_APPID_NODE_SPLIT && + getChildrenWithRetries(rootPath, true).isEmpty()) { + opList.add(Op.delete(rootPath, -1)); + LOG.info("No leaf app node exists, removing parent node " + + rootPath); + } doMultiWithRetries(opList); } @@ -711,7 +766,7 @@ protected synchronized void removeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { ArrayList opList = new ArrayList(); String nodeRemovePath = - getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationToken_" @@ -903,6 +958,38 @@ public synchronized void processWatchEvent(ZooKeeper zk, } } + private String getLeafAppIdNodePath(String appId) { + String path = null; + try { + path = getLeafAppIdNodePath(appId, false); + } catch (Exception e) { + // This should not happen (since no ZK node is being created..) + throw new YarnRuntimeException("Unexpected Exception", e); + } + return path; + } + + private String getLeafAppIdNodePath(String appId, boolean createIfNotExists) + throws Exception { + String rootNode = rmAppRoot; + String nodeName = appId; + if (appIdNodeSplitIndex != NO_APPID_NODE_SPLIT) { + int lastSegLen = nodeName.length() - nodeName.lastIndexOf("_") - 1; + int splitIdx = lastSegLen - appIdNodeSplitIndex; + String partNodeCreatePath = getNodePath(rmAppRoot, nodeName.substring(0, + nodeName.lastIndexOf("_") + splitIdx + 1)); + if (!createIfNotExists || + existsWithRetries(partNodeCreatePath, true) == null) { + createWithRetries(partNodeCreatePath, null, zkAcl, + CreateMode.PERSISTENT); + } + rootNode = partNodeCreatePath; + nodeName = nodeName.substring( + nodeName.lastIndexOf("_") + splitIdx + 1); + } + return getNodePath(rootNode, nodeName); + } + @VisibleForTesting @Private @Unstable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 6b09d39..0507b36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -18,14 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.crypto.SecretKey; @@ -35,28 +41,37 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; @@ -102,8 +117,7 @@ public String getAppNode(String appId) { } } - public RMStateStore getRMStateStore() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); + private RMStateStore createStore(Configuration conf) throws Exception { workingZnode = "/jira/issue/3077/rmstore"; conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); @@ -112,6 +126,16 @@ public RMStateStore getRMStateStore() throws Exception { return this.store; } + public RMStateStore getRMStateStore(Configuration conf) throws Exception { + return createStore(conf); + } + + + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + return createStore(conf); + } + @Override public boolean isFinalStateValid() throws Exception { List nodes = client.getChildren(store.znodeWorkingPath, false); @@ -218,7 +242,6 @@ private Configuration createHARMConf( return conf; } - @SuppressWarnings("unchecked") @Test public void testFencing() throws Exception { StateChangeRequestInfo req = new StateChangeRequestInfo( @@ -370,4 +393,270 @@ public void testFencedState() throws Exception { store.close(); } + + private Configuration createConfForAppNodeSplit() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, 1); + return conf; + } + + @Test + public void testAppNodeSplit() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + + long submitTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis() + 1234; + Configuration conf = new YarnConfiguration(); + + RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit()); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(store); + + AMRMTokenSecretManager appTokenMgr = + spy(new AMRMTokenSecretManager(conf, rmContext)); + + MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey(); + when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData); + + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = + new ClientToAMTokenSecretManagerInRM(); + + ApplicationAttemptId attemptId1 = ConverterUtils + .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); + ApplicationId appId1 = attemptId1.getApplicationId(); + storeApp(store, appId1, submitTime, startTime); + + Token appAttemptToken1 = + generateAMRMToken(attemptId1, appTokenMgr); + SecretKey clientTokenKey1 = + clientToAMTokenMgr.createMasterKey(attemptId1); + + ContainerId containerId1 = storeAttempt(store, attemptId1, + "container_1352994193343_0001_01_000001", + appAttemptToken1, clientTokenKey1, dispatcher); + + String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; + ApplicationAttemptId attemptId2 = + ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + + Token appAttemptToken2 = + generateAMRMToken(attemptId2, appTokenMgr); + SecretKey clientTokenKey2 = + clientToAMTokenMgr.createMasterKey(attemptId2); + + ContainerId containerId2 = storeAttempt(store, attemptId2, + "container_1352994193343_0001_02_000001", + appAttemptToken2, clientTokenKey2, dispatcher); + + ApplicationAttemptId attemptIdRemoved = ConverterUtils + .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); + ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); + storeApp(store, appIdRemoved, submitTime, startTime); + storeAttempt(store, attemptIdRemoved, + "container_1352994193343_0002_01_000001", null, null, dispatcher); + + RMApp mockRemovedApp = mock(RMApp.class); + RMAppAttemptMetrics mockRmAppAttemptMetrics = + mock(RMAppAttemptMetrics.class); + HashMap attempts = + new HashMap(); + ApplicationSubmissionContext context = + new ApplicationSubmissionContextPBImpl(); + context.setApplicationId(appIdRemoved); + when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime); + when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context); + when(mockRemovedApp.getAppAttempts()).thenReturn(attempts); + when(mockRemovedApp.getUser()).thenReturn("user1"); + RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class); + when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved); + when(mockRemovedAttempt.getRMAppAttemptMetrics()) + .thenReturn(mockRmAppAttemptMetrics); + when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage()) + .thenReturn(new AggregateAppResourceUsage(0,0)); + attempts.put(attemptIdRemoved, mockRemovedAttempt); + store.removeApplication(mockRemovedApp); + + // Remove application + storeApp(store, appIdRemoved, submitTime, startTime); + storeAttempt(store, attemptIdRemoved, + "container_1352994193343_0002_01_000001", null, null, dispatcher); + store.removeApplication(mockRemovedApp); + + // Close state store + Thread.sleep(1000); + store.close(); + + // Load state store + store = zkTester.getRMStateStore(createConfForAppNodeSplit()); + store.setRMDispatcher(dispatcher); + RMState state = store.loadState(); + java.util.Map rmAppState = + state.getApplicationState(); + + ApplicationStateData appState = rmAppState.get(appId1); + // Check if app is loaded correctly + assertNotNull(appState); + assertEquals(submitTime, appState.getSubmitTime()); + assertEquals(startTime, appState.getStartTime()); + assertEquals(appId1, + appState.getApplicationSubmissionContext().getApplicationId()); + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); + // Check if attempt1 is loaded correctly + assertNotNull(attemptState); + assertEquals(attemptId1, attemptState.getAttemptId()); + assertEquals(-1000, attemptState.getAMContainerExitStatus()); + assertEquals(containerId1, attemptState.getMasterContainer().getId()); + assertArrayEquals( + clientTokenKey1.getEncoded(), + attemptState.getAppAttemptTokens() + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + + attemptState = appState.getAttempt(attemptId2); + // Check if attempt2 is loaded correctly + assertNotNull(attemptState); + assertEquals(attemptId2, attemptState.getAttemptId()); + assertEquals(containerId2, attemptState.getMasterContainer().getId()); + assertArrayEquals( + clientTokenKey2.getEncoded(), + attemptState.getAppAttemptTokens() + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + + // Update app/attempt state + ApplicationStateData appState2 = + ApplicationStateData.newInstance(appState.getSubmitTime(), + appState.getStartTime(), appState.getUser(), + appState.getApplicationSubmissionContext(), RMAppState.FINISHED, + "appDiagnostics", 1234); + appState2.attempts.putAll(appState.attempts); + store.updateApplicationState(appState2); + + ApplicationAttemptStateData oldAttemptState = attemptState; + ApplicationAttemptStateData newAttemptState = + ApplicationAttemptStateData.newInstance( + oldAttemptState.getAttemptId(), + oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptTokens(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED, 100, + oldAttemptState.getFinishTime(), 0, 0); + store.updateApplicationAttemptState(newAttemptState); + + // Test updating app/attempt for app whose initial state is not saved + ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10); + ApplicationSubmissionContext dummyContext = + new ApplicationSubmissionContextPBImpl(); + dummyContext.setApplicationId(dummyAppId); + ApplicationStateData dummyApp = + ApplicationStateData.newInstance(appState.getSubmitTime(), + appState.getStartTime(), appState.getUser(), dummyContext, + RMAppState.FINISHED, "appDiagnostics", 1234); + store.updateApplicationState(dummyApp); + + ApplicationAttemptId dummyAttemptId = + ApplicationAttemptId.newInstance(dummyAppId, 6); + ApplicationAttemptStateData dummyAttempt = + ApplicationAttemptStateData.newInstance(dummyAttemptId, + oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptTokens(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED, 111, + oldAttemptState.getFinishTime(), 0, 0); + store.updateApplicationAttemptState(dummyAttempt); + + // Close the store + Thread.sleep(1000); + store.close(); + + // Check updated application state. + store = zkTester.getRMStateStore(createConfForAppNodeSplit()); + store.setRMDispatcher(dispatcher); + RMState newRMState = store.loadState(); + Map newRMAppState = + newRMState.getApplicationState(); + assertNotNull(newRMAppState.get( + dummyApp.getApplicationSubmissionContext().getApplicationId())); + ApplicationStateData updatedAppState = newRMAppState.get(appId1); + assertEquals(appState.getApplicationSubmissionContext().getApplicationId(), + updatedAppState.getApplicationSubmissionContext().getApplicationId()); + assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime()); + assertEquals(appState.getStartTime(), updatedAppState.getStartTime()); + assertEquals(appState.getUser(), updatedAppState.getUser()); + assertEquals( RMAppState.FINISHED, updatedAppState.getState()); + assertEquals("appDiagnostics", updatedAppState.getDiagnostics()); + assertEquals(1234, updatedAppState.getFinishTime()); + + // Check updated attempt state + assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext + ().getApplicationId()).getAttempt(dummyAttemptId)); + ApplicationAttemptStateData updatedAttemptState = + updatedAppState.getAttempt(newAttemptState.getAttemptId()); + assertEquals(oldAttemptState.getAttemptId(), + updatedAttemptState.getAttemptId()); + assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId()); + assertArrayEquals( + clientTokenKey2.getEncoded(), + attemptState.getAppAttemptTokens() + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + // new attempt state fields + assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState()); + assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl()); + assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics()); + assertEquals(100, updatedAttemptState.getAMContainerExitStatus()); + assertEquals(FinalApplicationStatus.SUCCEEDED, + updatedAttemptState.getFinalApplicationStatus()); + + // assert store is in expected state after everything is cleaned + assertTrue(zkTester.isFinalStateValid()); + store.close(); + } + + @Test + public void testSplitConfigChangeException() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + + long submitTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis() + 1234; + + RMStateStore store = zkTester.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(store); + + ApplicationId appId1 = + ConverterUtils.toApplicationId("application_1352994193343_111111"); + storeApp(store, appId1, submitTime, startTime); + + // Close state store + Thread.sleep(1000); + store.close(); + + // Load state store. Exception thrown because state store was not formatted. + store = zkTester.getRMStateStore(createConfForAppNodeSplit()); + try { + store.loadState(); + Assert.fail("Exception should be thrown" + + "warning user to format state store."); + } catch(YarnRuntimeException e) { + // Ignore + } + // Close state store + Thread.sleep(1000); + store.close(); + + // Delete state store and load the store again. + store = zkTester.getRMStateStore(createConfForAppNodeSplit()); + store.deleteStore(); + store.close(); + store = zkTester.getRMStateStore(createConfForAppNodeSplit()); + store.loadState(); + assertTrue(zkTester.isFinalStateValid()); + store.close(); + } }