diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java index de5492ee25f..f30460025cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java @@ -26,7 +26,7 @@ SELF("self"), NOT_SELF("not-self"), APP_ID("app-id"), - APP_LABEL("app-label"), + APP_TAG("app-tag"), ALL("all"); private String typeKeyword; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java index 7b9f3bee19b..c37e08f17aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java @@ -31,7 +31,7 @@ import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_TAG; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL; @@ -170,15 +170,23 @@ public All() { /** * Namespace to all applications in the cluster. */ - public static class AppLabel extends AllocationTagNamespace { + public static class AppTag extends AllocationTagNamespace { - public AppLabel() { - super(APP_LABEL); + private String applicationTag; + + public AppTag(String appTag) { + super(APP_TAG); + this.applicationTag = appTag; } @Override public void evaluate(TargetApplications target) { - // TODO Implement app-label namespace evaluation + setScopeIfNotNull(target.getApplicationIdsByTag(applicationTag)); + } + + @Override + public String toString() { + return APP_TAG.toString() + NAMESPACE_DELIMITER + this.applicationTag; } } @@ -238,8 +246,13 @@ public static AllocationTagNamespace parse(String namespaceStr) } String appIDStr = nsValues.get(1); return parseAppID(appIDStr); - case APP_LABEL: - return new AppLabel(); + case APP_TAG: + if (nsValues.size() != 2) { + throw new InvalidAllocationTagsQueryException( + "Missing the application tag in the namespace string: " + + namespaceStr); + } + return new AppTag(nsValues.get(1)); default: throw new InvalidAllocationTagsQueryException( "Invalid namespace string " + namespaceStr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index 830566a625a..1ecf4d73eac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,12 +31,14 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.log4j.Logger; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongBinaryOperator; @@ -642,10 +643,22 @@ public long getRackCardinalityByOp(String rack, AllocationTags tags, } /** - * @return all application IDs in a set that currently visible by - * the allocation tags manager. + * @return all applications that is known to the + * {@link AllocationTagsManager}, along with their application tags. + * The result is a map, where key is an application ID, and value is the + * application-tags attached to this application. If there is no + * application-tag exists for the application, the value is an empty set. */ - public Set getAllApplicationIds() { - return ImmutableSet.copyOf(perAppNodeMappings.keySet()); + public Map> getAllApplications() { + Map> result = new HashMap<>(); + ConcurrentMap allApps = rmContext.getRMApps(); + if (allApps != null) { + for (Map.Entry app : allApps.entrySet()) { + if (perAppNodeMappings.keySet().contains(app.getKey())) { + result.put(app.getKey(), app.getValue().getApplicationTags()); + } + } + } + return result; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 389fc5cc908..4c8e52e5082 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; @@ -70,18 +69,10 @@ private static AllocationTagNamespace getAllocationTagNamespace( throws InvalidAllocationTagsQueryException { // Parse to a valid namespace. AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey); - - // TODO Complete remove this check once we support app-label. - if (AllocationTagNamespaceType.APP_LABEL - .equals(namespace.getNamespaceType())) { - throw new InvalidAllocationTagsQueryException( - namespace.toString() + " is not supported yet!"); - } - // Evaluate the namespace according to the given target // before it can be consumed. TargetApplications ta = - new TargetApplications(currentAppId, atm.getAllApplicationIds()); + new TargetApplications(currentAppId, atm.getAllApplications()); namespace.evaluate(ta); return namespace; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java index 0de7c9ec6b1..59869cd34e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java @@ -18,8 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.ApplicationId; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -31,21 +36,59 @@ public class TargetApplications { private ApplicationId currentAppId; - private Set allAppIds; + private Map> allApps; public TargetApplications(ApplicationId currentApplicationId, Set allApplicationIds) { this.currentAppId = currentApplicationId; - this.allAppIds = allApplicationIds; + allApps = new HashMap<>(); + if (allApplicationIds != null) { + allApplicationIds.forEach(appId -> + allApps.put(appId, ImmutableSet.of())); + } + } + + public TargetApplications(ApplicationId currentApplicationId, + Map> allApplicationIds) { + this.currentAppId = currentApplicationId; + this.allApps = allApplicationIds; } public ApplicationId getCurrentApplicationId() { return this.currentAppId; } + public Set getAllApplicationIds() { + return this.allApps == null ? + ImmutableSet.of() : allApps.keySet(); + } + public Set getOtherApplicationIds() { - return allAppIds == null ? null : allAppIds.stream().filter(appId -> - !appId.equals(getCurrentApplicationId())) + if (getAllApplicationIds() == null + || getAllApplicationIds().isEmpty()) { + return ImmutableSet.of(); + } + return getAllApplicationIds() + .stream() + .filter(appId -> !appId.equals(getCurrentApplicationId())) .collect(Collectors.toSet()); } + + public Set getApplicationIdsByTag(String applicationTag) { + Set result = new HashSet<>(); + if (Strings.isNullOrEmpty(applicationTag) + || this.allApps == null) { + return result; + } + + for (Map.Entry> app + : this.allApps.entrySet()) { + if (app.getValue() != null + && app.getValue().contains(applicationTag)) { + result.add(app.getKey()); + } + } + + return result; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java index 1fce466d3ac..321e72b3623 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java @@ -163,4 +163,9 @@ public boolean allocationTagExistsOnNode(NodeId nodeId, throws InvalidAllocationTagsQueryException { return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag); } + + @Override + public Map> getAllApplications() { + return tagsManager.getAllApplications(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 9004110837e..1fc6badbe6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -23,8 +23,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; @@ -334,25 +332,6 @@ private void validateAndSetSchedulingRequest(SchedulingRequest targetAllocationTags = new HashSet<>( targetExpression.getTargetValues()); - - try { - AllocationTagNamespace tagNS = - AllocationTagNamespace.parse(targetExpression.getTargetKey()); - if (AllocationTagNamespaceType.APP_LABEL - .equals(tagNS.getNamespaceType())) { - throwExceptionWithMetaInfo( - "As of now, allocation tag namespace [" - + AllocationTagNamespaceType.APP_LABEL.toString() - + "] is not supported. Please make changes to placement " - + "constraints accordingly. If this is null, it will be " - + "set to " - + AllocationTagNamespaceType.SELF.toString() - + " by default."); - } - } catch (InvalidAllocationTagsQueryException e) { - throwExceptionWithMetaInfo( - "Invalid allocation tag namespace, message: " + e.getMessage()); - } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index c399368593a..664fae2d061 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -68,6 +68,7 @@ RMAppAttempt attempt; int maxAppAttempts = 1; List amReqs; + private Set applicationTags = null; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -82,6 +83,12 @@ public MockRMApp(int newid, long time, RMAppState newState, String userName) { user = userName; } + public MockRMApp(int newid, long time, RMAppState newState, + String userName, Set appTags) { + this(newid, time, newState, userName); + this.applicationTags = appTags; + } + public MockRMApp(int newid, long time, RMAppState newState, String userName, String diag) { this(newid, time, newState, userName); this.diagnostics = new StringBuilder(diag); @@ -248,7 +255,7 @@ public String getApplicationType() { @Override public Set getApplicationTags() { - return null; + return this.applicationTags; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java index d1ef331b195..823eda9215f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java @@ -16,11 +16,16 @@ * limitations under the License. */ import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + /** * Test class for {@link AllocationTagNamespace}. */ @@ -45,11 +50,23 @@ public void testNamespaceParse() throws InvalidAllocationTagsQueryException { Assert.assertEquals(AllocationTagNamespaceType.ALL, namespace.getNamespaceType()); - namespaceStr = "app-label"; + namespaceStr = "app-tag/spark-jobs"; namespace = AllocationTagNamespace.parse(namespaceStr); - Assert.assertEquals(AllocationTagNamespaceType.APP_LABEL, + Assert.assertEquals(AllocationTagNamespaceType.APP_TAG, namespace.getNamespaceType()); + // Invalid app-tag namespace syntax + try { + namespaceStr = "app-tag/tag123/tag234"; + AllocationTagNamespace.parse(namespaceStr); + Assert.fail("Parsing should fail as the given namespace is invalid"); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException); + Assert.assertTrue(e.getMessage().startsWith( + "Invalid namespace string")); + } + ApplicationId applicationId = ApplicationId.newInstance(12345, 1); namespaceStr = "app-id/" + applicationId.toString(); namespace = AllocationTagNamespace.parse(namespaceStr); @@ -145,5 +162,41 @@ public void testNamespaceEvaluation() throws namespace.evaluate(targetApplications); Assert.assertEquals(1, namespace.getNamespaceScope().size()); Assert.assertEquals(app2, namespace.getNamespaceScope().iterator().next()); + + /** + * App to Application Tags + * app1: A, B + * app2: A + * app3: + * app4: C + * app5: A, B, C + */ + Map> appsWithTags = new HashMap<>(); + appsWithTags.put(app1, ImmutableSet.of("A", "B")); + appsWithTags.put(app2, ImmutableSet.of("A")); + appsWithTags.put(app3, ImmutableSet.of()); + appsWithTags.put(app4, ImmutableSet.of("C")); + appsWithTags.put(app5, ImmutableSet.of("A", "B", "C")); + + namespaceStr = "app-tag/A"; + namespace = AllocationTagNamespace.parse(namespaceStr); + targetApplications = new TargetApplications(app1, appsWithTags); + namespace.evaluate(targetApplications); + Assert.assertEquals(3, namespace.getNamespaceScope().size()); + Assert.assertTrue(Sets.difference(namespace.getNamespaceScope(), + ImmutableSet.of(app1, app2, app5)).isEmpty()); + + namespaceStr = "app-tag/B"; + namespace = AllocationTagNamespace.parse(namespaceStr); + namespace.evaluate(targetApplications); + Assert.assertEquals(2, namespace.getNamespaceScope().size()); + Assert.assertTrue(Sets.difference(namespace.getNamespaceScope(), + ImmutableSet.of(app1, app5)).isEmpty()); + + // Not exist + namespaceStr = "app-tag/xyz"; + namespace = AllocationTagNamespace.parse(namespaceStr); + namespace.evaluate(targetApplications); + Assert.assertEquals(0, namespace.getNamespaceScope().size()); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index 48143219326..f47b0ddbd20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -37,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicLong; @@ -52,6 +54,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -63,6 +68,7 @@ import org.junit.Test; import com.google.common.collect.ImmutableSet; +import org.mockito.Mockito; /** * Test the PlacementConstraint Utility class functionality. @@ -648,17 +654,28 @@ public void testGlobalAppConstraints() @Test public void testNotSelfAppConstraints() throws InvalidAllocationTagsQueryException { - AllocationTagsManager tm = new AllocationTagsManager(rmContext); - PlacementConstraintManagerService pcm = - new MemoryPlacementConstraintManager(); - rmContext.setAllocationTagsManager(tm); - rmContext.setPlacementConstraintManager(pcm); - long ts = System.currentTimeMillis(); ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100); ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101); ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102); + ConcurrentMap allApps = new ConcurrentHashMap<>(); + allApps.put(application1, new MockRMApp(123, 1000, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + allApps.put(application2, new MockRMApp(124, 1001, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + allApps.put(application3, new MockRMApp(125, 1002, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + + RMContext mockedContext = Mockito.spy(rmContext); + when(mockedContext.getRMApps()).thenReturn(allApps); + + AllocationTagsManager tm = new AllocationTagsManager(mockedContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + mockedContext.setAllocationTagsManager(tm); + mockedContext.setPlacementConstraintManager(pcm); + // Register App1 with anti-affinity constraint map. RMNode n0r1 = rmNodes.get(0); RMNode n1r1 = rmNodes.get(1); @@ -872,6 +889,88 @@ public void testInterAppConstraintsByAppID() pcm.unregisterApplication(application3); } + @Test + public void testInterAppConstriantsByAppTag() + throws InvalidAllocationTagsQueryException { + + ApplicationId application1 = BuilderUtils.newApplicationId(1000, 123); + ApplicationId application2 = BuilderUtils.newApplicationId(1001, 124); + + // app1: test-tag + // app2: N/A + RMContext mockedContext = Mockito.spy(rmContext); + ConcurrentMap allApps = new ConcurrentHashMap<>(); + allApps.put(application1, new MockRMApp(123, 1000, + RMAppState.NEW, "userA", ImmutableSet.of("test-tag"))); + allApps.put(application2, new MockRMApp(124, 1001, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + when(mockedContext.getRMApps()).thenReturn(allApps); + + AllocationTagsManager tm = new AllocationTagsManager(mockedContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + mockedContext.setAllocationTagsManager(tm); + mockedContext.setPlacementConstraintManager(pcm); + + // Register App1 with anti-affinity constraint map. + RMNode n0r1 = rmNodes.get(0); + RMNode n1r1 = rmNodes.get(1); + RMNode n2r2 = rmNodes.get(2); + RMNode n3r2 = rmNodes.get(3); + + /** + * Place container: + * n0: app1/hbase-m(1) + * n1: "" + * n2: app1/hbase-m(1) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + + SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(), + n1r1.getRackName(), n1r1.getNodeID()); + SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(), + n2r2.getRackName(), n2r2.getNodeID()); + SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(), + n3r2.getRackName(), n3r2.getNodeID()); + + AllocationTagNamespace namespace = + new AllocationTagNamespace.AppTag("test-tag"); + Map, PlacementConstraint> constraintMap = new HashMap<>(); + PlacementConstraint constraint2 = PlacementConstraints + .targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(), + "hbase-m")) + .build(); + Set srcTags2 = ImmutableSet.of("app2"); + constraintMap.put(srcTags2, constraint2); + + pcm.registerApplication(application2, constraintMap); + + // Anti-affinity with app-tag/test-tag/hbase-m, + // app1 has tag "test-tag" so the constraint is equally to work on app1 + // onto n1 and n3 as they don't have "hbase-m" from app1. + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application1); + pcm.unregisterApplication(application2); + } + @Test public void testInvalidAllocationTagNamespace() { AllocationTagsManager tm = new AllocationTagsManager(rmContext);