diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 33e8a1f..cdea04a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2011,6 +2011,12 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { private static final String NM_NODE_LABELS_PROVIDER_PREFIX = NM_NODE_LABELS_PREFIX + "provider."; + public static final String NM_NODE_LABELS_RESYNC_INTERVAL = + NM_NODE_LABELS_PREFIX + "resync-interval-ms"; + + public static final long DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL = + 2 * 60 * 1000; + // If -1 is configured then no timer task should be created public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index bcd64c3..054138d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2131,6 +2131,15 @@ + Interval at which node labels syncs with RM from NM.Will send loaded labels + every x intervals configured along with heartbeat from NM to RM. + + yarn.nodemanager.node-labels.resync-interval-ms + 120000 + + + + When node labels "yarn.nodemanager.node-labels.provider" is of type "config" then ConfigurationNodeLabelsProvider fetches the labels this parameter. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index aa51e5c..51c8503 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -138,6 +138,7 @@ Set pendingContainersToRemove = new HashSet(); private NMNodeLabelsHandler nodeLabelsHandler; + private NodeLabelsProvider nodeLabelsProvider; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -149,9 +150,9 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeLabelsProvider nodeLabelsProvider) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; - nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider); this.context = context; this.dispatcher = dispatcher; + this.nodeLabelsProvider = nodeLabelsProvider; this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap(); this.pendingCompletedContainers = @@ -183,7 +184,8 @@ protected void serviceInit(Configuration conf) throws Exception { this.minimumResourceManagerVersion = conf.get( YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION); - + + nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider); // Default duration to track stopped containers on nodemanager is 10Min. // This should not be assigned very large value as it will remember all the // containers stopped during that time. @@ -825,7 +827,8 @@ private NMNodeLabelsHandler createNMNodeLabelsHandler( if (nodeLabelsProvider == null) { return new NMCentralizedNodeLabelsHandler(); } else { - return new NMDistributedNodeLabelsHandler(nodeLabelsProvider); + return new NMDistributedNodeLabelsHandler(nodeLabelsProvider, + this.getConfig()); } } @@ -890,16 +893,18 @@ public String verifyRMRegistrationResponseForNodeLabels( private static class NMDistributedNodeLabelsHandler implements NMNodeLabelsHandler { private NMDistributedNodeLabelsHandler( - NodeLabelsProvider nodeLabelsProvider) { + NodeLabelsProvider nodeLabelsProvider, Configuration conf) { this.nodeLabelsProvider = nodeLabelsProvider; + this.resyncInterval = + conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, + YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL); } private final NodeLabelsProvider nodeLabelsProvider; private Set previousNodeLabels; private boolean updatedLabelsSentToRM; - private long lastNodeLabelSendFailMills = 0L; - // TODO : Need to check which conf to use.Currently setting as 1 min - private static final long FAILEDLABELRESENDINTERVAL = 60000; + private long lastNodeLabelSendMills = 0L; + private long resyncInterval = 120000L; @Override public Set getNodeLabelsForRegistration() { @@ -941,17 +946,22 @@ public String verifyRMRegistrationResponseForNodeLabels( // take some action only on modification of labels boolean areNodeLabelsUpdated = nodeLabelsForHeartbeat.size() != previousNodeLabels.size() - || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat) - || checkResendLabelOnFailure(); + || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat); updatedLabelsSentToRM = false; - if (areNodeLabelsUpdated) { + // When nodelabels elapsed or resync time is elapsed will send again in + // heartbeat. + if (areNodeLabelsUpdated || isResyncIntervalElapsed()) { previousNodeLabels = nodeLabelsForHeartbeat; try { - LOG.info("Modified labels from provider: " - + StringUtils.join(",", previousNodeLabels)); + if (LOG.isDebugEnabled()) { + LOG.debug("Labels from provider: " + + StringUtils.join(",", previousNodeLabels)); + } validateNodeLabels(nodeLabelsForHeartbeat); updatedLabelsSentToRM = true; + // Set last send time in heartbeat + lastNodeLabelSendMills = System.currentTimeMillis(); } catch (IOException e) { // set previous node labels to invalid set, so that invalid // labels are not verified for every HB, and send empty set @@ -990,13 +1000,11 @@ private void validateNodeLabels(Set nodeLabelsForHeartbeat) * In case of failure when RM doesnt accept labels need to resend Labels to * RM. This method checks whether we need to resend */ - public boolean checkResendLabelOnFailure() { - if (lastNodeLabelSendFailMills > 0L) { - long lastFailTimePassed = - System.currentTimeMillis() - lastNodeLabelSendFailMills; - if (lastFailTimePassed > FAILEDLABELRESENDINTERVAL) { - return true; - } + public boolean isResyncIntervalElapsed() { + long elapsedTimeSinceLastSync = + System.currentTimeMillis() - lastNodeLabelSendMills; + if (elapsedTimeSinceLastSync > resyncInterval) { + return true; } return false; } @@ -1006,13 +1014,11 @@ public void verifyRMHeartbeatResponseForNodeLabels( NodeHeartbeatResponse response) { if (updatedLabelsSentToRM) { if (response.getAreNodeLabelsAcceptedByRM()) { - lastNodeLabelSendFailMills = 0L; LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels) + "} were Accepted by RM "); } else { // case where updated labels from NodeLabelsProvider is sent to RM and // RM rejected the labels - lastNodeLabelSendFailMills = System.currentTimeMillis(); LOG.error( "NM node labels {" + StringUtils.join(",", previousNodeLabels) + "} were not accepted by RM and message from RM : " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java index 099e4b4..e24e4aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -250,6 +250,7 @@ protected void stopRMProxy() { }; YarnConfiguration conf = createNMConfigForDistributeNodeLabels(); + conf.setLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, 2000); nm.init(conf); resourceTracker.resetNMHeartbeatReceiveFlag(); nm.start(); @@ -288,7 +289,30 @@ protected void stopRMProxy() { assertTrue("If provider sends null then empty labels should be sent", resourceTracker.labels.isEmpty()); resourceTracker.resetNMHeartbeatReceiveFlag(); - + // Since the resync interval is set to 2 sec in every alternate heartbeat + // the labels will be send along with heartbeat.In loop we sleep for 1 sec + // so that every sec 1 heartbeat is send. + int nullLabels = 0; + int nonNullLabels = 0; + for (int i = 0; i < 5; i++) { + dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P1")); + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + if (null == resourceTracker.labels) { + nullLabels++; + assertNull("Null resourcelabels are expected", resourceTracker.labels); + } else { + Assert.assertEquals("In heartbeat PI labels should be send", + toNodeLabelSet("P1"), resourceTracker.labels); + nonNullLabels++; + } + resourceTracker.resetNMHeartbeatReceiveFlag(); + Thread.sleep(1000); + } + Assert.assertTrue("Null labels received should be greater than 1", + nullLabels > 1); + Assert.assertTrue("Non null labels received should be greater than 1", + nonNullLabels > 1); nm.stop(); }