diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index d3db96f..7814b84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; @@ -409,15 +410,19 @@ private Resource createIncrContainerResource() { private List convertToRemoteNodes(List nodeIds) { ArrayList retNodes = new ArrayList<>(); for (NodeId nId : nodeIds) { - retNodes.add(convertToRemoteNode(nId)); + RemoteNode remoteNode = convertToRemoteNode(nId); + if (null != remoteNode) { + retNodes.add(remoteNode); + } } return retNodes; } private RemoteNode convertToRemoteNode(NodeId nodeId) { - return RemoteNode.newInstance(nodeId, - ((AbstractYarnScheduler)rmContext.getScheduler()).getNode(nodeId) - .getHttpAddress()); + SchedulerNode node = + ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId); + return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress()) + : null; } private Resource createMaxContainerResource() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index 232b4ad..dec55ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -165,9 +165,11 @@ public void initThresholdCalculator(float sigma, int limitMin, int limitMax) { } @Override - public void addNode(List containerStatuses, RMNode - rmNode) { - LOG.debug("Node added event from: " + rmNode.getNode().getName()); + public void addNode(List containerStatuses, + RMNode rmNode) { + if (LOG.isDebugEnabled()) { + LOG.debug("Node added event from: " + rmNode.getNode().getName()); + } // Ignoring this currently : at least one NODE_UPDATE heartbeat is // required to ensure node eligibility. } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestOpportunisticContainerAllocatorAMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestOpportunisticContainerAllocatorAMService.java new file mode 100644 index 0000000..3b4fb43 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestOpportunisticContainerAllocatorAMService.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import java.util.Arrays; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.OpportunisticContainerAllocatorAMService; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Unit tests for OpportunisticContainerAllocatorAMService. + */ +public class TestOpportunisticContainerAllocatorAMService { + + private static final int GB = 1024; + + @Test + public void testNodeRemovalDuringAllocate() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setInt( + YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); + MockRM rm = new MockRM(conf); + rm.start(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + // Both node 1 and node 2 will be applicable for scheduling. + for (int i = 0; i < 10; i++) { + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2)), + null); + if (ctxt.getNodeMap().size() == 2) { + break; + } + Thread.sleep(50); + } + Assert.assertEquals(2, ctxt.getNodeMap().size()); + // Remove node from scheduler but not from AM Service. + scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1)); + // After removal of node 1, only 1 node will be applicable for scheduling. + for (int i = 0; i < 10; i++) { + try { + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2)), + null); + } catch (Exception e) { + Assert.fail("Allocate request should be handled on node removal"); + } + if (ctxt.getNodeMap().size() == 1) { + break; + } + Thread.sleep(50); + } + Assert.assertEquals(1, ctxt.getNodeMap().size()); + } + + private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime, + int queueLength) { + OpportunisticContainersStatus status1 = + Mockito.mock(OpportunisticContainersStatus.class); + Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime); + Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength); + return status1; + } +} \ No newline at end of file