diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index eb4c626f952..2ad439189fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -513,6 +513,19 @@ public RMApp submitApp(int masterMemory) throws Exception { return submitApp(masterMemory, false); } + public RMApp submitApp(int masterMemory, Set appTags) + throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + ResourceRequest amResourceRequest = ResourceRequest.newInstance( + Priority.newInstance(0), ResourceRequest.ANY, resource, 1); + return submitApp(Collections.singletonList(amResourceRequest), "", + UserGroupInformation.getCurrentUser().getShortUserName(), null, false, + null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, + false, false, null, 0, null, true, Priority.newInstance(0), null, + null, null, appTags); + } + public RMApp submitApp(int masterMemory, Priority priority) throws Exception { Resource resource = Resource.newInstance(masterMemory, 0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() @@ -732,8 +745,23 @@ public RMApp submitApp(List amResourceRequests, String name, LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete, Priority priority, String amLabel, Map applicationTimeouts, - ByteBuffer tokensConf) - throws Exception { + ByteBuffer tokensConf) throws Exception { + return submitApp(amResourceRequests, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, priority, amLabel, + applicationTimeouts, tokensConf, null); + } + + public RMApp submitApp(List amResourceRequests, String name, + String user, Map acls, boolean unmanaged, + String queue, int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, + boolean cancelTokensWhenComplete, Priority priority, String amLabel, + Map applicationTimeouts, + ByteBuffer tokensConf, Set applicationTags) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -749,6 +777,9 @@ public RMApp submitApp(List amResourceRequests, String name, sub.setApplicationId(appId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); + if (applicationTags != null) { + sub.setApplicationTags(applicationTags); + } if (applicationTimeouts != null && applicationTimeouts.size() > 0) { sub.setApplicationTimeouts(applicationTimeouts); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java index f23fd8f06f3..bedc7ed98ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -21,12 +21,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; -import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -41,12 +47,29 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace.AppID; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace.AppTag; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace.NotSelf; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace; public class TestSchedulingRequestContainerAllocation { private final int GB = 1024; @@ -435,8 +458,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - PlacementConstraint constraint = PlacementConstraints - .targetNotIn("node", allocationTag("t1")) + PlacementConstraint constraint = targetNotIn("node", allocationTag("t1")) .build(); SchedulingRequest sc = SchedulingRequest .newInstance(0, Priority.newInstance(1), @@ -477,4 +499,419 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + + private void doNodeHeartbeat(MockNM ... nms) throws Exception { + for (MockNM nm : nms) { + nm.nodeHeartbeat(true); + } + } + + private List waitForAllocation(int allocNum, int timeout, + MockAM am, MockNM... nms) throws Exception { + final List result = new ArrayList<>(); + GenericTestUtils.waitFor(() -> { + try { + AllocateResponse response = am.schedule(); + List allocated = response.getAllocatedContainers(); + System.out.println("Expecting allocation: " + allocNum + + ", actual allocation: " + allocated.size()); + for (Container c : allocated) { + System.out.println("Container " + c.getId().toString() + + " is allocated on node: " + c.getNodeId().toString() + + ", allocation tags: " + + String.join(",", c.getAllocationTags())); + } + result.addAll(allocated); + if (result.size() == allocNum) { + return true; + } + doNodeHeartbeat(nms); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + }, 500, timeout); + return result; + } + + private static SchedulingRequest schedulingRequest(int requestId, + int containers, int cores, int mem, PlacementConstraint constraint, + String... tags) { + return schedulingRequest(1, requestId, containers, cores, mem, + ExecutionType.GUARANTEED, constraint, tags); + } + + private static SchedulingRequest schedulingRequest( + int priority, long allocReqId, int containers, int cores, int mem, + ExecutionType execType, PlacementConstraint constraint, String... tags) { + return SchedulingRequest.newBuilder() + .priority(Priority.newInstance(priority)) + .allocationRequestId(allocReqId) + .allocationTags(new HashSet<>(Arrays.asList(tags))) + .executionType(ExecutionTypeRequest.newInstance(execType, true)) + .resourceSizing( + ResourceSizing.newInstance(containers, + Resource.newInstance(mem, cores))) + .placementConstraintExpression(constraint) + .build(); + } + + private int getContainerNodesNum(List containers) { + Set nodes = new HashSet<>(); + if (containers != null) { + containers.forEach(c -> nodes.add(c.getNodeId())); + } + return nodes.size(); + } + + @Test(timeout = 30000L) + public void testInterAppCompositeConstraints() throws Exception { + // This test both intra and inter app constraints. + // Including simple affinity, anti-affinity, cardinality constraints, + // and simple AND composite constraints. + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + try { + rm.start(); + + MockNM nm1 = rm.registerNode("h1:1234", 100*GB, 100); + MockNM nm2 = rm.registerNode("h2:1234", 100*GB, 100); + MockNM nm3 = rm.registerNode("h3:1234", 100*GB, 100); + MockNM nm4 = rm.registerNode("h4:1234", 100*GB, 100); + MockNM nm5 = rm.registerNode("h5:1234", 100*GB, 100); + + RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("hbase")); + // Allocate AM container on nm1 + doNodeHeartbeat(nm1); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // App1 (hbase) + // h1: hbase-master(1) + // h2: hbase-master(1) + // h3: + // h4: + // h5: + PlacementConstraint pc = targetNotIn("node", + allocationTag("hbase-master")).build(); + am1.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 2, 1, 2048, pc, "hbase-master"))); + List allocated = waitForAllocation(2, 3000, am1, nm1, nm2); + + // 2 containers allocated + Assert.assertEquals(2, allocated.size()); + // containers should be distributed on 2 different nodes + Assert.assertEquals(2, getContainerNodesNum(allocated)); + + // App1 (hbase) + // h1: hbase-rs(1), hbase-master(1) + // h2: hbase-rs(1), hbase-master(1) + // h3: hbase-rs(1) + // h4: hbase-rs(1) + // h5: + pc = targetNotIn("node", allocationTag("hbase-rs")).build(); + am1.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(2, 4, 1, 1024, pc, "hbase-rs"))); + allocated = waitForAllocation(4, 3000, am1, nm1, nm2, nm3, nm4, nm5); + + Assert.assertEquals(4, allocated.size()); + Assert.assertEquals(4, getContainerNodesNum(allocated)); + + // App2 (web-server) + // Web server instance has 2 instance and non of them can be co-allocated + // with hbase-master. + RMApp app2 = rm.submitApp(1*GB, ImmutableSet.of("web-server")); + // Allocate AM container on nm1 + doNodeHeartbeat(nm2); + RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt(); + MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId()); + am2.registerAppAttempt(); + + // App2 (web-server) + // h1: hbase-rs(1), hbase-master(1) + // h2: hbase-rs(1), hbase-master(1) + // h3: hbase-rs(1), ws-inst(1) + // h4: hbase-rs(1), ws-inst(1) + // h5: + pc = and( + targetIn("node", allocationTagWithNamespace( + new TargetApplicationsNamespace.All().toString(), "hbase-master")), + targetNotIn("node", allocationTag("ws-inst"))).build(); + am2.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 2, 1, 2048, pc, "ws-inst"))); + allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(2, allocated.size()); + Assert.assertEquals(2, getContainerNodesNum(allocated)); + + ConcurrentMap rmNodes = rm.getRMContext().getRMNodes(); + for (Container c : allocated) { + RMNode rmNode = rmNodes.get(c.getNodeId()); + Assert.assertNotNull(rmNode); + Assert.assertTrue("If ws-inst is allocated to a node," + + " this node should have inherited the ws-inst tag ", + rmNode.getAllocationTagsWithCount().get("ws-inst") == 1); + Assert.assertTrue("ws-inst should be co-allocated to " + + "hbase-master nodes", + rmNode.getAllocationTagsWithCount().get("hbase-master") == 1); + } + + // App3 (ws-servant) + // App3 has multiple instances that must be co-allocated + // with app2 server instance, and each node cannot have more than + // 3 instances. + RMApp app3 = rm.submitApp(1*GB, ImmutableSet.of("ws-servants")); + // Allocate AM container on nm1 + doNodeHeartbeat(nm3); + RMAppAttempt app3attempt1 = app3.getCurrentAppAttempt(); + MockAM am3 = rm.sendAMLaunched(app3attempt1.getAppAttemptId()); + am3.registerAppAttempt(); + + // App3 (ws-servant) + // h1: hbase-rs(1), hbase-master(1) + // h2: hbase-rs(1), hbase-master(1) + // h3: hbase-rs(1), ws-inst(1), ws-servant(3) + // h4: hbase-rs(1), ws-inst(1), ws-servant(3) + // h5: + pc = and( + targetIn("node", allocationTagWithNamespace( + new AppTag("web-server").toString(), + "ws-inst")), + cardinality("node", 0, 2, "ws-servant")).build(); + am3.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 10, 1, 512, pc, "ws-servant"))); + // total 6 containers can be allocated due to cardinality constraint + // each round, 2 containers can be allocated + allocated = waitForAllocation(6, 10000, am3, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(6, allocated.size()); + Assert.assertEquals(2, getContainerNodesNum(allocated)); + + for (Container c : allocated) { + RMNode rmNode = rmNodes.get(c.getNodeId()); + Assert.assertNotNull(rmNode); + Assert.assertTrue("Node has ws-servant allocated must have 3 instances", + rmNode.getAllocationTagsWithCount().get("ws-servant") == 3); + Assert.assertTrue("Every ws-servant container should be co-allocated" + + " with ws-inst", + rmNode.getAllocationTagsWithCount().get("ws-inst") == 1); + } + } finally { + rm.stop(); + } + } + + @Test(timeout = 30000L) + public void testMultiAllocationTagsConstraints() throws Exception { + // This test simulates to use PC to avoid port conflicts + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + try { + rm.start(); + + MockNM nm1 = rm.registerNode("h1:1234", 10*GB, 10); + MockNM nm2 = rm.registerNode("h2:1234", 10*GB, 10); + MockNM nm3 = rm.registerNode("h3:1234", 10*GB, 10); + MockNM nm4 = rm.registerNode("h4:1234", 10*GB, 10); + MockNM nm5 = rm.registerNode("h5:1234", 10*GB, 10); + + RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("server1")); + // Allocate AM container on nm1 + doNodeHeartbeat(nm1); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // App1 uses ports: 7000, 8000 and 9000 + String[] server1Ports = + new String[] {"port_6000", "port_7000", "port_8000"}; + PlacementConstraint pc = targetNotIn("node", + allocationTagWithNamespace(AllocationTagNamespaceType.ALL.toString(), + server1Ports)) + .build(); + am1.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 2, 1, 1024, pc, server1Ports))); + List allocated = waitForAllocation(2, 3000, + am1, nm1, nm2, nm3, nm4, nm5); + + // 2 containers allocated + Assert.assertEquals(2, allocated.size()); + // containers should be distributed on 2 different nodes + Assert.assertEquals(2, getContainerNodesNum(allocated)); + + // App1 uses ports: 6000 + String[] server2Ports = new String[] {"port_6000"}; + RMApp app2 = rm.submitApp(1*GB, ImmutableSet.of("server2")); + // Allocate AM container on nm1 + doNodeHeartbeat(nm2); + RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt(); + MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId()); + am2.registerAppAttempt(); + + pc = targetNotIn("node", + allocationTagWithNamespace( + AllocationTagNamespaceType.ALL.toString(), + server2Ports)) + .build(); + am2.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 3, 1, 1024, pc, server2Ports))); + allocated = waitForAllocation(3, 3000, am2, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(3, allocated.size()); + Assert.assertEquals(3, getContainerNodesNum(allocated)); + + ConcurrentMap rmNodes = rm.getRMContext().getRMNodes(); + for (Container c : allocated) { + RMNode rmNode = rmNodes.get(c.getNodeId()); + Assert.assertNotNull(rmNode); + Assert.assertTrue("server2 should not co-allocate to server1 as" + + " they both need to use port 6000", + rmNode.getAllocationTagsWithCount().get("port_6000") == 1); + Assert.assertFalse(rmNode.getAllocationTagsWithCount() + .containsKey("port_7000")); + Assert.assertFalse(rmNode.getAllocationTagsWithCount() + .containsKey("port_8000")); + } + } finally { + rm.stop(); + } + } + + @Test(timeout = 30000L) + public void testInterAppConstraintsWithNamespaces() throws Exception { + // This test verifies inter-app constraints with namespaces + // not-self/app-id/app-tag + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + try { + rm.start(); + + MockNM nm1 = rm.registerNode("h1:1234", 100*GB, 100); + MockNM nm2 = rm.registerNode("h2:1234", 100*GB, 100); + MockNM nm3 = rm.registerNode("h3:1234", 100*GB, 100); + MockNM nm4 = rm.registerNode("h4:1234", 100*GB, 100); + MockNM nm5 = rm.registerNode("h5:1234", 100*GB, 100); + + ApplicationId app5Id = null; + Map> allocMap = new HashMap<>(); + // 10 apps and all containers are attached with foo tag + for (int i = 0; i<10; i++) { + // App1 ~ app5 tag "former5" + // App6 ~ app10 tag "latter5" + String applicationTag = i<5 ? "former5" : "latter5"; + RMApp app = rm.submitApp(1*GB, ImmutableSet.of(applicationTag)); + // Allocate AM container on nm1 + doNodeHeartbeat(nm1, nm2, nm3, nm4, nm5); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + PlacementConstraint pc = targetNotIn("node", allocationTag("foo")) + .build(); + am.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 3, 1, 1024, pc, "foo"))); + List allocated = waitForAllocation(3, 3000, + am, nm1, nm2, nm3, nm4, nm5); + // Memorize containers that has app5 foo + if (i == 5) { + app5Id = am.getApplicationAttemptId().getApplicationId(); + } + allocMap.put(am.getApplicationAttemptId().getApplicationId(), + allocated); + } + + Assert.assertNotNull(app5Id); + Assert.assertEquals(3, getContainerNodesNum(allocMap.get(app5Id))); + + // *** app-id + // Submit another app, use app-id constraint against app5 + RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("xyz")); + // Allocate AM container on nm1 + doNodeHeartbeat(nm1); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + PlacementConstraint pc = targetIn("node", + allocationTagWithNamespace(new AppID(app5Id).toString(), "foo")) + .build(); + am1.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 3, 1, 1024, pc, "foo"))); + List allocated = waitForAllocation(3, 3000, + am1, nm1, nm2, nm3, nm4, nm5); + + ConcurrentMap rmNodes = rm.getRMContext().getRMNodes(); + List app5Alloc = allocMap.get(app5Id); + for (Container c : allocated) { + RMNode rmNode = rmNodes.get(c.getNodeId()); + Assert.assertNotNull(rmNode); + Assert.assertTrue( + "This app is affinity with app-id/app5/foo containers", + app5Alloc.stream().anyMatch( + c5 -> c5.getNodeId() == c.getNodeId())); + } + + // *** app-tag + RMApp app2 = rm.submitApp(1*GB); + // Allocate AM container on nm1 + doNodeHeartbeat(nm2); + RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt(); + MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId()); + am2.registerAppAttempt(); + + pc = targetNotIn("node", + allocationTagWithNamespace(new AppTag("xyz").toString(), "foo")) + .build(); + am2.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 2, 1, 1024, pc, "foo"))); + allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(2, allocated.size()); + + // none of them can be allocated to nodes that has app5 foo containers + for (Container c : app5Alloc) { + Assert.assertNotEquals(c.getNodeId(), + allocated.iterator().next().getNodeId()); + } + + // *** not-self + RMApp app3 = rm.submitApp(1*GB); + // Allocate AM container on nm1 + doNodeHeartbeat(nm3); + RMAppAttempt app3attempt1 = app3.getCurrentAppAttempt(); + MockAM am3 = rm.sendAMLaunched(app3attempt1.getAppAttemptId()); + am3.registerAppAttempt(); + + pc = cardinality("node", new NotSelf().toString(), 1, 1, "foo") + .build(); + am3.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 1, 1, 1024, pc, "foo"))); + allocated = waitForAllocation(1, 3000, am3, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(1, allocated.size()); + // All 5 containers should be allocated + Assert.assertTrue(rmNodes.get(allocated.iterator().next().getNodeId()) + .getAllocationTagsWithCount().get("foo") == 2); + } finally { + rm.stop(); + } + } }