diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java new file mode 100644 index 00000000000..5371d13d1ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import com.google.common.base.Strings; +import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.*; + +/** + * Class to describe the namespace of an allocation tag. + */ +public abstract class AllocationTagNamespace { + + public final static String NAMESPACE_DELIMITER = "/"; + + private AllocationTagNamespaceType nsType; + + public AllocationTagNamespace(AllocationTagNamespaceType + allocationTagNamespaceType) { + this.nsType = allocationTagNamespaceType; + } + + /** + * Get the type of the namespace. + * @return namespace type. + */ + public AllocationTagNamespaceType getNamespaceType() { + return nsType; + } + + @Override + public String toString() { + return this.nsType.toString(); + } + + /** + * Namespace within application itself. + */ + public static class Self extends AllocationTagNamespace { + + public Self() { + super(SELF); + } + } + + /** + * Namespace to all applications except itself. + */ + public static class NotSelf extends AllocationTagNamespace { + + public NotSelf() { + super(NOT_SELF); + } + } + + /** + * Namespace to all applications in the cluster. + */ + public static class All extends AllocationTagNamespace { + + public All() { + super(ALL); + } + } + + /** + * Namespace to all applications in the cluster. + */ + public static class AppLabel extends AllocationTagNamespace { + + public AppLabel() { + super(APP_LABEL); + } + } + + /** + * Namespace defined by a certain application ID. + */ + public static class NamespaceToAppID extends AllocationTagNamespace { + + private ApplicationId appId; + // app-id namespace requires an extra value of an application id. + public NamespaceToAppID(ApplicationId applicationId) { + super(APP_ID); + this.appId = applicationId; + } + + public ApplicationId getTargetApplicationID() { + return this.appId; + } + + @Override + public String toString() { + return APP_ID.toString() + + NAMESPACE_DELIMITER + appId.toString(); + } + } + + /** + * Parse namespace from a string. The string must be in legal format + * defined by each {@link AllocationTagNamespaceType}. + * + * @param namespaceStr namespace string. + * @return an instance of {@link AllocationTagNamespace}. + * @throws InvalidAllocationTagException + * if given string is not in valid format + */ + public static AllocationTagNamespace parse(String namespaceStr) + throws InvalidAllocationTagException { + // Return the default namespace if no valid string is given. + if (Strings.isNullOrEmpty(namespaceStr)) { + return new Self(); + } + + // Normalize the input, escape additional chars. + List nsValues = normalize(namespaceStr); + // The first string should be the prefix. + String nsPrefix = nsValues.get(0); + AllocationTagNamespaceType allocationTagNamespaceType = + fromString(nsPrefix); + switch (allocationTagNamespaceType) { + case SELF: + return new Self(); + case NOT_SELF: + return new NotSelf(); + case ALL: + return new All(); + case APP_ID: + if (nsValues.size() != 2) { + throw new InvalidAllocationTagException( + "Missing an application ID in the namespace string: " + + namespaceStr); + } + String appIDStr = nsValues.get(1); + return parseAppID(appIDStr); + case APP_LABEL: + return new AppLabel(); + default: + throw new InvalidAllocationTagException( + "Invalid namespace string " + namespaceStr); + } + } + + private static AllocationTagNamespace parseAppID(String appIDStr) + throws InvalidAllocationTagException { + try { + ApplicationId applicationId = ApplicationId.fromString(appIDStr); + return new NamespaceToAppID(applicationId); + } catch (IllegalArgumentException e) { + throw new InvalidAllocationTagException( + "Invalid application ID for " + + APP_ID.getTypeKeyword()); + } + } + + /** + * Valid given namespace string and parse it to a list of sub-strings + * that can be consumed by the parser according to the type of the + * namespace. Currently the size of return list should be either 1 or 2. + * Extra slash is escaped during the normalization. + * + * @param namespaceStr namespace string. + * @return a list of parsed strings. + * @throws InvalidAllocationTagException + * if namespace format is unexpected. + */ + private static List normalize(String namespaceStr) + throws InvalidAllocationTagException { + List result = new ArrayList<>(); + if (namespaceStr == null) { + return result; + } + + String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER); + for (String str : nsValues) { + if (!Strings.isNullOrEmpty(str)) { + result.add(str); + } + } + + // Currently we only allow 1 or 2 values for a namespace string + if (result.size() == 0 || result.size() > 2) { + throw new InvalidAllocationTagException("Invalid namespace string: " + + namespaceStr + ", the syntax is or" + + " /"); + } + + return result; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java new file mode 100644 index 00000000000..5b28a9823c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Class to describe all supported forms of namespaces for an allocation tag. + */ +public enum AllocationTagNamespaceType { + + SELF("self"), + NOT_SELF("not-self"), + APP_ID("app-id"), + APP_LABEL("app-label"), + ALL("all"); + + private String typeKeyword; + AllocationTagNamespaceType(String keyword) { + this.typeKeyword = keyword; + } + + public String getTypeKeyword() { + return this.typeKeyword; + } + + /** + * Parses the namespace type from a given string. + * @param prefix namespace prefix. + * @return namespace type. + * @throws InvalidAllocationTagException + */ + public static AllocationTagNamespaceType fromString(String prefix) throws + InvalidAllocationTagException { + for (AllocationTagNamespaceType type : AllocationTagNamespaceType.values()) { + if(type.getTypeKeyword().equals(prefix)) { + return type; + } + } + + Set values = Arrays.stream(AllocationTagNamespaceType.values()) + .map(AllocationTagNamespaceType::toString) + .collect(Collectors.toSet()); + throw new InvalidAllocationTagException( + "Invalid namespace prefix: " + prefix + + ", valid values are: " + String.join(",", values)); + } + + @Override + public String toString() { + return this.getTypeKeyword(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java new file mode 100644 index 00000000000..59f0de3ba13 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import java.util.Set; + +/** + * Allocation tags under same namespace. + */ +public class AllocationTags { + + private AllocationTagNamespace ns; + private Set tags; + + public AllocationTags(AllocationTagNamespace namespace, + Set allocationTags) { + this.ns = namespace; + this.tags = allocationTags; + } + + /** + * @return the namespace of these tags. + */ + public AllocationTagNamespace getNamespace() { + return this.ns; + } + + /** + * @return the allocation tags. + */ + public Set getTags() { + return this.tags; + } + + /** + * @return true if the allocation tags is effective in all applications + * in this cluster. Specifically the namespace prefix should be + * "all". + */ + public boolean isGlobal() { + return AllocationTagNamespaceType.ALL.equals(ns.getNamespaceType()); + } + + /** + * @return true if allocation tags are associated with a single application + * by its application ID. The namespace prefix should be "app-id", + * false otherwise. + */ + public boolean isSingleInterApp() { + return AllocationTagNamespaceType.APP_ID.equals(ns.getNamespaceType()); + } + + /** + * @return true the allocation tags are restricted to the application itself. + * The namespace prefix should be "none" or "self", false otherwise. + */ + public boolean isIntraApp() { + return AllocationTagNamespaceType.SELF.equals(ns.getNamespaceType()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java index c1549c54db4..d34d5429cbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java @@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr; @@ -50,13 +52,6 @@ private PlacementConstraints() { public static final String RACK = PlacementConstraint.RACK_SCOPE; public static final String NODE_PARTITION = "yarn_node_partition/"; - private static final String APPLICATION_LABEL_PREFIX = - "yarn_application_label/"; - - @InterfaceAudience.Private - public static final String APPLICATION_LABEL_INTRA_APPLICATION = - APPLICATION_LABEL_PREFIX + "%intra_app%"; - /** * Creates a constraint that requires allocations to be placed on nodes that * satisfy all target expressions within the given scope (e.g., node or rack). @@ -223,6 +218,20 @@ public static TargetExpression allocationTag(String... allocationTags) { allocationTags); } + /** + * Constructs a target expression on a set of allocation tags under + * a certain namespace. + * + * @param namespace namespace of the allocation tags + * @param allocationTags allocation tags + * @return a target expression + */ + public static TargetExpression allocationTagWithNamespace(String namespace, + String... allocationTags) { + return new TargetExpression(TargetType.ALLOCATION_TAG, + namespace, allocationTags); + } + /** * Constructs a target expression on an allocation tag. It is satisfied if * there are allocations with one of the given tags. Comparing to @@ -235,8 +244,9 @@ public static TargetExpression allocationTag(String... allocationTags) { */ public static TargetExpression allocationTagToIntraApp( String... allocationTags) { + AllocationTagNamespace selfNs = new AllocationTagNamespace.Self(); return new TargetExpression(TargetType.ALLOCATION_TAG, - APPLICATION_LABEL_INTRA_APPLICATION, allocationTags); + selfNs.toString(), allocationTags); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java new file mode 100644 index 00000000000..e1e3cbdb015 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.yarn.exceptions; + +/** + * This exception is thrown by + * {@link org.apache.hadoop.yarn.api.records.AllocationTagNamespace#parse(String)} + * when it fails to parse a namespace. + */ +public class InvalidAllocationTagException extends YarnException { + + private static final long serialVersionUID = 1L; + + public InvalidAllocationTagException(String message) { + super(message); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java index 29483a2a0a6..d2bc4d87ae4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java @@ -32,4 +32,8 @@ public InvalidAllocationTagsQueryException(String msg) { super(msg); } + + public InvalidAllocationTagsQueryException(YarnException e) { + super(e); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index ab0bbd7f779..ea047fc7f14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Set; +import com.google.common.base.Strings; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -35,9 +36,13 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace.NamespaceToAppID; +import org.apache.hadoop.yarn.api.records.AllocationTags; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION; @@ -56,6 +61,26 @@ private PlacementConstraintsUtil() { } + /** + * Try to get application IDs from a target key, if the target key is + * malformed or it doesn't contain any valid application ID, null is returned. + * + * @param targetKey + * @return an application ID set or null. + */ + private static AllocationTagNamespace getAllocationTagNamespace( + String targetKey) throws InvalidAllocationTagException{ + // If target key is null or a empty string, indicating this is a tag + // against to this application itself. + if (Strings.isNullOrEmpty(targetKey)) { + // default + return new AllocationTagNamespace.Self(); + } + + // Parse to a valid namespace. + return AllocationTagNamespace.parse(targetKey); + } + /** * Returns true if single placement constraint with associated * allocationTags and scope is satisfied by a specific scheduler Node. @@ -74,6 +99,32 @@ private static boolean canSatisfySingleConstraintExpression( ApplicationId targetApplicationId, SingleConstraint sc, TargetExpression te, SchedulerNode node, AllocationTagsManager tm) throws InvalidAllocationTagsQueryException { + AllocationTagNamespace namespace; + try { + namespace = getAllocationTagNamespace(te.getTargetKey()); + } catch (InvalidAllocationTagException e) { + throw new InvalidAllocationTagsQueryException(e); + } + AllocationTags allocationTags = + new AllocationTags(namespace, te.getTargetValues()); + + // Currently only support none, self and app-id + // TODO support other forms of namespaces + ApplicationId effectiveAppID; + if (allocationTags.isIntraApp()) { + // default or self + effectiveAppID = targetApplicationId; + } else if (allocationTags.isSingleInterApp()) { + // app-id + NamespaceToAppID appIDNs = (NamespaceToAppID) allocationTags + .getNamespace(); + effectiveAppID = appIDNs.getTargetApplicationID(); + } else { + throw new InvalidAllocationTagsQueryException( + allocationTags.getNamespace().getNamespaceType().toString() + + " is not supported yet."); + } + long minScopeCardinality = 0; long maxScopeCardinality = 0; @@ -86,20 +137,20 @@ private static boolean canSatisfySingleConstraintExpression( if (sc.getScope().equals(PlacementConstraints.NODE)) { if (checkMinCardinality) { minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), - targetApplicationId, te.getTargetValues(), Long::max); + effectiveAppID, te.getTargetValues(), Long::max); } if (checkMaxCardinality) { maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), - targetApplicationId, te.getTargetValues(), Long::min); + effectiveAppID, te.getTargetValues(), Long::min); } } else if (sc.getScope().equals(PlacementConstraints.RACK)) { if (checkMinCardinality) { minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), - targetApplicationId, te.getTargetValues(), Long::max); + effectiveAppID, te.getTargetValues(), Long::max); } if (checkMaxCardinality) { maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), - targetApplicationId, te.getTargetValues(), Long::min); + effectiveAppID, te.getTargetValues(), Long::min); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index ed0734503fa..7acd286b27e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -23,6 +23,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; @@ -30,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -53,7 +56,6 @@ import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.APPLICATION_LABEL_INTRA_APPLICATION; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION; /** @@ -333,15 +335,23 @@ private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequ targetAllocationTags = new HashSet<>( targetExpression.getTargetValues()); - if (targetExpression.getTargetKey() != null && !targetExpression - .getTargetKey().equals(APPLICATION_LABEL_INTRA_APPLICATION)) { + try { + AllocationTagNamespace tagNS = + AllocationTagNamespace.parse(targetExpression.getTargetKey()); + if (!AllocationTagNamespaceType.SELF + .equals(tagNS.getNamespaceType())) { + throwExceptionWithMetaInfo( + "As of now, the only accepted target key for targetKey of " + + "allocation_tag target expression is: [" + + AllocationTagNamespaceType.SELF.toString() + + "]. Please make changes to placement constraints " + + "accordingly. If this is null, it will be set to " + + AllocationTagNamespaceType.SELF.toString() + + " by default."); + } + } catch (InvalidAllocationTagException e) { throwExceptionWithMetaInfo( - "As of now, the only accepted target key for targetKey of " - + "allocation_tag target expression is: [" - + APPLICATION_LABEL_INTRA_APPLICATION - + "]. Please make changes to placement constraints " - + "accordingly. If this is null, it will be set to " - + APPLICATION_LABEL_INTRA_APPLICATION + " by default."); + "Invalid allocation tag namespace, message: " + e.getMessage()); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index 5135f636dc2..933998b896a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -25,11 +25,13 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.AbstractMap; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -55,6 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace.NamespaceToAppID; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Before; @@ -508,4 +512,152 @@ public void testANDConstraintAssignment() Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); } + + @Test + public void testInterAppConstraintsByAppID() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + rmContext.setAllocationTagsManager(tm); + rmContext.setPlacementConstraintManager(pcm); + + long ts = System.currentTimeMillis(); + ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123); + + // Register App1 with anti-affinity constraint map. + RMNode n0r1 = rmNodes.get(0); + RMNode n1r1 = rmNodes.get(1); + RMNode n2r2 = rmNodes.get(2); + RMNode n3r2 = rmNodes.get(3); + + /** + * Place container: + * n0: app1/hbase-m(1) + * n1: "" + * n2: app1/hbase-m(1) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID()) + .get("hbase-m").longValue()); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID()) + .get("hbase-m").longValue()); + + SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(), + n1r1.getRackName(), n1r1.getNodeID()); + SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(), + n2r2.getRackName(), n2r2.getNodeID()); + SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(), + n3r2.getRackName(), n3r2.getNodeID()); + + + AllocationTagNamespace namespace = new NamespaceToAppID(application1); + Map, PlacementConstraint> constraintMap = new HashMap<>(); + PlacementConstraint constraint2 = PlacementConstraints + .targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(), + "hbase-m")) + .build(); + Set srcTags2 = new HashSet<>(); + srcTags2.add("app2"); + constraintMap.put(srcTags2, constraint2); + + ts = System.currentTimeMillis(); + ApplicationId application2 = BuilderUtils.newApplicationId(ts, 124); + pcm.registerApplication(application2, constraintMap); + + // Anti-affinity with app1/hbase-m so it should not be able to be placed + // onto n0 and n2 as they already have hbase-m allocated. + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode3, pcm, tm)); + + // Intra-app constraint + // Test with default and empty namespace + AllocationTagNamespace self = new AllocationTagNamespace.Self(); + PlacementConstraint constraint3 = PlacementConstraints + .targetNotIn(NODE, allocationTagWithNamespace(self.toString(), + "hbase-m")) + .build(); + Set srcTags3 = new HashSet<>(); + srcTags3.add("app3"); + constraintMap.put(srcTags3, constraint3); + + ts = System.currentTimeMillis(); + ApplicationId application3 = BuilderUtils.newApplicationId(ts, 124); + pcm.registerApplication(application3, constraintMap); + + /** + * Place container: + * n0: app1/hbase-m(1), app3/hbase-m + * n1: "" + * n2: app1/hbase-m(1) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(application3), ImmutableSet.of("hbase-m")); + + // Anti-affinity to self/hbase-m + Assert.assertFalse(PlacementConstraintsUtil + .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3), + schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil + .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3), + schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil + .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil + .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application3); + } + + @Test + public void testInvalidAllocationTagNamespace() { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + rmContext.setAllocationTagsManager(tm); + rmContext.setPlacementConstraintManager(pcm); + + long ts = System.currentTimeMillis(); + ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123); + RMNode n0r1 = rmNodes.get(0); + SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + + PlacementConstraint constraint1 = PlacementConstraints + .targetNotIn(NODE, allocationTagWithNamespace("unknown_namespace", + "hbase-m")) + .build(); + Set srcTags1 = new HashSet<>(); + srcTags1.add("app1"); + + try { + PlacementConstraintsUtil.canSatisfyConstraints(application1, + createSchedulingRequest(srcTags1, constraint1), schedulerNode0, + pcm, tm); + Assert.fail("This should fail because we gave an invalid namespace"); + } catch (Exception e) { + Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException); + Assert.assertTrue(e.getMessage() + .contains("Invalid namespace prefix: unknown_namespace")); + } + } } \ No newline at end of file