From c8aeb91f8e8d254d89e0803bfe8ae0257eb42e77 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 2 Aug 2018 22:52:49 +0530 Subject: [PATCH] YARN-7863-YARN-3409 --- .../yarn/api/resource/PlacementConstraint.java | 9 + .../util/constraint/PlacementConstraintParser.java | 184 ++++++++++++++++++++- .../resource/TestPlacementConstraintParser.java | 51 ++++++ .../api/resource/TestPlacementConstraints.java | 4 +- .../distributedshell/ApplicationMaster.java | 100 +++++++++-- .../distributedshell/AttributeSpec.java | 86 ++++++++++ .../yarn/applications/distributedshell/Client.java | 19 ++- .../server/resourcemanager/ResourceManager.java | 2 + .../nodelabels/NodeAttributesManagerImpl.java | 20 +++ .../resourcemanager/scheduler/SchedulerNode.java | 11 ++ .../scheduler/capacity/CapacityScheduler.java | 34 ++++ .../constraint/PlacementConstraintsUtil.java | 78 +++++++-- .../event/NodeAttributesUpdateSchedulerEvent.java | 39 +++++ .../scheduler/event/SchedulerEventType.java | 1 + .../SingleConstraintAppPlacementAllocator.java | 34 +++- 15 files changed, 624 insertions(+), 48 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/AttributeSpec.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java index 0fe8273e6d7..bd4339a857f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java @@ -127,10 +127,19 @@ public int hashCode() { * classes. */ public abstract static class AbstractConstraint implements Visitable { + protected String constraintName; public PlacementConstraint build() { return new PlacementConstraint(this); } + public String getConstraintName() { + return constraintName; + } + + public void setConstraintName(String name) { + constraintName = name; + } + @Override public String toString() { return super.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java index 2926c9d1de8..c115f01437a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java @@ -44,9 +44,9 @@ @InterfaceStability.Unstable public final class PlacementConstraintParser { + public static final char EXPRESSION_VAL_DELIM = ','; private static final char EXPRESSION_DELIM = ':'; private static final char KV_SPLIT_DELIM = '='; - private static final char EXPRESSION_VAL_DELIM = ','; private static final char BRACKET_START = '('; private static final char BRACKET_END = ')'; private static final String IN = "in"; @@ -57,6 +57,10 @@ private static final String SCOPE_NODE = PlacementConstraints.NODE; private static final String SCOPE_RACK = PlacementConstraints.RACK; + public enum TargetExprType { + NODE_ATTRIBUTE, ALLOCATION_TAG + } + private PlacementConstraintParser() { // Private constructor for this utility class. } @@ -349,6 +353,81 @@ public String nextElement() { } } + /** + * Constraint parser used to parse a given target expression. + */ + public static class NodeConstraintParser extends ConstraintParser { + + public NodeConstraintParser(String expression) { + super(new BaseStringTokenizer(expression, + String.valueOf(EXPRESSION_VAL_DELIM))); + } + + @Override + public AbstractConstraint parse() + throws PlacementConstraintParseException { + PlacementConstraint.AbstractConstraint placementConstraints = null; + String op = nextToken(); + String attributeName = ""; + String attributeNameSpace = "rm.yarn.io/"; + if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) { + String scope = SCOPE_NODE; + + Set constraintEntities = new TreeSet<>(); + while (hasMoreTokens()) { + String tag = nextToken(); + StringBuilder finalTag = new StringBuilder(); + StringTokenizer attributeKV = new StringTokenizer(tag, + String.valueOf(KV_SPLIT_DELIM)); + // Usually there will be only one k=v pair. However in case when multiple + // values are present for same attribute, it will also be coming as next token. + if (attributeKV.countTokens() > 1) { + attributeName = getConstraintNameWithNameSpace(attributeNameSpace, + attributeKV); + tag = attributeName + '=' + attributeKV.nextToken(); + } else { + finalTag.append(attributeName); + finalTag.append("="); + } + finalTag.append(tag); + constraintEntities.add(finalTag.toString()); + } + PlacementConstraint.TargetExpression target = null; + if (!constraintEntities.isEmpty()) { + target = PlacementConstraints.PlacementTargets + .nodeAttribute(attributeName, + constraintEntities + .toArray(new String[constraintEntities.size()])); + } + if (op.equalsIgnoreCase(IN)) { + placementConstraints = PlacementConstraints + .targetIn(scope, target); + } else if (op.equalsIgnoreCase(NOT_IN)) { + placementConstraints = PlacementConstraints + .targetNotIn(scope, target); + } + } else { + throw new PlacementConstraintParseException( + "expecting " + IN + " or " + NOT_IN + ", but get " + op); + } + placementConstraints.setConstraintName(attributeName); + return placementConstraints; + } + + private String getConstraintNameWithNameSpace(String attributeNameSpace, + StringTokenizer attributeKV) { + StringBuilder attributeName = new StringBuilder(); + String name = attributeKV.nextToken(); + if (name.indexOf("rm.yarn.io/") == -1 + && name.indexOf("nm.yarn.io/") == -1) { + attributeName.append(attributeNameSpace).append(name); + } else { + attributeName.append(name); + } + return attributeName.toString(); + } + } + /** * Constraint parser used to parse a given target expression, such as * "NOTIN, NODE, foo, bar". @@ -363,20 +442,23 @@ public TargetConstraintParser(String expression) { @Override public AbstractConstraint parse() throws PlacementConstraintParseException { - PlacementConstraint.AbstractConstraint placementConstraints; + PlacementConstraint.AbstractConstraint placementConstraints = null; String op = nextToken(); if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) { String scope = nextToken(); scope = parseScope(scope); - Set allocationTags = new TreeSet<>(); + Set constraintEntities = new TreeSet<>(); while(hasMoreTokens()) { String tag = nextToken(); - allocationTags.add(tag); + constraintEntities.add(tag); + } + PlacementConstraint.TargetExpression target = null; + if(!constraintEntities.isEmpty()) { + target = PlacementConstraints.PlacementTargets.allocationTag( + constraintEntities + .toArray(new String[constraintEntities.size()])); } - PlacementConstraint.TargetExpression target = - PlacementConstraints.PlacementTargets.allocationTag( - allocationTags.toArray(new String[allocationTags.size()])); if (op.equalsIgnoreCase(IN)) { placementConstraints = PlacementConstraints .targetIn(scope, target); @@ -558,6 +640,39 @@ public static AbstractConstraint parseExpression(String constraintStr) return constraintOptional.get(); } + /** + * Parses a given constraint expression to a {@link AbstractConstraint}, + * this expression can be any valid form of constraint expressions. + * + * @param constraintStr expression string + * @return a parsed {@link AbstractConstraint} + * @throws PlacementConstraintParseException when given expression + * is malformed + */ + public static AbstractConstraint parseNodeAttributeExpression(String constraintStr) + throws PlacementConstraintParseException { + // Try parse given expression with all allowed constraint parsers, + // fails if no one could parse it. + NodeConstraintParser tp = new NodeConstraintParser(constraintStr); + Optional constraintOptional = + Optional.ofNullable(tp.tryParse()); + if (!constraintOptional.isPresent()) { + CardinalityConstraintParser cp = + new CardinalityConstraintParser(constraintStr); + constraintOptional = Optional.ofNullable(cp.tryParse()); + if (!constraintOptional.isPresent()) { + ConjunctionConstraintParser jp = + new ConjunctionConstraintParser(constraintStr); + constraintOptional = Optional.ofNullable(jp.tryParse()); + } + if (!constraintOptional.isPresent()) { + throw new PlacementConstraintParseException( + "Invalid constraint expression " + constraintStr); + } + } + return constraintOptional.get(); + } + /** * Parses a placement constraint specification. A placement constraint spec * is a composite expression which is composed by multiple sub constraint @@ -612,4 +727,59 @@ public static AbstractConstraint parseExpression(String constraintStr) return result; } + + /** + * Parses a placement constraint specification for a node attributes. + * A node attribute constraint spec is a composite expression which is + * composed by multiple sub constraint expressions delimited by ":". + * With following syntax: + * + *

Op,Attribute1 P1:Op,Attribute2 P2:...

+ * + * where Attribute1 indicate the attribute name + * and where Pn can be any form of a valid constraint expression, + * such as: + * + *
    + *
  • python=true OR java!=1.8
  • + *
  • os=centos
  • + *
+ * @param expression expression string. + * @return a map of attributes to placement constraint mapping. + * @throws PlacementConstraintParseException + */ + public static Map parsePlacementSpecForAttributes( + String expression) throws PlacementConstraintParseException { + // Respect insertion order. + Map result = new LinkedHashMap<>(); + PlacementConstraintParser.ConstraintTokenizer tokenizer = + new PlacementConstraintParser.MultipleConstraintsTokenizer(expression); + tokenizer.validate(); + while (tokenizer.hasMoreElements()) { + String specStr = tokenizer.nextElement(); + // each spec starts with op code followed by a constraint value. + AbstractConstraint constraint = + PlacementConstraintParser + .parseNodeAttributeExpression(specStr); + + // TODO: attributeName is a placeholder for now here. + String attributeName = constraint.getConstraintName(); + result.put(attributeName, constraint.build()); + } + + return result; + } + + /** + * isValidOp code or not. + * @param op op code string + * @return true/false based on opcode validation. + */ + public static boolean isValidOp(String op) { + if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN) + || op.equalsIgnoreCase(CARDINALITY)) { + return true; + } + return false; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java index a69571c5c80..9639c551c8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java @@ -443,4 +443,55 @@ private void verifyConstraintToString(String inputExpr, + constrainExpr + ", caused by: " + e.getMessage()); } } + + @Test + public void testParseNodeAttributeSpec() + throws PlacementConstraintParseException { + Map result; + PlacementConstraint.AbstractConstraint expectedPc1, expectedPc2; + PlacementConstraint actualPc1, actualPc2; + String attributeName1, attributeName2; + + // A single node attribute constraint + result = PlacementConstraintParser + .parsePlacementSpecForAttributes("in,foo=true"); + Assert.assertEquals(1, result.size()); + TargetExpression target = PlacementTargets + .nodeAttribute("rm.yarn.io/foo", "rm.yarn.io/foo=true"); + expectedPc1 = targetIn("node", target); + + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + // A single node attribute constraint + result = PlacementConstraintParser + .parsePlacementSpecForAttributes("NOTIN,foo=abc"); + Assert.assertEquals(1, result.size()); + target = PlacementTargets + .nodeAttribute("rm.yarn.io/foo", "rm.yarn.io/foo=abc"); + expectedPc1 = targetNotIn("node", target); + + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + // A single node attribute constraint + result = PlacementConstraintParser + .parsePlacementSpecForAttributes("NOTIN,foo=abc:IN,bar=true"); + Assert.assertEquals(2, result.size()); + target = PlacementTargets + .nodeAttribute("rm.yarn.io/foo", "rm.yarn.io/foo=abc"); + expectedPc1 = targetNotIn("node", target); + target = PlacementTargets + .nodeAttribute("rm.yarn.io/bar", "rm.yarn.io/bar=true"); + expectedPc2 = targetIn("node", target); + + Iterator valueIt = result.values().iterator(); + actualPc1 = valueIt.next(); + actualPc2 = valueIt.next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java index 516ecfcd0c3..38ceaa0b34f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java @@ -69,7 +69,7 @@ public void testNodeAffinityToTag() { @Test public void testNodeAntiAffinityToAttribute() { AbstractConstraint constraintExpr = - targetNotIn(NODE, nodeAttribute("java", "1.8")); + targetNotIn(NODE, nodeAttribute("java", "java=1.8")); SingleConstraint sConstraint = (SingleConstraint) constraintExpr; Assert.assertEquals(NODE, sConstraint.getScope()); @@ -82,7 +82,7 @@ public void testNodeAntiAffinityToAttribute() { Assert.assertEquals("java", tExpr.getTargetKey()); Assert.assertEquals(TargetType.NODE_ATTRIBUTE, tExpr.getTargetType()); Assert.assertEquals(1, tExpr.getTargetValues().size()); - Assert.assertEquals("1.8", tExpr.getTargetValues().iterator().next()); + Assert.assertEquals("java=1.8", tExpr.getTargetValues().iterator().next()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 76fa38f922a..37d346cf5d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -118,6 +118,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.BoundedAppender; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -307,6 +308,9 @@ // Placement Specifications private Map placementSpecs = null; + // Placement Specifications + private Map attributeSpecs = null; + // Container retry options private ContainerRetryPolicy containerRetryPolicy = ContainerRetryPolicy.NEVER_RETRY; @@ -524,11 +528,24 @@ public boolean init(String[] args) throws ParseException, IOException { if (cliParser.hasOption("placement_spec")) { String placementSpec = cliParser.getOptionValue("placement_spec"); LOG.info("Placement Spec received [{}]", placementSpec); - parsePlacementSpecs(placementSpec); - LOG.info("Total num containers requested [{}]", numTotalContainers); - if (numTotalContainers == 0) { - throw new IllegalArgumentException( - "Cannot run distributed shell with no containers"); + String decodedSpec = getDecodedPlacementSpec(placementSpec); + + String[] splitted = decodedSpec.split( + String.valueOf(PlacementConstraintParser.EXPRESSION_VAL_DELIM), 2); + + // Given first part of the spec starts with OP_CODE like IN,NOTIN etc + // we could assume its a node_attribute spec. + boolean isNodeAttributeSpec = PlacementConstraintParser + .isValidOp(splitted[0]); + if(!isNodeAttributeSpec) { + parsePlacementSpecs(placementSpec); + LOG.info("Total num containers requested [{}]", numTotalContainers); + if (numTotalContainers == 0) { + throw new IllegalArgumentException( + "Cannot run distributed shell with no containers"); + } + } else { + parseNodeAttributeSpecs(placementSpec); } } @@ -694,13 +711,7 @@ public boolean init(String[] args) throws ParseException, IOException { return true; } - private void parsePlacementSpecs(String placementSpecifications) { - // Client sends placement spec in encoded format - Base64.Decoder decoder = Base64.getDecoder(); - byte[] decodedBytes = decoder.decode( - placementSpecifications.getBytes(StandardCharsets.UTF_8)); - String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8); - LOG.info("Decode placement spec: " + decodedSpec); + private void parsePlacementSpecs(String decodedSpec) { Map pSpecs = PlacementSpec.parse(decodedSpec); this.placementSpecs = new HashMap<>(); @@ -711,6 +722,24 @@ private void parsePlacementSpecs(String placementSpecifications) { } } + private String getDecodedPlacementSpec(String placementSpecifications) { + Base64.Decoder decoder = Base64.getDecoder(); + byte[] decodedBytes = decoder.decode( + placementSpecifications.getBytes(StandardCharsets.UTF_8)); + String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8); + LOG.info("Decode placement spec: " + decodedSpec); + return decodedSpec; + } + + private void parseNodeAttributeSpecs(String decodedSpec) { + Map pSpecs = + AttributeSpec.parse(decodedSpec); + this.attributeSpecs = new HashMap<>(); + for (AttributeSpec pSpec : pSpecs.values()) { + this.attributeSpecs.put(pSpec.nodeAttribute, pSpec); + } + } + /** * Helper function to print usage * @@ -798,6 +827,18 @@ public void run() throws YarnException, IOException, InterruptedException { } } } + + if (this.attributeSpecs != null) { + if(placementConstraintMap == null) { + placementConstraintMap = new HashMap<>(); + } + for (AttributeSpec spec : this.attributeSpecs.values()) { + if (spec.constraint != null) { + placementConstraintMap.put( + Collections.singleton(spec.nodeAttribute), spec.constraint); + } + } + } RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl, placementConstraintMap); @@ -844,17 +885,27 @@ public void run() throws YarnException, IOException, InterruptedException { // containers // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). - if (this.placementSpecs == null) { + if (this.placementSpecs == null && this.attributeSpecs == null) { for (int i = 0; i < numTotalContainersToRequest; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); } } else { List schedReqs = new ArrayList<>(); - for (PlacementSpec pSpec : this.placementSpecs.values()) { - for (int i = 0; i < pSpec.numContainers; i++) { - SchedulingRequest sr = setupSchedulingRequest(pSpec); - schedReqs.add(sr); + if (this.placementSpecs != null) { + for (PlacementSpec pSpec : this.placementSpecs.values()) { + for (int i = 0; i < pSpec.numContainers; i++) { + SchedulingRequest sr = setupSchedulingRequest(pSpec); + schedReqs.add(sr); + } + } + } + if (this.attributeSpecs != null) { + for (AttributeSpec pSpec : this.attributeSpecs.values()) { + for (int i = 0; i < numTotalContainersToRequest; i++) { + SchedulingRequest sr = setupSchedulingRequest(pSpec); + schedReqs.add(sr); + } } } amRMClient.addSchedulingRequests(schedReqs); @@ -1062,7 +1113,7 @@ public void onContainersCompleted(List completedContainers) { numRequestedContainers.addAndGet(askCount); // Dont bother re-asking if we are using placementSpecs - if (placementSpecs == null) { + if (placementSpecs == null && attributeSpecs == null) { if (askCount > 0) { for (int i = 0; i < askCount; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); @@ -1498,6 +1549,19 @@ private SchedulingRequest setupSchedulingRequest(PlacementSpec spec) { return sReq; } + private SchedulingRequest setupSchedulingRequest(AttributeSpec spec) { + long allocId = allocIdCounter.incrementAndGet(); + SchedulingRequest sReq = SchedulingRequest.newInstance( + allocId, Priority.newInstance(requestPriority), + ExecutionTypeRequest.newInstance(), + Collections.singleton(spec.nodeAttribute), + ResourceSizing.newInstance( + getTaskResourceCapability()), null); + sReq.setPlacementConstraint(spec.constraint); + LOG.info("Scheduling Request made: " + sReq.toString()); + return sReq; + } + private boolean fileExist(String filePath) { return new File(filePath).exists(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/AttributeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/AttributeSpec.java new file mode 100644 index 00000000000..8899560ae3c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/AttributeSpec.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Class encapsulating a nodeAttribute and a Placement + * Constraint. + */ +public class AttributeSpec { + + private static final Logger LOG = + LoggerFactory.getLogger(AttributeSpec.class); + + public final String nodeAttribute; + public final PlacementConstraint constraint; + + public AttributeSpec(String nodeAttribute, + PlacementConstraint constraint) { + this.nodeAttribute = nodeAttribute; + this.constraint = constraint; + } + + // Attribute specification should be of the form: + // AttributeSpec => ""|KeyVal;PlacementSpec + // KeyVal => Attribute op Constraint + // op => in, notin + // Attribute => String + // Constraint => + // "=", AttributeValue| + // "!=", AttributeValue| + // AttributeValue => String (Value of Node Attribute) + + /** + * Parser to convert a string representation of an attribute spec to mapping + * from attribute to Placement Constraint. + * + * @param specs Attribute spec. + * @return Mapping from source tag to placement constraint. + */ + public static Map parse(String specs) + throws IllegalArgumentException { + LOG.info("Parsing Attribute Specs: [{}]", specs); + Map pSpecs = new HashMap<>(); + Map parsed; + try { + parsed = PlacementConstraintParser.parsePlacementSpecForAttributes(specs); + for (Map.Entry entry : + parsed.entrySet()) { + LOG.info("Parsed constraint: {}", entry.getValue() + .getConstraintExpr().getClass().getSimpleName()); + pSpecs.put(entry.getKey(), new AttributeSpec( + entry.getKey(), + entry.getValue())); + } + return pSpecs; + } catch (PlacementConstraintParseException e) { + throw new IllegalArgumentException( + "Invalid placement spec: " + specs, e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index c8a71b320c0..09135332d5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.DockerClientConfigHandler; import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -103,7 +104,7 @@ * the provided shell command on a set of containers.

* *

This client is meant to act as an example on how to write yarn-based applications.

- * + * *

To submit an application, a client first needs to connect to the ResourceManager * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} * provides a way for the client to get access to cluster information and to request for a @@ -192,6 +193,8 @@ // Placement specification private String placementSpec = ""; + // Node Attribute specification + private String nodeAttributeSpec = ""; // log4j.properties file // if available, add to local resources and set into classpath private String log4jPropFile = ""; @@ -445,9 +448,21 @@ public boolean init(String[] args) throws ParseException { if (cliParser.hasOption("placement_spec")) { placementSpec = cliParser.getOptionValue("placement_spec"); + String[] splitted = placementSpec.split( + String.valueOf(PlacementConstraintParser.EXPRESSION_VAL_DELIM), 2); + + // Given first part of the spec starts with OP_CODE like IN,NOTIN etc + // we could assume its a node_attribute spec. + boolean isNodeAttributeSpec = PlacementConstraintParser + .isValidOp(splitted[0]); // Check if it is parsable - PlacementSpec.parse(this.placementSpec); + if (!isNodeAttributeSpec) { + PlacementSpec.parse(this.placementSpec); + } else { + AttributeSpec.parse(this.nodeAttributeSpec); + } } + appName = cliParser.getOptionValue("appname", "DistributedShell"); amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); amQueue = cliParser.getOptionValue("queue", "default"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 2a869bddefa..0be76b0e7df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -650,6 +650,8 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setNodeLabelManager(nlm); NodeAttributesManager nam = createNodeAttributesManager(); + NodeAttributesManagerImpl namImpl = (NodeAttributesManagerImpl) nam; + namImpl.setRMContext(rmContext); addService(nam); rmContext.setNodeAttributesManager(nam); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index bf9de15a3bf..7b378b06c5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.nodelabels.StringAttributeValue; import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent; import com.google.common.base.Strings; @@ -91,6 +93,7 @@ private final ReadLock readLock; private final WriteLock writeLock; + private RMContext rmContext = null; public NodeAttributesManagerImpl() { super("NodeAttributesManagerImpl"); @@ -205,6 +208,19 @@ private void internalUpdateAttributesOnNodes( .handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op)); } + // Map used to notify RM + Map> newNodeToAttributesMap = + new HashMap>(); + nodeCollections.forEach((k, v) -> { + newNodeToAttributesMap.put(k, v.attributes.keySet()); + }); + + // Notify RM + if (rmContext != null && rmContext.getDispatcher() != null) { + rmContext.getDispatcher().getEventHandler().handle( + new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap)); + } + } finally { writeLock.unlock(); } @@ -702,4 +718,8 @@ protected void serviceStop() throws Exception { store.close(); } } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 59771fdef78..20f554844ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; @@ -79,6 +80,8 @@ private volatile Set labels = null; + private volatile Set nodeAttributes = null; + // Last updated time private volatile long lastHeartbeatMonotonicTime; @@ -503,6 +506,14 @@ public int hashCode() { return getNodeID().hashCode(); } + public Set getNodeAttributes() { + return nodeAttributes; + } + + public void updateNodeAttributes(Set nodeAttributes) { + this.nodeAttributes = nodeAttributes; + } + private static class ContainerInfo { private final RMContainer container; private boolean launchedOnNode; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 37f56deb119..71dacea1ef7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -136,6 +137,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -1728,6 +1730,14 @@ public void handle(SchedulerEvent event) { updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; + case NODE_ATTRIBUTES_UPDATE: + { + NodeAttributesUpdateSchedulerEvent attributeUpdateEvent = + (NodeAttributesUpdateSchedulerEvent) event; + + updateNodeAttributes(attributeUpdateEvent); + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; @@ -1861,6 +1871,30 @@ public void handle(SchedulerEvent event) { } } + private void updateNodeAttributes( + NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { + try { + writeLock.lock(); + for (Entry> entry : attributeUpdateEvent + .getUpdatedNodeToAttributes().entrySet()) { + String hostname = entry.getKey(); + Set attributes = entry.getValue(); + List nodeIds = nodeTracker.getNodeIdsByResourceName(hostname); + updateAttributesOnNode(nodeIds, attributes); + } + } finally { + writeLock.unlock(); + } + } + + private void updateAttributesOnNode(List nodeIds, + Set attributes) { + nodeIds.forEach((k) -> { + SchedulerNode node = nodeTracker.getNode(k); + node.updateNodeAttributes(attributes); + }); + } + /** * Process node labels update. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index f47e1d4889d..39f3f8172e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; @@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeAttributesManagerImpl; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; @@ -114,19 +117,58 @@ private static boolean canSatisfySingleConstraintExpression( || maxScopeCardinality <= desiredMaxCardinality); } - private static boolean canSatisfyNodePartitionConstraintExpresssion( + private static boolean canSatisfyNodeConstraintExpresssion( TargetExpression targetExpression, SchedulerNode schedulerNode) { Set values = targetExpression.getTargetValues(); - if (values == null || values.isEmpty()) { - return schedulerNode.getPartition().equals( - RMNodeLabelsManager.NO_LABEL); - } else{ - String nodePartition = values.iterator().next(); - if (!nodePartition.equals(schedulerNode.getPartition())) { + + if (targetExpression.getTargetKey().equals(NODE_PARTITION)) { + if (values == null || values.isEmpty()) { + return schedulerNode.getPartition() + .equals(RMNodeLabelsManager.NO_LABEL); + } else { + String nodePartition = values.iterator().next(); + if (!nodePartition.equals(schedulerNode.getPartition())) { + return false; + } + } + } else { + // compare attributes. + String inputAttribute = values.iterator().next(); + NodeAttribute requestAttribute = getNodeConstraintFromRequest( + targetExpression.getTargetKey(), inputAttribute); + LOG.info("Incoming requestAttribute:" + requestAttribute); + if (requestAttribute == null) { + return true; + } + if (!schedulerNode.getNodeAttributes().contains(requestAttribute)) { + LOG.info("Incoming requestAttribute:" + requestAttribute + + "is not present in " + schedulerNode.getNodeID()); + return false; + } + boolean found = false; + for (Iterator it = schedulerNode.getNodeAttributes() + .iterator(); it.hasNext(); ) { + NodeAttribute nodeAttribute = it.next(); + LOG.info("Incoming requestAttribute :" + requestAttribute + + " verifies with requestAttribute value= " + requestAttribute + .getAttributeValue() + + ", stored nodeAttribute value=" + nodeAttribute + .getAttributeValue()); + if (requestAttribute.equals(nodeAttribute)) { + if (requestAttribute.getAttributeValue() + .equals(nodeAttribute.getAttributeValue())) { + LOG.info( + "Incoming requestAttribute:" + requestAttribute + " matches."); + found = true; + return found; + } + } + } + if (!found) { + LOG.info("skip this node for requestAttribute:" + requestAttribute); return false; } } - return true; } @@ -146,10 +188,9 @@ private static boolean canSatisfySingleConstraint(ApplicationId applicationId, singleConstraint, currentExp, schedulerNode, tagsManager)) { return false; } - } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE) - && currentExp.getTargetKey().equals(NODE_PARTITION)) { - // This is a node partition expression, check it. - canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode); + } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) { + // This is a node attribute expression, check it. + canSatisfyNodeConstraintExpresssion(currentExp, schedulerNode); } } // return true if all targetExpressions are satisfied @@ -263,4 +304,17 @@ public static boolean canSatisfyConstraints(ApplicationId applicationId, pcm.getMultilevelConstraint(applicationId, sourceTags, pc), schedulerNode, atm); } + + private static NodeAttribute getNodeConstraintFromRequest(String namespace, String attrString) { + // Input node attribute could be like rm.yarn.io/javaversion=1.8 + String[] splits = attrString.split("="); + if (splits == null) { + return null; + } + StringBuilder sb = new StringBuilder(); + sb.append(namespace).append('/').append(splits[0]); + NodeAttribute nodeAttribute = NodeAttribute + .newInstance(sb.toString(), NodeAttributeType.STRING, splits[1]); + return nodeAttribute; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java new file mode 100644 index 00000000000..2a3effbde8f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class NodeAttributesUpdateSchedulerEvent extends SchedulerEvent { + private Map> nodeToAttributes; + + public NodeAttributesUpdateSchedulerEvent( + Map> newNodeToAttributesMap) { + super(SchedulerEventType.NODE_ATTRIBUTES_UPDATE); + this.nodeToAttributes = newNodeToAttributesMap; + } + + public Map> getUpdatedNodeToAttributes() { + return nodeToAttributes; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index b107cf4ee61..869bf0ed9e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -26,6 +26,7 @@ NODE_UPDATE, NODE_RESOURCE_UPDATE, NODE_LABELS_UPDATE, + NODE_ATTRIBUTES_UPDATE, // Source: RMApp APP_ADDED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 2b610f2fe76..41a94c7e67b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -70,6 +71,7 @@ private SchedulingRequest schedulingRequest = null; private String targetNodePartition; + private Set targetNodeAttributes; private Set targetAllocationTags; private AllocationTagsManager allocationTagsManager; private PlacementConstraintManager placementConstraintManager; @@ -223,6 +225,9 @@ private String throwExceptionWithMetaInfo(String message) { private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequest) throws SchedulerInvalidResoureRequestException { + // Set Node Attributes + Set nodeAttributes = null; + // Check sizing exists if (newSchedulingRequest.getResourceSizing() == null || newSchedulingRequest.getResourceSizing().getResources() == null) { @@ -293,9 +298,9 @@ private void validateAndSetSchedulingRequest(SchedulingRequest // For node attribute target, we only support Partition now. And once // YARN-3409 is merged, we will support node attribute. if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) { - throwExceptionWithMetaInfo("When TargetType=" - + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE - + " only " + NODE_PARTITION + " is accepted as TargetKey."); + nodeAttributes = handleNodeAttributeFromSchedulingRequest( + targetExpression); + continue; } if (nodePartition != null) { @@ -360,14 +365,29 @@ private void validateAndSetSchedulingRequest(SchedulingRequest // Validation is done. set local results: this.targetNodePartition = nodePartition; this.targetAllocationTags = targetAllocationTags; + this.targetNodeAttributes = nodeAttributes; this.schedulingRequest = new SchedulingRequestPBImpl( ((SchedulingRequestPBImpl) newSchedulingRequest).getProto()); - LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo - .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils - .join(",", targetAllocationTags) + "]. nodePartition=" - + targetNodePartition); + LOG.info("Successfully added SchedulingRequest to app=" + + appSchedulingInfo.getApplicationAttemptId() + + " targetAllocationTags=[" + + StringUtils.join(",", targetAllocationTags) + "]. nodePartition=" + + targetNodePartition + " targetNodeAttributes=[" + + StringUtils.join(",", targetNodeAttributes) + "]"); + } + + private Set handleNodeAttributeFromSchedulingRequest( + TargetExpression targetExpression) { + Set nodeAttributes = new HashSet(); + Set values = targetExpression.getTargetValues(); + if (values == null || values.isEmpty()) { + return nodeAttributes; + } + + nodeAttributes.addAll(values); + return nodeAttributes; } @Override -- 2.15.2 (Apple Git-101.1)