From e15f9c99b1583c3c186126f66abd9799f49277c6 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Sat, 18 Aug 2018 00:29:13 +0530 Subject: [PATCH] YARN-7863 --- .../yarn/api/resource/PlacementConstraint.java | 9 + .../util/constraint/PlacementConstraintParser.java | 206 +++++++++++++++++++-- .../resource/TestPlacementConstraintParser.java | 59 +++++- .../api/resource/TestPlacementConstraints.java | 4 +- .../distributedshell/ApplicationMaster.java | 49 +++-- .../yarn/applications/distributedshell/Client.java | 6 +- .../distributedshell/PlacementSpec.java | 25 ++- .../server/resourcemanager/ResourceManager.java | 2 + .../nodelabels/NodeAttributesManagerImpl.java | 25 ++- .../resourcemanager/scheduler/SchedulerNode.java | 11 ++ .../scheduler/capacity/CapacityScheduler.java | 36 +++- .../allocator/RegularContainerAllocator.java | 3 + .../constraint/PlacementConstraintsUtil.java | 101 ++++++++-- .../event/NodeAttributesUpdateSchedulerEvent.java | 39 ++++ .../scheduler/event/SchedulerEventType.java | 1 + .../placement/LocalityAppPlacementAllocator.java | 1 + .../SingleConstraintAppPlacementAllocator.java | 41 +++- .../TestSingleConstraintAppPlacementAllocator.java | 19 ++ 18 files changed, 573 insertions(+), 64 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java index 0fe8273e6d7..94330768f47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java @@ -127,10 +127,19 @@ public int hashCode() { * classes. */ public abstract static class AbstractConstraint implements Visitable { + protected String constraintKey; public PlacementConstraint build() { return new PlacementConstraint(this); } + public String getConstraintKey() { + return constraintKey; + } + + public void setConstraintKey(String name) { + constraintKey = name; + } + @Override public String toString() { return super.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java index 2926c9d1de8..2254dba0931 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java @@ -44,9 +44,9 @@ @InterfaceStability.Unstable public final class PlacementConstraintParser { + public static final char EXPRESSION_VAL_DELIM = ','; private static final char EXPRESSION_DELIM = ':'; private static final char KV_SPLIT_DELIM = '='; - private static final char EXPRESSION_VAL_DELIM = ','; private static final char BRACKET_START = '('; private static final char BRACKET_END = ')'; private static final String IN = "in"; @@ -57,6 +57,10 @@ private static final String SCOPE_NODE = PlacementConstraints.NODE; private static final String SCOPE_RACK = PlacementConstraints.RACK; + public enum TargetExprType { + NODE_ATTRIBUTE, ALLOCATION_TAG + } + private PlacementConstraintParser() { // Private constructor for this utility class. } @@ -349,6 +353,81 @@ public String nextElement() { } } + /** + * Constraint parser used to parse a given target expression. + */ + public static class NodeConstraintParser extends ConstraintParser { + + public NodeConstraintParser(String expression) { + super(new BaseStringTokenizer(expression, + String.valueOf(EXPRESSION_VAL_DELIM))); + } + + @Override + public AbstractConstraint parse() + throws PlacementConstraintParseException { + PlacementConstraint.AbstractConstraint placementConstraints = null; + String op = nextToken(); + String attributeName = ""; + String attributeNameSpace = "rm.yarn.io/"; + if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) { + String scope = SCOPE_NODE; + + Set constraintEntities = new TreeSet<>(); + while (hasMoreTokens()) { + String tag = nextToken(); + StringBuilder finalTag = new StringBuilder(); + StringTokenizer attributeKV = new StringTokenizer(tag, + String.valueOf(KV_SPLIT_DELIM)); + // Usually there will be only one k=v pair. However in case when multiple + // values are present for same attribute, it will also be coming as next token. + if (attributeKV.countTokens() > 1) { + attributeName = getConstraintNameWithNameSpace(attributeNameSpace, + attributeKV); + tag = attributeName + '=' + attributeKV.nextToken(); + } else { + finalTag.append(attributeName); + finalTag.append("="); + } + finalTag.append(tag); + constraintEntities.add(finalTag.toString()); + } + PlacementConstraint.TargetExpression target = null; + if (!constraintEntities.isEmpty()) { + target = PlacementConstraints.PlacementTargets + .nodeAttribute(attributeName, + constraintEntities + .toArray(new String[constraintEntities.size()])); + } + if (op.equalsIgnoreCase(IN)) { + placementConstraints = PlacementConstraints + .targetIn(scope, target); + } else if (op.equalsIgnoreCase(NOT_IN)) { + placementConstraints = PlacementConstraints + .targetNotIn(scope, target); + } + } else { + throw new PlacementConstraintParseException( + "expecting " + IN + " or " + NOT_IN + ", but get " + op); + } + placementConstraints.setConstraintKey(attributeName); + return placementConstraints; + } + + private String getConstraintNameWithNameSpace(String attributeNameSpace, + StringTokenizer attributeKV) { + StringBuilder attributeName = new StringBuilder(); + String name = attributeKV.nextToken(); + if (name.indexOf("rm.yarn.io/") == -1 + && name.indexOf("nm.yarn.io/") == -1) { + attributeName.append(attributeNameSpace).append(name); + } else { + attributeName.append(name); + } + return attributeName.toString(); + } + } + /** * Constraint parser used to parse a given target expression, such as * "NOTIN, NODE, foo, bar". @@ -363,20 +442,23 @@ public TargetConstraintParser(String expression) { @Override public AbstractConstraint parse() throws PlacementConstraintParseException { - PlacementConstraint.AbstractConstraint placementConstraints; + PlacementConstraint.AbstractConstraint placementConstraints = null; String op = nextToken(); if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) { String scope = nextToken(); scope = parseScope(scope); - Set allocationTags = new TreeSet<>(); + Set constraintEntities = new TreeSet<>(); while(hasMoreTokens()) { String tag = nextToken(); - allocationTags.add(tag); + constraintEntities.add(tag); + } + PlacementConstraint.TargetExpression target = null; + if(!constraintEntities.isEmpty()) { + target = PlacementConstraints.PlacementTargets.allocationTag( + constraintEntities + .toArray(new String[constraintEntities.size()])); } - PlacementConstraint.TargetExpression target = - PlacementConstraints.PlacementTargets.allocationTag( - allocationTags.toArray(new String[allocationTags.size()])); if (op.equalsIgnoreCase(IN)) { placementConstraints = PlacementConstraints .targetIn(scope, target); @@ -490,11 +572,11 @@ public AbstractConstraint parse() throws PlacementConstraintParseException { * A helper class to encapsulate source tags and allocations in the * placement specification. */ - public static final class SourceTags { + public static final class PlacementKey { private String tag; private int num; - private SourceTags(String sourceTag, int number) { + private PlacementKey(String sourceTag, int number) { this.tag = sourceTag; this.num = number; } @@ -510,10 +592,10 @@ public int getNumOfAllocations() { /** * Parses source tags from expression "sourceTags=numOfAllocations". * @param expr - * @return source tags, see {@link SourceTags} + * @return source tags, see {@link PlacementKey} * @throws PlacementConstraintParseException */ - public static SourceTags parseFrom(String expr) + public static PlacementKey parseFrom(String expr) throws PlacementConstraintParseException { SourceTagsTokenizer stt = new SourceTagsTokenizer(expr); stt.validate(); @@ -521,7 +603,7 @@ public static SourceTags parseFrom(String expr) // During validation we already checked the number of parsed elements. String allocTag = stt.nextElement(); int allocNum = Integer.parseInt(stt.nextElement()); - return new SourceTags(allocTag, allocNum); + return new PlacementKey(allocTag, allocNum); } } @@ -558,6 +640,39 @@ public static AbstractConstraint parseExpression(String constraintStr) return constraintOptional.get(); } + /** + * Parses a given constraint expression to a {@link AbstractConstraint}, + * this expression can be any valid form of constraint expressions. + * + * @param constraintStr expression string + * @return a parsed {@link AbstractConstraint} + * @throws PlacementConstraintParseException when given expression + * is malformed + */ + public static AbstractConstraint parseNodeAttributeExpression(String constraintStr) + throws PlacementConstraintParseException { + // Try parse given expression with all allowed constraint parsers, + // fails if no one could parse it. + NodeConstraintParser tp = new NodeConstraintParser(constraintStr); + Optional constraintOptional = + Optional.ofNullable(tp.tryParse()); + if (!constraintOptional.isPresent()) { + CardinalityConstraintParser cp = + new CardinalityConstraintParser(constraintStr); + constraintOptional = Optional.ofNullable(cp.tryParse()); + if (!constraintOptional.isPresent()) { + ConjunctionConstraintParser jp = + new ConjunctionConstraintParser(constraintStr); + constraintOptional = Optional.ofNullable(jp.tryParse()); + } + if (!constraintOptional.isPresent()) { + throw new PlacementConstraintParseException( + "Invalid constraint expression " + constraintStr); + } + } + return constraintOptional.get(); + } + /** * Parses a placement constraint specification. A placement constraint spec * is a composite expression which is composed by multiple sub constraint @@ -582,14 +697,21 @@ public static AbstractConstraint parseExpression(String constraintStr) * @return a map of source tags to placement constraint mapping. * @throws PlacementConstraintParseException */ - public static Map parsePlacementSpec( + public static Map parsePlacementSpec( String expression) throws PlacementConstraintParseException { + // Check whether the incoming expression is node-attribute + boolean isNodeAttributeSpec = isValidOp(expression); + if (isNodeAttributeSpec) { + return parsePlacementSpecForAttributes(expression); + } + + // Continue handling for application tag based constraint otherwise. // Respect insertion order. - Map result = new LinkedHashMap<>(); + Map result = new LinkedHashMap<>(); PlacementConstraintParser.ConstraintTokenizer tokenizer = new PlacementConstraintParser.MultipleConstraintsTokenizer(expression); tokenizer.validate(); - while(tokenizer.hasMoreElements()) { + while (tokenizer.hasMoreElements()) { String specStr = tokenizer.nextElement(); // each spec starts with sourceAllocationTag=numOfContainers and // followed by a constraint expression. @@ -602,7 +724,7 @@ public static AbstractConstraint parseExpression(String constraintStr) } String tagAlloc = splitted[0]; - SourceTags st = SourceTags.parseFrom(tagAlloc); + PlacementKey st = PlacementKey.parseFrom(tagAlloc); String exprs = splitted[1]; AbstractConstraint constraint = PlacementConstraintParser.parseExpression(exprs); @@ -612,4 +734,56 @@ public static AbstractConstraint parseExpression(String constraintStr) return result; } + + /** + * Parses a placement constraint specification for a node attributes. + * A node attribute constraint spec is a composite expression which is + * composed by multiple sub constraint expressions delimited by ":". + * With following syntax: + * + *

Op,Attribute1 P1:Op,Attribute2 P2:...

+ * + * where Attribute1 indicate the attribute name + * and where Pn can be any form of a valid constraint expression, + * such as: + * + *
    + *
  • python=true OR java!=1.8
  • + *
  • os=centos
  • + *
+ * @param expression expression string. + * @return a map of attributes to placement constraint mapping. + * @throws PlacementConstraintParseException + */ + public static Map parsePlacementSpecForAttributes( + String expression) throws PlacementConstraintParseException { + // Respect insertion order. + Map result = new LinkedHashMap<>(); + PlacementConstraintParser.ConstraintTokenizer tokenizer = + new PlacementConstraintParser.MultipleConstraintsTokenizer(expression); + tokenizer.validate(); + while (tokenizer.hasMoreElements()) { + String specStr = tokenizer.nextElement(); + // each spec starts with op code followed by a constraint value. + AbstractConstraint constraint = + PlacementConstraintParser + .parseNodeAttributeExpression(specStr); + + // TODO: Given we have an application tag here, instead of constraint + // name, app tag could be used. + String attributeName = constraint.getConstraintKey(); + PlacementKey placementKey = new PlacementKey(attributeName, 0); + result.put(placementKey, constraint.build()); + } + + return result; + } + + private static boolean isValidOp(String op) { + if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN) + || op.equalsIgnoreCase(CARDINALITY)) { + return true; + } + return false; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java index a69571c5c80..7a3cc81fc89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException; import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; -import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.PlacementKey; import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.TargetConstraintParser; import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintParser; import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.CardinalityConstraintParser; @@ -334,10 +334,10 @@ void verify() @Test public void testParsePlacementSpec() throws PlacementConstraintParseException { - Map result; + Map result; PlacementConstraint expectedPc1, expectedPc2; PlacementConstraint actualPc1, actualPc2; - SourceTags tag1, tag2; + PlacementKey tag1, tag2; // A single anti-affinity constraint result = PlacementConstraintParser @@ -375,7 +375,7 @@ public void testParsePlacementSpec() result = PlacementConstraintParser .parsePlacementSpec("foo=3,notin,node,foo:bar=2,in,node,foo"); Assert.assertEquals(2, result.size()); - Iterator keyIt = result.keySet().iterator(); + Iterator keyIt = result.keySet().iterator(); tag1 = keyIt.next(); Assert.assertEquals("foo", tag1.getTag()); Assert.assertEquals(3, tag1.getNumOfAllocations()); @@ -443,4 +443,55 @@ private void verifyConstraintToString(String inputExpr, + constrainExpr + ", caused by: " + e.getMessage()); } } + + @Test + public void testParseNodeAttributeSpec() + throws PlacementConstraintParseException { + Map result; + PlacementConstraint.AbstractConstraint expectedPc1, expectedPc2; + PlacementConstraint actualPc1, actualPc2; + String attributeName1, attributeName2; + + // A single node attribute constraint + result = PlacementConstraintParser + .parsePlacementSpecForAttributes("in,foo=true"); + Assert.assertEquals(1, result.size()); + TargetExpression target = PlacementTargets + .nodeAttribute("rm.yarn.io/foo", "rm.yarn.io/foo=true"); + expectedPc1 = targetIn("node", target); + + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + // A single node attribute constraint + result = PlacementConstraintParser + .parsePlacementSpecForAttributes("NOTIN,foo=abc"); + Assert.assertEquals(1, result.size()); + target = PlacementTargets + .nodeAttribute("rm.yarn.io/foo", "rm.yarn.io/foo=abc"); + expectedPc1 = targetNotIn("node", target); + + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + // A single node attribute constraint + result = PlacementConstraintParser + .parsePlacementSpecForAttributes("NOTIN,foo=abc:IN,bar=true"); + Assert.assertEquals(2, result.size()); + target = PlacementTargets + .nodeAttribute("rm.yarn.io/foo", "rm.yarn.io/foo=abc"); + expectedPc1 = targetNotIn("node", target); + target = PlacementTargets + .nodeAttribute("rm.yarn.io/bar", "rm.yarn.io/bar=true"); + expectedPc2 = targetIn("node", target); + + Iterator valueIt = result.values().iterator(); + actualPc1 = valueIt.next(); + actualPc2 = valueIt.next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java index 516ecfcd0c3..38ceaa0b34f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java @@ -69,7 +69,7 @@ public void testNodeAffinityToTag() { @Test public void testNodeAntiAffinityToAttribute() { AbstractConstraint constraintExpr = - targetNotIn(NODE, nodeAttribute("java", "1.8")); + targetNotIn(NODE, nodeAttribute("java", "java=1.8")); SingleConstraint sConstraint = (SingleConstraint) constraintExpr; Assert.assertEquals(NODE, sConstraint.getScope()); @@ -82,7 +82,7 @@ public void testNodeAntiAffinityToAttribute() { Assert.assertEquals("java", tExpr.getTargetKey()); Assert.assertEquals(TargetType.NODE_ATTRIBUTE, tExpr.getTargetType()); Assert.assertEquals(1, tExpr.getTargetValues().size()); - Assert.assertEquals("1.8", tExpr.getTargetValues().iterator().next()); + Assert.assertEquals("java=1.8", tExpr.getTargetValues().iterator().next()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 76fa38f922a..85993a146d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -118,6 +118,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.BoundedAppender; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -523,9 +524,15 @@ public boolean init(String[] args) throws ParseException, IOException { if (cliParser.hasOption("placement_spec")) { String placementSpec = cliParser.getOptionValue("placement_spec"); - LOG.info("Placement Spec received [{}]", placementSpec); - parsePlacementSpecs(placementSpec); + String decodedSpec = getDecodedPlacementSpec(placementSpec); + LOG.info("Placement Spec received [{}]", decodedSpec); + + int totalContainers = Integer.parseInt(cliParser.getOptionValue( + "num_containers", "1")); + this.numTotalContainers = 0; + parsePlacementSpecs(decodedSpec, totalContainers); LOG.info("Total num containers requested [{}]", numTotalContainers); + if (numTotalContainers == 0) { throw new IllegalArgumentException( "Cannot run distributed shell with no containers"); @@ -694,23 +701,28 @@ public boolean init(String[] args) throws ParseException, IOException { return true; } - private void parsePlacementSpecs(String placementSpecifications) { - // Client sends placement spec in encoded format - Base64.Decoder decoder = Base64.getDecoder(); - byte[] decodedBytes = decoder.decode( - placementSpecifications.getBytes(StandardCharsets.UTF_8)); - String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8); - LOG.info("Decode placement spec: " + decodedSpec); + private void parsePlacementSpecs(String decodedSpec, int totalContainers) { Map pSpecs = PlacementSpec.parse(decodedSpec); this.placementSpecs = new HashMap<>(); - this.numTotalContainers = 0; for (PlacementSpec pSpec : pSpecs.values()) { - this.numTotalContainers += pSpec.numContainers; + this.numTotalContainers += pSpec.getNumContainers(); this.placementSpecs.put(pSpec.sourceTag, pSpec); + if(pSpec.getNumContainers() == 0) { + pSpec.setNumContainers(totalContainers); + } } } + private String getDecodedPlacementSpec(String placementSpecifications) { + Base64.Decoder decoder = Base64.getDecoder(); + byte[] decodedBytes = decoder.decode( + placementSpecifications.getBytes(StandardCharsets.UTF_8)); + String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8); + LOG.info("Decode placement spec: " + decodedSpec); + return decodedSpec; + } + /** * Helper function to print usage * @@ -798,6 +810,7 @@ public void run() throws YarnException, IOException, InterruptedException { } } } + RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl, placementConstraintMap); @@ -845,16 +858,22 @@ public void run() throws YarnException, IOException, InterruptedException { // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). if (this.placementSpecs == null) { + LOG.info("placementSpecs null"); for (int i = 0; i < numTotalContainersToRequest; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); } } else { + LOG.info("placementSpecs to create req:" + placementSpecs); List schedReqs = new ArrayList<>(); - for (PlacementSpec pSpec : this.placementSpecs.values()) { - for (int i = 0; i < pSpec.numContainers; i++) { - SchedulingRequest sr = setupSchedulingRequest(pSpec); - schedReqs.add(sr); + if (this.placementSpecs != null) { + for (PlacementSpec pSpec : this.placementSpecs.values()) { + LOG.info("placementSpec :" + pSpec + ", container:" + pSpec + .getNumContainers()); + for (int i = 0; i < pSpec.getNumContainers(); i++) { + SchedulingRequest sr = setupSchedulingRequest(pSpec); + schedReqs.add(sr); + } } } amRMClient.addSchedulingRequests(schedReqs); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index c8a71b320c0..eb21b925f6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.DockerClientConfigHandler; import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -103,7 +104,7 @@ * the provided shell command on a set of containers.

* *

This client is meant to act as an example on how to write yarn-based applications.

- * + * *

To submit an application, a client first needs to connect to the ResourceManager * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} * provides a way for the client to get access to cluster information and to request for a @@ -192,6 +193,8 @@ // Placement specification private String placementSpec = ""; + // Node Attribute specification + private String nodeAttributeSpec = ""; // log4j.properties file // if available, add to local resources and set into classpath private String log4jPropFile = ""; @@ -448,6 +451,7 @@ public boolean init(String[] args) throws ParseException { // Check if it is parsable PlacementSpec.parse(this.placementSpec); } + appName = cliParser.getOptionValue("appname", "DistributedShell"); amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); amQueue = cliParser.getOptionValue("queue", "default"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java index 290925980a5..f81237ed5f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java @@ -20,7 +20,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException; import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; -import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.PlacementKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,8 +37,8 @@ LoggerFactory.getLogger(PlacementSpec.class); public final String sourceTag; - public final int numContainers; public final PlacementConstraint constraint; + private int numContainers; public PlacementSpec(String sourceTag, int numContainers, PlacementConstraint constraint) { @@ -47,6 +47,22 @@ public PlacementSpec(String sourceTag, int numContainers, this.constraint = constraint; } + /** + * Get teh number of container for this spec + * @return container count + */ + public int getNumContainers() { + return numContainers; + } + + /** + * Set number of containers for this spec. + * @param numContainers number of containers. + */ + public void setNumContainers(int numContainers) { + this.numContainers = numContainers; + } + // Placement specification should be of the form: // PlacementSpec => ""|KeyVal;PlacementSpec // KeyVal => SourceTag=Constraint @@ -71,11 +87,12 @@ public PlacementSpec(String sourceTag, int numContainers, public static Map parse(String specs) throws IllegalArgumentException { LOG.info("Parsing Placement Specs: [{}]", specs); + Map pSpecs = new HashMap<>(); - Map parsed; + Map parsed; try { parsed = PlacementConstraintParser.parsePlacementSpec(specs); - for (Map.Entry entry : + for (Map.Entry entry : parsed.entrySet()) { LOG.info("Parsed source tag: {}, number of allocations: {}", entry.getKey().getTag(), entry.getKey().getNumOfAllocations()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 2a869bddefa..0be76b0e7df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -650,6 +650,8 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setNodeLabelManager(nlm); NodeAttributesManager nam = createNodeAttributesManager(); + NodeAttributesManagerImpl namImpl = (NodeAttributesManagerImpl) nam; + namImpl.setRMContext(rmContext); addService(nam); rmContext.setNodeAttributesManager(nam); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index bf9de15a3bf..0778cf689c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.nodelabels.StringAttributeValue; import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent; import com.google.common.base.Strings; @@ -91,6 +93,7 @@ private final ReadLock readLock; private final WriteLock writeLock; + private RMContext rmContext = null; public NodeAttributesManagerImpl() { super("NodeAttributesManagerImpl"); @@ -130,7 +133,7 @@ protected void serviceStart() throws Exception { } protected void initNodeAttributeStore(Configuration conf) throws Exception { - this.store =getAttributeStoreClass(conf); + this.store = getAttributeStoreClass(conf); this.store.init(conf, this); this.store.recover(); } @@ -199,12 +202,28 @@ private void internalUpdateAttributesOnNodes( LOG.debug(logMsg); } + LOG.info(logMsg); if (null != dispatcher && NodeAttribute.PREFIX_CENTRALIZED .equals(attributePrefix)) { dispatcher.getEventHandler() .handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op)); } + // Map used to notify RM + Map> newNodeToAttributesMap = + new HashMap>(); + nodeCollections.forEach((k, v) -> { + newNodeToAttributesMap.put(k, v.attributes.keySet()); + }); + + LOG.info("newNodeToAttributesMap:" + newNodeToAttributesMap.values()); + // Notify RM + if (rmContext != null && rmContext.getDispatcher() != null) { + LOG.info("event fire to RM for attributes"); + rmContext.getDispatcher().getEventHandler().handle( + new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap)); + } + } finally { writeLock.unlock(); } @@ -702,4 +721,8 @@ protected void serviceStop() throws Exception { store.close(); } } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 59771fdef78..20f554844ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; @@ -79,6 +80,8 @@ private volatile Set labels = null; + private volatile Set nodeAttributes = null; + // Last updated time private volatile long lastHeartbeatMonotonicTime; @@ -503,6 +506,14 @@ public int hashCode() { return getNodeID().hashCode(); } + public Set getNodeAttributes() { + return nodeAttributes; + } + + public void updateNodeAttributes(Set nodeAttributes) { + this.nodeAttributes = nodeAttributes; + } + private static class ContainerInfo { private final RMContainer container; private boolean launchedOnNode; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 37f56deb119..220135a7cc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -136,6 +137,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -1728,6 +1730,14 @@ public void handle(SchedulerEvent event) { updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; + case NODE_ATTRIBUTES_UPDATE: + { + NodeAttributesUpdateSchedulerEvent attributeUpdateEvent = + (NodeAttributesUpdateSchedulerEvent) event; + + updateNodeAttributes(attributeUpdateEvent); + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; @@ -1861,6 +1871,30 @@ public void handle(SchedulerEvent event) { } } + private void updateNodeAttributes( + NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { + try { + writeLock.lock(); + for (Entry> entry : attributeUpdateEvent + .getUpdatedNodeToAttributes().entrySet()) { + String hostname = entry.getKey(); + Set attributes = entry.getValue(); + List nodeIds = nodeTracker.getNodeIdsByResourceName(hostname); + updateAttributesOnNode(nodeIds, attributes); + } + } finally { + writeLock.unlock(); + } + } + + private void updateAttributesOnNode(List nodeIds, + Set attributes) { + nodeIds.forEach((k) -> { + SchedulerNode node = nodeTracker.getNode(k); + node.updateNodeAttributes(attributes); + }); + } + /** * Process node labels update. */ @@ -2708,7 +2742,7 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt, schedulingRequest, schedulerNode, rmContext.getPlacementConstraintManager(), rmContext.getAllocationTagsManager())) { - LOG.debug("Failed to allocate container for application " + LOG.info("Failed to allocate container for application " + appAttempt.getApplicationId() + " on node " + schedulerNode.getNodeName() + " because this allocation violates the" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index a843002cdb5..76bace6939b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -141,15 +141,18 @@ private ContainerAllocation preCheckForNodeCandidateSet( } } + LOG.info("appInfo.precheckNode start"); // Is the nodePartition of pending request matches the node's partition // If not match, jump to next priority. if (!appInfo.precheckNode(schedulerKey, node, schedulingMode)) { + LOG.info("appInfo.precheckNode skipped"); ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, ActivityDiagnosticConstant. PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST); return ContainerAllocation.PRIORITY_SKIPPED; } + LOG.info("appInfo.precheckNode continued"); if (!application.getCSLeafQueue().getReservationContinueLooking()) { if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index f47e1d4889d..f55b5b4402f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; @@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeAttributesManagerImpl; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; @@ -114,19 +117,66 @@ private static boolean canSatisfySingleConstraintExpression( || maxScopeCardinality <= desiredMaxCardinality); } - private static boolean canSatisfyNodePartitionConstraintExpresssion( + private static boolean canSatisfyNodeConstraintExpresssion( TargetExpression targetExpression, SchedulerNode schedulerNode) { Set values = targetExpression.getTargetValues(); - if (values == null || values.isEmpty()) { - return schedulerNode.getPartition().equals( - RMNodeLabelsManager.NO_LABEL); - } else{ - String nodePartition = values.iterator().next(); - if (!nodePartition.equals(schedulerNode.getPartition())) { + + if (targetExpression.getTargetKey().equals(NODE_PARTITION)) { + if (values == null || values.isEmpty()) { + return schedulerNode.getPartition() + .equals(RMNodeLabelsManager.NO_LABEL); + } else { + String nodePartition = values.iterator().next(); + if (!nodePartition.equals(schedulerNode.getPartition())) { + return false; + } + } + } else { + // compare attributes. + String inputAttribute = values.iterator().next(); + NodeAttribute requestAttribute = getNodeConstraintFromRequest(inputAttribute); + if (requestAttribute == null) { + return true; + } + + if (schedulerNode.getNodeAttributes() == null || + !schedulerNode.getNodeAttributes().contains(requestAttribute)) { + if(LOG.isDebugEnabled()) { + LOG.debug("Incoming requestAttribute:" + requestAttribute + + "is not present in " + schedulerNode.getNodeID()); + } + return false; + } + boolean found = false; + for (Iterator it = schedulerNode.getNodeAttributes() + .iterator(); it.hasNext(); ) { + NodeAttribute nodeAttribute = it.next(); + if(LOG.isDebugEnabled()) { + LOG.debug("Starting to compare Incoming requestAttribute :" + + requestAttribute + + " with requestAttribute value= " + requestAttribute + .getAttributeValue() + + ", stored nodeAttribute value=" + nodeAttribute + .getAttributeValue()); + } + if (requestAttribute.equals(nodeAttribute)) { + if (requestAttribute.getAttributeValue() + .equals(nodeAttribute.getAttributeValue())) { + if(LOG.isDebugEnabled()) { + LOG.debug( + "Incoming requestAttribute:" + requestAttribute + + " matches with node:" + schedulerNode.getNodeID()); + } + found = true; + return found; + } + } + } + if (!found) { + LOG.info("skip this node for requestAttribute:" + requestAttribute); return false; } } - return true; } @@ -146,10 +196,11 @@ private static boolean canSatisfySingleConstraint(ApplicationId applicationId, singleConstraint, currentExp, schedulerNode, tagsManager)) { return false; } - } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE) - && currentExp.getTargetKey().equals(NODE_PARTITION)) { - // This is a node partition expression, check it. - canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode); + } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) { + // This is a node attribute expression, check it. + if(!canSatisfyNodeConstraintExpresssion(currentExp, schedulerNode)) { + return false; + } } } // return true if all targetExpressions are satisfied @@ -203,6 +254,11 @@ private static boolean canSatisfyConstraints(ApplicationId appId, AllocationTagsManager atm) throws InvalidAllocationTagsQueryException { if (constraint == null) { + if(LOG.isDebugEnabled()) { + LOG.debug( + "Constraint is found empty during constraint validation for app:" + + appId); + } return true; } @@ -263,4 +319,25 @@ public static boolean canSatisfyConstraints(ApplicationId applicationId, pcm.getMultilevelConstraint(applicationId, sourceTags, pc), schedulerNode, atm); } + + private static NodeAttribute getNodeConstraintFromRequest(String attrString) { + NodeAttribute nodeAttribute = null; + LOG.info("Incoming node attribute: " + attrString); + // Input node attribute could be like rm.yarn.io/java=1.8 + String[] splits = attrString.split("="); + if (splits == null) { + return null; + } + + String[] name = splits[0].split("/"); + if(name == null || name.length == 1) { + nodeAttribute = NodeAttribute + .newInstance(splits[0], NodeAttributeType.STRING, splits[1]); + } else { + nodeAttribute = NodeAttribute + .newInstance(name[0], name[1], NodeAttributeType.STRING, splits[1]); + } + + return nodeAttribute; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java new file mode 100644 index 00000000000..2a3effbde8f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class NodeAttributesUpdateSchedulerEvent extends SchedulerEvent { + private Map> nodeToAttributes; + + public NodeAttributesUpdateSchedulerEvent( + Map> newNodeToAttributesMap) { + super(SchedulerEventType.NODE_ATTRIBUTES_UPDATE); + this.nodeToAttributes = newNodeToAttributesMap; + } + + public Map> getUpdatedNodeToAttributes() { + return nodeToAttributes; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index b107cf4ee61..869bf0ed9e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -26,6 +26,7 @@ NODE_UPDATE, NODE_RESOURCE_UPDATE, NODE_LABELS_UPDATE, + NODE_ATTRIBUTES_UPDATE, // Source: RMApp APP_ADDED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index e1239a9db34..85c94b39356 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -366,6 +366,7 @@ public boolean precheckNode(SchedulerNode schedulerNode, SchedulingMode schedulingMode) { // We will only look at node label = nodeLabelToLookAt according to // schedulingMode and partition of node. + LOG.info("Locality App Allocator for " + schedulerNode.getNodeID() + "," + schedulingMode); String nodePartitionToLookAt; if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { nodePartitionToLookAt = schedulerNode.getPartition(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 2b610f2fe76..e5fe7cd7aaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -70,6 +71,7 @@ private SchedulingRequest schedulingRequest = null; private String targetNodePartition; + private Set targetNodeAttributes; private Set targetAllocationTags; private AllocationTagsManager allocationTagsManager; private PlacementConstraintManager placementConstraintManager; @@ -223,6 +225,9 @@ private String throwExceptionWithMetaInfo(String message) { private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequest) throws SchedulerInvalidResoureRequestException { + // Set Node Attributes + Set nodeAttributes = null; + // Check sizing exists if (newSchedulingRequest.getResourceSizing() == null || newSchedulingRequest.getResourceSizing().getResources() == null) { @@ -293,9 +298,9 @@ private void validateAndSetSchedulingRequest(SchedulingRequest // For node attribute target, we only support Partition now. And once // YARN-3409 is merged, we will support node attribute. if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) { - throwExceptionWithMetaInfo("When TargetType=" - + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE - + " only " + NODE_PARTITION + " is accepted as TargetKey."); + nodeAttributes = handleNodeAttributeFromSchedulingRequest( + targetExpression); + continue; } if (nodePartition != null) { @@ -338,7 +343,7 @@ private void validateAndSetSchedulingRequest(SchedulingRequest } } - if (targetAllocationTags == null) { + if (targetAllocationTags == null && nodeAttributes == null) { // That means we don't have ALLOCATION_TAG specified throwExceptionWithMetaInfo( "Couldn't find target expression with type == ALLOCATION_TAG," @@ -360,14 +365,29 @@ private void validateAndSetSchedulingRequest(SchedulingRequest // Validation is done. set local results: this.targetNodePartition = nodePartition; this.targetAllocationTags = targetAllocationTags; + this.targetNodeAttributes = nodeAttributes; this.schedulingRequest = new SchedulingRequestPBImpl( ((SchedulingRequestPBImpl) newSchedulingRequest).getProto()); - LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo - .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils - .join(",", targetAllocationTags) + "]. nodePartition=" - + targetNodePartition); + LOG.info("Successfully added SchedulingRequest to app=" + + appSchedulingInfo.getApplicationAttemptId() + + " targetAllocationTags=[" + + StringUtils.join(",", targetAllocationTags) + "]. nodePartition=" + + targetNodePartition + " targetNodeAttributes=[" + + StringUtils.join(",", targetNodeAttributes) + "]"); + } + + private Set handleNodeAttributeFromSchedulingRequest( + TargetExpression targetExpression) { + Set nodeAttributes = new HashSet(); + Set values = targetExpression.getTargetValues(); + if (values == null || values.isEmpty()) { + return nodeAttributes; + } + + nodeAttributes.addAll(values); + return nodeAttributes; } @Override @@ -520,6 +540,11 @@ String getTargetNodePartition() { return targetAllocationTags; } + @VisibleForTesting + Set getTargetNodeAttributes() { + return targetNodeAttributes; + } + @Override public void initialize(AppSchedulingInfo appSchedulingInfo, SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java index ccf428143dd..7603fd971ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java @@ -404,4 +404,23 @@ public void testFunctionality() throws InvalidAllocationTagsQueryException { Assert.assertFalse(allocator .precheckNode(node2, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); } + + @Test + public void testSchedulingRequestValidationForAttribute() { + // Valid + assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetNotIn(PlacementConstraints.NODE, + PlacementConstraints.PlacementTargets + .nodeAttribute("java", "java=1.8"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build()); + Assert.assertEquals(ImmutableSet.of("java=1.8"), + allocator.getTargetNodeAttributes()); + Assert.assertEquals("", allocator.getTargetNodePartition()); + } } -- 2.15.2 (Apple Git-101.1)