diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
index c1549c54db4..f6759cd4c51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
@@ -53,6 +53,8 @@ private PlacementConstraints() {
private static final String APPLICATION_LABEL_PREFIX =
"yarn_application_label/";
+ public static final String ALLOCATION_TAG_NAMESPACE_PREFIX = "ns:";
+
@InterfaceAudience.Private
public static final String APPLICATION_LABEL_INTRA_APPLICATION =
APPLICATION_LABEL_PREFIX + "%intra_app%";
@@ -223,6 +225,20 @@ public static TargetExpression allocationTag(String... allocationTags) {
allocationTags);
}
+ /**
+ * Constructs a target expression on a set of allocation tags under
+ * a certain namespace.
+ *
+ * @param namespace namespace of the allocation tags
+ * @param allocationTags allocation tags
+ * @return a target expression
+ */
+ public static TargetExpression allocationTagWithNamespace(String namespace,
+ String... allocationTags) {
+ return new TargetExpression(TargetType.ALLOCATION_TAG,
+ namespace, allocationTags);
+ }
+
/**
* Constructs a target expression on an allocation tag. It is satisfied if
* there are allocations with one of the given tags. Comparing to
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index ab0bbd7f779..a86373f6041 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.Set;
+import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -38,7 +39,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AllocationTagNamespace;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AllocationTagNamespace.NamespaceToAppID;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AllocationTags;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.ALLOCATION_TAG_NAMESPACE_PREFIX;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
/**
@@ -56,6 +61,32 @@
private PlacementConstraintsUtil() {
}
+ /**
+ * Try to get application IDs from a target key, if the target key is
+ * malformed or it doesn't contain any valid application ID, null is returned.
+ *
+ * @param targetKey
+ * @return an application ID set or null.
+ */
+ private static AllocationTagNamespace getAllocationTagNamespace(
+ String targetKey) throws InvalidAllocationTagsQueryException {
+ // If target key is null or a empty string, indicating this is a tag
+ // against to this application itself.
+ if (Strings.isNullOrEmpty(targetKey)) {
+ // default
+ return new AllocationTagNamespace.Self();
+ }
+ // If target key is given, but not start with correct prefix,
+ if (!targetKey.startsWith(ALLOCATION_TAG_NAMESPACE_PREFIX)) {
+ throw new InvalidAllocationTagsQueryException("Invalid target key: "
+ + targetKey + " Expecting namespace started with prefix: "
+ + ALLOCATION_TAG_NAMESPACE_PREFIX);
+ }
+
+ // Parse to a valid namespace.
+ return AllocationTagNamespace.parse(targetKey);
+ }
+
/**
* Returns true if single placement constraint with associated
* allocationTags and scope is satisfied by a specific scheduler Node.
@@ -74,6 +105,28 @@ private static boolean canSatisfySingleConstraintExpression(
ApplicationId targetApplicationId, SingleConstraint sc,
TargetExpression te, SchedulerNode node, AllocationTagsManager tm)
throws InvalidAllocationTagsQueryException {
+ AllocationTagNamespace namespace =
+ getAllocationTagNamespace(te.getTargetKey());
+ AllocationTags allocationTags = new AllocationTags(namespace,
+ te.getTargetValues());
+
+ // Currently only support ns:none, ns:self and ns:app-id
+ // TODO support other forms of namespaces
+ ApplicationId effectiveAppID;
+ if (allocationTags.isIntraApp()) {
+ // default or ns:self
+ effectiveAppID = targetApplicationId;
+ } else if (allocationTags.isSingleInterApp()) {
+ // ns:app-id
+ NamespaceToAppID appIDNs = (NamespaceToAppID) allocationTags
+ .getNamespace();
+ effectiveAppID = appIDNs.getTargetApplicationID();
+ } else {
+ throw new InvalidAllocationTagsQueryException(
+ allocationTags.getNamespace().getNamespaceType().toString()
+ + " is not supported yet.");
+ }
+
long minScopeCardinality = 0;
long maxScopeCardinality = 0;
@@ -86,20 +139,20 @@ private static boolean canSatisfySingleConstraintExpression(
if (sc.getScope().equals(PlacementConstraints.NODE)) {
if (checkMinCardinality) {
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
- targetApplicationId, te.getTargetValues(), Long::max);
+ effectiveAppID, te.getTargetValues(), Long::max);
}
if (checkMaxCardinality) {
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
- targetApplicationId, te.getTargetValues(), Long::min);
+ effectiveAppID, te.getTargetValues(), Long::min);
}
} else if (sc.getScope().equals(PlacementConstraints.RACK)) {
if (checkMinCardinality) {
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
- targetApplicationId, te.getTargetValues(), Long::max);
+ effectiveAppID, te.getTargetValues(), Long::max);
}
if (checkMaxCardinality) {
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
- targetApplicationId, te.getTargetValues(), Long::min);
+ effectiveAppID, te.getTargetValues(), Long::min);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AllocationTagNamespace.java
new file mode 100644
index 00000000000..fe71cf0e713
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AllocationTagNamespace.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class to describe the namespace of an allocation tag.
+ */
+public abstract class AllocationTagNamespace {
+
+ public final static String NAMESPACE_DELIMITER = "/";
+
+ private NamespaceType nsType;
+
+ public AllocationTagNamespace(NamespaceType namespaceType) {
+ this.nsType = namespaceType;
+ }
+
+ /**
+ * Get the type of the namespace.
+ * @return namespace type.
+ */
+ public NamespaceType getNamespaceType() {
+ return nsType;
+ }
+
+ @Override
+ public String toString() {
+ return this.nsType.toString();
+ }
+
+ /**
+ * Default namespace.
+ */
+ public static class None extends AllocationTagNamespace {
+
+ public None() {
+ super(NamespaceType.NONE);
+ }
+ }
+
+ /**
+ * Namespace within application itself.
+ */
+ public static class Self extends AllocationTagNamespace {
+
+ public Self() {
+ super(NamespaceType.SELF);
+ }
+ }
+
+ /**
+ * Namespace defined by a certain application ID.
+ */
+ public static class NamespaceToAppID extends AllocationTagNamespace {
+
+ private ApplicationId appId;
+ // app-id namespace requires an extra value of an application id.
+ public NamespaceToAppID(ApplicationId applicationId) {
+ super(NamespaceType.APP_ID);
+ this.appId = applicationId;
+ }
+
+ public ApplicationId getTargetApplicationID() {
+ return this.appId;
+ }
+
+ @Override
+ public String toString() {
+ return NamespaceType.APP_ID.toString()
+ + NAMESPACE_DELIMITER + appId.toString();
+ }
+ }
+
+ // TODO implement ns:not-self, ns:all
+
+ /**
+ * Parse namespace from a string. The string must be in legal format
+ * defined by each {@link NamespaceType}.
+ *
+ * @param namespaceStr namespace string.
+ * @return an instance of {@link AllocationTagNamespace}.
+ * @throws InvalidAllocationTagsQueryException
+ * if given string is not in valid format
+ */
+ public static AllocationTagNamespace parse(String namespaceStr)
+ throws InvalidAllocationTagsQueryException {
+ // Return the default namespace if no valid string is given.
+ if (Strings.isNullOrEmpty(namespaceStr)) {
+ return new None();
+ }
+
+ // Normalize the input, escape additional chars.
+ List nsValues = normalize(namespaceStr);
+ // The first string should be the prefix.
+ String nsPrefix = nsValues.get(0);
+ NamespaceType namespaceType = NamespaceType.fromString(nsPrefix);
+ switch (namespaceType) {
+ case APP_ID:
+ if (nsValues.size() != 2) {
+ throw new InvalidAllocationTagsQueryException(
+ "Missing an application ID in the namespace string: "
+ + namespaceStr);
+ }
+ String appIDStr = nsValues.get(1);
+ return parseAppID(appIDStr);
+ case NONE:
+ return new None();
+ case SELF:
+ return new Self();
+ default:
+ throw new InvalidAllocationTagsQueryException(
+ "Invalid namespace string " + namespaceStr);
+ }
+ }
+
+ private static AllocationTagNamespace parseAppID(String appIDStr)
+ throws InvalidAllocationTagsQueryException {
+ try {
+ ApplicationId applicationId = ApplicationId.fromString(appIDStr);
+ return new NamespaceToAppID(applicationId);
+ } catch (IllegalArgumentException e) {
+ throw new InvalidAllocationTagsQueryException(
+ "Invalid application ID for "
+ + NamespaceType.APP_ID.getTypeKeyword());
+ }
+ }
+
+ /**
+ * Valid given namespace string and parse it to a list of sub-strings
+ * that can be consumed by the parser according to the type of the
+ * namespace. Currently the size of return list should be either 1 or 2.
+ * Extra slash is escaped during the normalization.
+ *
+ * @param namespaceStr namespace string.
+ * @return a list of parsed strings.
+ * @throws InvalidAllocationTagsQueryException
+ * if namespace format is unexpected.
+ */
+ private static List normalize(String namespaceStr)
+ throws InvalidAllocationTagsQueryException {
+ List result = new ArrayList<>();
+ if (namespaceStr == null) {
+ return result;
+ }
+
+ String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER);
+ for (String str : nsValues) {
+ if (!Strings.isNullOrEmpty(str)) {
+ result.add(str);
+ }
+ }
+
+ // Currently we only allow 1 or 2 values for a namespace string
+ if (result.size() == 0 || result.size() > 2) {
+ throw new InvalidAllocationTagsQueryException("Invalid namespace string: "
+ + namespaceStr + ", the syntax is or"
+ + " /");
+ }
+
+ return result;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AllocationTags.java
new file mode 100644
index 00000000000..8375d8e41f9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AllocationTags.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import java.util.Set;
+
+/**
+ * Allocation tags under same namespace.
+ */
+public class AllocationTags {
+
+ private AllocationTagNamespace ns;
+ private Set tags;
+
+ public AllocationTags(AllocationTagNamespace namespace,
+ Set allocationTags) {
+ this.ns = namespace;
+ this.tags = allocationTags;
+ }
+
+ /**
+ * @return the namespace of these tags.
+ */
+ public AllocationTagNamespace getNamespace() {
+ return this.ns;
+ }
+
+ /**
+ * @return the allocation tags.
+ */
+ public Set getTags() {
+ return this.tags;
+ }
+
+ /**
+ * @return true if the allocation tags is effective in all applications
+ * in this cluster. Specifically the namespace prefix should be ns:all.
+ */
+ public boolean isGlobal() {
+ return NamespaceType.ALL.equals(ns.getNamespaceType());
+ }
+
+ /**
+ * @return true if allocation tags are associated with a single application
+ * by its application ID. The namespace prefix should be ns:app-id,
+ * false otherwise.
+ */
+ public boolean isSingleInterApp() {
+ return NamespaceType.APP_ID.equals(ns.getNamespaceType());
+ }
+
+ /**
+ * @return true the allocation tags are restricted to the application itself.
+ * The namespace prefix should be ns:none or ns:self, false otherwise.
+ */
+ public boolean isIntraApp() {
+ return NamespaceType.SELF.equals(ns.getNamespaceType()) ||
+ NamespaceType.NONE.equals(ns.getNamespaceType());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NamespaceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NamespaceType.java
new file mode 100644
index 00000000000..3b8ba598fe2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NamespaceType.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Class to describe all supported forms of namespaces for an allocation tag.
+ */
+public enum NamespaceType {
+
+ SELF("ns:self"),
+ NOT_SELF("ns:not-self"),
+ APP_ID("ns:app-id"),
+ APP_LABEL("ns:app-label"),
+ ALL("ns:all"),
+ NONE("");
+
+ private String typeKeyword;
+ NamespaceType(String keyword) {
+ this.typeKeyword = keyword;
+ }
+
+ public String getTypeKeyword() {
+ return this.typeKeyword;
+ }
+
+ /**
+ * Parses the namespace type from a given string.
+ * @param prefix namespace prefix.
+ * @return namespace type.
+ * @throws InvalidAllocationTagsQueryException
+ */
+ public static NamespaceType fromString(String prefix) throws
+ InvalidAllocationTagsQueryException {
+ for (NamespaceType type : NamespaceType.values()) {
+ if(type.getTypeKeyword().equals(prefix)) {
+ return type;
+ }
+ }
+
+ Set values = Arrays.stream(NamespaceType.values())
+ .map(NamespaceType::toString)
+ .collect(Collectors.toSet());
+ throw new InvalidAllocationTagsQueryException(
+ "Invalid namespace prefix: " + prefix
+ + ", valid values are: " + String.join(",", values));
+ }
+
+ @Override
+ public String toString() {
+ return this.getTypeKeyword();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
index 5135f636dc2..0dec140c788 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
@@ -25,11 +25,13 @@
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.AbstractMap;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -55,6 +57,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AllocationTagNamespace.NamespaceToAppID;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AllocationTagNamespace;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
@@ -508,4 +512,157 @@ public void testANDConstraintAssignment()
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
}
+
+ @Test
+ public void testInterAppConstraintsByAppID()
+ throws InvalidAllocationTagsQueryException {
+ AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+ PlacementConstraintManagerService pcm =
+ new MemoryPlacementConstraintManager();
+ rmContext.setAllocationTagsManager(tm);
+ rmContext.setPlacementConstraintManager(pcm);
+
+ long ts = System.currentTimeMillis();
+ ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123);
+
+ // Register App1 with anti-affinity constraint map.
+ RMNode n0r1 = rmNodes.get(0);
+ RMNode n1r1 = rmNodes.get(1);
+ RMNode n2r2 = rmNodes.get(2);
+ RMNode n3r2 = rmNodes.get(3);
+
+ /**
+ * Place container:
+ * n0: app1/hbase-m(1)
+ * n1: ""
+ * n2: app1/hbase-m(1)
+ * n3: ""
+ */
+ tm.addContainer(n0r1.getNodeID(),
+ newContainerId(application1), ImmutableSet.of("hbase-m"));
+ tm.addContainer(n2r2.getNodeID(),
+ newContainerId(application1), ImmutableSet.of("hbase-m"));
+ Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
+ .get("hbase-m").longValue());
+ Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
+ .get("hbase-m").longValue());
+
+ SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
+ n0r1.getRackName(), n0r1.getNodeID());
+ SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
+ n1r1.getRackName(), n1r1.getNodeID());
+ SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
+ n2r2.getRackName(), n2r2.getNodeID());
+ SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
+ n3r2.getRackName(), n3r2.getNodeID());
+
+
+ AllocationTagNamespace namespace = new NamespaceToAppID(application1);
+ Map, PlacementConstraint> constraintMap = new HashMap<>();
+ PlacementConstraint constraint2 = PlacementConstraints
+ .targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(),
+ "hbase-m"))
+ .build();
+ Set srcTags2 = new HashSet<>();
+ srcTags2.add("app2");
+ constraintMap.put(srcTags2, constraint2);
+
+ ts = System.currentTimeMillis();
+ ApplicationId application2 = BuilderUtils.newApplicationId(ts, 124);
+ pcm.registerApplication(application2, constraintMap);
+
+ // Anti-affinity with app1/hbase-m so it should not be able to be placed
+ // onto n0 and n2 as they already have hbase-m allocated.
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+ application2, createSchedulingRequest(srcTags2),
+ schedulerNode0, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+ application2, createSchedulingRequest(srcTags2),
+ schedulerNode1, pcm, tm));
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+ application2, createSchedulingRequest(srcTags2),
+ schedulerNode2, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+ application2, createSchedulingRequest(srcTags2),
+ schedulerNode3, pcm, tm));
+
+ // Intra-app constraint
+ // Test with default and empty namespace
+ AllocationTagNamespace none = new AllocationTagNamespace.None();
+ AllocationTagNamespace self = new AllocationTagNamespace.Self();
+
+ for (AllocationTagNamespace intraAppTag :
+ new AllocationTagNamespace[]{none, self}) {
+ PlacementConstraint constraint3 = PlacementConstraints
+ .targetNotIn(NODE, allocationTagWithNamespace(intraAppTag.toString(),
+ "hbase-m"))
+ .build();
+ Set srcTags3 = new HashSet<>();
+ srcTags3.add("app3");
+ constraintMap.put(srcTags3, constraint3);
+
+ ts = System.currentTimeMillis();
+ ApplicationId application3 = BuilderUtils.newApplicationId(ts, 124);
+ pcm.registerApplication(application3, constraintMap);
+
+ /**
+ * Place container:
+ * n0: app1/hbase-m(1), app3/hbase-m
+ * n1: ""
+ * n2: app1/hbase-m(1)
+ * n3: ""
+ */
+ tm.addContainer(n0r1.getNodeID(),
+ newContainerId(application3), ImmutableSet.of("hbase-m"));
+
+ // Anti-affinity to self/hbase-m
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+ application3, createSchedulingRequest(srcTags3),
+ schedulerNode0, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+ application3, createSchedulingRequest(srcTags3),
+ schedulerNode1, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+ application3, createSchedulingRequest(srcTags3),
+ schedulerNode2, pcm, tm));
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+ application3, createSchedulingRequest(srcTags3),
+ schedulerNode3, pcm, tm));
+
+ pcm.unregisterApplication(application3);
+ }
+ }
+
+ @Test
+ public void testInvalidAllocationTagNamespace() {
+ AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+ PlacementConstraintManagerService pcm =
+ new MemoryPlacementConstraintManager();
+ rmContext.setAllocationTagsManager(tm);
+ rmContext.setPlacementConstraintManager(pcm);
+
+ long ts = System.currentTimeMillis();
+ ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123);
+ RMNode n0r1 = rmNodes.get(0);
+ SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
+ n0r1.getRackName(), n0r1.getNodeID());
+
+ PlacementConstraint constraint1 = PlacementConstraints
+ .targetNotIn(NODE, allocationTagWithNamespace("ns:unknown_namespace",
+ "hbase-m"))
+ .build();
+ Set srcTags1 = new HashSet<>();
+ srcTags1.add("app1");
+
+ try {
+ PlacementConstraintsUtil.canSatisfyConstraints(application1,
+ createSchedulingRequest(srcTags1, constraint1), schedulerNode0,
+ pcm, tm);
+ Assert.fail("This should fail because we gave an invalid namespace");
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
+ Assert.assertTrue(e.getMessage()
+ .contains("Invalid namespace prefix: ns:unknown_namespace"));
+ }
+ }
}
\ No newline at end of file