diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 05efc69..eab68cb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -897,6 +897,9 @@ private NMDistributedNodeLabelsHandler( private final NodeLabelsProvider nodeLabelsProvider; private Set previousNodeLabels; private boolean updatedLabelsSentToRM; + private long lastNodeLabelSendFailMills = 0L; + // TODO : Need to check which conf to use.Currently setting as 1 min + private long failLabelResendInterval = 60000; @Override public Set getNodeLabelsForRegistration() { @@ -938,12 +941,15 @@ public String verifyRMRegistrationResponseForNodeLabels( // take some action only on modification of labels boolean areNodeLabelsUpdated = nodeLabelsForHeartbeat.size() != previousNodeLabels.size() - || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat); + || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat) + || checkResendLabelOnFailure(); updatedLabelsSentToRM = false; if (areNodeLabelsUpdated) { previousNodeLabels = nodeLabelsForHeartbeat; try { + LOG.info("Modified labels from provider: " + + StringUtils.join(",", previousNodeLabels)); validateNodeLabels(nodeLabelsForHeartbeat); updatedLabelsSentToRM = true; } catch (IOException e) { @@ -980,16 +986,33 @@ private void validateNodeLabels(Set nodeLabelsForHeartbeat) } } + /* + * In case of failure when RM doesnt accept labels need to resend Labels to + * RM. This method checks whether we need to resend + */ + public boolean checkResendLabelOnFailure() { + if (lastNodeLabelSendFailMills > 0L) { + long lastFailTimePassed = + System.currentTimeMillis() - lastNodeLabelSendFailMills; + if (lastFailTimePassed > failLabelResendInterval) { + return true; + } + } + return false; + } + @Override public void verifyRMHeartbeatResponseForNodeLabels( NodeHeartbeatResponse response) { if (updatedLabelsSentToRM) { if (response.getAreNodeLabelsAcceptedByRM()) { + lastNodeLabelSendFailMills = 0L; LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels) + "} were Accepted by RM "); } else { // case where updated labels from NodeLabelsProvider is sent to RM and // RM rejected the labels + lastNodeLabelSendFailMills = System.currentTimeMillis(); LOG.error( "NM node labels {" + StringUtils.join(",", previousNodeLabels) + "} were not accepted by RM and message from RM : " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java index bbc6710..dac0b09 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java @@ -30,8 +30,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import com.google.common.annotations.VisibleForTesting; - /** * Provides base implementation of NodeLabelsProvider with Timer and expects * subclass to provide TimerTask which can fetch NodeLabels @@ -55,8 +53,6 @@ protected Set nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET; - @VisibleForTesting - long startTime = 0; public AbstractNodeLabelsProvider(String name) { super(name); @@ -77,12 +73,13 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { timerTask = createTimerTask(); + timerTask.run(); if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) { nodeLabelsScheduler = new Timer("DistributedNodeLabelsRunner-Timer", true); // Start the timer task and then periodically at the configured interval // time. Illegal values for intervalTime is handled by timer api - nodeLabelsScheduler.scheduleAtFixedRate(timerTask, startTime, + nodeLabelsScheduler.scheduleAtFixedRate(timerTask, intervalTime, intervalTime); } super.serviceStart(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java index f549d1a..fc78de3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Date; import java.util.HashSet; import java.util.TimerTask; @@ -41,16 +40,6 @@ public ConfigurationNodeLabelsProvider() { super("Configuration Based NodeLabels Provider"); } - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - // In case timer is not configured avoid calling timertask.run thus avoiding - // unnecessary creation of YarnConfiguration Object - updateNodeLabelsFromConfig(conf); - if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) { - startTime = new Date().getTime() + intervalTime; - } - } private void updateNodeLabelsFromConfig(Configuration conf) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java index 27fd4cb..2c1a045 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java @@ -53,8 +53,8 @@ public void setup() { loader = new XMLPathClassLoader( TestConfigurationNodeLabelsProvider.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(loader); testRootDir.mkdirs(); - nodeLabelsProvider = new ConfigurationNodeLabelsProvider(); } @@ -69,16 +69,11 @@ public void tearDown() throws Exception { } } - private Configuration getConfForNodeLabels() { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, "A,B,CX"); - return conf; - } - @Test public void testNodeLabelsFromConfig() throws IOException, InterruptedException { - Configuration conf = getConfForNodeLabels(); + Configuration conf = new Configuration(); + modifyConf("A,B,CX"); nodeLabelsProvider.init(conf); // test for ensuring labels are set during initialization of the class nodeLabelsProvider.start(); @@ -89,17 +84,18 @@ public void testNodeLabelsFromConfig() throws IOException, // test for valid Modification TimerTask timerTask = nodeLabelsProvider.getTimerTask(); - modifyConfAndCallTimer(timerTask, "X,y,Z"); + modifyConf("X,y,Z"); + timerTask.run(); assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"), nodeLabelsProvider.getNodeLabels()); } @Test public void testConfigForNoTimer() throws Exception { - Configuration conf = getConfForNodeLabels(); + Configuration conf = new Configuration(); + modifyConf("A,B,CX"); conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER); - nodeLabelsProvider.init(conf); nodeLabelsProvider.start(); Assert @@ -112,18 +108,11 @@ public void testConfigForNoTimer() throws Exception { nodeLabelsProvider.getNodeLabels()); } - private static void modifyConfAndCallTimer(TimerTask timerTask, - String nodeLabels) throws FileNotFoundException, IOException { + private static void modifyConf(String nodeLabels) + throws FileNotFoundException, IOException { Configuration conf = new Configuration(); conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, nodeLabels); conf.writeXml(new FileOutputStream(nodeLabelsConfigFile)); - ClassLoader actualLoader = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(loader); - timerTask.run(); - } finally { - Thread.currentThread().setContextClassLoader(actualLoader); - } } private static class XMLPathClassLoader extends ClassLoader {