diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java index 25f876156bd..ae479f49174 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java @@ -28,7 +28,7 @@ import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_TAG; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL; import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.fromString; @@ -123,11 +123,11 @@ public boolean isNotSelf() { /** * @return true if the namespace is effective to a group of applications - * identified by a application label, the namespace prefix should be - * "app-label"; false otherwise. + * identified by a application tag, the namespace prefix should be + * "app-tag"; false otherwise. */ - public boolean isAppLabel() { - return AllocationTagNamespaceType.APP_LABEL.equals(getNamespaceType()); + public boolean isAppTag() { + return AllocationTagNamespaceType.APP_TAG.equals(getNamespaceType()); } @Override @@ -207,15 +207,23 @@ public void evaluate(TargetApplications target) { /** * Namespace to all applications in the cluster. */ - public static class AppLabel extends AllocationTagNamespace { + public static class AppTag extends AllocationTagNamespace { - public AppLabel() { - super(APP_LABEL); + private String applicationTag; + + public AppTag(String appTag) { + super(APP_TAG); + this.applicationTag = appTag; } @Override public void evaluate(TargetApplications target) { - // TODO Implement app-label namespace evaluation + setScopeIfNotNull(target.getApplicationIdsByTag(applicationTag)); + } + + @Override + public String toString() { + return APP_TAG.toString() + NAMESPACE_DELIMITER + this.applicationTag; } } @@ -279,8 +287,13 @@ public static AllocationTagNamespace parse(String namespaceStr) } String appIDStr = nsValues.get(1); return parseAppID(appIDStr); - case APP_LABEL: - return new AppLabel(); + case APP_TAG: + if (nsValues.size() != 2) { + throw new InvalidAllocationTagException( + "Missing the application tag in the namespace string: " + + namespaceStr); + } + return new AppTag(nsValues.get(1)); default: throw new InvalidAllocationTagException( "Invalid namespace string " + namespaceStr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java index 5e46cd0f3d9..5a0002e5bad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java @@ -32,7 +32,7 @@ SELF("self"), NOT_SELF("not-self"), APP_ID("app-id"), - APP_LABEL("app-label"), + APP_TAG("app-tag"), ALL("all"); private String typeKeyword; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java index d7a1ca0302d..5bf7b0f2a3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.api.records; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; + +import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -29,18 +34,19 @@ public class TargetApplications { private ApplicationId currentAppId; - private Set allAppIds; + private Map> allApps; private TargetApplications() {} public TargetApplications(ApplicationId currentApplicationId, - Set allApplicationIds) { + Map> allApplicationIds) { this.currentAppId = currentApplicationId; - this.allAppIds = allApplicationIds; + this.allApps = allApplicationIds; } public Set getAllApplicationIds() { - return this.allAppIds; + return this.allApps == null ? + ImmutableSet.of() : allApps.keySet(); } public ApplicationId getCurrentApplicationId() { @@ -48,12 +54,35 @@ public ApplicationId getCurrentApplicationId() { } public Set getOtherApplicationIds() { - return allAppIds == null ? null : allAppIds.stream().filter(appId -> - !appId.equals(getCurrentApplicationId())) + if (getAllApplicationIds() == null + || getAllApplicationIds().isEmpty()) { + return ImmutableSet.of(); + } + return getAllApplicationIds() + .stream() + .filter(appId -> !appId.equals(getCurrentApplicationId())) .collect(Collectors.toSet()); } public static TargetApplications emptyTarget() { return new TargetApplications(); } + + public Set getApplicationIdsByTag(String applicationTag) { + Set result = new HashSet<>(); + if (Strings.isNullOrEmpty(applicationTag) + || this.allApps == null) { + return result; + } + + for (Map.Entry> app + : this.allApps.entrySet()) { + if (app.getValue() != null + && app.getValue().contains(applicationTag)) { + result.add(app.getKey()); + } + } + + return result; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index 96bf75d71c9..1e9bc85a391 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; @@ -33,12 +34,15 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.log4j.Logger; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongBinaryOperator; @@ -689,11 +693,23 @@ public long getRackCardinalityByOp(String rack, AllocationTags tags, return globalNodeMapping.getTypeToTagsWithCount().get(nodeId); } + /** - * @return all application IDs in a set that currently visible by - * the allocation tags manager. + * @return all applications that has allocated allocation tags, + * along with their application tags. The result is a map, where + * key is an application ID, value is the application tags attached + * to this application. */ - public Set getAllApplicationIds() { - return ImmutableSet.copyOf(perAppNodeMappings.keySet()); + public Map> getAllApplications() { + Map> result = new HashMap<>(); + ConcurrentMap allApps = rmContext.getRMApps(); + if (allApps != null) { + for (Map.Entry app : allApps.entrySet()) { + if (perAppNodeMappings.keySet().contains(app.getKey())) { + result.put(app.getKey(), app.getValue().getApplicationTags()); + } + } + } + return result; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index afe40c51020..929e4d74458 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -73,17 +73,10 @@ private static AllocationTagNamespace getAllocationTagNamespace( throws InvalidAllocationTagException{ // Parse to a valid namespace. AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey); - - // TODO Complete remove this check once we support app-label. - if (namespace.isAppLabel()) { - throw new InvalidAllocationTagException( - namespace.toString() + " is not supported yet!"); - } - // Evaluate the namespace according to the given target // before it can be consumed. TargetApplications ta = new TargetApplications(currentAppId, - atm.getAllApplicationIds()); + atm.getAllApplications()); namespace.evaluate(ta); return namespace; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index e1da86959c4..c6f29db4a96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -339,7 +339,7 @@ private void validateAndSetSchedulingRequest(SchedulingRequest try { AllocationTagNamespace tagNS = AllocationTagNamespace.parse(targetExpression.getTargetKey()); - if (tagNS.isAppLabel()) { + if (tagNS.isAppTag()) { throwExceptionWithMetaInfo( tagNS.toString() + " is not supported! As of now, " + "the accepted target key for targetKey of allocation_tag" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index c399368593a..9d39fd300b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -68,6 +68,7 @@ RMAppAttempt attempt; int maxAppAttempts = 1; List amReqs; + Set applicationTags = null; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -82,6 +83,12 @@ public MockRMApp(int newid, long time, RMAppState newState, String userName) { user = userName; } + public MockRMApp(int newid, long time, RMAppState newState, + String userName, Set appTags) { + this(newid, time, newState, userName); + this.applicationTags = appTags; + } + public MockRMApp(int newid, long time, RMAppState newState, String userName, String diag) { this(newid, time, newState, userName); this.diagnostics = new StringBuilder(diag); @@ -248,7 +255,7 @@ public String getApplicationType() { @Override public Set getApplicationTags() { - return null; + return this.applicationTags; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java index bc4703badf6..fb13829fb09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java @@ -37,7 +37,10 @@ import org.junit.Before; import org.junit.Test; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; /** * Test functionality of AllocationTagsManager. @@ -478,7 +481,7 @@ public void testNodeAllocationTagsAggregation() // Target applications, current app: app1 // all apps: app1, app2, app3 TargetApplications ta = new TargetApplications(app1, - ImmutableSet.of(app1, app2, app3)); + applicationsWithoutTags(app1, app2, app3)); //******************************** // 1) self (app1) @@ -558,4 +561,13 @@ public void testNodeAllocationTagsAggregation() Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::min)); Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::min)); } + + private Map> applicationsWithoutTags( + ApplicationId... applicationIds) { + Map> result = new HashMap<>(); + for (ApplicationId appId : applicationIds) { + result.put(appId, ImmutableSet.of()); + } + return result; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java index 67a3901e644..f676b037201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java @@ -16,6 +16,7 @@ * limitations under the License. */ import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.TargetApplications; @@ -23,6 +24,10 @@ import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + /** * Test class for {@link AllocationTagNamespace}. */ @@ -44,9 +49,21 @@ public void testNamespaceParse() throws InvalidAllocationTagException { namespace = AllocationTagNamespace.parse(namespaceStr); Assert.assertTrue(namespace.isGlobal()); - namespaceStr = "app-label"; + namespaceStr = "app-tag/spark-jobs"; namespace = AllocationTagNamespace.parse(namespaceStr); - Assert.assertTrue(namespace.isAppLabel()); + Assert.assertTrue(namespace.isAppTag()); + + // Invalid app-tag namespace syntax + try { + namespaceStr = "app-tag/tag123/tag234"; + AllocationTagNamespace.parse(namespaceStr); + Assert.fail("Parsing should fail as the given namespace is invalid"); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e instanceof InvalidAllocationTagException); + Assert.assertTrue(e.getMessage().startsWith( + "Invalid namespace string")); + } ApplicationId applicationId = ApplicationId.newInstance(12345, 1); namespaceStr = "app-id/" + applicationId.toString(); @@ -112,36 +129,81 @@ public void testNamespaceEvaluation() throws InvalidAllocationTagException { namespaceStr = "self"; namespace = AllocationTagNamespace.parse(namespaceStr); - targetApplications = new TargetApplications(app1, ImmutableSet.of(app1)); + targetApplications = new TargetApplications(app1, applicationsWithoutTags(app1)); namespace.evaluate(targetApplications); Assert.assertEquals(1, namespace.getNamespaceScope().size()); Assert.assertEquals(app1, namespace.getNamespaceScope().iterator().next()); namespaceStr = "not-self"; namespace = AllocationTagNamespace.parse(namespaceStr); - targetApplications = new TargetApplications(app1, ImmutableSet.of(app1)); + targetApplications = new TargetApplications(app1, applicationsWithoutTags(app1)); namespace.evaluate(targetApplications); Assert.assertEquals(0, namespace.getNamespaceScope().size()); - targetApplications = new TargetApplications(app1, - ImmutableSet.of(app1, app2, app3)); + targetApplications = + new TargetApplications(app1, applicationsWithoutTags(app1, app2, app3)); namespace.evaluate(targetApplications); Assert.assertEquals(2, namespace.getNamespaceScope().size()); Assert.assertFalse(namespace.getNamespaceScope().contains(app1)); namespaceStr = "all"; namespace = AllocationTagNamespace.parse(namespaceStr); - targetApplications = new TargetApplications(null, - ImmutableSet.of(app1, app2)); + targetApplications = + new TargetApplications(null, applicationsWithoutTags(app1, app2)); namespace.evaluate(targetApplications); Assert.assertEquals(2, namespace.getNamespaceScope().size()); namespaceStr = "app-id/" + app2.toString(); namespace = AllocationTagNamespace.parse(namespaceStr); targetApplications = new TargetApplications(app1, - ImmutableSet.of(app1, app2, app3, app4, app5)); + applicationsWithoutTags(app1, app2, app3, app4, app5)); namespace.evaluate(targetApplications); Assert.assertEquals(1, namespace.getNamespaceScope().size()); Assert.assertEquals(app2, namespace.getNamespaceScope().iterator().next()); + + /** + * App to Application Tags + * app1: A, B + * app2: A + * app3: + * app4: C + * app5: A, B, C + */ + Map> appsWithTags = new HashMap<>(); + appsWithTags.put(app1, ImmutableSet.of("A", "B")); + appsWithTags.put(app2, ImmutableSet.of("A")); + appsWithTags.put(app3, ImmutableSet.of()); + appsWithTags.put(app4, ImmutableSet.of("C")); + appsWithTags.put(app5, ImmutableSet.of("A", "B", "C")); + + namespaceStr = "app-tag/A"; + namespace = AllocationTagNamespace.parse(namespaceStr); + targetApplications = new TargetApplications(app1, appsWithTags); + namespace.evaluate(targetApplications); + Assert.assertEquals(3, namespace.getNamespaceScope().size()); + Assert.assertTrue(Sets.difference(namespace.getNamespaceScope(), + ImmutableSet.of(app1, app2, app5)).isEmpty()); + + namespaceStr = "app-tag/B"; + namespace = AllocationTagNamespace.parse(namespaceStr); + namespace.evaluate(targetApplications); + Assert.assertEquals(2, namespace.getNamespaceScope().size()); + Assert.assertTrue(Sets.difference(namespace.getNamespaceScope(), + ImmutableSet.of(app1, app5)).isEmpty()); + + // Not exist + namespaceStr = "app-tag/xyz"; + namespace = AllocationTagNamespace.parse(namespaceStr); + namespace.evaluate(targetApplications); + Assert.assertEquals(0, namespace.getNamespaceScope().size()); + } + + private Map> applicationsWithoutTags( + ApplicationId... applicationIds) { + Map> result = new HashMap<>(); + for (ApplicationId appId : applicationIds) { + result.put(appId, ImmutableSet.of()); + } + return result; } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index a4d880c78cd..9da70be8d22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -37,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicLong; @@ -53,6 +55,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -64,6 +69,7 @@ import org.junit.Test; import com.google.common.collect.ImmutableSet; +import org.mockito.Mockito; /** * Test the PlacementConstraint Utility class functionality. @@ -873,6 +879,87 @@ public void testInterAppConstraintsByAppID() pcm.unregisterApplication(application3); } + @Test + public void testInterAppConstriantsByAppTag() + throws InvalidAllocationTagsQueryException { + + ApplicationId application1 = BuilderUtils.newApplicationId(1000, 123); + ApplicationId application2 = BuilderUtils.newApplicationId(1001, 124); + + RMContext mockedContext = Mockito.spy(rmContext); + ConcurrentMap allApps = new ConcurrentHashMap<>(); + allApps.put(application1, new MockRMApp(123, 1000, + RMAppState.NEW, "userA", ImmutableSet.of("test-tag"))); + allApps.put(application2, new MockRMApp(124, 1001, + RMAppState.NEW, "userA", ImmutableSet.of(""))); + when(mockedContext.getRMApps()).thenReturn(allApps); + + AllocationTagsManager tm = new AllocationTagsManager(mockedContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + mockedContext.setAllocationTagsManager(tm); + mockedContext.setPlacementConstraintManager(pcm); + + // Register App1 with anti-affinity constraint map. + RMNode n0r1 = rmNodes.get(0); + RMNode n1r1 = rmNodes.get(1); + RMNode n2r2 = rmNodes.get(2); + RMNode n3r2 = rmNodes.get(3); + + /** + * Place container: + * n0: app1/hbase-m(1) + * n1: "" + * n2: app1/hbase-m(1) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + + SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(), + n1r1.getRackName(), n1r1.getNodeID()); + SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(), + n2r2.getRackName(), n2r2.getNodeID()); + SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(), + n3r2.getRackName(), n3r2.getNodeID()); + + AllocationTagNamespace namespace = + new AllocationTagNamespace.AppTag("test-tag"); + Map, PlacementConstraint> constraintMap = new HashMap<>(); + PlacementConstraint constraint2 = PlacementConstraints + .targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(), + "hbase-m")) + .build(); + Set srcTags2 = new HashSet<>(); + srcTags2.add("app2"); + constraintMap.put(srcTags2, constraint2); + + pcm.registerApplication(application2, constraintMap); + + // Anti-affinity with app-tag/test-tag/hbase-m, + // app1 has tag "test-tag" so the constraint is equally to work on app1 + // onto n1 and n3 as they don't have "hbase-m" from app1. + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application1); + pcm.unregisterApplication(application2); + } + @Test public void testInvalidAllocationTagNamespace() { AllocationTagsManager tm = new AllocationTagsManager(rmContext);