diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 7798ba9..fc30a80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -41,6 +41,15 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, List runningApplications, Set nodeLabels) { + return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, + containerStatuses, runningApplications, nodeLabels, null); + } + + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, + int httpPort, Resource resource, String nodeManagerVersionId, + List containerStatuses, + List runningApplications, Set nodeLabels, + Resource physicalResource) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -50,6 +59,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setContainerStatuses(containerStatuses); request.setRunningApplications(runningApplications); request.setNodeLabels(nodeLabels); + request.setPhysicalResource(physicalResource); return request; } @@ -88,4 +98,18 @@ public abstract void setContainerStatuses( */ public abstract void setRunningApplications( List runningApplications); + + /** + * Get the physical resources in the node to properly estimate resource + * utilization. + * @return Physical resources in the node. + */ + public abstract Resource getPhysicalResource(); + + /** + * Set the physical resources in the node to properly estimate resource + * utilization. + * @param physicalResource Physical resources in the node. + */ + public abstract void setPhysicalResource(Resource physicalResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 2a1a268..97d112e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -56,6 +56,9 @@ private List runningApplications = null; private Set labels = null; + /** Physical resources in the node. */ + private Resource physicalResource = null; + public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); } @@ -269,7 +272,29 @@ public synchronized void setContainerStatuses( initContainerRecoveryReports(); this.containerStatuses.addAll(containerReports); } - + + @Override + public synchronized Resource getPhysicalResource() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.physicalResource != null) { + return this.physicalResource; + } + if (!p.hasPhysicalResource()) { + return null; + } + this.physicalResource = convertFromProtoFormat(p.getPhysicalResource()); + return this.physicalResource; + } + + @Override + public synchronized void setPhysicalResource(Resource pPhysicalResource) { + maybeInitBuilder(); + if (pPhysicalResource == null) { + builder.clearPhysicalResource(); + } + this.physicalResource = pPhysicalResource; + } + @Override public int hashCode() { return getProto().hashCode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 3660252..c6d0b2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,6 +58,7 @@ message RegisterNodeManagerRequestProto { repeated NMContainerStatusProto container_statuses = 6; repeated ApplicationIdProto runningApplications = 7; optional NodeLabelsProto nodeLabels = 8; + optional ResourceProto physicalResource = 9; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f692bf1..eef1d07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -113,6 +114,7 @@ private long nextHeartBeatInterval; private ResourceTracker resourceTracker; private Resource totalResource; + private Resource physicalResource; private int httpPort; private String nodeManagerVersionId; private String minimumResourceManagerVersion; @@ -187,6 +189,15 @@ protected void serviceInit(Configuration conf) throws Exception { this.totalResource = Resource.newInstance(memoryMb, virtualCores); metrics.addResource(totalResource); + + // Get actual node physical resources + ResourceCalculatorPlugin rcp = + ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf); + int physicalMemoryMb = (int) rcp.getPhysicalMemorySize() / (1024 * 1024); + int physicalCores = rcp.getNumProcessors(); + this.physicalResource = + Resource.newInstance(physicalMemoryMb, physicalCores); + this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf); this.tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, @@ -343,7 +354,7 @@ protected void registerWithRM() RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels); + nodeLabels, physicalResource); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 9d480f3..f0280a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -314,6 +314,7 @@ public RegisterNodeManagerResponse registerNodeManager( int httpPort = request.getHttpPort(); Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); + Resource physicalResource = request.getPhysicalResource(); RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -383,7 +384,7 @@ public RegisterNodeManagerResponse registerNodeManager( .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability, nodeManagerVersion); + resolve(host), capability, nodeManagerVersion, physicalResource); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 3a9cf54..52402c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -114,6 +114,12 @@ public ResourceUtilization getNodeUtilization(); /** + * the physical resources in the node. + * @return the physical resources in the node. + */ + Resource getPhysicalResource(); + + /** * The rack name for this node manager. * @return the rack name. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index a3a6b30..ae226d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -131,6 +131,9 @@ /* Resource utilization for the node. */ private ResourceUtilization nodeUtilization; + /** Physical resources in the node. */ + private Resource physicalResource; + /* Container Queue Information for the node.. Used by Distributed Scheduler */ private QueuedContainersStatus queuedContainersStatus; @@ -345,7 +348,15 @@ RMNodeEvent> stateMachine; public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, - int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { + int cmPort, int httpPort, Node node, Resource capability, + String nodeManagerVersion) { + this(nodeId, context, hostName, cmPort, httpPort, node, capability, + nodeManagerVersion, null); + } + + public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, + int cmPort, int httpPort, Node node, Resource capability, + String nodeManagerVersion, Resource physResource) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -359,6 +370,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.lastHealthReportTime = System.currentTimeMillis(); this.nodeManagerVersion = nodeManagerVersion; this.timeStamp = 0; + this.physicalResource = physResource; this.latestNodeHeartBeatResponse.setResponseId(0); @@ -519,6 +531,27 @@ public void setNodeUtilization(ResourceUtilization nodeUtilization) { } @Override + public Resource getPhysicalResource() { + this.readLock.lock(); + + try { + return this.physicalResource; + } finally { + this.readLock.unlock(); + } + } + + public void setPhysicalResource(Resource physicalResource) { + this.writeLock.lock(); + + try { + this.physicalResource = physicalResource; + } finally { + this.writeLock.unlock(); + } + } + + @Override public NodeState getState() { this.readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 22aa0ee..d1a4f40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -112,12 +112,13 @@ public static Resource newAvailResource(Resource total, Resource used) { private Set labels; private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; + private Resource physicalResource; public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state, Set labels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, Resource pPhysicalResource) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; this.httpAddress = httpAddress; @@ -131,6 +132,7 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, this.labels = labels; this.containersUtilization = containersUtilization; this.nodeUtilization = nodeUtilization; + this.physicalResource = pPhysicalResource; } @Override @@ -272,6 +274,14 @@ public long getUntrackedTimeStamp() { @Override public void setUntrackedTimeStamp(long timeStamp) { } + + public Resource getPhysicalResource() { + return this.physicalResource; + } + + public void setPhysicalResource(Resource pPhysicalResource) { + this.physicalResource = pPhysicalResource; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, @@ -282,19 +292,19 @@ private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, Set labels) { return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123, - labels, null, null); + labels, null, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port) { return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port, - null, null, null); + null, null, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port, Set labels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, Resource physicalResource) { final String rackName = "rack"+ rack; final int nid = hostnum; final String nodeAddr = hostName + ":" + nid; @@ -307,7 +317,7 @@ private static RMNode buildRMNode(int rack, final Resource perNode, String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, healthReport, 0, nid, hostName, state, labels, - containersUtilization, nodeUtilization); + containersUtilization, nodeUtilization, physicalResource); } public static RMNode nodeInfo(int rack, final Resource perNode,