diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index aa5caf9..0d8f3e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -100,7 +100,10 @@ public static final Log LOG = LogFactory.getLog(RMStateStore.class); - private enum RMStateStoreState { + /** + * The enum defines state of RMStateStore. + */ + public enum RMStateStoreState { ACTIVE, FENCED }; @@ -1016,7 +1019,6 @@ protected void notifyStoreOperationFailed(Exception failureCause) { LOG.error("State store operation failed ", failureCause); if (HAUtil.isHAEnabled(getConfig())) { LOG.warn("State-store fenced ! Transitioning RM to standby"); - updateFencedState(); Thread standByTransitionThread = new Thread(new StandByTransitionThread()); standByTransitionThread.setName("StandByTransitionThread Handler"); @@ -1087,6 +1089,15 @@ public void setResourceManager(ResourceManager rm) { private class StandByTransitionThread implements Runnable { @Override public void run() { + writeLock.lock(); + try { + if (isFencedState()) { + return; + } + updateFencedState(); + } finally { + writeLock.unlock(); + } LOG.info("RMStateStore has been fenced"); resourceManager.handleTransitionToStandBy(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java new file mode 100644 index 0000000..5351233 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class TestMemoryRMStateStore { + + @Test + public void testNotifyStoreOperationFailed() throws Exception { + RMStateStore store = new MemoryRMStateStore() { + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + throw new Exception("testNotifyStoreOperationFailed"); + } + }; + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + store.init(conf); + ResourceManager mockRM = mock(ResourceManager.class); + store.setResourceManager(mockRM); + RMDelegationTokenIdentifier mockTokenId = + mock(RMDelegationTokenIdentifier.class); + store.removeRMDelegationToken(mockTokenId); + int i = 0; + while (store.getRMStateStoreState() != + RMStateStore.RMStateStoreState.FENCED) { + Thread.sleep(200); + if (++i > 15) { + break; + } + } + assertTrue("RMStateStore should have been in fenced state", + store.isFencedState()); + } +}