diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index bf34837..2d57982 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -220,10 +220,6 @@ protected void serviceInit(Configuration conf) throws Exception { isDistributedNodeLabelConfiguration = YarnConfiguration.isDistributedNodeLabelConfiguration(conf); - - if (nodeLabelsEnabled) { - initNodeLabelStore(conf); - } labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL)); } @@ -243,6 +239,10 @@ protected void startDispatcher() { @Override protected void serviceStart() throws Exception { + if (nodeLabelsEnabled) { + initNodeLabelStore(getConfig()); + } + // init dispatcher only when service start, because recover will happen in // service init, we don't want to trigger any event handling at that time. initDispatcher(getConfig()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java index fb60cd6..4b052c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java @@ -108,6 +108,7 @@ public void testRecoverWithMirror() throws Exception { mgr = new MockNodeLabelManager(); mgr.init(conf); + mgr.start(); // check variables Assert.assertEquals(3, mgr.getClusterNodeLabelNames().size()); @@ -127,6 +128,7 @@ public void testRecoverWithMirror() throws Exception { mgr.stop(); mgr = new MockNodeLabelManager(); mgr.init(conf); + mgr.start(); // check variables Assert.assertEquals(3, mgr.getClusterNodeLabelNames().size()); @@ -165,6 +167,7 @@ public void testRecoverWithDistributedNodeLabels() throws Exception { cf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE); mgr.init(cf); + mgr.start(); // check variables Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); @@ -205,6 +208,7 @@ public void testEditlogRecover() throws Exception { mgr = new MockNodeLabelManager(); mgr.init(conf); + mgr.start(); // check variables Assert.assertEquals(3, mgr.getClusterNodeLabelNames().size()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3469b0c..5180622 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -98,6 +97,8 @@ static final Logger LOG = Logger.getLogger(MockRM.class); static final String ENABLE_WEBAPP = "mockrm.webapp.enabled"; + + final private boolean useNullRMNodeLabelsManager; public MockRM() { this(new YarnConfiguration()); @@ -108,20 +109,31 @@ public MockRM(Configuration conf) { } public MockRM(Configuration conf, RMStateStore store) { + this(conf, store, true); + } + + public MockRM(Configuration conf, RMStateStore store, + boolean useNullRMNodeLabelsManager) { super(); init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); if(store != null) { setRMStateStore(store); } Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); + rootLogger.setLevel(Level.DEBUG); + this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager; } @Override - protected RMNodeLabelsManager createNodeLabelManager() { - RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); - mgr.init(getConfig()); - return mgr; + protected RMNodeLabelsManager createNodeLabelManager() + throws InstantiationException, IllegalAccessException { + if (useNullRMNodeLabelsManager) { + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(getConfig()); + return mgr; + } else { + return super.createNodeLabelManager(); + } } public void waitForState(ApplicationId appId, RMAppState finalState) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index 9f54de8..6f3666f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -106,8 +106,8 @@ protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm) } protected void startRMs() throws IOException { - rm1 = new MockRM(confForRM1); - rm2 = new MockRM(confForRM2); + rm1 = new MockRM(confForRM1, null, false); + rm2 = new MockRM(confForRM2, null, false); startRMs(rm1, confForRM1, rm2, confForRM2); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForNodeLabels.java new file mode 100644 index 0000000..25d9c56 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForNodeLabels.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableSet; + +public class TestRMHAForNodeLabels extends RMHATestBase { + public static final Log LOG = LogFactory + .getLog(TestSubmitApplicationWithRMHA.class); + + @Before + @Override + public void setup() throws Exception { + super.setup(); + + // Create directory for node label store + File tempDir = File.createTempFile("nlb", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + + confForRM1.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + confForRM1.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR, + tempDir.getAbsolutePath()); + + confForRM2.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + confForRM2.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR, + tempDir.getAbsolutePath()); + } + + @Test + public void testRMHARecoverNodeLabels() throws Exception { + // start two RMs, and transit rm1 to active, rm2 to standby + startRMs(); + + // Add labels to rm1 + rm1.getRMContext() + .getNodeLabelManager() + .addToCluserNodeLabels( + Arrays.asList(NodeLabel.newInstance("a"), + NodeLabel.newInstance("b"), NodeLabel.newInstance("c"))); + + Map> nodeToLabels = new HashMap<>(); + nodeToLabels.put(NodeId.newInstance("host1", 0), ImmutableSet.of("a")); + nodeToLabels.put(NodeId.newInstance("host2", 0), ImmutableSet.of("b")); + + rm1.getRMContext().getNodeLabelManager().replaceLabelsOnNode(nodeToLabels); + + // Do the failover + explicitFailover(); + + // Check labels in rm2 + Assert + .assertTrue(rm2.getRMContext().getNodeLabelManager() + .getClusterNodeLabelNames() + .containsAll(ImmutableSet.of("a", "b", "c"))); + Assert.assertTrue(rm2.getRMContext().getNodeLabelManager() + .getNodeLabels().get(NodeId.newInstance("host1", 0)).contains("a")); + Assert.assertTrue(rm2.getRMContext().getNodeLabelManager() + .getNodeLabels().get(NodeId.newInstance("host2", 0)).contains("b")); + } +}