diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index e889de0..5ec240c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -99,7 +99,7 @@ new org.apache.hadoop.yarn.api.records.ResourceRequest .ResourceRequestComparator()); - final private Map nodes = new HashMap<>(); + final private Map nodes = new HashMap<>(); Resource used = recordFactory.newRecordInstance(Resource.class); @@ -208,11 +208,11 @@ public synchronized void addResourceRequestSpec( } public synchronized void addNodeManager(String host, - int containerManagerPort, NodeManager nodeManager) { + int containerManagerPort, MockNM nodeManager) { nodes.put(host + ":" + containerManagerPort, nodeManager); } - private synchronized NodeManager getNodeManager(String host) { + private synchronized MockNM getNodeManager(String host) { return nodes.get(host); } @@ -269,8 +269,8 @@ public synchronized void finishTask(Task task) throws IOException, "Finishing unknown task " + task.getTaskId() + " from application " + applicationId); } - - NodeManager nodeManager = task.getNodeManager(); + + MockNM nodeManager = task.getNodeManager(); ContainerId containerId = task.getContainerId(); task.stop(); List containerIds = new ArrayList(); @@ -283,7 +283,7 @@ public synchronized void finishTask(Task task) throws IOException, LOG.info("Finished task " + task.getTaskId() + " of application " + applicationId + - " on node " + nodeManager.getHostName() + + " on node " + nodeManager.getContainerManagerAddress() + ", currently using " + used + " resources"); } @@ -392,7 +392,7 @@ private synchronized void assign(SchedulerRequestKey schedulerKey, t.hasNext();) { Task task = t.next(); if (task.getState() == State.PENDING && task.canSchedule(type, host)) { - NodeManager nodeManager = getNodeManager(host); + MockNM nodeManager = getNodeManager(host); task.start(nodeManager, container.getId()); i.remove(); @@ -403,7 +403,7 @@ private synchronized void assign(SchedulerRequestKey schedulerKey, LOG.info("Assigned container (" + container + ") of type " + type + " to task " + task.getTaskId() + " at priority " + schedulerKey.getPriority() + - " on node " + nodeManager.getHostName() + + " on node " + nodeManager.getContainerManagerAddress() + ", currently using " + used + " resources"); // Update resource requests diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 4a8ff00..c25106c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -18,23 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.*; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -47,14 +44,29 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; +import org.apache.hadoop.yarn.util.resource.Resources; import org.eclipse.jetty.util.log.Log; +import org.junit.Assert; -public class MockNM { +public class MockNM implements ContainerManagementProtocol { + private static final org.apache.commons.logging.Log LOG = + LogFactory.getLog(MockNM.class); + private static final RecordFactory RECORDFACTORY = + RecordFactoryProvider.getRecordFactory(null); - private int responseId; + private String rackName; + private Resource capability; + private ResourceManager resourceManager; + private Resource available = RECORDFACTORY.newRecordInstance(Resource.class); + private Resource used = RECORDFACTORY.newRecordInstance(Resource.class); + + private final Map> containers = + new HashMap>(); + private final Map containerStatusMap = + new HashMap(); + + private int responseId = 0; private NodeId nodeId; - private long memory; - private int vCores; private ResourceTrackerService resourceTracker; private int httpPort = 2; private MasterKey currentContainerTokenMasterKey; @@ -65,7 +77,8 @@ private Map registeringCollectors = new ConcurrentHashMap<>(); - public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { + public MockNM(String nodeIdStr, int memory, + ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory this(nodeIdStr, memory, Math.max(1, (memory * YarnConfiguration.DEFAULT_NM_VCORES) / @@ -75,19 +88,40 @@ public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTrack public MockNM(String nodeIdStr, int memory, int vcores, ResourceTrackerService resourceTracker) { - this(nodeIdStr, memory, vcores, resourceTracker, YarnVersionInfo.getVersion()); + this(nodeIdStr, memory, vcores, resourceTracker, + YarnVersionInfo.getVersion()); } public MockNM(String nodeIdStr, int memory, int vcores, ResourceTrackerService resourceTracker, String version) { - this.memory = memory; - this.vCores = vcores; + this.capability = BuilderUtils.newResource(memory, vcores); this.resourceTracker = resourceTracker; this.version = version; String[] splits = nodeIdStr.split(":"); nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1])); } + public MockNM(String hostName, int containerManagerPort, int httpPort, + String rackName, Resource capability, ResourceManager resourceManager) + throws IOException, YarnException { + this.capability = capability; + this.resourceTracker = resourceManager.getResourceTrackerService(); + this.rackName = rackName; + this.httpPort = httpPort; + this.version = YarnVersionInfo.getVersion(); + Resources.addTo(available, capability); + this.nodeId = NodeId.newInstance(hostName, containerManagerPort); + RegisterNodeManagerRequest request = RECORDFACTORY + .newRecordInstance(RegisterNodeManagerRequest.class); + request.setHttpPort(httpPort); + request.setResource(capability); + request.setNodeId(this.nodeId); + request.setNMVersion(YarnVersionInfo.getVersion()); + resourceManager.getResourceTrackerService().registerNodeManager(request); + this.resourceManager = resourceManager; + resourceManager.getResourceScheduler().getNodeReport(this.nodeId); + } + public NodeId getNodeId() { return nodeId; } @@ -100,22 +134,25 @@ public void setHttpPort(int port) { httpPort = port; } - public void setResourceTrackerService(ResourceTrackerService resourceTracker) { - this.resourceTracker = resourceTracker; + public void setResourceTrackerService( + ResourceTrackerService resourceTrackerService) { + this.resourceTracker = resourceTrackerService; } - public void containerStatus(ContainerStatus containerStatus) throws Exception { + public void containerStatus(ContainerStatus containerStatus) + throws Exception { Map> conts = new HashMap>(); - conts.put(containerStatus.getContainerId().getApplicationAttemptId().getApplicationId(), - Arrays.asList(new ContainerStatus[] { containerStatus })); + conts.put(containerStatus.getContainerId().getApplicationAttemptId() + .getApplicationId(), Arrays.asList( + new ContainerStatus[] {containerStatus})); nodeHeartbeat(conts, true); } public void containerIncreaseStatus(Container container) throws Exception { ContainerStatus containerStatus = BuilderUtils.newContainerStatus( container.getId(), ContainerState.RUNNING, "Success", 0, - container.getResource()); + container.getResource()); List increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, true, ++responseId); @@ -146,8 +183,7 @@ public RegisterNodeManagerResponse registerNode( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); req.setHttpPort(httpPort); - Resource resource = BuilderUtils.newResource(memory, vCores); - req.setResource(resource); + req.setResource(capability); req.setContainerStatuses(containerReports); req.setNMVersion(version); req.setRunningApplications(runningApplications); @@ -158,8 +194,8 @@ public RegisterNodeManagerResponse registerNode( this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey(); Resource newResource = registrationResponse.getResource(); if (newResource != null) { - memory = (int) newResource.getMemorySize(); - vCores = newResource.getVirtualCores(); + capability.setMemorySize(newResource.getMemorySize()); + capability.setVirtualCores(newResource.getVirtualCores()); } containerStats.clear(); if (containerReports != null) { @@ -175,7 +211,30 @@ public RegisterNodeManagerResponse registerNode( return registrationResponse; } - public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { + private List getContainerStatuses(Map> containerList) { + List containerStatuses = new ArrayList(); + for (List appContainers : containerList.values()) { + for (Container container : appContainers) { + containerStatuses.add(containerStatusMap.get(container)); + } + } + return containerStatuses; + } + public void heartbeat() throws IOException, YarnException { + NodeStatus nodeStatus = + org.apache.hadoop.yarn.server.resourcemanager.MockNM.createNodeStatus( + nodeId, getContainerStatuses(containers)); + nodeStatus.setResponseId(responseId); + NodeHeartbeatRequest request = RECORDFACTORY + .newRecordInstance(NodeHeartbeatRequest.class); + request.setNodeStatus(nodeStatus); + NodeHeartbeatResponse response = resourceTracker.nodeHeartbeat(request); + responseId = response.getResponseId(); + } + + public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) + throws Exception { return nodeHeartbeat(Collections.emptyList(), Collections.emptyList(), isHealthy, ++responseId); } @@ -184,7 +243,7 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, long containerId, ContainerState containerState) throws Exception { ContainerStatus containerStatus = BuilderUtils.newContainerStatus( BuilderUtils.newContainerId(attemptId, containerId), containerState, - "Success", 0, BuilderUtils.newResource(memory, vCores)); + "Success", 0, capability); ArrayList containerStatusList = new ArrayList(1); containerStatusList.add(containerStatus); @@ -199,7 +258,8 @@ public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy, int resId) throws Exception { + List> conts, boolean isHealthy, int resId) + throws Exception { ArrayList updatedStats = new ArrayList(); for (List stats : conts.values()) { updatedStats.addAll(stats); @@ -264,22 +324,252 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, Resource newResource = heartbeatResponse.getResource(); if (newResource != null) { - memory = newResource.getMemorySize(); - vCores = newResource.getVirtualCores(); + capability.setMemorySize(newResource.getMemorySize()); + capability.setVirtualCores(newResource.getVirtualCores()); } return heartbeatResponse; } + @Override + synchronized public StartContainersResponse startContainers( + StartContainersRequest requests) throws YarnException { + + for (StartContainerRequest request : requests + .getStartContainerRequests()) { + Token containerToken = request.getContainerToken(); + ContainerTokenIdentifier tokenId = null; + + try { + tokenId = BuilderUtils.newContainerTokenIdentifier(containerToken); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } + + ContainerId containerID = tokenId.getContainerID(); + ApplicationId applicationId = + containerID.getApplicationAttemptId().getApplicationId(); + + List applicationContainers = containers.get(applicationId); + if (applicationContainers == null) { + applicationContainers = new ArrayList(); + containers.put(applicationId, applicationContainers); + } + + // Sanity check + for (Container container : applicationContainers) { + if (container.getId().compareTo(containerID) == 0) { + throw new IllegalStateException("Container " + containerID + + " already setup on node " + getContainerManagerAddress()); + } + } + + Container container = BuilderUtils.newContainer(containerID, + this.nodeId, getNodeHttpAddress(), tokenId.getResource(), + null, null); + + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + container.getId(), ContainerState.NEW, "", + -1000, container.getResource()); + applicationContainers.add(container); + containerStatusMap.put(container, containerStatus); + Resources.subtractFrom(available, tokenId.getResource()); + Resources.addTo(used, tokenId.getResource()); + + if (LOG.isDebugEnabled()) { + LOG.debug("startContainer:" + " node=" + getContainerManagerAddress() + + " application=" + applicationId + " container=" + container + + " available=" + available + " used=" + used); + } + + } + StartContainersResponse response = + StartContainersResponse.newInstance(null, null, null); + return response; + } + + synchronized public void checkResourceUsage() { + LOG.info("Checking resource usage for " + getContainerManagerAddress()); + Assert.assertEquals(available.getMemorySize(), resourceManager + .getResourceScheduler().getNodeReport(this.nodeId) + .getAvailableResource().getMemorySize()); + Assert.assertEquals(used.getMemorySize(), resourceManager + .getResourceScheduler().getNodeReport(this.nodeId) + .getUsedResource().getMemorySize()); + } + + @Override + synchronized public StopContainersResponse stopContainers( + StopContainersRequest request) throws YarnException { + for (ContainerId containerID : request.getContainerIds()) { + String applicationId = String.valueOf(containerID + .getApplicationAttemptId().getApplicationId().getId()); + // Mark the container as COMPLETE + List applicationContainers = containers.get(containerID + .getApplicationAttemptId().getApplicationId()); + for (Container c : applicationContainers) { + if (c.getId().compareTo(containerID) == 0) { + ContainerStatus containerStatus = containerStatusMap.get(c); + containerStatus.setState(ContainerState.COMPLETE); + containerStatusMap.put(c, containerStatus); + } + } + + // Send a heartbeat + try { + heartbeat(); + } catch (IOException ioe) { + throw RPCUtil.getRemoteException(ioe); + } + + // Remove container and update status + int ctr = 0; + Container container = null; + for (Iterator i = applicationContainers.iterator(); i + .hasNext();) { + container = i.next(); + if (container.getId().compareTo(containerID) == 0) { + i.remove(); + ++ctr; + } + } + + if (ctr != 1) { + throw new IllegalStateException("Container " + containerID + + " stopped " + ctr + " times!"); + } + + Resources.addTo(available, container.getResource()); + Resources.subtractFrom(used, container.getResource()); + + if (LOG.isDebugEnabled()) { + LOG.debug("stopContainer:" + " node=" + getContainerManagerAddress() + + " application=" + applicationId + " container=" + containerID + + " available=" + available + " used=" + used); + } + } + return StopContainersResponse.newInstance(null, null); + } + + @Override + synchronized public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) throws YarnException { + List statuses = new ArrayList(); + for (ContainerId containerId : request.getContainerIds()) { + List appContainers = containers.get(containerId. + getApplicationAttemptId().getApplicationId()); + Container container = null; + for (Container c : appContainers) { + if (c.getId().equals(containerId)) { + container = c; + } + } + if (container != null + && containerStatusMap.get(container).getState() != null) { + statuses.add(containerStatusMap.get(container)); + } + } + return GetContainerStatusesResponse.newInstance(statuses, null); + } + + @Override + @Deprecated + public IncreaseContainersResourceResponse increaseContainersResource( + IncreaseContainersResourceRequest request) + throws YarnException, IOException { + return null; + } + + @Override + public ContainerUpdateResponse updateContainer(ContainerUpdateRequest + request) throws YarnException, IOException { + return null; + } + + public static org.apache.hadoop.yarn.server.api.records.NodeStatus + createNodeStatus(NodeId nodeId, List containers) { + org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = + RECORDFACTORY.newRecordInstance(org.apache.hadoop.yarn.server.api + .records.NodeStatus.class); + nodeStatus.setNodeId(nodeId); + nodeStatus.setContainersStatuses(containers); + NodeHealthStatus nodeHealthStatus = + RECORDFACTORY.newRecordInstance(NodeHealthStatus.class); + nodeHealthStatus.setIsNodeHealthy(true); + nodeStatus.setNodeHealthStatus(nodeHealthStatus); + return nodeStatus; + } + + @Override + public synchronized SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + throw new YarnException("Not supported yet!"); + } + + @Override + public ResourceLocalizationResponse localize( + ResourceLocalizationRequest request) throws YarnException, IOException { + return null; + } + + @Override + public ReInitializeContainerResponse reInitializeContainer( + ReInitializeContainerRequest request) throws YarnException, IOException { + return null; + } + + @Override + public RestartContainerResponse restartContainer(ContainerId containerId) + throws YarnException, IOException { + return null; + } + + @Override + public RollbackResponse rollbackLastReInitialization(ContainerId containerId) + throws YarnException, IOException { + return null; + } + + @Override + public CommitResponse commitLastReInitialization(ContainerId containerId) + throws YarnException, IOException { + return null; + } + public long getMemory() { - return memory; + return capability.getMemorySize(); } public int getvCores() { - return vCores; + return capability.getVirtualCores(); } public String getVersion() { return version; } + + public String getContainerManagerAddress() { + String containerManagerAddress = nodeId.getHost() + ":" + nodeId.getPort(); + return containerManagerAddress; + } + + public String getNodeHttpAddress(){ + String nodeHttpAddress = nodeId.getHost() + ":" + httpPort; + return nodeHttpAddress; + } + public String getRackName() { + return rackName; + } + + public Resource getCapability() { + return capability; + } + + public Resource getAvailable() { + return available; + } + + public Resource getUsed() { + return used; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java index 31b372e..01461d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java @@ -49,7 +49,7 @@ final private Set racks = new HashSet(); private ContainerId containerId; - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager; + private MockNM nodeManager; private State state; @@ -85,7 +85,7 @@ public SchedulerRequestKey getSchedulerKey() { return schedulerKey; } - public org.apache.hadoop.yarn.server.resourcemanager.NodeManager getNodeManager() { + public MockNM getNodeManager() { return nodeManager; } @@ -115,9 +115,9 @@ public boolean canSchedule(NodeType type, String hostName) { return true; } - public void start(NodeManager nodeManager, ContainerId containerId) { - this.nodeManager = nodeManager; - this.containerId = containerId; + public void start(MockNM mockNodeManager, ContainerId containerID) { + this.nodeManager = mockNodeManager; + this.containerId = containerID; setState(State.RUNNING); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index ad8c335..8876b3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -72,14 +72,11 @@ public void tearDown() throws Exception { resourceManager.stop(); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager - registerNode(String hostName, int containerManagerPort, int httpPort, - String rackName, Resource capability) throws IOException, - YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( - hostName, containerManagerPort, httpPort, rackName, capability, - resourceManager); + private MockNM registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) + throws IOException, YarnException { + MockNM nm = new MockNM(hostName, containerManagerPort, httpPort, + rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -97,15 +94,13 @@ public void testResourceAllocation() throws IOException, // Register node1 String host1 = "host1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = - registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, vcores)); + MockNM nm1 = registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(memory, vcores)); // Register node2 String host2 = "host2"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = - registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory/2, vcores/2)); + MockNM nm2 = registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(memory/2, vcores/2)); // Submit an application Application application = new Application("user1", resourceManager); @@ -180,8 +175,7 @@ public void testResourceAllocation() throws IOException, LOG.info("--- END: testResourceAllocation ---"); } - private void nodeUpdate( - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1) { + private void nodeUpdate(MockNM nm1) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm1.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); @@ -192,9 +186,8 @@ private void nodeUpdate( public void testNodeHealthReportIsNotNull() throws Exception{ String host1 = "host1"; final int memory = 4 * 1024; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = - registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, 1)); + MockNM nm1 = registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(memory, 1)); nm1.heartbeat(); nm1.heartbeat(); Collection values = resourceManager.getRMContext().getRMNodes().values(); @@ -203,9 +196,8 @@ public void testNodeHealthReportIsNotNull() throws Exception{ } } - private void checkResourceUsage( - org.apache.hadoop.yarn.server.resourcemanager.NodeManager... nodes ) { - for (org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager : nodes) { + private void checkResourceUsage(MockNM... nodes) { + for (MockNM nodeManager : nodes) { nodeManager.checkResourceUsage(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java index 83a354d..ea9d0e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; @@ -169,11 +169,11 @@ public void testResourceUpdate() { sh.getResourcesReleased()); } - private NodeManager registerNode(String hostName, int containerManagerPort, - int httpPort, String rackName, Resource capability) throws IOException, - YarnException { - NodeManager nm = - new NodeManager(hostName, containerManagerPort, httpPort, rackName, + private MockNM registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) + throws IOException, YarnException { + MockNM nm = + new MockNM(hostName, containerManagerPort, httpPort, rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() @@ -182,7 +182,7 @@ private NodeManager registerNode(String hostName, int containerManagerPort, return nm; } - private void nodeUpdate(NodeManager nm) { + private void nodeUpdate(MockNM nm) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler @@ -202,9 +202,8 @@ public void testCapacitySchedulerAllocation() throws Exception { // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * 1024, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -215,7 +214,7 @@ public void testCapacitySchedulerAllocation() throws Exception { new Application("user_0", "default", resourceManager); application_0.submit(); - application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_0, 1234, nm0); Resource capability_0_0 = Resources.createResource(1024, 1); application_0.addResourceRequestSpec(priority_1, capability_0_0); @@ -234,7 +233,7 @@ public void testCapacitySchedulerAllocation() throws Exception { application_0.schedule(); // Send a heartbeat to kick the tires on the Scheduler - nodeUpdate(nm_0); + nodeUpdate(nm0); SchedulerHealth sh = ((CapacityScheduler) resourceManager.getResourceScheduler()) .getSchedulerHealth(); @@ -254,7 +253,7 @@ public void testCapacitySchedulerAllocation() throws Exception { application_0.addTask(task_0_2); application_0.schedule(); - nodeUpdate(nm_0); + nodeUpdate(nm0); Assert.assertEquals(1, sh.getAllocationCount().longValue()); Assert.assertEquals(Resource.newInstance(2 * 1024, 1), sh.getResourcesAllocated()); @@ -277,15 +276,13 @@ public void testCapacitySchedulerReservation() throws Exception { // Register nodes String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * 1024, 1)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * 1024, 1)); String host_1 = "host_1"; - NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); - nodeUpdate(nm_0); - nodeUpdate(nm_1); + MockNM nm1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * 1024, 1)); + nodeUpdate(nm0); + nodeUpdate(nm1); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -296,8 +293,8 @@ public void testCapacitySchedulerReservation() throws Exception { new Application("user_0", "default", resourceManager); application_0.submit(); - application_0.addNodeManager(host_0, 1234, nm_0); - application_0.addNodeManager(host_1, 1234, nm_1); + application_0.addNodeManager(host_0, 1234, nm0); + application_0.addNodeManager(host_1, 1234, nm1); Resource capability_0_0 = Resources.createResource(1024, 1); application_0.addResourceRequestSpec(priority_1, capability_0_0); @@ -313,7 +310,7 @@ public void testCapacitySchedulerReservation() throws Exception { application_0.schedule(); // Send a heartbeat to kick the tires on the Scheduler - nodeUpdate(nm_0); + nodeUpdate(nm0); SchedulerHealth sh = ((CapacityScheduler) resourceManager.getResourceScheduler()) .getSchedulerHealth(); @@ -331,7 +328,7 @@ public void testCapacitySchedulerReservation() throws Exception { application_0.addTask(task_0_1); application_0.schedule(); - nodeUpdate(nm_0); + nodeUpdate(nm0); Assert.assertEquals(0, sh.getAllocationCount().longValue()); Assert.assertEquals(1, sh.getReservationCount().longValue()); Assert.assertEquals(Resource.newInstance(2 * 1024, 1), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 8440519..58db6cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -94,7 +94,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -227,10 +226,10 @@ public void tearDown() throws Exception { } } - private NodeManager registerNode(ResourceManager rm, String hostName, + private MockNM registerNode(ResourceManager rm, String hostName, int containerManagerPort, int httpPort, String rackName, Resource capability) throws IOException, YarnException { - NodeManager nm = new NodeManager(hostName, + MockNM nm = new MockNM(hostName, containerManagerPort, httpPort, rackName, capability, rm); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes() @@ -272,12 +271,12 @@ public void testConfValidation() throws Exception { } } - private NodeManager + private MockNM registerNode(String hostName, int containerManagerPort, int httpPort, String rackName, Resource capability) throws IOException, YarnException { - NodeManager nm = - new NodeManager( + MockNM nm = + new MockNM( hostName, containerManagerPort, httpPort, rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = @@ -294,15 +293,13 @@ public void testCapacityScheduler() throws Exception { // Register node1 String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); // Register node2 String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + MockNM nm1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -312,8 +309,8 @@ public void testCapacityScheduler() throws Exception { Application application_0 = new Application("user_0", "a1", resourceManager); application_0.submit(); - application_0.addNodeManager(host_0, 1234, nm_0); - application_0.addNodeManager(host_1, 1234, nm_1); + application_0.addNodeManager(host_0, 1234, nm0); + application_0.addNodeManager(host_1, 1234, nm1); Resource capability_0_0 = Resources.createResource(1 * GB, 1); application_0.addResourceRequestSpec(priority_1, capability_0_0); @@ -329,8 +326,8 @@ public void testCapacityScheduler() throws Exception { Application application_1 = new Application("user_1", "b2", resourceManager); application_1.submit(); - application_1.addNodeManager(host_0, 1234, nm_0); - application_1.addNodeManager(host_1, 1234, nm_1); + application_1.addNodeManager(host_0, 1234, nm0); + application_1.addNodeManager(host_1, 1234, nm1); Resource capability_1_0 = Resources.createResource(3 * GB, 1); application_1.addResourceRequestSpec(priority_1, capability_1_0); @@ -350,10 +347,10 @@ public void testCapacityScheduler() throws Exception { LOG.info("Kick!"); // task_0_0 and task_1_0 allocated, used=4G - nodeUpdate(nm_0); + nodeUpdate(nm0); // nothing allocated - nodeUpdate(nm_1); + nodeUpdate(nm1); // Get allocations from the scheduler application_0.schedule(); // task_0_0 @@ -362,8 +359,8 @@ public void testCapacityScheduler() throws Exception { application_1.schedule(); // task_1_0 checkApplicationResourceUsage(3 * GB, application_1); - checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G) - checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available + checkNodeResourceUsage(4*GB, nm0); // task_0_0 (1G) and task_1_0 (3G) + checkNodeResourceUsage(0*GB, nm1); // no tasks, 2G available LOG.info("Adding new tasks..."); @@ -380,13 +377,13 @@ public void testCapacityScheduler() throws Exception { application_0.schedule(); // Send a heartbeat to kick the tires on the Scheduler - LOG.info("Sending hb from " + nm_0.getHostName()); + LOG.info("Sending hb from " + nm0.getContainerManagerAddress()); // nothing new, used=4G - nodeUpdate(nm_0); + nodeUpdate(nm0); - LOG.info("Sending hb from " + nm_1.getHostName()); + LOG.info("Sending hb from " + nm1.getContainerManagerAddress()); // task_0_1 is prefer as locality, used=2G - nodeUpdate(nm_1); + nodeUpdate(nm1); // Get allocations from the scheduler LOG.info("Trying to allocate..."); @@ -396,11 +393,11 @@ public void testCapacityScheduler() throws Exception { application_1.schedule(); checkApplicationResourceUsage(5 * GB, application_1); - nodeUpdate(nm_0); - nodeUpdate(nm_1); + nodeUpdate(nm0); + nodeUpdate(nm1); - checkNodeResourceUsage(4*GB, nm_0); - checkNodeResourceUsage(2*GB, nm_1); + checkNodeResourceUsage(4*GB, nm0); + checkNodeResourceUsage(2*GB, nm1); LOG.info("--- END: testCapacityScheduler ---"); } @@ -434,9 +431,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { // Register node1 String host0 = "host_0"; - NodeManager nm0 = - registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + MockNM nm0 = registerNode(rm, host0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(10 * GB, 10)); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -536,9 +532,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { // Register node1 String host0 = "host_0"; - NodeManager nm0 = - registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + MockNM nm0 = registerNode(rm, host0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(10 * GB, 10)); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -607,14 +602,14 @@ protected RMNodeLabelsManager createNodeLabelManager() { LOG.info("--- START: testAssignMultiple ---"); } - private void nodeUpdate(ResourceManager rm, NodeManager nm) { + private void nodeUpdate(ResourceManager rm, MockNM nm) { RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); rm.getResourceScheduler().handle(nodeUpdate); } - private void nodeUpdate(NodeManager nm) { + private void nodeUpdate(MockNM nm) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); @@ -942,7 +937,7 @@ private void checkApplicationResourceUsage(int expected, Assert.assertEquals(expected, application.getUsedResources().getMemorySize()); } - private void checkNodeResourceUsage(int expected, NodeManager node) { + private void checkNodeResourceUsage(int expected, MockNM node) { Assert.assertEquals(expected, node.getUsed().getMemorySize()); node.checkResourceUsage(); } @@ -1880,15 +1875,13 @@ public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); // Register node2 String host_1 = "host_1"; - NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + MockNM nm1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -1899,8 +1892,8 @@ public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { new Application("user_0", "a1", resourceManager); application_0.submit(); // app + app attempt event sent to scheduler - application_0.addNodeManager(host_0, 1234, nm_0); - application_0.addNodeManager(host_1, 1234, nm_1); + application_0.addNodeManager(host_0, 1234, nm0); + application_0.addNodeManager(host_1, 1234, nm1); Resource capability_0_0 = Resources.createResource(1 * GB, 1); application_0.addResourceRequestSpec(priority_1, capability_0_0); @@ -1917,8 +1910,8 @@ public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { new Application("user_1", "b2", resourceManager); application_1.submit(); // app + app attempt event sent to scheduler - application_1.addNodeManager(host_0, 1234, nm_0); - application_1.addNodeManager(host_1, 1234, nm_1); + application_1.addNodeManager(host_0, 1234, nm0); + application_1.addNodeManager(host_1, 1234, nm1); Resource capability_1_0 = Resources.createResource(1 * GB, 1); application_1.addResourceRequestSpec(priority_1, capability_1_0); @@ -1935,10 +1928,10 @@ public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { application_1.schedule(); // allocate // task_0_0 task_1_0 allocated, used=2G - nodeUpdate(nm_0); + nodeUpdate(nm0); // nothing allocated - nodeUpdate(nm_1); + nodeUpdate(nm1); // Get allocations from the scheduler application_0.schedule(); // task_0_0 @@ -1947,9 +1940,9 @@ public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { application_1.schedule(); // task_1_0 checkApplicationResourceUsage(1 * GB, application_1); - checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G + checkNodeResourceUsage(2 * GB, nm0); // task_0_0 (1G) and task_1_0 (1G) 2G // available - checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available + checkNodeResourceUsage(0 * GB, nm1); // no tasks, 2G available // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5% // total cap) @@ -1971,10 +1964,10 @@ public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { application_0.schedule(); // prev 2G used free 2G - nodeUpdate(nm_0); + nodeUpdate(nm0); // prev 0G used free 2G - nodeUpdate(nm_1); + nodeUpdate(nm1); // Get allocations from the scheduler application_1.schedule(); @@ -1984,8 +1977,8 @@ public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { application_0.schedule(); checkApplicationResourceUsage(3 * GB, application_0); - checkNodeResourceUsage(4 * GB, nm_0); - checkNodeResourceUsage(2 * GB, nm_1); + checkNodeResourceUsage(4 * GB, nm0); + checkNodeResourceUsage(2 * GB, nm1); } @@ -1996,15 +1989,13 @@ public void testMoveAppSuccess() throws Exception { // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); // Register node2 String host_1 = "host_1"; - NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + MockNM nm1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2015,8 +2006,8 @@ public void testMoveAppSuccess() throws Exception { new Application("user_0", "a1", resourceManager); application_0.submit(); // app + app attempt event sent to scheduler - application_0.addNodeManager(host_0, 1234, nm_0); - application_0.addNodeManager(host_1, 1234, nm_1); + application_0.addNodeManager(host_0, 1234, nm0); + application_0.addNodeManager(host_1, 1234, nm1); Resource capability_0_0 = Resources.createResource(3 * GB, 1); application_0.addResourceRequestSpec(priority_1, capability_0_0); @@ -2033,8 +2024,8 @@ public void testMoveAppSuccess() throws Exception { new Application("user_1", "b2", resourceManager); application_1.submit(); // app + app attempt event sent to scheduler - application_1.addNodeManager(host_0, 1234, nm_0); - application_1.addNodeManager(host_1, 1234, nm_1); + application_1.addNodeManager(host_0, 1234, nm0); + application_1.addNodeManager(host_1, 1234, nm1); Resource capability_1_0 = Resources.createResource(1 * GB, 1); application_1.addResourceRequestSpec(priority_1, capability_1_0); @@ -2053,9 +2044,9 @@ public void testMoveAppSuccess() throws Exception { // b2 can only run 1 app at a time scheduler.moveApplication(application_0.getApplicationId(), "b2"); - nodeUpdate(nm_0); + nodeUpdate(nm0); - nodeUpdate(nm_1); + nodeUpdate(nm1); // Get allocations from the scheduler application_0.schedule(); // task_0_0 @@ -2066,21 +2057,21 @@ public void testMoveAppSuccess() throws Exception { // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is // not scheduled - checkNodeResourceUsage(1 * GB, nm_0); - checkNodeResourceUsage(0 * GB, nm_1); + checkNodeResourceUsage(1 * GB, nm0); + checkNodeResourceUsage(0 * GB, nm1); // lets move application_0 to a queue where it can run scheduler.moveApplication(application_0.getApplicationId(), "a2"); application_0.schedule(); - nodeUpdate(nm_1); + nodeUpdate(nm1); // Get allocations from the scheduler application_0.schedule(); // task_0_0 checkApplicationResourceUsage(3 * GB, application_0); - checkNodeResourceUsage(1 * GB, nm_0); - checkNodeResourceUsage(3 * GB, nm_1); + checkNodeResourceUsage(1 * GB, nm0); + checkNodeResourceUsage(3 * GB, nm1); } @@ -2118,9 +2109,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(6 * GB, 1)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(6 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2131,7 +2121,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { new Application("user_0", "a1", resourceManager); application_0.submit(); // app + app attempt event sent to scheduler - application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_0, 1234, nm0); Resource capability_0_0 = Resources.createResource(3 * GB, 1); application_0.addResourceRequestSpec(priority_1, capability_0_0); @@ -2147,13 +2137,13 @@ protected RMNodeLabelsManager createNodeLabelManager() { application_0.schedule(); // allocate // task_0_0 allocated - nodeUpdate(nm_0); + nodeUpdate(nm0); // Get allocations from the scheduler application_0.schedule(); // task_0_0 checkApplicationResourceUsage(3 * GB, application_0); - checkNodeResourceUsage(3 * GB, nm_0); + checkNodeResourceUsage(3 * GB, nm0); // b2 queue contains 3GB consumption app, // add another 3GB will hit max capacity limit on queue b scheduler.moveApplication(application_0.getApplicationId(), "b1"); @@ -2166,15 +2156,13 @@ public void testMoveAppQueueMetricsCheck() throws Exception { // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); // Register node2 String host_1 = "host_1"; - NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + MockNM nm1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2185,8 +2173,8 @@ public void testMoveAppQueueMetricsCheck() throws Exception { new Application("user_0", "a1", resourceManager); application_0.submit(); // app + app attempt event sent to scheduler - application_0.addNodeManager(host_0, 1234, nm_0); - application_0.addNodeManager(host_1, 1234, nm_1); + application_0.addNodeManager(host_0, 1234, nm0); + application_0.addNodeManager(host_1, 1234, nm1); Resource capability_0_0 = Resources.createResource(3 * GB, 1); application_0.addResourceRequestSpec(priority_1, capability_0_0); @@ -2203,8 +2191,8 @@ public void testMoveAppQueueMetricsCheck() throws Exception { new Application("user_1", "b2", resourceManager); application_1.submit(); // app + app attempt event sent to scheduler - application_1.addNodeManager(host_0, 1234, nm_0); - application_1.addNodeManager(host_1, 1234, nm_1); + application_1.addNodeManager(host_0, 1234, nm0); + application_1.addNodeManager(host_1, 1234, nm1); Resource capability_1_0 = Resources.createResource(1 * GB, 1); application_1.addResourceRequestSpec(priority_1, capability_1_0); @@ -2220,9 +2208,9 @@ public void testMoveAppQueueMetricsCheck() throws Exception { application_0.schedule(); // allocate application_1.schedule(); // allocate - nodeUpdate(nm_0); + nodeUpdate(nm0); - nodeUpdate(nm_1); + nodeUpdate(nm1); CapacityScheduler cs = (CapacityScheduler) resourceManager.getResourceScheduler(); @@ -2301,8 +2289,8 @@ public void testMoveAppQueueMetricsCheck() throws Exception { // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is // not scheduled - checkNodeResourceUsage(4 * GB, nm_0); - checkNodeResourceUsage(0 * GB, nm_1); + checkNodeResourceUsage(4 * GB, nm0); + checkNodeResourceUsage(0 * GB, nm1); } @@ -4106,9 +4094,8 @@ public void handle(Event event) { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -4117,7 +4104,7 @@ public void handle(Event event) { new Application("user_0", "a1", resourceManager); application_0.submit(); - application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_0, 1234, nm0); Resource capability_0_0 = Resources.createResource(1 * GB, 1); application_0.addResourceRequestSpec(priority_0, capability_0_0); @@ -4129,12 +4116,12 @@ public void handle(Event event) { // Send resource requests to the scheduler application_0.schedule(); - nodeUpdate(nm_0); + nodeUpdate(nm0); // Kick off another heartbeat with the node state mocked to decommissioning // This should update the schedulernodes to have 0 available resource RMNode spyNode = Mockito.spy(resourceManager.getRMContext().getRMNodes() - .get(nm_0.getNodeId())); + .get(nm0.getNodeId())); when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); resourceManager.getResourceScheduler().handle( new NodeUpdateSchedulerEvent(spyNode)); @@ -4143,10 +4130,10 @@ public void handle(Event event) { application_0.schedule(); // Check the used resource is 1 GB 1 core - Assert.assertEquals(1 * GB, nm_0.getUsed().getMemorySize()); + Assert.assertEquals(1 * GB, nm0.getUsed().getMemorySize()); Resource usedResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getAllocatedResource(); + .getSchedulerNode(nm0.getNodeId()).getAllocatedResource(); Assert.assertEquals("Used Resource Memory Size should be 1GB", 1 * GB, usedResource.getMemorySize()); Assert.assertEquals("Used Resource Virtual Cores should be 1", 1, @@ -4154,7 +4141,7 @@ public void handle(Event event) { // Check total resource of scheduler node is also changed to 1 GB 1 core Resource totalResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + .getSchedulerNode(nm0.getNodeId()).getTotalResource(); Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB, totalResource.getMemorySize()); Assert.assertEquals("Total Resource Virtual Cores should be 1", 1, @@ -4162,7 +4149,7 @@ public void handle(Event event) { // Check the available resource is 0/0 Resource availableResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getUnallocatedResource(); + .getSchedulerNode(nm0.getNodeId()).getUnallocatedResource(); Assert.assertEquals("Available Resource Memory Size should be 0", 0, availableResource.getMemorySize()); Assert.assertEquals("Available Resource Memory Size should be 0", 0, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index d5b1fcc..141194f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -4954,12 +4954,11 @@ public void handle(Event event) { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); RMNode node = - resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); + resourceManager.getRMContext().getRMNodes().get(nm0.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); resourceManager.getResourceScheduler().handle(nodeUpdate); @@ -4968,7 +4967,7 @@ public void handle(Event event) { // This should update the schedulernodes to have 0 available resource RMNode spyNode = Mockito.spy(resourceManager.getRMContext().getRMNodes() - .get(nm_0.getNodeId())); + .get(nm0.getNodeId())); when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); resourceManager.getResourceScheduler().handle( new NodeUpdateSchedulerEvent(spyNode)); @@ -4977,30 +4976,28 @@ public void handle(Event event) { // Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory()); Resource usedResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getAllocatedResource(); + .getSchedulerNode(nm0.getNodeId()).getAllocatedResource(); Assert.assertEquals(usedResource.getMemorySize(), 0); Assert.assertEquals(usedResource.getVirtualCores(), 0); // Check total resource of scheduler node is also changed to 0 GB 0 core Resource totalResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + .getSchedulerNode(nm0.getNodeId()).getTotalResource(); Assert.assertEquals(totalResource.getMemorySize(), 0 * GB); Assert.assertEquals(totalResource.getVirtualCores(), 0); // Check the available resource is 0/0 Resource availableResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getUnallocatedResource(); + .getSchedulerNode(nm0.getNodeId()).getUnallocatedResource(); Assert.assertEquals(availableResource.getMemorySize(), 0); Assert.assertEquals(availableResource.getVirtualCores(), 0); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode( - String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) throws IOException, YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, httpPort, rackName, capability, - resourceManager); + private MockNM registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) + throws IOException, YarnException { + MockNM nm = new MockNM(hostName, containerManagerPort, httpPort, rackName, + capability, resourceManager); // after YARN-5375, scheduler event is processed in rm main dispatcher, // wait it processed, or may lead dead lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 3f97b59..4d4935a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -137,14 +137,11 @@ public void tearDown() throws Exception { resourceManager.stop(); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager - registerNode(String hostName, int containerManagerPort, int nmHttpPort, - String rackName, Resource capability) throws IOException, - YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, nmHttpPort, rackName, capability, - resourceManager); + private MockNM registerNode(String hostName, int containerManagerPort, + int nmHttpPort, String rackName, Resource capability) + throws IOException, YarnException { + MockNM nm = new MockNM(hostName, containerManagerPort, nmHttpPort, + rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -402,17 +399,15 @@ public void testFifoScheduler() throws Exception { // Register node1 String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); - nm_0.heartbeat(); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); + nm0.heartbeat(); // Register node2 String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); - nm_1.heartbeat(); + MockNM nm1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); + nm1.heartbeat(); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -422,8 +417,8 @@ public void testFifoScheduler() throws Exception { Application application_0 = new Application("user_0", resourceManager); application_0.submit(); - application_0.addNodeManager(host_0, 1234, nm_0); - application_0.addNodeManager(host_1, 1234, nm_1); + application_0.addNodeManager(host_0, 1234, nm0); + application_0.addNodeManager(host_1, 1234, nm1); Resource capability_0_0 = Resources.createResource(GB); application_0.addResourceRequestSpec(priority_1, capability_0_0); @@ -439,8 +434,8 @@ public void testFifoScheduler() throws Exception { Application application_1 = new Application("user_1", resourceManager); application_1.submit(); - application_1.addNodeManager(host_0, 1234, nm_0); - application_1.addNodeManager(host_1, 1234, nm_1); + application_1.addNodeManager(host_0, 1234, nm0); + application_1.addNodeManager(host_1, 1234, nm1); Resource capability_1_0 = Resources.createResource(3 * GB); application_1.addResourceRequestSpec(priority_1, capability_1_0); @@ -461,8 +456,8 @@ public void testFifoScheduler() throws Exception { LOG.info("Send a heartbeat to kick the tires on the Scheduler... " + "nm0 -> task_0_0 and task_1_0 allocated, used=4G " + "nm1 -> nothing allocated"); - nm_0.heartbeat(); // task_0_0 and task_1_0 allocated, used=4G - nm_1.heartbeat(); // nothing allocated + nm0.heartbeat(); // task_0_0 and task_1_0 allocated, used=4G + nm1.heartbeat(); // nothing allocated // Get allocations from the scheduler application_0.schedule(); // task_0_0 @@ -471,11 +466,11 @@ public void testFifoScheduler() throws Exception { application_1.schedule(); // task_1_0 checkApplicationResourceUsage(3 * GB, application_1); - nm_0.heartbeat(); - nm_1.heartbeat(); + nm0.heartbeat(); + nm1.heartbeat(); - checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G) - checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available + checkNodeResourceUsage(4*GB, nm0); // task_0_0 (1G) and task_1_0 (3G) + checkNodeResourceUsage(0*GB, nm1); // no tasks, 2G available LOG.info("Adding new tasks..."); @@ -508,11 +503,11 @@ public void testFifoScheduler() throws Exception { application_0.schedule(); // Send a heartbeat to kick the tires on the Scheduler - LOG.info("Sending hb from " + nm_0.getHostName()); - nm_0.heartbeat(); // nothing new, used=4G + LOG.info("Sending hb from " + nm0.getContainerManagerAddress()); + nm0.heartbeat(); // nothing new, used=4G - LOG.info("Sending hb from " + nm_1.getHostName()); - nm_1.heartbeat(); // task_0_3, used=2G + LOG.info("Sending hb from " + nm1.getContainerManagerAddress()); + nm1.heartbeat(); // task_0_3, used=2G // Get allocations from the scheduler LOG.info("Trying to allocate..."); @@ -520,51 +515,51 @@ public void testFifoScheduler() throws Exception { checkApplicationResourceUsage(3 * GB, application_0); application_1.schedule(); checkApplicationResourceUsage(3 * GB, application_1); - nm_0.heartbeat(); - nm_1.heartbeat(); - checkNodeResourceUsage(4*GB, nm_0); - checkNodeResourceUsage(2*GB, nm_1); + nm0.heartbeat(); + nm1.heartbeat(); + checkNodeResourceUsage(4*GB, nm0); + checkNodeResourceUsage(2*GB, nm1); // Complete tasks LOG.info("Finishing up task_0_0"); application_0.finishTask(task_0_0); // Now task_0_1 application_0.schedule(); application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); + nm0.heartbeat(); + nm1.heartbeat(); checkApplicationResourceUsage(3 * GB, application_0); checkApplicationResourceUsage(3 * GB, application_1); - checkNodeResourceUsage(4*GB, nm_0); - checkNodeResourceUsage(2*GB, nm_1); + checkNodeResourceUsage(4*GB, nm0); + checkNodeResourceUsage(2*GB, nm1); LOG.info("Finishing up task_1_0"); application_1.finishTask(task_1_0); // Now task_0_2 application_0.schedule(); // final overcommit for app0 caused here application_1.schedule(); - nm_0.heartbeat(); // final overcommit for app0 occurs here - nm_1.heartbeat(); + nm0.heartbeat(); // final overcommit for app0 occurs here + nm1.heartbeat(); checkApplicationResourceUsage(4 * GB, application_0); checkApplicationResourceUsage(0 * GB, application_1); //checkNodeResourceUsage(1*GB, nm_0); // final over-commit -> rm.node->1G, test.node=2G - checkNodeResourceUsage(2*GB, nm_1); + checkNodeResourceUsage(2*GB, nm1); LOG.info("Finishing up task_0_3"); application_0.finishTask(task_0_3); // No more application_0.schedule(); application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); + nm0.heartbeat(); + nm1.heartbeat(); checkApplicationResourceUsage(2 * GB, application_0); checkApplicationResourceUsage(0 * GB, application_1); //checkNodeResourceUsage(2*GB, nm_0); // final over-commit, rm.node->1G, test.node->2G - checkNodeResourceUsage(0*GB, nm_1); + checkNodeResourceUsage(0*GB, nm1); LOG.info("Finishing up task_0_1"); application_0.finishTask(task_0_1); application_0.schedule(); application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); + nm0.heartbeat(); + nm1.heartbeat(); checkApplicationResourceUsage(1 * GB, application_0); checkApplicationResourceUsage(0 * GB, application_1); @@ -572,8 +567,8 @@ public void testFifoScheduler() throws Exception { application_0.finishTask(task_0_2); // now task_1_3 can go! application_0.schedule(); application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); + nm0.heartbeat(); + nm1.heartbeat(); checkApplicationResourceUsage(0 * GB, application_0); checkApplicationResourceUsage(4 * GB, application_1); @@ -581,8 +576,8 @@ public void testFifoScheduler() throws Exception { application_1.finishTask(task_1_3); // now task_1_1 application_0.schedule(); application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); + nm0.heartbeat(); + nm1.heartbeat(); checkApplicationResourceUsage(0 * GB, application_0); checkApplicationResourceUsage(3 * GB, application_1); @@ -590,8 +585,8 @@ public void testFifoScheduler() throws Exception { application_1.finishTask(task_1_1); application_0.schedule(); application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); + nm0.heartbeat(); + nm1.heartbeat(); checkApplicationResourceUsage(0 * GB, application_0); checkApplicationResourceUsage(3 * GB, application_1); @@ -1215,9 +1210,8 @@ public void handle(Event event) { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + MockNM nm0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -1226,7 +1220,7 @@ public void handle(Event event) { new Application("user_0", "a1", resourceManager); application_0.submit(); - application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_0, 1234, nm0); Resource capability_0_0 = Resources.createResource(1 * GB, 1); application_0.addResourceRequestSpec(priority_0, capability_0_0); @@ -1239,7 +1233,7 @@ public void handle(Event event) { application_0.schedule(); RMNode node = - resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); + resourceManager.getRMContext().getRMNodes().get(nm0.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); resourceManager.getResourceScheduler().handle(nodeUpdate); @@ -1248,7 +1242,7 @@ public void handle(Event event) { // This should update the schedulernodes to have 0 available resource RMNode spyNode = Mockito.spy(resourceManager.getRMContext().getRMNodes() - .get(nm_0.getNodeId())); + .get(nm0.getNodeId())); when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); resourceManager.getResourceScheduler().handle( new NodeUpdateSchedulerEvent(spyNode)); @@ -1260,19 +1254,19 @@ public void handle(Event event) { // Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory()); Resource usedResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getAllocatedResource(); + .getSchedulerNode(nm0.getNodeId()).getAllocatedResource(); Assert.assertEquals(usedResource.getMemorySize(), 1 * GB); Assert.assertEquals(usedResource.getVirtualCores(), 1); // Check total resource of scheduler node is also changed to 1 GB 1 core Resource totalResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + .getSchedulerNode(nm0.getNodeId()).getTotalResource(); Assert.assertEquals(totalResource.getMemorySize(), 1 * GB); Assert.assertEquals(totalResource.getVirtualCores(), 1); // Check the available resource is 0/0 Resource availableResource = resourceManager.getResourceScheduler() - .getSchedulerNode(nm_0.getNodeId()).getUnallocatedResource(); + .getSchedulerNode(nm0.getNodeId()).getUnallocatedResource(); Assert.assertEquals(availableResource.getMemorySize(), 0); Assert.assertEquals(availableResource.getVirtualCores(), 0); } @@ -1282,8 +1276,7 @@ private void checkApplicationResourceUsage(int expected, Assert.assertEquals(expected, application.getUsedResources().getMemorySize()); } - private void checkNodeResourceUsage(int expected, - org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) { + private void checkNodeResourceUsage(int expected, MockNM node) { Assert.assertEquals(expected, node.getUsed().getMemorySize()); node.checkResourceUsage(); }