diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 85d5a580362..54e8888f0d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2013,6 +2013,13 @@ public static boolean isAclEnabled(Configuration conf) {
NM_PREFIX + "health-checker.interval-ms";
public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000;
+ /** Whether or not to run the node health script before the NM
+ * starts up.*/
+ public static final String NM_HEALTH_CHECK_RUN_BEFORE_STARTUP =
+ NM_PREFIX + "health-checker.run-before-startup";
+ public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP =
+ false;
+
/** Health check time out period for all scripts.*/
public static final String NM_HEALTH_CHECK_TIMEOUT_MS =
NM_PREFIX + "health-checker.timeout-ms";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f09186ecf4c..2f97a7cce7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1668,6 +1668,13 @@
1200000
+
+ Whether or not to run the node health script
+ before the NM starts up.
+ yarn.nodemanager.health-checker.run-before-startup
+ false
+
+
Frequency of running node health scripts.
yarn.nodemanager.health-checker.interval-ms
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
index acec16fd56b..54b39155c63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
public abstract class RegisterNodeManagerRequest {
@@ -53,14 +54,15 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
Resource physicalResource) {
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
containerStatuses, runningApplications, nodeLabels, physicalResource,
- null);
+ null, null);
}
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List containerStatuses,
List runningApplications, Set nodeLabels,
- Resource physicalResource, Set nodeAttributes) {
+ Resource physicalResource, Set nodeAttributes,
+ NodeStatus nodeStatus) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
@@ -72,6 +74,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
request.setNodeLabels(nodeLabels);
request.setPhysicalResource(physicalResource);
request.setNodeAttributes(nodeAttributes);
+ request.setNodeStatus(nodeStatus);
return request;
}
@@ -133,4 +136,16 @@ public abstract void setLogAggregationReportsForApps(
public abstract Set getNodeAttributes();
public abstract void setNodeAttributes(Set nodeAttributes);
+
+ /**
+ * Get the status of the node.
+ * @return The status of the node.
+ */
+ public abstract NodeStatus getNodeStatus();
+
+ /**
+ * Set the status of the node.
+ * @param nodeStatus The status of the node.
+ */
+ public abstract void setNodeStatus(NodeStatus nodeStatus);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
index 317f8abd6f1..d91cff2531a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
@@ -51,7 +52,9 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
+
public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
RegisterNodeManagerRequestProto.Builder builder = null;
@@ -68,6 +71,7 @@
/** Physical resources in the node. */
private Resource physicalResource = null;
+ private NodeStatus nodeStatus;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -121,6 +125,9 @@ private synchronized void mergeLocalToBuilder() {
if (this.logAggregationReportsForApps != null) {
addLogAggregationStatusForAppsToProto();
}
+ if (this.nodeStatus != null) {
+ builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));
+ }
}
private void addLogAggregationStatusForAppsToProto() {
@@ -359,6 +366,28 @@ public synchronized void setPhysicalResource(Resource pPhysicalResource) {
this.physicalResource = pPhysicalResource;
}
+ @Override
+ public synchronized NodeStatus getNodeStatus() {
+ RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.nodeStatus != null) {
+ return this.nodeStatus;
+ }
+ if (!p.hasNodeStatus()) {
+ return null;
+ }
+ this.nodeStatus = convertFromProtoFormat(p.getNodeStatus());
+ return this.nodeStatus;
+ }
+
+ @Override
+ public synchronized void setNodeStatus(NodeStatus pNodeStatus) {
+ maybeInitBuilder();
+ if (pNodeStatus == null) {
+ builder.clearNodeStatus();
+ }
+ this.nodeStatus = pNodeStatus;
+ }
+
@Override
public int hashCode() {
return getProto().hashCode();
@@ -533,4 +562,12 @@ public synchronized void setLogAggregationReportsForApps(
}
this.logAggregationReportsForApps = logAggregationStatusForApps;
}
+
+ private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto s) {
+ return new NodeStatusPBImpl(s);
+ }
+
+ private NodeStatusProto convertToProtoFormat(NodeStatus s) {
+ return ((NodeStatusPBImpl)s).getProto();
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index ff7153eca8e..c643179888e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -74,6 +74,7 @@ message RegisterNodeManagerRequestProto {
optional ResourceProto physicalResource = 9;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
optional NodeAttributesProto nodeAttributes = 11;
+ optional NodeStatusProto nodeStatus = 12;
}
message RegisterNodeManagerResponseProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 5e3693ae9c1..0725d423096 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -392,10 +392,11 @@ protected void registerWithRM()
// during RM recovery
synchronized (this.context) {
List containerReports = getNMContainerStatuses();
+ NodeStatus nodeStatus = getNodeStatus(0);
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications(),
- nodeLabels, physicalResource, nodeAttributes);
+ nodeLabels, physicalResource, nodeAttributes, nodeStatus);
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java
index 1c9bd82bd46..af92b15e9c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java
@@ -60,8 +60,9 @@
"Node health script timed out";
private NodeHealthScriptRunner(String scriptName, long checkInterval,
- long timeout, String[] scriptArgs) {
- super(NodeHealthScriptRunner.class.getName(), checkInterval);
+ long timeout, String[] scriptArgs, boolean runBeforeStartup) {
+ super(NodeHealthScriptRunner.class.getName(), checkInterval,
+ runBeforeStartup);
this.nodeHealthScript = scriptName;
this.scriptTimeout = timeout;
setTimerTask(new NodeHealthMonitorExecutor(scriptArgs));
@@ -91,6 +92,10 @@ public static NodeHealthScriptRunner newInstance(String scriptName,
"interval-ms can not be set to a negative number.");
}
+ boolean runBeforeStartup = conf.getBoolean(
+ YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP,
+ YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP);
+
// Determine time out
String scriptTimeoutConfig = String.format(
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE,
@@ -113,7 +118,7 @@ public static NodeHealthScriptRunner newInstance(String scriptName,
String[] scriptArgs = conf.getStrings(scriptArgsConfig, new String[]{});
return new NodeHealthScriptRunner(nodeHealthScript,
- checkIntervalMs, scriptTimeout, scriptArgs);
+ checkIntervalMs, scriptTimeout, scriptArgs, runBeforeStartup);
}
private enum HealthCheckerExitStatus {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java
index a0c4d8b8ebd..e9ad5b00ae4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java
@@ -45,13 +45,16 @@
private Timer timer;
private TimerTask task;
private long intervalMs;
+ private boolean runBeforeStartup;
- TimedHealthReporterService(String name, long intervalMs) {
+ TimedHealthReporterService(String name, long intervalMs,
+ boolean runBeforeStartup) {
super(name);
this.isHealthy = true;
this.healthReport = "";
this.lastReportedTime = System.currentTimeMillis();
this.intervalMs = intervalMs;
+ this.runBeforeStartup = runBeforeStartup;
}
@VisibleForTesting
@@ -73,7 +76,13 @@ public void serviceStart() throws Exception {
throw new Exception("Health reporting task hasn't been set!");
}
timer = new Timer("HealthReporterService-Timer", true);
- timer.scheduleAtFixedRate(task, 0, intervalMs);
+ long delay = 0;
+ if (runBeforeStartup) {
+ delay = intervalMs;
+ task.run();
+ }
+
+ timer.scheduleAtFixedRate(task, delay, intervalMs);
super.serviceStart();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 2c89ddd9e9b..7d6feea6f19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -335,6 +335,7 @@ public RegisterNodeManagerResponse registerNodeManager(
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
Resource physicalResource = request.getPhysicalResource();
+ NodeStatus nodeStatus = request.getNodeStatus();
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
@@ -426,7 +427,7 @@ public RegisterNodeManagerResponse registerNodeManager(
if (oldNode == null) {
RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
request.getNMContainerStatuses(),
- request.getRunningApplications());
+ request.getRunningApplications(), nodeStatus);
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
if (LOG.isDebugEnabled()) {
@@ -462,7 +463,7 @@ public RegisterNodeManagerResponse registerNodeManager(
this.rmContext.getRMNodes().put(nodeId, rmNode);
this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeStartedEvent(nodeId, null, null));
+ .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus));
} else {
// Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index a565fe75656..d9a748b3d0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -36,6 +36,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.collections.keyvalue.DefaultMapEntry;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -208,7 +209,8 @@
RMNodeEventType,
RMNodeEvent>(NodeState.NEW)
//Transitions from NEW state
- .addTransition(NodeState.NEW, NodeState.RUNNING,
+ .addTransition(NodeState.NEW,
+ EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STARTED, new AddNodeTransition())
.addTransition(NodeState.NEW, NodeState.NEW,
RMNodeEventType.RESOURCE_UPDATE,
@@ -850,10 +852,10 @@ private static NodeHealthStatus updateRMNodeFromStatusEvents(
}
public static class AddNodeTransition implements
- SingleArcTransition {
+ MultipleArcTransition {
@Override
- public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
List containers = null;
@@ -889,17 +891,40 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
}
- rmNode.context.getDispatcher().getEventHandler()
- .handle(new NodeAddedSchedulerEvent(rmNode, containers));
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodesListManagerEvent(
- NodesListManagerEventType.NODE_USABLE, rmNode));
+ NodeStatus nodeStatus =
+ startEvent.getNodeStatus();
+ RMNodeStatusEvent rmNodeStatusEvent =
+ new RMNodeStatusEvent(nodeId, nodeStatus);
+
+ NodeHealthStatus nodeHealthStatus =
+ updateRMNodeFromStatusEvents(rmNode, rmNodeStatusEvent);
+
+ NodeState nodeState = null;
+ if (nodeHealthStatus.getIsNodeHealthy()) {
+ rmNode.context.getDispatcher().getEventHandler()
+ .handle(new NodeAddedSchedulerEvent(rmNode, containers));
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_USABLE, rmNode));
+ nodeState = NodeState.RUNNING;
+ } else {
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_UNUSABLE, rmNode));
+ //Update the metrics
+ rmNode.updateMetricsForDeactivatedNode(NodeState.RUNNING,
+ NodeState.UNHEALTHY);
+ nodeState = NodeState.UNHEALTHY;
+ }
+
List logAggregationReportsForApps =
startEvent.getLogAggregationReportsForApps();
if (logAggregationReportsForApps != null
&& !logAggregationReportsForApps.isEmpty()) {
rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
}
+
+ return nodeState;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
index 397699453fb..2bf04d0fe76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
@@ -24,19 +24,23 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public class RMNodeStartedEvent extends RMNodeEvent {
+ private final NodeStatus nodeStatus;
private List containerStatuses;
private List runningApplications;
private List logAggregationReportsForApps;
public RMNodeStartedEvent(NodeId nodeId,
List containerReports,
- List runningApplications) {
+ List runningApplications,
+ NodeStatus nodeStatus) {
super(nodeId, RMNodeEventType.STARTED);
this.containerStatuses = containerReports;
this.runningApplications = runningApplications;
+ this.nodeStatus = nodeStatus;
}
public List getNMContainerStatuses() {
@@ -47,6 +51,10 @@ public RMNodeStartedEvent(NodeId nodeId,
return runningApplications;
}
+ public NodeStatus getNodeStatus() {
+ return nodeStatus;
+ }
+
public List getLogAggregationReportsForApps() {
return this.logAggregationReportsForApps;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 3543bc4707e..fe196f887fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -187,6 +187,22 @@ public RegisterNodeManagerResponse registerNode(
req.setNodeLabels(nodeLabels);
}
+ NodeStatus status = Records.newRecord(NodeStatus.class);
+ status.setResponseId(0);
+ status.setNodeId(nodeId);
+ ArrayList completedContainers = new ArrayList();
+ status.setContainersStatuses(
+ new ArrayList(containerStats.values()));
+ for (ContainerId cid : completedContainers) {
+ containerStats.remove(cid);
+ }
+ NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
+ healthStatus.setHealthReport("");
+ healthStatus.setIsNodeHealthy(true);
+ healthStatus.setLastHealthReportTime(1);
+ status.setNodeHealthStatus(healthStatus);
+ req.setNodeStatus(status);
+
RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req);
this.currentContainerTokenMasterKey =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index b3888c3cd66..495a695abfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.token.Token;
@@ -54,6 +57,8 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
@@ -543,7 +548,12 @@ public MockNM registerNode(String nodeIdStr, Resource nodeCapability)
public void sendNodeStarted(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId());
- node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null));
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
+ node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null,
+ mockNodeStatus));
drainEventsImplicitly();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 1f1e164cf5b..42732468305 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -216,8 +216,12 @@ private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
@Test (timeout = 5000)
public void testExpiredContainer() {
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
// Start the node
- node.handle(new RMNodeStartedEvent(null, null, null));
+ node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container
@@ -280,12 +284,16 @@ public void testRecommissionNode() {
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
//Start the node
- node.handle(new RMNodeStartedEvent(null, null, null));
+ node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
- node2.handle(new RMNodeStartedEvent(null, null, null));
+ node2.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
ApplicationId app1 = BuilderUtils.newApplicationId(1, 1);
@@ -341,8 +349,12 @@ public void testContainerUpdate() throws InterruptedException{
@Test (timeout = 5000)
public void testStatusChange(){
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
//Start the node
- node.handle(new RMNodeStartedEvent(null, null, null));
+ node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
//Add info to the queue first
node.setNextHeartBeat(false);
@@ -712,7 +724,12 @@ private RMNodeImpl getRunningNode(String nmVersion, int port) {
Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
capability, nmVersion);
- node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
+ mockNodeStatus));
Assert.assertEquals(NodeState.RUNNING, node.getState());
return node;
}
@@ -763,7 +780,13 @@ private RMNodeImpl getRebootedNode() {
Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, capability, null);
- node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
+
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
+ mockNodeStatus));
Assert.assertEquals(NodeState.RUNNING, node.getState());
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING));
Assert.assertEquals(NodeState.REBOOTED, node.getState());
@@ -779,7 +802,12 @@ public void testAdd() {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
- node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
+ mockNodeStatus));
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
@@ -1075,8 +1103,12 @@ public void testDisappearingContainer() {
@Test
public void testForHandlingDuplicatedCompltedContainers() {
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
// Start the node
- node.handle(new RMNodeStartedEvent(null, null, null));
+ node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
// Add info to the queue first
node.setNextHeartBeat(false);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
index 411b8482170..228e6ec5d11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
@@ -20,6 +20,8 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collection;
@@ -37,6 +39,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
@@ -127,8 +131,14 @@ public void testResourceAllocation()
nm1.getNodeId());
RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
nm2.getNodeId());
- node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null));
- node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null));
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
+ node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null,
+ mockNodeStatus));
+ node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null,
+ mockNodeStatus));
// Submit an application
Application application = new Application("user1", resourceManager);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 8d31fe1a8ba..531ca985289 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -21,6 +21,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
@@ -139,13 +140,18 @@ public void testLogAggregationStatus() throws Exception {
Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node1 =
new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null);
- node1.handle(new RMNodeStartedEvent(nodeId1, null, null));
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
+ node1.handle(new RMNodeStartedEvent(nodeId1, null, null, mockNodeStatus));
rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1));
NodeId nodeId2 = NodeId.newInstance("localhost", 2345);
RMNodeImpl node2 =
new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null);
- node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null));
+ node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null,
+ mockNodeStatus));
rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2));
// The initial log aggregation status for these two nodes
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index c3f41f62f6f..02a9c09b567 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -241,8 +241,13 @@ private RMNode getRunningRMNode(String host, int port, int memory) {
}
private void sendStartedEvent(RMNode node) {
+ NodeStatus mockNodeStatus = mock(NodeStatus.class);
+ NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+ when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+ when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
((RMNodeImpl) node)
- .handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
+ .handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
+ mockNodeStatus));
}
private void sendLostEvent(RMNode node) {