diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index e889de0..9d1ee83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; +import org.mockito.Mock; @Private public class Application { @@ -99,7 +100,7 @@ new org.apache.hadoop.yarn.api.records.ResourceRequest .ResourceRequestComparator()); - final private Map nodes = new HashMap<>(); + final private Map nodes = new HashMap<>(); Resource used = recordFactory.newRecordInstance(Resource.class); @@ -208,11 +209,11 @@ public synchronized void addResourceRequestSpec( } public synchronized void addNodeManager(String host, - int containerManagerPort, NodeManager nodeManager) { + int containerManagerPort, MockNM nodeManager) { nodes.put(host + ":" + containerManagerPort, nodeManager); } - private synchronized NodeManager getNodeManager(String host) { + private synchronized MockNM getNodeManager(String host) { return nodes.get(host); } @@ -269,8 +270,8 @@ public synchronized void finishTask(Task task) throws IOException, "Finishing unknown task " + task.getTaskId() + " from application " + applicationId); } - - NodeManager nodeManager = task.getNodeManager(); + + MockNM nodeManager = task.getNodeManager(); ContainerId containerId = task.getContainerId(); task.stop(); List containerIds = new ArrayList(); @@ -283,7 +284,7 @@ public synchronized void finishTask(Task task) throws IOException, LOG.info("Finished task " + task.getTaskId() + " of application " + applicationId + - " on node " + nodeManager.getHostName() + + " on node " + nodeManager.getContainerManagerAddress() + ", currently using " + used + " resources"); } @@ -392,7 +393,7 @@ private synchronized void assign(SchedulerRequestKey schedulerKey, t.hasNext();) { Task task = t.next(); if (task.getState() == State.PENDING && task.canSchedule(type, host)) { - NodeManager nodeManager = getNodeManager(host); + MockNM nodeManager = getNodeManager(host); task.start(nodeManager, container.getId()); i.remove(); @@ -403,7 +404,7 @@ private synchronized void assign(SchedulerRequestKey schedulerKey, LOG.info("Assigned container (" + container + ") of type " + type + " to task " + task.getTaskId() + " at priority " + schedulerKey.getPriority() + - " on node " + nodeManager.getHostName() + + " on node " + nodeManager.getContainerManagerAddress() + ", currently using " + used + " resources"); // Update resource requests diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 4a8ff00..834afad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -18,23 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.*; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -47,14 +44,29 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; +import org.apache.hadoop.yarn.util.resource.Resources; import org.eclipse.jetty.util.log.Log; +import org.junit.Assert; -public class MockNM { +public class MockNM implements ContainerManagementProtocol { + private static final org.apache.commons.logging.Log LOG = + LogFactory.getLog(MockNM.class); + private static final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); - private int responseId; + private String rackName; + private Resource capability; + private ResourceManager resourceManager; + Resource available = recordFactory.newRecordInstance(Resource.class); + Resource used = recordFactory.newRecordInstance(Resource.class); + + final Map> containers = + new HashMap>(); + final Map containerStatusMap = + new HashMap(); + + private int responseId = 0; private NodeId nodeId; - private long memory; - private int vCores; private ResourceTrackerService resourceTracker; private int httpPort = 2; private MasterKey currentContainerTokenMasterKey; @@ -65,7 +77,8 @@ private Map registeringCollectors = new ConcurrentHashMap<>(); - public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { + public MockNM(String nodeIdStr, int memory, + ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory this(nodeIdStr, memory, Math.max(1, (memory * YarnConfiguration.DEFAULT_NM_VCORES) / @@ -75,19 +88,40 @@ public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTrack public MockNM(String nodeIdStr, int memory, int vcores, ResourceTrackerService resourceTracker) { - this(nodeIdStr, memory, vcores, resourceTracker, YarnVersionInfo.getVersion()); + this(nodeIdStr, memory, vcores, resourceTracker, + YarnVersionInfo.getVersion()); } public MockNM(String nodeIdStr, int memory, int vcores, ResourceTrackerService resourceTracker, String version) { - this.memory = memory; - this.vCores = vcores; + this.capability = BuilderUtils.newResource(memory, vcores); this.resourceTracker = resourceTracker; this.version = version; String[] splits = nodeIdStr.split(":"); nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1])); } + public MockNM(String hostName, int containerManagerPort, int httpPort, + String rackName, Resource capability, ResourceManager resourceManager) + throws IOException, YarnException { + this.capability = capability; + this.resourceTracker = resourceManager.getResourceTrackerService(); + this.rackName = rackName; + this.httpPort = httpPort; + this.version = YarnVersionInfo.getVersion(); + Resources.addTo(available, capability); + this.nodeId = NodeId.newInstance(hostName, containerManagerPort); + RegisterNodeManagerRequest request = recordFactory + .newRecordInstance(RegisterNodeManagerRequest.class); + request.setHttpPort(httpPort); + request.setResource(capability); + request.setNodeId(this.nodeId); + request.setNMVersion(YarnVersionInfo.getVersion()); + resourceManager.getResourceTrackerService().registerNodeManager(request); + this.resourceManager = resourceManager; + resourceManager.getResourceScheduler().getNodeReport(this.nodeId); + } + public NodeId getNodeId() { return nodeId; } @@ -100,22 +134,25 @@ public void setHttpPort(int port) { httpPort = port; } - public void setResourceTrackerService(ResourceTrackerService resourceTracker) { + public void setResourceTrackerService( + ResourceTrackerService resourceTracker) { this.resourceTracker = resourceTracker; } - public void containerStatus(ContainerStatus containerStatus) throws Exception { + public void containerStatus(ContainerStatus containerStatus) + throws Exception { Map> conts = new HashMap>(); - conts.put(containerStatus.getContainerId().getApplicationAttemptId().getApplicationId(), - Arrays.asList(new ContainerStatus[] { containerStatus })); + conts.put(containerStatus.getContainerId().getApplicationAttemptId() + .getApplicationId(), Arrays.asList( + new ContainerStatus[] { containerStatus })); nodeHeartbeat(conts, true); } public void containerIncreaseStatus(Container container) throws Exception { ContainerStatus containerStatus = BuilderUtils.newContainerStatus( container.getId(), ContainerState.RUNNING, "Success", 0, - container.getResource()); + container.getResource()); List increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, true, ++responseId); @@ -146,8 +183,7 @@ public RegisterNodeManagerResponse registerNode( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); req.setHttpPort(httpPort); - Resource resource = BuilderUtils.newResource(memory, vCores); - req.setResource(resource); + req.setResource(capability); req.setContainerStatuses(containerReports); req.setNMVersion(version); req.setRunningApplications(runningApplications); @@ -158,8 +194,8 @@ public RegisterNodeManagerResponse registerNode( this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey(); Resource newResource = registrationResponse.getResource(); if (newResource != null) { - memory = (int) newResource.getMemorySize(); - vCores = newResource.getVirtualCores(); + capability.setMemorySize(newResource.getMemorySize()); + capability.setVirtualCores(newResource.getVirtualCores()); } containerStats.clear(); if (containerReports != null) { @@ -175,7 +211,30 @@ public RegisterNodeManagerResponse registerNode( return registrationResponse; } - public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { + private List getContainerStatuses(Map> containers) { + List containerStatuses = new ArrayList(); + for (List appContainers : containers.values()) { + for (Container container : appContainers) { + containerStatuses.add(containerStatusMap.get(container)); + } + } + return containerStatuses; + } + public void heartbeat() throws IOException, YarnException { + NodeStatus nodeStatus = + org.apache.hadoop.yarn.server.resourcemanager.MockNM.createNodeStatus( + nodeId, getContainerStatuses(containers)); + nodeStatus.setResponseId(responseId); + NodeHeartbeatRequest request = recordFactory + .newRecordInstance(NodeHeartbeatRequest.class); + request.setNodeStatus(nodeStatus); + NodeHeartbeatResponse response = resourceTracker.nodeHeartbeat(request); + responseId = response.getResponseId(); + } + + public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) + throws Exception { return nodeHeartbeat(Collections.emptyList(), Collections.emptyList(), isHealthy, ++responseId); } @@ -184,7 +243,7 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, long containerId, ContainerState containerState) throws Exception { ContainerStatus containerStatus = BuilderUtils.newContainerStatus( BuilderUtils.newContainerId(attemptId, containerId), containerState, - "Success", 0, BuilderUtils.newResource(memory, vCores)); + "Success", 0, capability); ArrayList containerStatusList = new ArrayList(1); containerStatusList.add(containerStatus); @@ -199,7 +258,8 @@ public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy, int resId) throws Exception { + List> conts, boolean isHealthy, int resId) + throws Exception { ArrayList updatedStats = new ArrayList(); for (List stats : conts.values()) { updatedStats.addAll(stats); @@ -264,22 +324,253 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, Resource newResource = heartbeatResponse.getResource(); if (newResource != null) { - memory = newResource.getMemorySize(); - vCores = newResource.getVirtualCores(); + capability.setMemorySize(newResource.getMemorySize()); + capability.setVirtualCores(newResource.getVirtualCores()); } return heartbeatResponse; } + @Override + synchronized public StartContainersResponse startContainers( + StartContainersRequest requests) throws YarnException { + + for (StartContainerRequest request : requests + .getStartContainerRequests()) { + Token containerToken = request.getContainerToken(); + ContainerTokenIdentifier tokenId = null; + + try { + tokenId = BuilderUtils.newContainerTokenIdentifier(containerToken); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } + + ContainerId containerID = tokenId.getContainerID(); + ApplicationId applicationId = + containerID.getApplicationAttemptId().getApplicationId(); + + List applicationContainers = containers.get(applicationId); + if (applicationContainers == null) { + applicationContainers = new ArrayList(); + containers.put(applicationId, applicationContainers); + } + + // Sanity check + for (Container container : applicationContainers) { + if (container.getId().compareTo(containerID) == 0) { + throw new IllegalStateException("Container " + containerID + + " already setup on node " + getContainerManagerAddress()); + } + } + + Container container = BuilderUtils.newContainer(containerID, + this.nodeId, getNodeHttpAddress(), tokenId.getResource(), + null, null); + + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + container.getId(), ContainerState.NEW, "", + -1000, container.getResource()); + applicationContainers.add(container); + containerStatusMap.put(container, containerStatus); + Resources.subtractFrom(available, tokenId.getResource()); + Resources.addTo(used, tokenId.getResource()); + + if (LOG.isDebugEnabled()) { + LOG.debug("startContainer:" + " node=" + getContainerManagerAddress() + + " application=" + applicationId + " container=" + container + + " available=" + available + " used=" + used); + } + + } + StartContainersResponse response = + StartContainersResponse.newInstance(null, null, null); + return response; + } + + synchronized public void checkResourceUsage() { + LOG.info("Checking resource usage for " + getContainerManagerAddress()); + Assert.assertEquals(available.getMemorySize(), resourceManager + .getResourceScheduler().getNodeReport(this.nodeId) + .getAvailableResource().getMemorySize()); + Assert.assertEquals(used.getMemorySize(), resourceManager + .getResourceScheduler().getNodeReport(this.nodeId) + .getUsedResource().getMemorySize()); + } + + @Override + synchronized public StopContainersResponse stopContainers( + StopContainersRequest request) throws YarnException { + for (ContainerId containerID : request.getContainerIds()) { + String applicationId = String.valueOf(containerID + .getApplicationAttemptId().getApplicationId().getId()); + // Mark the container as COMPLETE + List applicationContainers = containers.get(containerID + .getApplicationAttemptId().getApplicationId()); + for (Container c : applicationContainers) { + if (c.getId().compareTo(containerID) == 0) { + ContainerStatus containerStatus = containerStatusMap.get(c); + containerStatus.setState(ContainerState.COMPLETE); + containerStatusMap.put(c, containerStatus); + } + } + + // Send a heartbeat + try { + heartbeat(); + } catch (IOException ioe) { + throw RPCUtil.getRemoteException(ioe); + } + + // Remove container and update status + int ctr = 0; + Container container = null; + for (Iterator i = applicationContainers.iterator(); i + .hasNext();) { + container = i.next(); + if (container.getId().compareTo(containerID) == 0) { + i.remove(); + ++ctr; + } + } + + if (ctr != 1) { + throw new IllegalStateException("Container " + containerID + + " stopped " + ctr + " times!"); + } + + Resources.addTo(available, container.getResource()); + Resources.subtractFrom(used, container.getResource()); + + if (LOG.isDebugEnabled()) { + LOG.debug("stopContainer:" + " node=" + getContainerManagerAddress() + + " application=" + applicationId + " container=" + containerID + + " available=" + available + " used=" + used); + } + } + return StopContainersResponse.newInstance(null,null); + } + + @Override + synchronized public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) throws YarnException { + List statuses = new ArrayList(); + for (ContainerId containerId : request.getContainerIds()) { + List appContainers = containers.get(containerId. + getApplicationAttemptId().getApplicationId()); + Container container = null; + for (Container c : appContainers) { + if (c.getId().equals(containerId)) { + container = c; + } + } + if (container != null + && containerStatusMap.get(container).getState() != null) { + statuses.add(containerStatusMap.get(container)); + } + } + return GetContainerStatusesResponse.newInstance(statuses, null); + } + + @Override + @Deprecated + public IncreaseContainersResourceResponse increaseContainersResource( + IncreaseContainersResourceRequest request) + throws YarnException, IOException { + return null; + } + + @Override + public ContainerUpdateResponse updateContainer(ContainerUpdateRequest + request) throws YarnException, IOException { + return null; + } + + public static org.apache.hadoop.yarn.server.api.records.NodeStatus + createNodeStatus (NodeId nodeId, List containers) { + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = + recordFactory.newRecordInstance(org.apache.hadoop.yarn.server.api + .records.NodeStatus.class); + nodeStatus.setNodeId(nodeId); + nodeStatus.setContainersStatuses(containers); + NodeHealthStatus nodeHealthStatus = + recordFactory.newRecordInstance(NodeHealthStatus.class); + nodeHealthStatus.setIsNodeHealthy(true); + nodeStatus.setNodeHealthStatus(nodeHealthStatus); + return nodeStatus; + } + + @Override + public synchronized SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + throw new YarnException("Not supported yet!"); + } + + @Override + public ResourceLocalizationResponse localize( + ResourceLocalizationRequest request) throws YarnException, IOException { + return null; + } + + @Override + public ReInitializeContainerResponse reInitializeContainer( + ReInitializeContainerRequest request) throws YarnException, IOException { + return null; + } + + @Override + public RestartContainerResponse restartContainer(ContainerId containerId) + throws YarnException, IOException { + return null; + } + + @Override + public RollbackResponse rollbackLastReInitialization(ContainerId containerId) + throws YarnException, IOException { + return null; + } + + @Override + public CommitResponse commitLastReInitialization(ContainerId containerId) + throws YarnException, IOException { + return null; + } + public long getMemory() { - return memory; + return capability.getMemorySize(); } public int getvCores() { - return vCores; + return capability.getVirtualCores(); } public String getVersion() { return version; } + + public String getContainerManagerAddress() { + String containerManagerAddress = nodeId.getHost() + ":" + nodeId.getPort(); + return containerManagerAddress; + } + + public String getNodeHttpAddress(){ + String nodeHttpAddress = nodeId.getHost() + ":" + httpPort; + return nodeHttpAddress; + } + public String getRackName() { + return rackName; + } + + public Resource getCapability() { + return capability; + } + + public Resource getAvailable() { + return available; + } + + public Resource getUsed() { + return used; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java index 31b372e..31c667f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java @@ -49,7 +49,7 @@ final private Set racks = new HashSet(); private ContainerId containerId; - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager; + private MockNM nodeManager; private State state; @@ -85,7 +85,7 @@ public SchedulerRequestKey getSchedulerKey() { return schedulerKey; } - public org.apache.hadoop.yarn.server.resourcemanager.NodeManager getNodeManager() { + public MockNM getNodeManager() { return nodeManager; } @@ -115,7 +115,7 @@ public boolean canSchedule(NodeType type, String hostName) { return true; } - public void start(NodeManager nodeManager, ContainerId containerId) { + public void start(MockNM nodeManager, ContainerId containerId) { this.nodeManager = nodeManager; this.containerId = containerId; setState(State.RUNNING); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index ad8c335..386022a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -72,14 +72,11 @@ public void tearDown() throws Exception { resourceManager.stop(); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager - registerNode(String hostName, int containerManagerPort, int httpPort, - String rackName, Resource capability) throws IOException, - YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( - hostName, containerManagerPort, httpPort, rackName, capability, - resourceManager); + private MockNM registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) + throws IOException, YarnException { + MockNM nm = new MockNM(hostName, containerManagerPort, httpPort, + rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -97,15 +94,13 @@ public void testResourceAllocation() throws IOException, // Register node1 String host1 = "host1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = - registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, vcores)); + MockNM nm1 = registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(memory, vcores)); // Register node2 String host2 = "host2"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = - registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory/2, vcores/2)); + MockNM nm2 = registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(memory/2, vcores/2)); // Submit an application Application application = new Application("user1", resourceManager); @@ -180,8 +175,7 @@ public void testResourceAllocation() throws IOException, LOG.info("--- END: testResourceAllocation ---"); } - private void nodeUpdate( - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1) { + private void nodeUpdate(MockNM nm1) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm1.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); @@ -192,9 +186,8 @@ private void nodeUpdate( public void testNodeHealthReportIsNotNull() throws Exception{ String host1 = "host1"; final int memory = 4 * 1024; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = - registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(memory, 1)); + MockNM nm1 = registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(memory, 1)); nm1.heartbeat(); nm1.heartbeat(); Collection values = resourceManager.getRMContext().getRMNodes().values(); @@ -203,9 +196,8 @@ public void testNodeHealthReportIsNotNull() throws Exception{ } } - private void checkResourceUsage( - org.apache.hadoop.yarn.server.resourcemanager.NodeManager... nodes ) { - for (org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager : nodes) { + private void checkResourceUsage(MockNM... nodes ) { + for (MockNM nodeManager : nodes) { nodeManager.checkResourceUsage(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java index 83a354d..763e754 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; @@ -169,11 +169,11 @@ public void testResourceUpdate() { sh.getResourcesReleased()); } - private NodeManager registerNode(String hostName, int containerManagerPort, - int httpPort, String rackName, Resource capability) throws IOException, - YarnException { - NodeManager nm = - new NodeManager(hostName, containerManagerPort, httpPort, rackName, + private MockNM registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) + throws IOException, YarnException { + MockNM nm = + new MockNM(hostName, containerManagerPort, httpPort, rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() @@ -182,7 +182,7 @@ private NodeManager registerNode(String hostName, int containerManagerPort, return nm; } - private void nodeUpdate(NodeManager nm) { + private void nodeUpdate(MockNM nm) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler @@ -202,9 +202,8 @@ public void testCapacitySchedulerAllocation() throws Exception { // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * 1024, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -277,13 +276,11 @@ public void testCapacitySchedulerReservation() throws Exception { // Register nodes String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * 1024, 1)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * 1024, 1)); String host_1 = "host_1"; - NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * 1024, 1)); + MockNM nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * 1024, 1)); nodeUpdate(nm_0); nodeUpdate(nm_1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 8440519..b6a0a62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -94,7 +94,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -227,10 +226,10 @@ public void tearDown() throws Exception { } } - private NodeManager registerNode(ResourceManager rm, String hostName, + private MockNM registerNode(ResourceManager rm, String hostName, int containerManagerPort, int httpPort, String rackName, Resource capability) throws IOException, YarnException { - NodeManager nm = new NodeManager(hostName, + MockNM nm = new MockNM(hostName, containerManagerPort, httpPort, rackName, capability, rm); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes() @@ -272,12 +271,12 @@ public void testConfValidation() throws Exception { } } - private NodeManager + private MockNM registerNode(String hostName, int containerManagerPort, int httpPort, String rackName, Resource capability) throws IOException, YarnException { - NodeManager nm = - new NodeManager( + MockNM nm = + new MockNM( hostName, containerManagerPort, httpPort, rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = @@ -294,15 +293,13 @@ public void testCapacityScheduler() throws Exception { // Register node1 String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); // Register node2 String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + MockNM nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -380,11 +377,11 @@ public void testCapacityScheduler() throws Exception { application_0.schedule(); // Send a heartbeat to kick the tires on the Scheduler - LOG.info("Sending hb from " + nm_0.getHostName()); + LOG.info("Sending hb from " + nm_0.getContainerManagerAddress()); // nothing new, used=4G nodeUpdate(nm_0); - LOG.info("Sending hb from " + nm_1.getHostName()); + LOG.info("Sending hb from " + nm_1.getContainerManagerAddress()); // task_0_1 is prefer as locality, used=2G nodeUpdate(nm_1); @@ -434,9 +431,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { // Register node1 String host0 = "host_0"; - NodeManager nm0 = - registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + MockNM nm0 = registerNode(rm, host0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(10 * GB, 10)); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -536,9 +532,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { // Register node1 String host0 = "host_0"; - NodeManager nm0 = - registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(10 * GB, 10)); + MockNM nm0 = registerNode(rm, host0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(10 * GB, 10)); // ResourceRequest priorities Priority priority0 = Priority.newInstance(0); @@ -607,14 +602,14 @@ protected RMNodeLabelsManager createNodeLabelManager() { LOG.info("--- START: testAssignMultiple ---"); } - private void nodeUpdate(ResourceManager rm, NodeManager nm) { + private void nodeUpdate(ResourceManager rm, MockNM nm) { RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); rm.getResourceScheduler().handle(nodeUpdate); } - private void nodeUpdate(NodeManager nm) { + private void nodeUpdate(MockNM nm) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); @@ -942,7 +937,7 @@ private void checkApplicationResourceUsage(int expected, Assert.assertEquals(expected, application.getUsedResources().getMemorySize()); } - private void checkNodeResourceUsage(int expected, NodeManager node) { + private void checkNodeResourceUsage(int expected, MockNM node) { Assert.assertEquals(expected, node.getUsed().getMemorySize()); node.checkResourceUsage(); } @@ -1880,15 +1875,13 @@ public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { ResourceScheduler scheduler = resourceManager.getResourceScheduler(); // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); // Register node2 String host_1 = "host_1"; - NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + MockNM nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -1996,15 +1989,13 @@ public void testMoveAppSuccess() throws Exception { // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); // Register node2 String host_1 = "host_1"; - NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + MockNM nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2118,9 +2109,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(6 * GB, 1)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(6 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -2166,15 +2156,13 @@ public void testMoveAppQueueMetricsCheck() throws Exception { // Register node1 String host_0 = "host_0"; - NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); // Register node2 String host_1 = "host_1"; - NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(5 * GB, 1)); + MockNM nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(5 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -4106,9 +4094,8 @@ public void handle(Event event) { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index d5b1fcc..db4940f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -4954,9 +4954,8 @@ public void handle(Event event) { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); @@ -4994,13 +4993,11 @@ public void handle(Event event) { Assert.assertEquals(availableResource.getVirtualCores(), 0); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode( - String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) throws IOException, YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, httpPort, rackName, capability, - resourceManager); + private MockNM registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) + throws IOException, YarnException { + MockNM nm = new MockNM(hostName, containerManagerPort, httpPort, rackName, + capability, resourceManager); // after YARN-5375, scheduler event is processed in rm main dispatcher, // wait it processed, or may lead dead lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 3f97b59..6a8d5fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -137,14 +137,11 @@ public void tearDown() throws Exception { resourceManager.stop(); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager - registerNode(String hostName, int containerManagerPort, int nmHttpPort, - String rackName, Resource capability) throws IOException, - YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, nmHttpPort, rackName, capability, - resourceManager); + private MockNM registerNode(String hostName, int containerManagerPort, + int nmHttpPort, String rackName, Resource capability) + throws IOException, YarnException { + MockNM nm = new MockNM(hostName, containerManagerPort, nmHttpPort, + rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -402,16 +399,14 @@ public void testFifoScheduler() throws Exception { // Register node1 String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); nm_0.heartbeat(); // Register node2 String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + MockNM nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); nm_1.heartbeat(); // ResourceRequest priorities @@ -508,10 +503,10 @@ public void testFifoScheduler() throws Exception { application_0.schedule(); // Send a heartbeat to kick the tires on the Scheduler - LOG.info("Sending hb from " + nm_0.getHostName()); + LOG.info("Sending hb from " + nm_0.getContainerManagerAddress()); nm_0.heartbeat(); // nothing new, used=4G - LOG.info("Sending hb from " + nm_1.getHostName()); + LOG.info("Sending hb from " + nm_1.getContainerManagerAddress()); nm_1.heartbeat(); // task_0_3, used=2G // Get allocations from the scheduler @@ -1215,9 +1210,8 @@ public void handle(Event event) { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + MockNM nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -1282,8 +1276,7 @@ private void checkApplicationResourceUsage(int expected, Assert.assertEquals(expected, application.getUsedResources().getMemorySize()); } - private void checkNodeResourceUsage(int expected, - org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) { + private void checkNodeResourceUsage(int expected, MockNM node) { Assert.assertEquals(expected, node.getUsed().getMemorySize()); node.checkResourceUsage(); }