diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 85d5a580362..54e8888f0d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2013,6 +2013,13 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "health-checker.interval-ms"; public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000; + /** Whether or not to run the node health script before the NM + * starts up.*/ + public static final String NM_HEALTH_CHECK_RUN_BEFORE_STARTUP = + NM_PREFIX + "health-checker.run-before-startup"; + public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP = + false; + /** Health check time out period for all scripts.*/ public static final String NM_HEALTH_CHECK_TIMEOUT_MS = NM_PREFIX + "health-checker.timeout-ms"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index f09186ecf4c..2f97a7cce7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1668,6 +1668,13 @@ 1200000 + + Whether or not to run the node health script + before the NM starts up. + yarn.nodemanager.health-checker.run-before-startup + false + + Frequency of running node health scripts. yarn.nodemanager.health-checker.interval-ms diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index acec16fd56b..54b39155c63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; public abstract class RegisterNodeManagerRequest { @@ -53,14 +54,15 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, Resource physicalResource) { return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, containerStatuses, runningApplications, nodeLabels, physicalResource, - null); + null, null); } public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, List runningApplications, Set nodeLabels, - Resource physicalResource, Set nodeAttributes) { + Resource physicalResource, Set nodeAttributes, + NodeStatus nodeStatus) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -72,6 +74,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNodeLabels(nodeLabels); request.setPhysicalResource(physicalResource); request.setNodeAttributes(nodeAttributes); + request.setNodeStatus(nodeStatus); return request; } @@ -133,4 +136,16 @@ public abstract void setLogAggregationReportsForApps( public abstract Set getNodeAttributes(); public abstract void setNodeAttributes(Set nodeAttributes); + + /** + * Get the status of the node. + * @return The status of the node. + */ + public abstract NodeStatus getNodeStatus(); + + /** + * Set the status of the node. + * @param nodeStatus The status of the node. + */ + public abstract void setNodeStatus(NodeStatus nodeStatus); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 317f8abd6f1..d91cff2531a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; @@ -51,7 +52,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; - +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; + public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest { RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); RegisterNodeManagerRequestProto.Builder builder = null; @@ -68,6 +71,7 @@ /** Physical resources in the node. */ private Resource physicalResource = null; + private NodeStatus nodeStatus; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -121,6 +125,9 @@ private synchronized void mergeLocalToBuilder() { if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } + if (this.nodeStatus != null) { + builder.setNodeStatus(convertToProtoFormat(this.nodeStatus)); + } } private void addLogAggregationStatusForAppsToProto() { @@ -359,6 +366,28 @@ public synchronized void setPhysicalResource(Resource pPhysicalResource) { this.physicalResource = pPhysicalResource; } + @Override + public synchronized NodeStatus getNodeStatus() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.nodeStatus != null) { + return this.nodeStatus; + } + if (!p.hasNodeStatus()) { + return null; + } + this.nodeStatus = convertFromProtoFormat(p.getNodeStatus()); + return this.nodeStatus; + } + + @Override + public synchronized void setNodeStatus(NodeStatus pNodeStatus) { + maybeInitBuilder(); + if (pNodeStatus == null) { + builder.clearNodeStatus(); + } + this.nodeStatus = pNodeStatus; + } + @Override public int hashCode() { return getProto().hashCode(); @@ -533,4 +562,12 @@ public synchronized void setLogAggregationReportsForApps( } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto s) { + return new NodeStatusPBImpl(s); + } + + private NodeStatusProto convertToProtoFormat(NodeStatus s) { + return ((NodeStatusPBImpl)s).getProto(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index ff7153eca8e..c643179888e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -74,6 +74,7 @@ message RegisterNodeManagerRequestProto { optional ResourceProto physicalResource = 9; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; optional NodeAttributesProto nodeAttributes = 11; + optional NodeStatusProto nodeStatus = 12; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5e3693ae9c1..0725d423096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -392,10 +392,11 @@ protected void registerWithRM() // during RM recovery synchronized (this.context) { List containerReports = getNMContainerStatuses(); + NodeStatus nodeStatus = getNodeStatus(0); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels, physicalResource, nodeAttributes); + nodeLabels, physicalResource, nodeAttributes, nodeStatus); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java index 1c9bd82bd46..af92b15e9c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/NodeHealthScriptRunner.java @@ -60,8 +60,9 @@ "Node health script timed out"; private NodeHealthScriptRunner(String scriptName, long checkInterval, - long timeout, String[] scriptArgs) { - super(NodeHealthScriptRunner.class.getName(), checkInterval); + long timeout, String[] scriptArgs, boolean runBeforeStartup) { + super(NodeHealthScriptRunner.class.getName(), checkInterval, + runBeforeStartup); this.nodeHealthScript = scriptName; this.scriptTimeout = timeout; setTimerTask(new NodeHealthMonitorExecutor(scriptArgs)); @@ -91,6 +92,10 @@ public static NodeHealthScriptRunner newInstance(String scriptName, "interval-ms can not be set to a negative number."); } + boolean runBeforeStartup = conf.getBoolean( + YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP, + YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP); + // Determine time out String scriptTimeoutConfig = String.format( YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE, @@ -113,7 +118,7 @@ public static NodeHealthScriptRunner newInstance(String scriptName, String[] scriptArgs = conf.getStrings(scriptArgsConfig, new String[]{}); return new NodeHealthScriptRunner(nodeHealthScript, - checkIntervalMs, scriptTimeout, scriptArgs); + checkIntervalMs, scriptTimeout, scriptArgs, runBeforeStartup); } private enum HealthCheckerExitStatus { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java index a0c4d8b8ebd..e9ad5b00ae4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/health/TimedHealthReporterService.java @@ -45,13 +45,16 @@ private Timer timer; private TimerTask task; private long intervalMs; + private boolean runBeforeStartup; - TimedHealthReporterService(String name, long intervalMs) { + TimedHealthReporterService(String name, long intervalMs, + boolean runBeforeStartup) { super(name); this.isHealthy = true; this.healthReport = ""; this.lastReportedTime = System.currentTimeMillis(); this.intervalMs = intervalMs; + this.runBeforeStartup = runBeforeStartup; } @VisibleForTesting @@ -73,7 +76,13 @@ public void serviceStart() throws Exception { throw new Exception("Health reporting task hasn't been set!"); } timer = new Timer("HealthReporterService-Timer", true); - timer.scheduleAtFixedRate(task, 0, intervalMs); + long delay = 0; + if (runBeforeStartup) { + delay = intervalMs; + task.run(); + } + + timer.scheduleAtFixedRate(task, delay, intervalMs); super.serviceStart(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 2c89ddd9e9b..7d6feea6f19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -335,6 +335,7 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); Resource physicalResource = request.getPhysicalResource(); + NodeStatus nodeStatus = request.getNodeStatus(); RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -426,7 +427,7 @@ public RegisterNodeManagerResponse registerNodeManager( if (oldNode == null) { RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), - request.getRunningApplications()); + request.getRunningApplications(), nodeStatus); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { if (LOG.isDebugEnabled()) { @@ -462,7 +463,7 @@ public RegisterNodeManagerResponse registerNodeManager( this.rmContext.getRMNodes().put(nodeId, rmNode); this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeStartedEvent(nodeId, null, null)); + .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus)); } else { // Reset heartbeat ID since node just restarted. oldNode.resetLastNodeHeartBeatResponse(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index a565fe75656..d9a748b3d0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.collections.keyvalue.DefaultMapEntry; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -208,7 +209,8 @@ RMNodeEventType, RMNodeEvent>(NodeState.NEW) //Transitions from NEW state - .addTransition(NodeState.NEW, NodeState.RUNNING, + .addTransition(NodeState.NEW, + EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STARTED, new AddNodeTransition()) .addTransition(NodeState.NEW, NodeState.NEW, RMNodeEventType.RESOURCE_UPDATE, @@ -850,10 +852,10 @@ private static NodeHealthStatus updateRMNodeFromStatusEvents( } public static class AddNodeTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; List containers = null; @@ -889,17 +891,40 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } - rmNode.context.getDispatcher().getEventHandler() - .handle(new NodeAddedSchedulerEvent(rmNode, containers)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); + NodeStatus nodeStatus = + startEvent.getNodeStatus(); + RMNodeStatusEvent rmNodeStatusEvent = + new RMNodeStatusEvent(nodeId, nodeStatus); + + NodeHealthStatus nodeHealthStatus = + updateRMNodeFromStatusEvents(rmNode, rmNodeStatusEvent); + + NodeState nodeState = null; + if (nodeHealthStatus.getIsNodeHealthy()) { + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(rmNode, containers)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); + nodeState = NodeState.RUNNING; + } else { + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_UNUSABLE, rmNode)); + //Update the metrics + rmNode.updateMetricsForDeactivatedNode(NodeState.RUNNING, + NodeState.UNHEALTHY); + nodeState = NodeState.UNHEALTHY; + } + List logAggregationReportsForApps = startEvent.getLogAggregationReportsForApps(); if (logAggregationReportsForApps != null && !logAggregationReportsForApps.isEmpty()) { rmNode.handleLogAggregationStatus(logAggregationReportsForApps); } + + return nodeState; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java index 397699453fb..2bf04d0fe76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -24,19 +24,23 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; public class RMNodeStartedEvent extends RMNodeEvent { + private final NodeStatus nodeStatus; private List containerStatuses; private List runningApplications; private List logAggregationReportsForApps; public RMNodeStartedEvent(NodeId nodeId, List containerReports, - List runningApplications) { + List runningApplications, + NodeStatus nodeStatus) { super(nodeId, RMNodeEventType.STARTED); this.containerStatuses = containerReports; this.runningApplications = runningApplications; + this.nodeStatus = nodeStatus; } public List getNMContainerStatuses() { @@ -47,6 +51,10 @@ public RMNodeStartedEvent(NodeId nodeId, return runningApplications; } + public NodeStatus getNodeStatus() { + return nodeStatus; + } + public List getLogAggregationReportsForApps() { return this.logAggregationReportsForApps; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 3543bc4707e..fe196f887fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -187,6 +187,22 @@ public RegisterNodeManagerResponse registerNode( req.setNodeLabels(nodeLabels); } + NodeStatus status = Records.newRecord(NodeStatus.class); + status.setResponseId(0); + status.setNodeId(nodeId); + ArrayList completedContainers = new ArrayList(); + status.setContainersStatuses( + new ArrayList(containerStats.values())); + for (ContainerId cid : completedContainers) { + containerStats.remove(cid); + } + NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); + healthStatus.setHealthReport(""); + healthStatus.setIsNodeHealthy(true); + healthStatus.setLastHealthReportTime(1); + status.setNodeHealthStatus(healthStatus); + req.setNodeStatus(status); + RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index b3888c3cd66..495a695abfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.token.Token; @@ -54,6 +57,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; @@ -543,7 +548,12 @@ public MockNM registerNode(String nodeIdStr, Resource nodeCapability) public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); + node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null, + mockNodeStatus)); drainEventsImplicitly(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 1f1e164cf5b..42732468305 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -216,8 +216,12 @@ private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() { @Test (timeout = 5000) public void testExpiredContainer() { + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); // Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -280,12 +284,16 @@ public void testRecommissionNode() { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); //Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeStartedEvent(null, null, null)); + node2.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); ApplicationId app1 = BuilderUtils.newApplicationId(1, 1); @@ -341,8 +349,12 @@ public void testContainerUpdate() throws InterruptedException{ @Test (timeout = 5000) public void testStatusChange(){ + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); //Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); //Add info to the queue first node.setNextHeartBeat(false); @@ -712,7 +724,12 @@ private RMNodeImpl getRunningNode(String nmVersion, int port) { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, capability, nmVersion); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -763,7 +780,13 @@ private RMNodeImpl getRebootedNode() { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, capability, null); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); + + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals(NodeState.RUNNING, node.getState()); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING)); Assert.assertEquals(NodeState.REBOOTED, node.getState()); @@ -779,7 +802,12 @@ public void testAdd() { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", @@ -1075,8 +1103,12 @@ public void testDisappearingContainer() { @Test public void testForHandlingDuplicatedCompltedContainers() { + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); // Start the node - node.handle(new RMNodeStartedEvent(null, null, null)); + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); // Add info to the queue first node.setNextHeartBeat(false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index 411b8482170..228e6ec5d11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Collection; @@ -37,6 +39,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; @@ -127,8 +131,14 @@ public void testResourceAllocation() nm1.getNodeId()); RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( nm2.getNodeId()); - node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null)); - node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null)); + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); + node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null, + mockNodeStatus)); + node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null, + mockNodeStatus)); // Submit an application Application application = new Application("user1", resourceManager); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 8d31fe1a8ba..531ca985289 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; @@ -139,13 +140,18 @@ public void testLogAggregationStatus() throws Exception { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node1 = new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null); - node1.handle(new RMNodeStartedEvent(nodeId1, null, null)); + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); + node1.handle(new RMNodeStartedEvent(nodeId1, null, null, mockNodeStatus)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1)); NodeId nodeId2 = NodeId.newInstance("localhost", 2345); RMNodeImpl node2 = new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null); - node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null)); + node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null, + mockNodeStatus)); rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2)); // The initial log aggregation status for these two nodes diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index c3f41f62f6f..02a9c09b567 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -241,8 +241,13 @@ private RMNode getRunningRMNode(String host, int port, int memory) { } private void sendStartedEvent(RMNode node) { + NodeStatus mockNodeStatus = mock(NodeStatus.class); + NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class); + when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus); + when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true); ((RMNodeImpl) node) - .handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); + .handle(new RMNodeStartedEvent(node.getNodeID(), null, null, + mockNodeStatus)); } private void sendLostEvent(RMNode node) {