diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 33e8a1f..cdea04a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2011,6 +2011,12 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
NM_NODE_LABELS_PREFIX + "provider.";
+ public static final String NM_NODE_LABELS_RESYNC_INTERVAL =
+ NM_NODE_LABELS_PREFIX + "resync-interval-ms";
+
+ public static final long DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL =
+ 2 * 60 * 1000;
+
// If -1 is configured then no timer task should be created
public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index bcd64c3..054138d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2131,6 +2131,15 @@
+ Interval at which node labels syncs with RM from NM.Will send loaded labels
+ every x intervals configured along with heartbeat from NM to RM.
+
+ yarn.nodemanager.node-labels.resync-interval-ms
+ 120000
+
+
+
+
When node labels "yarn.nodemanager.node-labels.provider"
is of type "config" then ConfigurationNodeLabelsProvider fetches the
labels this parameter.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index aa51e5c..51c8503 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -138,6 +138,7 @@
Set pendingContainersToRemove = new HashSet();
private NMNodeLabelsHandler nodeLabelsHandler;
+ private NodeLabelsProvider nodeLabelsProvider;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -149,9 +150,9 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeLabelsProvider nodeLabelsProvider) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
- nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
this.context = context;
this.dispatcher = dispatcher;
+ this.nodeLabelsProvider = nodeLabelsProvider;
this.metrics = metrics;
this.recentlyStoppedContainers = new LinkedHashMap();
this.pendingCompletedContainers =
@@ -183,7 +184,8 @@ protected void serviceInit(Configuration conf) throws Exception {
this.minimumResourceManagerVersion = conf.get(
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
-
+
+ nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
@@ -825,7 +827,8 @@ private NMNodeLabelsHandler createNMNodeLabelsHandler(
if (nodeLabelsProvider == null) {
return new NMCentralizedNodeLabelsHandler();
} else {
- return new NMDistributedNodeLabelsHandler(nodeLabelsProvider);
+ return new NMDistributedNodeLabelsHandler(nodeLabelsProvider,
+ this.getConfig());
}
}
@@ -890,16 +893,18 @@ public String verifyRMRegistrationResponseForNodeLabels(
private static class NMDistributedNodeLabelsHandler
implements NMNodeLabelsHandler {
private NMDistributedNodeLabelsHandler(
- NodeLabelsProvider nodeLabelsProvider) {
+ NodeLabelsProvider nodeLabelsProvider, Configuration conf) {
this.nodeLabelsProvider = nodeLabelsProvider;
+ this.resyncInterval =
+ conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL,
+ YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL);
}
private final NodeLabelsProvider nodeLabelsProvider;
private Set previousNodeLabels;
private boolean updatedLabelsSentToRM;
- private long lastNodeLabelSendFailMills = 0L;
- // TODO : Need to check which conf to use.Currently setting as 1 min
- private static final long FAILEDLABELRESENDINTERVAL = 60000;
+ private long lastNodeLabelSendMills = 0L;
+ private long resyncInterval = 120000L;
@Override
public Set getNodeLabelsForRegistration() {
@@ -941,17 +946,22 @@ public String verifyRMRegistrationResponseForNodeLabels(
// take some action only on modification of labels
boolean areNodeLabelsUpdated =
nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
- || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat)
- || checkResendLabelOnFailure();
+ || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
updatedLabelsSentToRM = false;
- if (areNodeLabelsUpdated) {
+ // When nodelabels elapsed or resync time is elapsed will send again in
+ // heartbeat.
+ if (areNodeLabelsUpdated || isResyncIntervalElapsed()) {
previousNodeLabels = nodeLabelsForHeartbeat;
try {
- LOG.info("Modified labels from provider: "
- + StringUtils.join(",", previousNodeLabels));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Labels from provider: "
+ + StringUtils.join(",", previousNodeLabels));
+ }
validateNodeLabels(nodeLabelsForHeartbeat);
updatedLabelsSentToRM = true;
+ // Set last send time in heartbeat
+ lastNodeLabelSendMills = System.currentTimeMillis();
} catch (IOException e) {
// set previous node labels to invalid set, so that invalid
// labels are not verified for every HB, and send empty set
@@ -990,13 +1000,11 @@ private void validateNodeLabels(Set nodeLabelsForHeartbeat)
* In case of failure when RM doesnt accept labels need to resend Labels to
* RM. This method checks whether we need to resend
*/
- public boolean checkResendLabelOnFailure() {
- if (lastNodeLabelSendFailMills > 0L) {
- long lastFailTimePassed =
- System.currentTimeMillis() - lastNodeLabelSendFailMills;
- if (lastFailTimePassed > FAILEDLABELRESENDINTERVAL) {
- return true;
- }
+ public boolean isResyncIntervalElapsed() {
+ long elapsedTimeSinceLastSync =
+ System.currentTimeMillis() - lastNodeLabelSendMills;
+ if (elapsedTimeSinceLastSync > resyncInterval) {
+ return true;
}
return false;
}
@@ -1006,13 +1014,11 @@ public void verifyRMHeartbeatResponseForNodeLabels(
NodeHeartbeatResponse response) {
if (updatedLabelsSentToRM) {
if (response.getAreNodeLabelsAcceptedByRM()) {
- lastNodeLabelSendFailMills = 0L;
LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were Accepted by RM ");
} else {
// case where updated labels from NodeLabelsProvider is sent to RM and
// RM rejected the labels
- lastNodeLabelSendFailMills = System.currentTimeMillis();
LOG.error(
"NM node labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were not accepted by RM and message from RM : "
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
index 099e4b4..e24e4aa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
@@ -250,6 +250,7 @@ protected void stopRMProxy() {
};
YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
+ conf.setLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, 2000);
nm.init(conf);
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
@@ -288,7 +289,30 @@ protected void stopRMProxy() {
assertTrue("If provider sends null then empty labels should be sent",
resourceTracker.labels.isEmpty());
resourceTracker.resetNMHeartbeatReceiveFlag();
-
+ // Since the resync interval is set to 2 sec in every alternate heartbeat
+ // the labels will be send along with heartbeat.In loop we sleep for 1 sec
+ // so that every sec 1 heartbeat is send.
+ int nullLabels = 0;
+ int nonNullLabels = 0;
+ for (int i = 0; i < 5; i++) {
+ dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P1"));
+ nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+ resourceTracker.waitTillHeartbeat();
+ if (null == resourceTracker.labels) {
+ nullLabels++;
+ assertNull("Null resourcelabels are expected", resourceTracker.labels);
+ } else {
+ Assert.assertEquals("In heartbeat PI labels should be send",
+ toNodeLabelSet("P1"), resourceTracker.labels);
+ nonNullLabels++;
+ }
+ resourceTracker.resetNMHeartbeatReceiveFlag();
+ Thread.sleep(1000);
+ }
+ Assert.assertTrue("Null labels received should be greater than 1",
+ nullLabels > 1);
+ Assert.assertTrue("Non null labels received should be greater than 1",
+ nonNullLabels > 1);
nm.stop();
}