updatedStats,
req.setNodeStatus(status);
req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey);
req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey);
-
req.setRegisteringCollectors(this.registeringCollectors);
req.setTokenSequenceNo(this.tokenSequenceNo);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestCentralizedOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestCentralizedOpportunisticContainerAllocator.java
new file mode 100644
index 00000000000..f8795158a10
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestCentralizedOpportunisticContainerAllocator.java
@@ -0,0 +1,1008 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for Centralized Opportunistic Container Allocator.
+ */
+public class TestCentralizedOpportunisticContainerAllocator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestCentralizedOpportunisticContainerAllocator.class);
+ private static final int GB = 1024;
+ private CentralizedOpportunisticContainerAllocator allocator = null;
+ private OpportunisticContainerContext oppCntxt = null;
+ private static final Priority PRIORITY_NORMAL = Priority.newInstance(1);
+ private static final Resource CAPABILITY_1GB =
+ Resources.createResource(GB);
+ private static final ExecutionTypeRequest OPPORTUNISTIC_REQ =
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
+
+ @Before
+ public void setup() {
+ SecurityUtil.setTokenServiceUseIp(false);
+ final MasterKey mKey = new MasterKey() {
+ @Override
+ public int getKeyId() {
+ return 1;
+ }
+ @Override
+ public void setKeyId(int keyId) {}
+ @Override
+ public ByteBuffer getBytes() {
+ return ByteBuffer.allocate(8);
+ }
+ @Override
+ public void setBytes(ByteBuffer bytes) {}
+ };
+ BaseContainerTokenSecretManager secMan =
+ new BaseContainerTokenSecretManager(new Configuration()) {
+ @Override
+ public MasterKey getCurrentKey() {
+ return mKey;
+ }
+
+ @Override
+ public byte[] createPassword(ContainerTokenIdentifier identifier) {
+ return new byte[]{1, 2};
+ }
+ };
+
+ allocator = new CentralizedOpportunisticContainerAllocator(secMan);
+ oppCntxt = new OpportunisticContainerContext();
+ oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1));
+ oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1));
+ oppCntxt.getAppParams().setMaxResource(Resource.newInstance(1024, 10));
+ }
+
+ /**
+ * Tests allocation of an Opportunistic container from single application.
+ * @throws Exception
+ */
+ @Test
+ public void testSimpleAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Collections.singletonList(ResourceRequest.newInstance(PRIORITY_NORMAL,
+ "*", CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1, 2, 100);
+ selector.addNode(null, h1);
+ selector.updateNode(h1);
+ selector.computeTask.run();
+
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ Assert.assertEquals(1, containers.size());
+ Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
+ }
+
+ /**
+ * Tests Opportunistic container should not be allocated on blacklisted
+ * nodes.
+ * @throws Exception
+ */
+ @Test
+ public void testBlacklistRejection() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ Arrays.asList("h1", "h2"), new ArrayList<>());
+ List reqs =
+ Collections.singletonList(ResourceRequest.newInstance(PRIORITY_NORMAL,
+ "*", CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1, 2, 100);
+ RMNode h2 = createRMNode("h2", 1, 2, 100);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ Assert.assertEquals(0, containers.size());
+ Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size());
+ }
+
+ /**
+ * Tests that allocation of Opportunistic containers should be spread out.
+ * @throws Exception
+ */
+ @Test
+ public void testRoundRobinSimpleAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newBuilder().allocationRequestId(1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName(ResourceRequest.ANY)
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName(ResourceRequest.ANY)
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(3)
+ .priority(PRIORITY_NORMAL)
+ .resourceName(ResourceRequest.ANY)
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1, 2, 3);
+ RMNode h2 = createRMNode("h2", 1, 2, 3);
+ RMNode h3 = createRMNode("h3", 1, 2, 3);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ LOG.info("Containers: {}", containers);
+ Set allocatedNodes = new HashSet<>();
+ for (Container c : containers) {
+ allocatedNodes.add(c.getNodeId().toString());
+ }
+ Assert.assertTrue(allocatedNodes.contains("h1:1"));
+ Assert.assertTrue(allocatedNodes.contains("h2:1"));
+ Assert.assertTrue(allocatedNodes.contains("h3:1"));
+ Assert.assertEquals(3, containers.size());
+ }
+
+ /**
+ * Tests allocation of node local Opportunistic container requests.
+ * @throws Exception
+ */
+ @Test
+ public void testNodeLocalAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newBuilder().allocationRequestId(1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName(ResourceRequest.ANY)
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("/r1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("h1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName(ResourceRequest.ANY)
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(3)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("/r1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(3)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("h1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(3)
+ .priority(PRIORITY_NORMAL)
+ .resourceName(ResourceRequest.ANY)
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, 2, 5);
+ RMNode h2 = createRMNode("h2", 1234, 2, 5);
+ RMNode h3 = createRMNode("h3", 1234, 2, 5);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ LOG.info("Containers: {}", containers);
+ // all 3 containers should be allocated.
+ Assert.assertEquals(3, containers.size());
+ // container with allocation id 2 and 3 should be allocated on node h1
+ for (Container c : containers) {
+ if (c.getAllocationRequestId() == 2 || c.getAllocationRequestId() == 3) {
+ Assert.assertEquals("h1:1234", c.getNodeId().toString());
+ }
+ }
+ }
+
+ /**
+ * Tests node local allocation of Opportunistic container requests with
+ * same allocation request id.
+ * @throws Exception
+ */
+ @Test
+ public void testNodeLocalAllocationSameSchedulerKey() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .numContainers(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("/r1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .numContainers(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("h1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .numContainers(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName(ResourceRequest.ANY)
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, 2, 5);
+ RMNode h2 = createRMNode("h2", 1234, 2, 5);
+ RMNode h3 = createRMNode("h3", 1234, 2, 5);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ LOG.info("Containers: {}", containers);
+ Set allocatedHosts = new HashSet<>();
+ for (Container c : containers) {
+ allocatedHosts.add(c.getNodeId().toString());
+ }
+ Assert.assertEquals(2, containers.size());
+ Assert.assertTrue(allocatedHosts.contains("h1:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h2:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+ }
+
+ /**
+ * Tests rack local allocation of Opportunistic container requests.
+ * @throws Exception
+ */
+ @Test
+ public void testSimpleRackLocalAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "*",
+ CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "h4",
+ CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "/r1",
+ CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r2", 2, 5);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 2, 5);
+ RMNode h3 = createRMNode("h3", 1234, "/r3", 2, 5);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ Set allocatedHosts = new HashSet<>();
+ for (Container c : containers) {
+ allocatedHosts.add(c.getNodeId().toString());
+ }
+ Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h4:1234"));
+ Assert.assertEquals(1, containers.size());
+ }
+
+ /**
+ * Tests that allocation of rack local Opportunistic container requests
+ * should be spread out.
+ * @throws Exception
+ */
+ @Test
+ public void testRoundRobinRackLocalAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newBuilder().allocationRequestId(1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("/r1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("h5")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName(ResourceRequest.ANY)
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("/r1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("h5")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .priority(PRIORITY_NORMAL)
+ .resourceName(ResourceRequest.ANY)
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r2", 4, 5);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 4, 5);
+ RMNode h3 = createRMNode("h3", 1234, "/r3", 4, 5);
+ RMNode h4 = createRMNode("h4", 1234, "/r1", 4, 5);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ Set allocatedHosts = new HashSet<>();
+ for (Container c : containers) {
+ allocatedHosts.add(c.getNodeId().toString());
+ }
+ LOG.info("Containers: {}", containers);
+ Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+ Assert.assertTrue(allocatedHosts.contains("h4:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h1:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+ Assert.assertEquals(2, containers.size());
+ }
+
+ /**
+ * Tests that allocation of rack local Opportunistic container requests
+ * with same allocation request id should be spread out.
+ * @throws Exception
+ */
+ @Test
+ public void testRoundRobinRackLocalAllocationSameSchedulerKey()
+ throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "*",
+ CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "h5",
+ CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "/r1",
+ CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r2", 4, 5);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 4, 5);
+ RMNode h3 = createRMNode("h3", 1234, "/r3", 4, 5);
+ RMNode h4 = createRMNode("h4", 1234, "/r1", 4, 5);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ Set allocatedHosts = new HashSet<>();
+ for (Container c : containers) {
+ allocatedHosts.add(c.getNodeId().toString());
+ }
+ LOG.info("Containers: {}", containers);
+ Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+ Assert.assertTrue(allocatedHosts.contains("h4:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h1:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+ Assert.assertEquals(2, containers.size());
+ }
+
+ /**
+ * Tests off switch allocation of Opportunistic containers.
+ * @throws Exception
+ */
+ @Test
+ public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "*",
+ CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "h6",
+ CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "/r3",
+ CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r2", 4, 5);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 4, 5);
+ RMNode h3 = createRMNode("h3", 1234, "/r2", 4, 5);
+ RMNode h4 = createRMNode("h4", 1234, "/r1", 4, 5);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ LOG.info("Containers: {}", containers);
+ Assert.assertEquals(2, containers.size());
+ }
+
+ /**
+ * Tests allocation of rack local Opportunistic containers with same
+ * scheduler key.
+ * @throws Exception
+ */
+ @Test
+ public void testLotsOfContainersRackLocalAllocationSameSchedulerKey()
+ throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "*",
+ CAPABILITY_1GB, 1000, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "h1",
+ CAPABILITY_1GB, 1000, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "/r1",
+ CAPABILITY_1GB, 1000, true, null, OPPORTUNISTIC_REQ));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r1", 0, 500);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 0, 500);
+ RMNode h3 = createRMNode("h3", 1234, "/r1", 0, 500);
+ RMNode h4 = createRMNode("h4", 1234, "/r2", 0, 300);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+
+ Map hostsToNumContainerMap = new HashMap<>();
+ for (Container c : containers) {
+ String host = c.getNodeId().toString();
+ int numContainers = 0;
+ if (hostsToNumContainerMap.containsKey(host)) {
+ numContainers = hostsToNumContainerMap.get(host);
+ }
+ hostsToNumContainerMap.put(host, numContainers + 1);
+ }
+ Assert.assertEquals(1000, containers.size());
+ Assert.assertEquals(500, hostsToNumContainerMap.get("h1:1234").intValue());
+ Assert.assertFalse(hostsToNumContainerMap.containsKey("h4:1234"));
+ }
+
+ /**
+ * Tests scheduling of many rack local Opportunistic container requests.
+ * @throws Exception
+ */
+ @Test
+ public void testLotsOfContainersRackLocalAllocation()
+ throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("*")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("h5")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("/r1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ }
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r1", 0, 500);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 0, 500);
+ RMNode h3 = createRMNode("h3", 1234, "/r1", 0, 500);
+ RMNode h4 = createRMNode("h4", 1234, "/r2", 0, 300);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = new ArrayList<>();
+ for (int i = 0; i < 25; i++) {
+ containers.addAll(allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user"));
+ }
+ Assert.assertEquals(100, containers.size());
+ }
+
+ /**
+ * Tests maximum number of opportunistic containers that can be allocated in
+ * AM heartbeat.
+ * @throws Exception
+ */
+ @Test
+ public void testMaxAllocationsPerAMHeartbeat() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ allocator.setMaxAllocationsPerAMHeartbeat(2);
+ List reqs = Arrays.asList(
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "*", CAPABILITY_1GB, 3,
+ true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "h6", CAPABILITY_1GB, 3,
+ true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "/r3", CAPABILITY_1GB, 3,
+ true, null, OPPORTUNISTIC_REQ));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r1", 0, 500);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 0, 500);
+ RMNode h3 = createRMNode("h3", 1234, "/r1", 0, 500);
+ RMNode h4 = createRMNode("h4", 1234, "/r2", 0, 300);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // Although capacity is present, but only 2 containers should be allocated
+ // as max allocation per AM heartbeat is set to 2.
+ Assert.assertEquals(2, containers.size());
+ containers = allocator.allocateContainers(
+ blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // Remaining 1 container should be allocated.
+ Assert.assertEquals(1, containers.size());
+ }
+
+ /**
+ * Tests maximum opportunistic container allocation per AM heartbeat for
+ * allocation requests with different scheduler key.
+ * @throws Exception
+ */
+ @Test
+ public void testMaxAllocationsPerAMHeartbeatDifferentSchedKey()
+ throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ allocator.setMaxAllocationsPerAMHeartbeat(2);
+ final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true);
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(Priority.newInstance(2), "h6",
+ CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(Priority.newInstance(3), "/r3",
+ CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r1", 0, 500);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 0, 500);
+ RMNode h3 = createRMNode("h3", 1234, "/r1", 0, 500);
+ RMNode h4 = createRMNode("h4", 1234, "/r2", 0, 300);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // Although capacity is present, but only 2 containers should be allocated
+ // as max allocation per AM heartbeat is set to 2.
+ Assert.assertEquals(2, containers.size());
+ containers = allocator.allocateContainers(
+ blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // 2 more containers should be allocated from pending allocation requests.
+ Assert.assertEquals(2, containers.size());
+ containers = allocator.allocateContainers(
+ blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // Remaining 1 container should be allocated.
+ Assert.assertEquals(1, containers.size());
+ }
+
+ /**
+ * Tests maximum opportunistic container allocation per AM heartbeat when
+ * limit is set to -1.
+ * @throws Exception
+ */
+ @Test
+ public void testMaxAllocationsPerAMHeartbeatWithNoLimit() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ allocator.setMaxAllocationsPerAMHeartbeat(-1);
+
+ List reqs = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("h1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ }
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r1", 0, 500);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 0, 500);
+ RMNode h3 = createRMNode("h3", 1234, "/r1", 0, 500);
+ RMNode h4 = createRMNode("h4", 1234, "/r2", 0, 300);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
+
+ // all containers should be allocated in single heartbeat.
+ Assert.assertEquals(20, containers.size());
+ }
+
+ /**
+ * Tests maximum opportunistic container allocation per AM heartbeat when
+ * limit is set to higher value.
+ * @throws Exception
+ */
+ @Test
+ public void testMaxAllocationsPerAMHeartbeatWithHighLimit()
+ throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ allocator.setMaxAllocationsPerAMHeartbeat(100);
+
+ List reqs = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+ .priority(PRIORITY_NORMAL)
+ .resourceName("h1")
+ .capability(CAPABILITY_1GB)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ }
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r1", 0, 500);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 0, 500);
+ RMNode h3 = createRMNode("h3", 1234, "/r1", 0, 500);
+ RMNode h4 = createRMNode("h4", 1234, "/r2", 0, 300);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
+
+ // all containers should be allocated in single heartbeat.
+ Assert.assertEquals(20, containers.size());
+ }
+
+ /**
+ * Test opportunistic container allocation latency metrics.
+ * @throws Exception
+ */
+ @Test
+ public void testAllocationLatencyMetrics() throws Exception {
+ oppCntxt = spy(oppCntxt);
+ OpportunisticSchedulerMetrics metrics =
+ mock(OpportunisticSchedulerMetrics.class);
+ when(oppCntxt.getOppSchedulerMetrics()).thenReturn(metrics);
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ Collections.emptyList(), Collections.emptyList());
+ List reqs = Arrays.asList(
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "*", CAPABILITY_1GB, 2,
+ true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "h6", CAPABILITY_1GB, 2,
+ true, null, OPPORTUNISTIC_REQ),
+ ResourceRequest.newInstance(PRIORITY_NORMAL, "/r3", CAPABILITY_1GB, 2,
+ true, null, OPPORTUNISTIC_REQ));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+ RMNode h1 = createRMNode("h1", 1234, "/r1", 0, 500);
+ RMNode h2 = createRMNode("h2", 1234, "/r1", 0, 500);
+ RMNode h3 = createRMNode("h3", 1234, "/r1", 0, 500);
+ RMNode h4 = createRMNode("h4", 1234, "/r2", 0, 300);
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+ selector.addNode(null, h4);
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+ selector.updateNode(h4);
+ selector.computeTask.run();
+ allocator.setNodeQueueLoadMonitor(selector);
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user");
+ LOG.info("Containers: {}", containers);
+ Assert.assertEquals(2, containers.size());
+ // for each allocated container, latency should be added.
+ verify(metrics, times(2)).addAllocateOLatencyEntry(anyLong());
+ }
+
+ private RMNode createRMNode(String host, int port, int queueLength,
+ int queueCapacity) {
+ return createRMNode(host, port, "default", queueLength,
+ queueCapacity);
+ }
+
+ private RMNode createRMNode(String host, int port, String rack,
+ int queueLength, int queueCapacity) {
+ RMNode node1 = Mockito.mock(RMNode.class);
+ NodeId nID1 = new TestNodeQueueLoadMonitor.FakeNodeId(host, port);
+ Mockito.when(node1.getHostName()).thenReturn(host);
+ Mockito.when(node1.getRackName()).thenReturn(rack);
+ Mockito.when(node1.getNodeID()).thenReturn(nID1);
+ Mockito.when(node1.getState()).thenReturn(NodeState.RUNNING);
+ OpportunisticContainersStatus status1 =
+ Mockito.mock(OpportunisticContainersStatus.class);
+ Mockito.when(status1.getEstimatedQueueWaitTime())
+ .thenReturn(-1);
+ Mockito.when(status1.getWaitQueueLength())
+ .thenReturn(queueLength);
+ Mockito.when(status1.getOpportQueueCapacity())
+ .thenReturn(queueCapacity);
+ Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1);
+ return node1;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
index bbc0086c375..eedd9de93ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
@@ -27,7 +27,9 @@
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* Unit tests for NodeQueueLoadMonitor.
@@ -228,6 +230,127 @@ public void testContainerQueuingLimit() {
}
+ /**
+ * Tests selection of local node from NodeQueueLoadMonitor. This test covers
+ * selection of node based on queue limit and blacklisted nodes.
+ */
+ @Test
+ public void testSelectLocalNode() {
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+
+ RMNode h1 = createRMNode("h1", 1, -1, 2, 5);
+ RMNode h2 = createRMNode("h2", 2, -1, 5, 5);
+ RMNode h3 = createRMNode("h3", 3, -1, 4, 5);
+
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+
+ // basic test for selecting node which has queue length less
+ // than queue capacity.
+ Set blacklist = new HashSet<>();
+ RMNode node = selector.selectLocalNode("h1", blacklist);
+ Assert.assertEquals("h1", node.getHostName());
+
+ // if node has been added to blacklist
+ blacklist.add("h1");
+ node = selector.selectLocalNode("h1", blacklist);
+ Assert.assertNull(node);
+
+ node = selector.selectLocalNode("h2", blacklist);
+ Assert.assertNull(node);
+
+ node = selector.selectLocalNode("h3", blacklist);
+ Assert.assertEquals("h3", node.getHostName());
+ }
+
+ /**
+ * Tests selection of rack local node from NodeQueueLoadMonitor. This test
+ * covers selection of node based on queue limit and blacklisted nodes.
+ */
+ @Test
+ public void testSelectRackLocalNode() {
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+
+ RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5);
+ RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5);
+ RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 5);
+
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+
+ // basic test for selecting node which has queue length less
+ // than queue capacity.
+ Set blacklist = new HashSet<>();
+ RMNode node = selector.selectRackLocalNode("rack1", blacklist);
+ Assert.assertEquals("h1", node.getHostName());
+
+ // if node has been added to blacklist
+ blacklist.add("h1");
+ node = selector.selectRackLocalNode("rack1", blacklist);
+ Assert.assertNull(node);
+
+ node = selector.selectRackLocalNode("rack2", blacklist);
+ Assert.assertEquals("h3", node.getHostName());
+
+ blacklist.add("h3");
+ node = selector.selectRackLocalNode("rack2", blacklist);
+ Assert.assertNull(node);
+ }
+
+ /**
+ * Tests selection of any node from NodeQueueLoadMonitor. This test
+ * covers selection of node based on queue limit and blacklisted nodes.
+ */
+ @Test
+ public void testSelectAnyNode() {
+ NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+ NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+
+ RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5);
+ RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5);
+ RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 10);
+
+ selector.addNode(null, h1);
+ selector.addNode(null, h2);
+ selector.addNode(null, h3);
+
+ selector.updateNode(h1);
+ selector.updateNode(h2);
+ selector.updateNode(h3);
+
+ selector.computeTask.run();
+
+ Assert.assertEquals(2, selector.getSortedNodes().size());
+
+ // basic test for selecting node which has queue length
+ // less than queue capacity.
+ Set blacklist = new HashSet<>();
+ RMNode node = selector.selectAnyNode(blacklist);
+ Assert.assertTrue(node.getHostName().equals("h1") ||
+ node.getHostName().equals("h3"));
+
+ // if node has been added to blacklist
+ blacklist.add("h1");
+ node = selector.selectAnyNode(blacklist);
+ Assert.assertEquals("h3", node.getHostName());
+
+ blacklist.add("h3");
+ node = selector.selectAnyNode(blacklist);
+ Assert.assertNull(node);
+ }
+
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength) {
return createRMNode(host, port, waitTime, queueLength,
@@ -236,20 +359,28 @@ private RMNode createRMNode(String host, int port,
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength, NodeState state) {
- return createRMNode(host, port, waitTime, queueLength,
+ return createRMNode(host, port, "default", waitTime, queueLength,
DEFAULT_MAX_QUEUE_LENGTH, state);
}
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength, int queueCapacity) {
- return createRMNode(host, port, waitTime, queueLength, queueCapacity,
+ return createRMNode(host, port, "default", waitTime, queueLength,
+ queueCapacity, NodeState.RUNNING);
+ }
+
+ private RMNode createRMNode(String host, int port, String rack,
+ int waitTime, int queueLength, int queueCapacity) {
+ return createRMNode(host, port, rack, waitTime, queueLength, queueCapacity,
NodeState.RUNNING);
}
- private RMNode createRMNode(String host, int port,
+ private RMNode createRMNode(String host, int port, String rack,
int waitTime, int queueLength, int queueCapacity, NodeState state) {
RMNode node1 = Mockito.mock(RMNode.class);
NodeId nID1 = new FakeNodeId(host, port);
+ Mockito.when(node1.getHostName()).thenReturn(host);
+ Mockito.when(node1.getRackName()).thenReturn(rack);
Mockito.when(node1.getNodeID()).thenReturn(nID1);
Mockito.when(node1.getState()).thenReturn(state);
OpportunisticContainersStatus status1 =