diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java index 2ae4872..bd5dccf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java @@ -169,4 +169,29 @@ public void subtractFrom(int pmem, int vmem, float cpu) { this.setVirtualMemory(this.getVirtualMemory() - vmem); this.setCPU(this.getCPU() - cpu); } + + /** + * Convert utilization into a resource allocation. It ceils the values to fit + * into container sizes. + * @return New resource for this utilization. + */ + @Public + @Unstable + public Resource toResource() { + long mem = this.getVirtualMemory(); + long vcores = (long)Math.ceil(this.getCPU()); + Resource resource = Resource.newInstance(mem, vcores); + return resource; + } + + /** + * Make sure that utilization is not negative. + */ + @Public + @Unstable + public void preventNegative() { + this.setPhysicalMemory(Math.max(this.getPhysicalMemory(), 0)); + this.setVirtualMemory(Math.max(this.getVirtualMemory(), 0)); + this.setCPU(Math.max(this.getCPU(), 0)); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3f84a23..4e9e766 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1528,6 +1528,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE = false; + /** + * Whether the scheduler should consider the external load in this machine + * when scheduling containers + */ + public static final String NM_SCHEDULE_EXTERNAL_LOAD = + NM_PREFIX + "schedule-external-load"; + public static final boolean DEFAULT_NM_SCHEDULE_EXTERNAL_LOAD = false; // Configurations for applicaiton life time monitor feature public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 7798ba9..59a840e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -41,6 +41,15 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, List runningApplications, Set nodeLabels) { + return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, + containerStatuses, runningApplications, nodeLabels, false); + } + + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, + int httpPort, Resource resource, String nodeManagerVersionId, + List containerStatuses, + List runningApplications, Set nodeLabels, + boolean scheduleExternal) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -50,6 +59,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setContainerStatuses(containerStatuses); request.setRunningApplications(runningApplications); request.setNodeLabels(nodeLabels); + request.setScheduleExternal(scheduleExternal); return request; } @@ -60,7 +70,9 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract List getNMContainerStatuses(); public abstract Set getNodeLabels(); public abstract void setNodeLabels(Set nodeLabels); - + public abstract boolean getScheduleExternal(); + public abstract void setScheduleExternal(boolean scheduleExternal); + /** * We introduce this here because currently YARN RM doesn't persist nodes info * for application running. When RM restart happened, we cannot determinate if diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 2a1a268..28be1a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -329,6 +329,18 @@ private synchronized void initNodeLabels() { } } + @Override + public boolean getScheduleExternal() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasScheduleExternal()) ? p.getScheduleExternal() : false; + } + + @Override + public void setScheduleExternal(boolean scheduleExternal) { + maybeInitBuilder(); + builder.setScheduleExternal(scheduleExternal); + } + private static NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) { return new NodeLabelPBImpl(p); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d485e6b..473966a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,6 +58,7 @@ message RegisterNodeManagerRequestProto { repeated NMContainerStatusProto container_statuses = 6; repeated ApplicationIdProto runningApplications = 7; optional NodeLabelsProto nodeLabels = 8; + optional bool scheduleExternal = 9; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f692bf1..01cd2dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -151,6 +151,8 @@ private NMNodeLabelsHandler nodeLabelsHandler; private final NodeLabelsProvider nodeLabelsProvider; + private boolean scheduleExternal; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { this(context, dispatcher, healthChecker, metrics, null); @@ -222,6 +224,10 @@ protected void serviceInit(Configuration conf) throws Exception { this.logAggregationEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + + this.scheduleExternal = conf.getBoolean( + YarnConfiguration.NM_SCHEDULE_EXTERNAL_LOAD, + YarnConfiguration.DEFAULT_NM_SCHEDULE_EXTERNAL_LOAD); } @Override @@ -343,7 +349,7 @@ protected void registerWithRM() RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels); + nodeLabels, scheduleExternal); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 51fc0bd..302aaee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -318,6 +318,7 @@ public RegisterNodeManagerResponse registerNodeManager( int httpPort = request.getHttpPort(); Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); + boolean scheduleExternal = request.getScheduleExternal(); RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -387,7 +388,7 @@ public RegisterNodeManagerResponse registerNodeManager( .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability, nodeManagerVersion); + resolve(host), capability, nodeManagerVersion, scheduleExternal); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 10e2afa..dcb5461 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -175,10 +175,17 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( long getUntrackedTimeStamp(); void setUntrackedTimeStamp(long timeStamp); - /* + + /** * Optional decommissioning timeout in second * (null indicates default timeout). * @return the decommissioning timeout in second. */ Integer getDecommissioningTimeout(); + + /** + * If the node must consider external utilization when scheduling. + * @return Consider external utilization when scheduling. + */ + boolean isScheduleExternal(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 375b4cf..331e3ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -173,6 +173,9 @@ private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); + /** If the node requires scheduling its external load. */ + private boolean scheduleExternal; + private static final StateMachineFactory stateMachine; public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, - int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { + int cmPort, int httpPort, Node node, Resource capability, + String nodeManagerVersion) { + this(nodeId, context, hostName, cmPort, httpPort, node, capability, + nodeManagerVersion, false); + } + + public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, + int cmPort, int httpPort, Node node, Resource capability, + String nodeManagerVersion, boolean scheduleExternal) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -362,6 +373,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.healthReport = "Healthy"; this.lastHealthReportTime = System.currentTimeMillis(); this.nodeManagerVersion = nodeManagerVersion; + this.scheduleExternal = scheduleExternal; this.timeStamp = 0; this.latestNodeHeartBeatResponse.setResponseId(0); @@ -1495,4 +1507,9 @@ public void setUntrackedTimeStamp(long ts) { public Integer getDecommissioningTimeout() { return decommissioningTimeout; } + + @Override + public boolean isScheduleExternal() { + return this.scheduleExternal; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 2efdbd0..6e6f8da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -53,7 +53,6 @@ private static final Log LOG = LogFactory.getLog(SchedulerNode.class); - private Resource unallocatedResource = Resource.newInstance(0, 0); private Resource allocatedResource = Resource.newInstance(0, 0); private Resource totalResource; private RMContainer reservedContainer; @@ -75,7 +74,6 @@ public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; - this.unallocatedResource = Resources.clone(node.getTotalCapability()); this.totalResource = Resources.clone(node.getTotalCapability()); if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); @@ -99,8 +97,6 @@ public RMNode getRMNode() { */ public synchronized void updateTotalResource(Resource resource){ this.totalResource = resource; - this.unallocatedResource = Resources.subtract(totalResource, - this.allocatedResource); } /** @@ -210,7 +206,45 @@ public synchronized void decreaseContainer(ContainerId containerId, * @return Unallocated resources on the node */ public synchronized Resource getUnallocatedResource() { - return this.unallocatedResource; + Resource unallocatedResource = Resources.subtract( + this.getTotalResource(), this.getAllocatedResource()); + + if (isScheduleExternal()) { + ResourceUtilization externalUtilization = getExternalUtilization(); + if (externalUtilization != null) { + Resource externalResource = externalUtilization.toResource(); + Resources.subtractFrom(unallocatedResource, externalResource); + } + } + + return unallocatedResource; + } + + /** + * Estimate the external utilization in the node. This should be the total + * node utilization minus the containers. + * @return Resource utilization in the node or null if not available. + */ + public ResourceUtilization getExternalUtilization() { + ResourceUtilization externalUtilization = null; + + ResourceUtilization nodeUtilization = rmNode.getNodeUtilization(); + ResourceUtilization containersUtilization = + rmNode.getAggregatedContainersUtilization(); + + if (nodeUtilization != null && containersUtilization != null) { + externalUtilization = ResourceUtilization.newInstance(nodeUtilization); + externalUtilization.subtractFrom( + containersUtilization.getPhysicalMemory(), + containersUtilization.getVirtualMemory(), + containersUtilization.getCPU()); + + // Prevent negative values in utilization as node and container + // utilization might be out of sync + externalUtilization.preventNegative(); + } + + return externalUtilization; } /** @@ -285,7 +319,6 @@ private synchronized void addUnallocatedResource(Resource resource) { + rmNode.getNodeAddress()); return; } - Resources.addTo(unallocatedResource, resource); Resources.subtractFrom(allocatedResource, resource); } @@ -300,7 +333,6 @@ private synchronized void deductUnallocatedResource(Resource resource) { + rmNode.getNodeAddress()); return; } - Resources.subtractFrom(unallocatedResource, resource); Resources.addTo(allocatedResource, resource); } @@ -432,4 +464,11 @@ public void setNodeUtilization(ResourceUtilization nodeUtilization) { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + /** + * Check if the node schedules external utilization. + */ + public boolean isScheduleExternal() { + return rmNode.isScheduleExternal(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 5a89e54..e4456fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -277,6 +277,11 @@ public void setUntrackedTimeStamp(long timeStamp) { public Integer getDecommissioningTimeout() { return null; } + + @Override + public boolean isScheduleExternal() { + return false; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java index a941302..cb4a5df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java @@ -24,10 +24,17 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -37,12 +44,17 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.junit.Before; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; public class TestMiniYarnClusterNodeUtilization { @@ -50,36 +62,57 @@ private static final int NUM_RM = 1; private static final int NUM_NM = 1; + // Size of the container to allocate + private static final int CONTAINER_SIZE_MEM = 4*1024; + private static final int CONTAINER_SIZE_CPU = 4; + // Values for the first round private static final int CONTAINER_PMEM_1 = 1024; - private static final int CONTAINER_VMEM_1 = 2048; - private static final float CONTAINER_CPU_1 = 11.0f; + private static final int CONTAINER_VMEM_1 = 2*1024; + private static final float CONTAINER_CPU_1 = 0.2f; - private static final int NODE_PMEM_1 = 10240; - private static final int NODE_VMEM_1 = 20480; - private static final float NODE_CPU_1 = 51.0f; + private static final int NODE_PMEM_1 = 10*1024; + private static final int NODE_VMEM_1 = 20*1024; + private static final float NODE_CPU_1 = 2.0f; // Values for the second round - private static final int CONTAINER_PMEM_2 = 2048; - private static final int CONTAINER_VMEM_2 = 4096; - private static final float CONTAINER_CPU_2 = 22.0f; + private static final int CONTAINER_PMEM_2 = 2*1024; + private static final int CONTAINER_VMEM_2 = 4*1024; + private static final float CONTAINER_CPU_2 = 1.5f; + + private static final int NODE_PMEM_2 = 20*1024; + private static final int NODE_VMEM_2 = 40*1024; + private static final float NODE_CPU_2 = 1.8f; - private static final int NODE_PMEM_2 = 20480; - private static final int NODE_VMEM_2 = 40960; - private static final float NODE_CPU_2 = 61.0f; + // Values for the third round + private static final int CONTAINER_PMEM_3 = 3*1024; + private static final int CONTAINER_VMEM_3 = 6*1024; + private static final float CONTAINER_CPU_3 = 2.5f; - private MiniYARNCluster cluster; - private CustomNodeManager nm; + private static final int NODE_PMEM_3 = 21*1024; + private static final int NODE_VMEM_3 = 42*1024; + private static final float NODE_CPU_3 = 3.8f; - private Configuration conf; + private static Configuration conf; - private NodeStatus nodeStatus; + private static MiniYARNCluster cluster; + private static CustomNodeManager nm; + private static RMContext rmContext; - @Before - public void setup() { + private static NodeStatus nodeStatus; + + @BeforeClass + public static void setup() throws YarnException, InterruptedException { conf = new YarnConfiguration(); conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + + // Node Manager with 16 cores and 64GB + conf.setInt(YarnConfiguration.NM_VCORES, 16); + conf.setInt(YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, 64*1024); + // Consider the load of the machine when scheduling + conf.setBoolean(YarnConfiguration.NM_SCHEDULE_EXTERNAL_LOAD, true); + String name = TestMiniYarnClusterNodeUtilization.class.getName(); cluster = new MiniYARNCluster(name, NUM_RM, NUM_NM, 1, 1); cluster.init(conf); @@ -92,6 +125,40 @@ public void setup() { CONTAINER_PMEM_1, CONTAINER_VMEM_1, CONTAINER_CPU_1, NODE_PMEM_1, NODE_VMEM_1, NODE_CPU_1); nm.setNodeStatus(nodeStatus); + + assertTrue("NMs fail to connect to the RM", + cluster.waitForNodeManagersToConnect(10000)); + + // Get the Resource Manager + ResourceManager rm = cluster.getResourceManager(0); + rmContext = rm.getRMContext(); + + // Allocate the testing container + NMContext nmContext = (NMContext)nm.getNMContext(); + NodeId nodeId = nmContext.getNodeId(); + SchedulerNode schedulerNode = + rmContext.getScheduler().getSchedulerNode(nodeId); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(123456, 1), 1); + ContainerId containerId = ContainerId.newContainerId(attemptId, 1); + Resource resource = Resource.newInstance( + CONTAINER_SIZE_MEM, CONTAINER_SIZE_CPU); + Container container = Container.newInstance( + containerId, null, "", resource , null, null); + RMContainer rmContainer = new RMContainerImpl( + container, attemptId, nodeId, "", rmContext); + + NodeStatusUpdater updater = nm.getNodeStatusUpdater(); + updater.sendOutofBandHeartBeat(); + + waitHeartbeat(); + + schedulerNode.allocateContainer(rmContainer); + } + + @AfterClass + public static void tearDown() throws IOException { + cluster.close(); } /** @@ -102,8 +169,6 @@ public void setup() { @Test(timeout=60000) public void testUpdateNodeUtilization() throws InterruptedException, IOException, YarnException { - assertTrue("NMs fail to connect to the RM", - cluster.waitForNodeManagersToConnect(10000)); // Simulate heartbeat using NodeStatus fixture NodeHeartbeatRequest request = @@ -135,8 +200,6 @@ public void testUpdateNodeUtilization() @Test(timeout=60000) public void testMockNodeStatusHeartbeat() throws InterruptedException, YarnException { - assertTrue("NMs fail to connect to the RM", - cluster.waitForNodeManagersToConnect(10000)); NodeStatusUpdater updater = nm.getNodeStatusUpdater(); updater.sendOutofBandHeartBeat(); @@ -156,6 +219,51 @@ public void testMockNodeStatusHeartbeat() } /** + * Check the unallocated resources reported when scheduling external load in + * the node. + */ + @Test(timeout=60000) + public void testScheduleExternal() + throws InterruptedException, YarnException { + + // Alter utilization + int responseId = 30; + nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId, + CONTAINER_PMEM_3, CONTAINER_VMEM_3, CONTAINER_CPU_3, + NODE_PMEM_3, NODE_VMEM_3, NODE_CPU_3); + nm.setNodeStatus(nodeStatus); + NodeStatusUpdater updater = nm.getNodeStatusUpdater(); + updater.sendOutofBandHeartBeat(); + + waitHeartbeat(); + + // Check the unallocated resources considering external load at scheduling + NMContext nmContext = (NMContext)nm.getNMContext(); + NodeId nodeId = nmContext.getNodeId(); + SchedulerNode schedulerNode = + rmContext.getScheduler().getSchedulerNode(nodeId); + + // The external utilization of the node should be nodes - containers + ResourceUtilization externalUtilization = + schedulerNode.getExternalUtilization(); + assertEquals(NODE_CPU_3-CONTAINER_CPU_3, + externalUtilization.getCPU(), 0); + assertEquals(NODE_PMEM_3-CONTAINER_PMEM_3, + externalUtilization.getPhysicalMemory()); + + // TODO + // We should be able to schedule 10 cores and 42GB + // Node: 16 cores, 64 GB + // Allocated: 4 cores, 4 GB + // External: 2 cores, 18GB + /* + Resource unallocatedResource = schedulerNode.getUnallocatedResource(); + assertEquals(10, unallocatedResource.getVirtualCores()); + assertEquals(42*1024, unallocatedResource.getMemorySize()); + */ + } + + /** * Create a NodeStatus test vector. * @param nodeId Node identifier. * @param responseId Response identifier. @@ -166,7 +274,7 @@ public void testMockNodeStatusHeartbeat() * @param nodeVMem Virtual memory of the node. * @param nodeCPU CPU percentage of the node. */ - private NodeStatus createNodeStatus( + private static NodeStatus createNodeStatus( NodeId nodeId, int responseId, int containerPMem, @@ -182,10 +290,16 @@ private NodeStatus createNodeStatus( containerCPU); ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(nodePMem, nodeVMem, nodeCPU); + + List containers = new ArrayList(); + containers.add(ContainerStatus.newInstance( + BuilderUtils.newContainerId(1, 1, 1, 1), + ContainerState.RUNNING, "", 0)); + NodeStatus status = NodeStatus.newInstance( nodeId, responseId, - new ArrayList(), + containers, null, NodeHealthStatus.newInstance(true, null, 0), containersUtilization, @@ -200,9 +314,6 @@ private NodeStatus createNodeStatus( * fixture utilization data. */ private void verifySimulatedUtilization() throws InterruptedException { - ResourceManager rm = cluster.getResourceManager(0); - RMContext rmContext = rm.getRMContext(); - ResourceUtilization containersUtilization = nodeStatus.getContainersUtilization(); ResourceUtilization nodeUtilization = @@ -210,16 +321,7 @@ private void verifySimulatedUtilization() throws InterruptedException { // Give the heartbeat time to propagate to the RM (max 10 seconds) // We check if the nodeUtilization is up to date - for (int i=0; i<100; i++) { - for (RMNode ni : rmContext.getRMNodes().values()) { - if (ni.getNodeUtilization() != null) { - if (ni.getNodeUtilization().equals(nodeUtilization)) { - break; - } - } - } - Thread.sleep(100); - } + waitHeartbeat(); // Verify the data is readable from the RM and scheduler nodes for (RMNode ni : rmContext.getRMNodes().values()) { @@ -242,4 +344,24 @@ private void verifySimulatedUtilization() throws InterruptedException { nodeUtilization, nu); } } + + /** + * Wait for the NM heartbeat to propagate to the scheduler. + * @param rmContext Context of the Resource Manager. + * @param nodeUtilization Expected node utilization. + * @throws InterruptedException + */ + private static void waitHeartbeat() throws InterruptedException { + ResourceUtilization nodeUtilization = nodeStatus.getNodeUtilization(); + for (int i=0; i<100; i++) { + for (RMNode ni : rmContext.getRMNodes().values()) { + if (ni.getNodeUtilization() != null) { + if (ni.getNodeUtilization().equals(nodeUtilization)) { + break; + } + } + } + Thread.sleep(100); + } + } }