From 7ca4983b74e70afdacb6c5490808c2e993e8ef10 Mon Sep 17 00:00:00 2001
From: Sunil G
Date: Fri, 17 Aug 2018 10:42:29 +0530
Subject: [PATCH] YARN-7863
---
.../yarn/api/resource/PlacementConstraint.java | 9 +
.../util/constraint/PlacementConstraintParser.java | 201 +++++++++++++++++++--
.../resource/TestPlacementConstraintParser.java | 59 +++++-
.../api/resource/TestPlacementConstraints.java | 4 +-
.../distributedshell/ApplicationMaster.java | 68 +++++--
.../yarn/applications/distributedshell/Client.java | 8 +-
.../distributedshell/PlacementSpec.java | 27 ++-
.../server/resourcemanager/ResourceManager.java | 2 +
.../nodelabels/NodeAttributesManagerImpl.java | 25 ++-
.../resourcemanager/scheduler/SchedulerNode.java | 11 ++
.../scheduler/capacity/CapacityScheduler.java | 36 +++-
.../allocator/RegularContainerAllocator.java | 3 +
.../constraint/PlacementConstraintsUtil.java | 101 +++++++++--
.../event/NodeAttributesUpdateSchedulerEvent.java | 39 ++++
.../scheduler/event/SchedulerEventType.java | 1 +
.../placement/LocalityAppPlacementAllocator.java | 1 +
.../SingleConstraintAppPlacementAllocator.java | 36 +++-
17 files changed, 563 insertions(+), 68 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
index 0fe8273e6d7..bd4339a857f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
@@ -127,10 +127,19 @@ public int hashCode() {
* classes.
*/
public abstract static class AbstractConstraint implements Visitable {
+ protected String constraintName;
public PlacementConstraint build() {
return new PlacementConstraint(this);
}
+ public String getConstraintName() {
+ return constraintName;
+ }
+
+ public void setConstraintName(String name) {
+ constraintName = name;
+ }
+
@Override
public String toString() {
return super.toString();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
index 2926c9d1de8..3342ca61a1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
@@ -44,9 +44,9 @@
@InterfaceStability.Unstable
public final class PlacementConstraintParser {
+ public static final char EXPRESSION_VAL_DELIM = ',';
private static final char EXPRESSION_DELIM = ':';
private static final char KV_SPLIT_DELIM = '=';
- private static final char EXPRESSION_VAL_DELIM = ',';
private static final char BRACKET_START = '(';
private static final char BRACKET_END = ')';
private static final String IN = "in";
@@ -57,6 +57,10 @@
private static final String SCOPE_NODE = PlacementConstraints.NODE;
private static final String SCOPE_RACK = PlacementConstraints.RACK;
+ public enum TargetExprType {
+ NODE_ATTRIBUTE, ALLOCATION_TAG
+ }
+
private PlacementConstraintParser() {
// Private constructor for this utility class.
}
@@ -349,6 +353,81 @@ public String nextElement() {
}
}
+ /**
+ * Constraint parser used to parse a given target expression.
+ */
+ public static class NodeConstraintParser extends ConstraintParser {
+
+ public NodeConstraintParser(String expression) {
+ super(new BaseStringTokenizer(expression,
+ String.valueOf(EXPRESSION_VAL_DELIM)));
+ }
+
+ @Override
+ public AbstractConstraint parse()
+ throws PlacementConstraintParseException {
+ PlacementConstraint.AbstractConstraint placementConstraints = null;
+ String op = nextToken();
+ String attributeName = "";
+ String attributeNameSpace = "rm.yarn.io/";
+ if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) {
+ String scope = SCOPE_NODE;
+
+ Set constraintEntities = new TreeSet<>();
+ while (hasMoreTokens()) {
+ String tag = nextToken();
+ StringBuilder finalTag = new StringBuilder();
+ StringTokenizer attributeKV = new StringTokenizer(tag,
+ String.valueOf(KV_SPLIT_DELIM));
+ // Usually there will be only one k=v pair. However in case when multiple
+ // values are present for same attribute, it will also be coming as next token.
+ if (attributeKV.countTokens() > 1) {
+ attributeName = getConstraintNameWithNameSpace(attributeNameSpace,
+ attributeKV);
+ tag = attributeName + '=' + attributeKV.nextToken();
+ } else {
+ finalTag.append(attributeName);
+ finalTag.append("=");
+ }
+ finalTag.append(tag);
+ constraintEntities.add(finalTag.toString());
+ }
+ PlacementConstraint.TargetExpression target = null;
+ if (!constraintEntities.isEmpty()) {
+ target = PlacementConstraints.PlacementTargets
+ .nodeAttribute(attributeName,
+ constraintEntities
+ .toArray(new String[constraintEntities.size()]));
+ }
+ if (op.equalsIgnoreCase(IN)) {
+ placementConstraints = PlacementConstraints
+ .targetIn(scope, target);
+ } else if (op.equalsIgnoreCase(NOT_IN)) {
+ placementConstraints = PlacementConstraints
+ .targetNotIn(scope, target);
+ }
+ } else {
+ throw new PlacementConstraintParseException(
+ "expecting " + IN + " or " + NOT_IN + ", but get " + op);
+ }
+ placementConstraints.setConstraintName(attributeName);
+ return placementConstraints;
+ }
+
+ private String getConstraintNameWithNameSpace(String attributeNameSpace,
+ StringTokenizer attributeKV) {
+ StringBuilder attributeName = new StringBuilder();
+ String name = attributeKV.nextToken();
+ if (name.indexOf("rm.yarn.io/") == -1
+ && name.indexOf("nm.yarn.io/") == -1) {
+ attributeName.append(attributeNameSpace).append(name);
+ } else {
+ attributeName.append(name);
+ }
+ return attributeName.toString();
+ }
+ }
+
/**
* Constraint parser used to parse a given target expression, such as
* "NOTIN, NODE, foo, bar".
@@ -363,20 +442,23 @@ public TargetConstraintParser(String expression) {
@Override
public AbstractConstraint parse()
throws PlacementConstraintParseException {
- PlacementConstraint.AbstractConstraint placementConstraints;
+ PlacementConstraint.AbstractConstraint placementConstraints = null;
String op = nextToken();
if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) {
String scope = nextToken();
scope = parseScope(scope);
- Set allocationTags = new TreeSet<>();
+ Set constraintEntities = new TreeSet<>();
while(hasMoreTokens()) {
String tag = nextToken();
- allocationTags.add(tag);
+ constraintEntities.add(tag);
+ }
+ PlacementConstraint.TargetExpression target = null;
+ if(!constraintEntities.isEmpty()) {
+ target = PlacementConstraints.PlacementTargets.allocationTag(
+ constraintEntities
+ .toArray(new String[constraintEntities.size()]));
}
- PlacementConstraint.TargetExpression target =
- PlacementConstraints.PlacementTargets.allocationTag(
- allocationTags.toArray(new String[allocationTags.size()]));
if (op.equalsIgnoreCase(IN)) {
placementConstraints = PlacementConstraints
.targetIn(scope, target);
@@ -490,11 +572,11 @@ public AbstractConstraint parse() throws PlacementConstraintParseException {
* A helper class to encapsulate source tags and allocations in the
* placement specification.
*/
- public static final class SourceTags {
+ public static final class PlacementKey {
private String tag;
private int num;
- private SourceTags(String sourceTag, int number) {
+ private PlacementKey(String sourceTag, int number) {
this.tag = sourceTag;
this.num = number;
}
@@ -510,10 +592,10 @@ public int getNumOfAllocations() {
/**
* Parses source tags from expression "sourceTags=numOfAllocations".
* @param expr
- * @return source tags, see {@link SourceTags}
+ * @return source tags, see {@link PlacementKey}
* @throws PlacementConstraintParseException
*/
- public static SourceTags parseFrom(String expr)
+ public static PlacementKey parseFrom(String expr)
throws PlacementConstraintParseException {
SourceTagsTokenizer stt = new SourceTagsTokenizer(expr);
stt.validate();
@@ -521,7 +603,7 @@ public static SourceTags parseFrom(String expr)
// During validation we already checked the number of parsed elements.
String allocTag = stt.nextElement();
int allocNum = Integer.parseInt(stt.nextElement());
- return new SourceTags(allocTag, allocNum);
+ return new PlacementKey(allocTag, allocNum);
}
}
@@ -558,6 +640,39 @@ public static AbstractConstraint parseExpression(String constraintStr)
return constraintOptional.get();
}
+ /**
+ * Parses a given constraint expression to a {@link AbstractConstraint},
+ * this expression can be any valid form of constraint expressions.
+ *
+ * @param constraintStr expression string
+ * @return a parsed {@link AbstractConstraint}
+ * @throws PlacementConstraintParseException when given expression
+ * is malformed
+ */
+ public static AbstractConstraint parseNodeAttributeExpression(String constraintStr)
+ throws PlacementConstraintParseException {
+ // Try parse given expression with all allowed constraint parsers,
+ // fails if no one could parse it.
+ NodeConstraintParser tp = new NodeConstraintParser(constraintStr);
+ Optional constraintOptional =
+ Optional.ofNullable(tp.tryParse());
+ if (!constraintOptional.isPresent()) {
+ CardinalityConstraintParser cp =
+ new CardinalityConstraintParser(constraintStr);
+ constraintOptional = Optional.ofNullable(cp.tryParse());
+ if (!constraintOptional.isPresent()) {
+ ConjunctionConstraintParser jp =
+ new ConjunctionConstraintParser(constraintStr);
+ constraintOptional = Optional.ofNullable(jp.tryParse());
+ }
+ if (!constraintOptional.isPresent()) {
+ throw new PlacementConstraintParseException(
+ "Invalid constraint expression " + constraintStr);
+ }
+ }
+ return constraintOptional.get();
+ }
+
/**
* Parses a placement constraint specification. A placement constraint spec
* is a composite expression which is composed by multiple sub constraint
@@ -582,10 +697,10 @@ public static AbstractConstraint parseExpression(String constraintStr)
* @return a map of source tags to placement constraint mapping.
* @throws PlacementConstraintParseException
*/
- public static Map parsePlacementSpec(
+ public static Map parsePlacementSpec(
String expression) throws PlacementConstraintParseException {
// Respect insertion order.
- Map result = new LinkedHashMap<>();
+ Map result = new LinkedHashMap<>();
PlacementConstraintParser.ConstraintTokenizer tokenizer =
new PlacementConstraintParser.MultipleConstraintsTokenizer(expression);
tokenizer.validate();
@@ -602,7 +717,7 @@ public static AbstractConstraint parseExpression(String constraintStr)
}
String tagAlloc = splitted[0];
- SourceTags st = SourceTags.parseFrom(tagAlloc);
+ PlacementKey st = PlacementKey.parseFrom(tagAlloc);
String exprs = splitted[1];
AbstractConstraint constraint =
PlacementConstraintParser.parseExpression(exprs);
@@ -612,4 +727,60 @@ public static AbstractConstraint parseExpression(String constraintStr)
return result;
}
+
+ /**
+ * Parses a placement constraint specification for a node attributes.
+ * A node attribute constraint spec is a composite expression which is
+ * composed by multiple sub constraint expressions delimited by ":".
+ * With following syntax:
+ *
+ * Op,Attribute1 P1:Op,Attribute2 P2:...
+ *
+ * where Attribute1 indicate the attribute name
+ * and where Pn can be any form of a valid constraint expression,
+ * such as:
+ *
+ *
+ * - python=true OR java!=1.8
+ * - os=centos
+ *
+ * @param expression expression string.
+ * @return a map of attributes to placement constraint mapping.
+ * @throws PlacementConstraintParseException
+ */
+ public static Map parsePlacementSpecForAttributes(
+ String expression) throws PlacementConstraintParseException {
+ // Respect insertion order.
+ Map result = new LinkedHashMap<>();
+ PlacementConstraintParser.ConstraintTokenizer tokenizer =
+ new PlacementConstraintParser.MultipleConstraintsTokenizer(expression);
+ tokenizer.validate();
+ while (tokenizer.hasMoreElements()) {
+ String specStr = tokenizer.nextElement();
+ // each spec starts with op code followed by a constraint value.
+ AbstractConstraint constraint =
+ PlacementConstraintParser
+ .parseNodeAttributeExpression(specStr);
+
+ // TODO: attributeName is a placeholder for now here.
+ String attributeName = constraint.getConstraintName();
+ PlacementKey placementKey = new PlacementKey(attributeName, 0);
+ result.put(placementKey, constraint.build());
+ }
+
+ return result;
+ }
+
+ /**
+ * isValidOp code or not.
+ * @param op op code string
+ * @return true/false based on opcode validation.
+ */
+ public static boolean isValidOp(String op) {
+ if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)
+ || op.equalsIgnoreCase(CARDINALITY)) {
+ return true;
+ }
+ return false;
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
index a69571c5c80..7a3cc81fc89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
@@ -29,7 +29,7 @@
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
-import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.PlacementKey;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.TargetConstraintParser;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintParser;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.CardinalityConstraintParser;
@@ -334,10 +334,10 @@ void verify()
@Test
public void testParsePlacementSpec()
throws PlacementConstraintParseException {
- Map result;
+ Map result;
PlacementConstraint expectedPc1, expectedPc2;
PlacementConstraint actualPc1, actualPc2;
- SourceTags tag1, tag2;
+ PlacementKey tag1, tag2;
// A single anti-affinity constraint
result = PlacementConstraintParser
@@ -375,7 +375,7 @@ public void testParsePlacementSpec()
result = PlacementConstraintParser
.parsePlacementSpec("foo=3,notin,node,foo:bar=2,in,node,foo");
Assert.assertEquals(2, result.size());
- Iterator keyIt = result.keySet().iterator();
+ Iterator keyIt = result.keySet().iterator();
tag1 = keyIt.next();
Assert.assertEquals("foo", tag1.getTag());
Assert.assertEquals(3, tag1.getNumOfAllocations());
@@ -443,4 +443,55 @@ private void verifyConstraintToString(String inputExpr,
+ constrainExpr + ", caused by: " + e.getMessage());
}
}
+
+ @Test
+ public void testParseNodeAttributeSpec()
+ throws PlacementConstraintParseException {
+ Map result;
+ PlacementConstraint.AbstractConstraint expectedPc1, expectedPc2;
+ PlacementConstraint actualPc1, actualPc2;
+ String attributeName1, attributeName2;
+
+ // A single node attribute constraint
+ result = PlacementConstraintParser
+ .parsePlacementSpecForAttributes("in,foo=true");
+ Assert.assertEquals(1, result.size());
+ TargetExpression target = PlacementTargets
+ .nodeAttribute("rm.yarn.io/foo", "rm.yarn.io/foo=true");
+ expectedPc1 = targetIn("node", target);
+
+ actualPc1 = result.values().iterator().next();
+ Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+
+ // A single node attribute constraint
+ result = PlacementConstraintParser
+ .parsePlacementSpecForAttributes("NOTIN,foo=abc");
+ Assert.assertEquals(1, result.size());
+ target = PlacementTargets
+ .nodeAttribute("rm.yarn.io/foo", "rm.yarn.io/foo=abc");
+ expectedPc1 = targetNotIn("node", target);
+
+ actualPc1 = result.values().iterator().next();
+ Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+
+ actualPc1 = result.values().iterator().next();
+ Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+
+ // A single node attribute constraint
+ result = PlacementConstraintParser
+ .parsePlacementSpecForAttributes("NOTIN,foo=abc:IN,bar=true");
+ Assert.assertEquals(2, result.size());
+ target = PlacementTargets
+ .nodeAttribute("rm.yarn.io/foo", "rm.yarn.io/foo=abc");
+ expectedPc1 = targetNotIn("node", target);
+ target = PlacementTargets
+ .nodeAttribute("rm.yarn.io/bar", "rm.yarn.io/bar=true");
+ expectedPc2 = targetIn("node", target);
+
+ Iterator valueIt = result.values().iterator();
+ actualPc1 = valueIt.next();
+ actualPc2 = valueIt.next();
+ Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+ Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java
index 516ecfcd0c3..38ceaa0b34f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraints.java
@@ -69,7 +69,7 @@ public void testNodeAffinityToTag() {
@Test
public void testNodeAntiAffinityToAttribute() {
AbstractConstraint constraintExpr =
- targetNotIn(NODE, nodeAttribute("java", "1.8"));
+ targetNotIn(NODE, nodeAttribute("java", "java=1.8"));
SingleConstraint sConstraint = (SingleConstraint) constraintExpr;
Assert.assertEquals(NODE, sConstraint.getScope());
@@ -82,7 +82,7 @@ public void testNodeAntiAffinityToAttribute() {
Assert.assertEquals("java", tExpr.getTargetKey());
Assert.assertEquals(TargetType.NODE_ATTRIBUTE, tExpr.getTargetType());
Assert.assertEquals(1, tExpr.getTargetValues().size());
- Assert.assertEquals("1.8", tExpr.getTargetValues().iterator().next());
+ Assert.assertEquals("java=1.8", tExpr.getTargetValues().iterator().next());
}
@Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 76fa38f922a..e21710fed0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -118,6 +118,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.BoundedAppender;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -523,9 +524,27 @@ public boolean init(String[] args) throws ParseException, IOException {
if (cliParser.hasOption("placement_spec")) {
String placementSpec = cliParser.getOptionValue("placement_spec");
- LOG.info("Placement Spec received [{}]", placementSpec);
- parsePlacementSpecs(placementSpec);
+ String decodedSpec = getDecodedPlacementSpec(placementSpec);
+ LOG.info("Placement Spec received [{}]", decodedSpec);
+
+ String[] splitted = decodedSpec.split(
+ String.valueOf(PlacementConstraintParser.EXPRESSION_VAL_DELIM), 2);
+
+ // Given first part of the spec starts with OP_CODE like IN,NOTIN etc
+ // we could assume its a node_attribute spec.
+ boolean isNodeAttributeSpec = PlacementConstraintParser
+ .isValidOp(splitted[0]);
+
+ this.numTotalContainers = 0;
+ if(isNodeAttributeSpec) {
+ numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
+ "num_containers", "1"));
+ }
+
+ parsePlacementSpecs(decodedSpec);
+
LOG.info("Total num containers requested [{}]", numTotalContainers);
+
if (numTotalContainers == 0) {
throw new IllegalArgumentException(
"Cannot run distributed shell with no containers");
@@ -694,23 +713,34 @@ public boolean init(String[] args) throws ParseException, IOException {
return true;
}
- private void parsePlacementSpecs(String placementSpecifications) {
- // Client sends placement spec in encoded format
- Base64.Decoder decoder = Base64.getDecoder();
- byte[] decodedBytes = decoder.decode(
- placementSpecifications.getBytes(StandardCharsets.UTF_8));
- String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
- LOG.info("Decode placement spec: " + decodedSpec);
+ private void parsePlacementSpecs(String decodedSpec) {
+ String[] splitted = decodedSpec.split(
+ String.valueOf(PlacementConstraintParser.EXPRESSION_VAL_DELIM), 2);
+
+ // Given first part of the spec starts with OP_CODE like IN,NOTIN etc
+ // we could assume its a node_attribute spec.
+ boolean isNodeAttributeSpec = PlacementConstraintParser
+ .isValidOp(splitted[0]);
Map pSpecs =
- PlacementSpec.parse(decodedSpec);
+ PlacementSpec.parse(decodedSpec, this.numTotalContainers, isNodeAttributeSpec);
this.placementSpecs = new HashMap<>();
- this.numTotalContainers = 0;
for (PlacementSpec pSpec : pSpecs.values()) {
- this.numTotalContainers += pSpec.numContainers;
+ if(!isNodeAttributeSpec) {
+ this.numTotalContainers += pSpec.numContainers;
+ }
this.placementSpecs.put(pSpec.sourceTag, pSpec);
}
}
+ private String getDecodedPlacementSpec(String placementSpecifications) {
+ Base64.Decoder decoder = Base64.getDecoder();
+ byte[] decodedBytes = decoder.decode(
+ placementSpecifications.getBytes(StandardCharsets.UTF_8));
+ String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
+ LOG.info("Decode placement spec: " + decodedSpec);
+ return decodedSpec;
+ }
+
/**
* Helper function to print usage
*
@@ -798,6 +828,7 @@ public void run() throws YarnException, IOException, InterruptedException {
}
}
}
+
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl, placementConstraintMap);
@@ -845,16 +876,21 @@ public void run() throws YarnException, IOException, InterruptedException {
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
if (this.placementSpecs == null) {
+ LOG.info("placementSpecs null");
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
} else {
+ LOG.info("placementSpecs to create req:" + placementSpecs);
List schedReqs = new ArrayList<>();
- for (PlacementSpec pSpec : this.placementSpecs.values()) {
- for (int i = 0; i < pSpec.numContainers; i++) {
- SchedulingRequest sr = setupSchedulingRequest(pSpec);
- schedReqs.add(sr);
+ if (this.placementSpecs != null) {
+ for (PlacementSpec pSpec : this.placementSpecs.values()) {
+ LOG.info("placementSpec :" + pSpec + ", container:"+ pSpec.numContainers);
+ for (int i = 0; i < pSpec.numContainers; i++) {
+ SchedulingRequest sr = setupSchedulingRequest(pSpec);
+ schedReqs.add(sr);
+ }
}
}
amRMClient.addSchedulingRequests(schedReqs);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index c8a71b320c0..0e880a3d26b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -90,6 +90,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -103,7 +104,7 @@
* the provided shell command on a set of containers.
*
* This client is meant to act as an example on how to write yarn-based applications.
- *
+ *
* To submit an application, a client first needs to connect to the ResourceManager
* aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol}
* provides a way for the client to get access to cluster information and to request for a
@@ -192,6 +193,8 @@
// Placement specification
private String placementSpec = "";
+ // Node Attribute specification
+ private String nodeAttributeSpec = "";
// log4j.properties file
// if available, add to local resources and set into classpath
private String log4jPropFile = "";
@@ -446,8 +449,9 @@ public boolean init(String[] args) throws ParseException {
if (cliParser.hasOption("placement_spec")) {
placementSpec = cliParser.getOptionValue("placement_spec");
// Check if it is parsable
- PlacementSpec.parse(this.placementSpec);
+ PlacementSpec.parse(this.placementSpec, this.numContainers);
}
+
appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
index 290925980a5..59cfeb241f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
@@ -20,7 +20,7 @@
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
-import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.PlacementKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,24 +66,37 @@ public PlacementSpec(String sourceTag, int numContainers,
* from source tag to Placement Constraint.
*
* @param specs Placement spec.
+ * @param totalContainers container count
+ * @param isNodeAttributeSpec is attribute or not.
* @return Mapping from source tag to placement constraint.
*/
- public static Map parse(String specs)
+ public static Map parse(String specs,
+ int totalContainers, boolean isNodeAttributeSpec)
throws IllegalArgumentException {
LOG.info("Parsing Placement Specs: [{}]", specs);
+
Map pSpecs = new HashMap<>();
- Map parsed;
+ Map parsed;
try {
- parsed = PlacementConstraintParser.parsePlacementSpec(specs);
- for (Map.Entry entry :
+ if(!isNodeAttributeSpec) {
+ parsed = PlacementConstraintParser.parsePlacementSpec(specs);
+ } else {
+ parsed = PlacementConstraintParser.parsePlacementSpecForAttributes(specs);
+ }
+ for (Map.Entry entry :
parsed.entrySet()) {
+ int numContainers = entry.getKey().getNumOfAllocations();
+ // In case of attributes, use --num_containers.
+ if(isNodeAttributeSpec ) {
+ numContainers = totalContainers;
+ }
LOG.info("Parsed source tag: {}, number of allocations: {}",
- entry.getKey().getTag(), entry.getKey().getNumOfAllocations());
+ entry.getKey().getTag(), numContainers);
LOG.info("Parsed constraint: {}", entry.getValue()
.getConstraintExpr().getClass().getSimpleName());
pSpecs.put(entry.getKey().getTag(), new PlacementSpec(
entry.getKey().getTag(),
- entry.getKey().getNumOfAllocations(),
+ numContainers,
entry.getValue()));
}
return pSpecs;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 2a869bddefa..0be76b0e7df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -650,6 +650,8 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmContext.setNodeLabelManager(nlm);
NodeAttributesManager nam = createNodeAttributesManager();
+ NodeAttributesManagerImpl namImpl = (NodeAttributesManagerImpl) nam;
+ namImpl.setRMContext(rmContext);
addService(nam);
rmContext.setNodeAttributesManager(nam);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index bf9de15a3bf..0778cf689c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -56,6 +56,8 @@
import org.apache.hadoop.yarn.nodelabels.StringAttributeValue;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent;
import com.google.common.base.Strings;
@@ -91,6 +93,7 @@
private final ReadLock readLock;
private final WriteLock writeLock;
+ private RMContext rmContext = null;
public NodeAttributesManagerImpl() {
super("NodeAttributesManagerImpl");
@@ -130,7 +133,7 @@ protected void serviceStart() throws Exception {
}
protected void initNodeAttributeStore(Configuration conf) throws Exception {
- this.store =getAttributeStoreClass(conf);
+ this.store = getAttributeStoreClass(conf);
this.store.init(conf, this);
this.store.recover();
}
@@ -199,12 +202,28 @@ private void internalUpdateAttributesOnNodes(
LOG.debug(logMsg);
}
+ LOG.info(logMsg);
if (null != dispatcher && NodeAttribute.PREFIX_CENTRALIZED
.equals(attributePrefix)) {
dispatcher.getEventHandler()
.handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op));
}
+ // Map used to notify RM
+ Map> newNodeToAttributesMap =
+ new HashMap>();
+ nodeCollections.forEach((k, v) -> {
+ newNodeToAttributesMap.put(k, v.attributes.keySet());
+ });
+
+ LOG.info("newNodeToAttributesMap:" + newNodeToAttributesMap.values());
+ // Notify RM
+ if (rmContext != null && rmContext.getDispatcher() != null) {
+ LOG.info("event fire to RM for attributes");
+ rmContext.getDispatcher().getEventHandler().handle(
+ new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
+ }
+
} finally {
writeLock.unlock();
}
@@ -702,4 +721,8 @@ protected void serviceStop() throws Exception {
store.close();
}
}
+
+ public void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 59771fdef78..20f554844ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
@@ -79,6 +80,8 @@
private volatile Set labels = null;
+ private volatile Set nodeAttributes = null;
+
// Last updated time
private volatile long lastHeartbeatMonotonicTime;
@@ -503,6 +506,14 @@ public int hashCode() {
return getNodeID().hashCode();
}
+ public Set getNodeAttributes() {
+ return nodeAttributes;
+ }
+
+ public void updateNodeAttributes(Set nodeAttributes) {
+ this.nodeAttributes = nodeAttributes;
+ }
+
private static class ContainerInfo {
private final RMContainer container;
private boolean launchedOnNode;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 37f56deb119..220135a7cc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -136,6 +137,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1728,6 +1730,14 @@ public void handle(SchedulerEvent event) {
updateNodeLabelsAndQueueResource(labelUpdateEvent);
}
break;
+ case NODE_ATTRIBUTES_UPDATE:
+ {
+ NodeAttributesUpdateSchedulerEvent attributeUpdateEvent =
+ (NodeAttributesUpdateSchedulerEvent) event;
+
+ updateNodeAttributes(attributeUpdateEvent);
+ }
+ break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
@@ -1861,6 +1871,30 @@ public void handle(SchedulerEvent event) {
}
}
+ private void updateNodeAttributes(
+ NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) {
+ try {
+ writeLock.lock();
+ for (Entry> entry : attributeUpdateEvent
+ .getUpdatedNodeToAttributes().entrySet()) {
+ String hostname = entry.getKey();
+ Set attributes = entry.getValue();
+ List nodeIds = nodeTracker.getNodeIdsByResourceName(hostname);
+ updateAttributesOnNode(nodeIds, attributes);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void updateAttributesOnNode(List nodeIds,
+ Set attributes) {
+ nodeIds.forEach((k) -> {
+ SchedulerNode node = nodeTracker.getNode(k);
+ node.updateNodeAttributes(attributes);
+ });
+ }
+
/**
* Process node labels update.
*/
@@ -2708,7 +2742,7 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
schedulingRequest, schedulerNode,
rmContext.getPlacementConstraintManager(),
rmContext.getAllocationTagsManager())) {
- LOG.debug("Failed to allocate container for application "
+ LOG.info("Failed to allocate container for application "
+ appAttempt.getApplicationId() + " on node "
+ schedulerNode.getNodeName()
+ " because this allocation violates the"
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index a843002cdb5..76bace6939b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -141,15 +141,18 @@ private ContainerAllocation preCheckForNodeCandidateSet(
}
}
+ LOG.info("appInfo.precheckNode start");
// Is the nodePartition of pending request matches the node's partition
// If not match, jump to next priority.
if (!appInfo.precheckNode(schedulerKey, node, schedulingMode)) {
+ LOG.info("appInfo.precheckNode skipped");
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.
PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST);
return ContainerAllocation.PRIORITY_SKIPPED;
}
+ LOG.info("appInfo.precheckNode continued");
if (!application.getCSLeafQueue().getReservationContinueLooking()) {
if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index f47e1d4889d..f55b5b4402f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -25,6 +25,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
@@ -35,6 +37,7 @@
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeAttributesManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
@@ -114,19 +117,66 @@ private static boolean canSatisfySingleConstraintExpression(
|| maxScopeCardinality <= desiredMaxCardinality);
}
- private static boolean canSatisfyNodePartitionConstraintExpresssion(
+ private static boolean canSatisfyNodeConstraintExpresssion(
TargetExpression targetExpression, SchedulerNode schedulerNode) {
Set values = targetExpression.getTargetValues();
- if (values == null || values.isEmpty()) {
- return schedulerNode.getPartition().equals(
- RMNodeLabelsManager.NO_LABEL);
- } else{
- String nodePartition = values.iterator().next();
- if (!nodePartition.equals(schedulerNode.getPartition())) {
+
+ if (targetExpression.getTargetKey().equals(NODE_PARTITION)) {
+ if (values == null || values.isEmpty()) {
+ return schedulerNode.getPartition()
+ .equals(RMNodeLabelsManager.NO_LABEL);
+ } else {
+ String nodePartition = values.iterator().next();
+ if (!nodePartition.equals(schedulerNode.getPartition())) {
+ return false;
+ }
+ }
+ } else {
+ // compare attributes.
+ String inputAttribute = values.iterator().next();
+ NodeAttribute requestAttribute = getNodeConstraintFromRequest(inputAttribute);
+ if (requestAttribute == null) {
+ return true;
+ }
+
+ if (schedulerNode.getNodeAttributes() == null ||
+ !schedulerNode.getNodeAttributes().contains(requestAttribute)) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Incoming requestAttribute:" + requestAttribute
+ + "is not present in " + schedulerNode.getNodeID());
+ }
+ return false;
+ }
+ boolean found = false;
+ for (Iterator it = schedulerNode.getNodeAttributes()
+ .iterator(); it.hasNext(); ) {
+ NodeAttribute nodeAttribute = it.next();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Starting to compare Incoming requestAttribute :"
+ + requestAttribute
+ + " with requestAttribute value= " + requestAttribute
+ .getAttributeValue()
+ + ", stored nodeAttribute value=" + nodeAttribute
+ .getAttributeValue());
+ }
+ if (requestAttribute.equals(nodeAttribute)) {
+ if (requestAttribute.getAttributeValue()
+ .equals(nodeAttribute.getAttributeValue())) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Incoming requestAttribute:" + requestAttribute
+ + " matches with node:" + schedulerNode.getNodeID());
+ }
+ found = true;
+ return found;
+ }
+ }
+ }
+ if (!found) {
+ LOG.info("skip this node for requestAttribute:" + requestAttribute);
return false;
}
}
-
return true;
}
@@ -146,10 +196,11 @@ private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
singleConstraint, currentExp, schedulerNode, tagsManager)) {
return false;
}
- } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)
- && currentExp.getTargetKey().equals(NODE_PARTITION)) {
- // This is a node partition expression, check it.
- canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode);
+ } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) {
+ // This is a node attribute expression, check it.
+ if(!canSatisfyNodeConstraintExpresssion(currentExp, schedulerNode)) {
+ return false;
+ }
}
}
// return true if all targetExpressions are satisfied
@@ -203,6 +254,11 @@ private static boolean canSatisfyConstraints(ApplicationId appId,
AllocationTagsManager atm)
throws InvalidAllocationTagsQueryException {
if (constraint == null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Constraint is found empty during constraint validation for app:"
+ + appId);
+ }
return true;
}
@@ -263,4 +319,25 @@ public static boolean canSatisfyConstraints(ApplicationId applicationId,
pcm.getMultilevelConstraint(applicationId, sourceTags, pc),
schedulerNode, atm);
}
+
+ private static NodeAttribute getNodeConstraintFromRequest(String attrString) {
+ NodeAttribute nodeAttribute = null;
+ LOG.info("Incoming node attribute: " + attrString);
+ // Input node attribute could be like rm.yarn.io/java=1.8
+ String[] splits = attrString.split("=");
+ if (splits == null) {
+ return null;
+ }
+
+ String[] name = splits[0].split("/");
+ if(name == null || name.length == 1) {
+ nodeAttribute = NodeAttribute
+ .newInstance(splits[0], NodeAttributeType.STRING, splits[1]);
+ } else {
+ nodeAttribute = NodeAttribute
+ .newInstance(name[0], name[1], NodeAttributeType.STRING, splits[1]);
+ }
+
+ return nodeAttribute;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java
new file mode 100644
index 00000000000..2a3effbde8f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class NodeAttributesUpdateSchedulerEvent extends SchedulerEvent {
+ private Map> nodeToAttributes;
+
+ public NodeAttributesUpdateSchedulerEvent(
+ Map> newNodeToAttributesMap) {
+ super(SchedulerEventType.NODE_ATTRIBUTES_UPDATE);
+ this.nodeToAttributes = newNodeToAttributesMap;
+ }
+
+ public Map> getUpdatedNodeToAttributes() {
+ return nodeToAttributes;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index b107cf4ee61..869bf0ed9e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -26,6 +26,7 @@
NODE_UPDATE,
NODE_RESOURCE_UPDATE,
NODE_LABELS_UPDATE,
+ NODE_ATTRIBUTES_UPDATE,
// Source: RMApp
APP_ADDED,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index e1239a9db34..85c94b39356 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -366,6 +366,7 @@ public boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode) {
// We will only look at node label = nodeLabelToLookAt according to
// schedulingMode and partition of node.
+ LOG.info("Locality App Allocator for " + schedulerNode.getNodeID() + "," + schedulingMode);
String nodePartitionToLookAt;
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
nodePartitionToLookAt = schedulerNode.getPartition();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
index 2b610f2fe76..9b19a409ca6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -70,6 +71,7 @@
private SchedulingRequest schedulingRequest = null;
private String targetNodePartition;
+ private Set targetNodeAttributes;
private Set targetAllocationTags;
private AllocationTagsManager allocationTagsManager;
private PlacementConstraintManager placementConstraintManager;
@@ -223,6 +225,9 @@ private String throwExceptionWithMetaInfo(String message) {
private void validateAndSetSchedulingRequest(SchedulingRequest
newSchedulingRequest)
throws SchedulerInvalidResoureRequestException {
+ // Set Node Attributes
+ Set nodeAttributes = null;
+
// Check sizing exists
if (newSchedulingRequest.getResourceSizing() == null
|| newSchedulingRequest.getResourceSizing().getResources() == null) {
@@ -293,9 +298,9 @@ private void validateAndSetSchedulingRequest(SchedulingRequest
// For node attribute target, we only support Partition now. And once
// YARN-3409 is merged, we will support node attribute.
if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
- throwExceptionWithMetaInfo("When TargetType="
- + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
- + " only " + NODE_PARTITION + " is accepted as TargetKey.");
+ nodeAttributes = handleNodeAttributeFromSchedulingRequest(
+ targetExpression);
+ continue;
}
if (nodePartition != null) {
@@ -338,7 +343,7 @@ private void validateAndSetSchedulingRequest(SchedulingRequest
}
}
- if (targetAllocationTags == null) {
+ if (targetAllocationTags == null && nodeAttributes == null) {
// That means we don't have ALLOCATION_TAG specified
throwExceptionWithMetaInfo(
"Couldn't find target expression with type == ALLOCATION_TAG,"
@@ -360,14 +365,29 @@ private void validateAndSetSchedulingRequest(SchedulingRequest
// Validation is done. set local results:
this.targetNodePartition = nodePartition;
this.targetAllocationTags = targetAllocationTags;
+ this.targetNodeAttributes = nodeAttributes;
this.schedulingRequest = new SchedulingRequestPBImpl(
((SchedulingRequestPBImpl) newSchedulingRequest).getProto());
- LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo
- .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils
- .join(",", targetAllocationTags) + "]. nodePartition="
- + targetNodePartition);
+ LOG.info("Successfully added SchedulingRequest to app="
+ + appSchedulingInfo.getApplicationAttemptId()
+ + " targetAllocationTags=["
+ + StringUtils.join(",", targetAllocationTags) + "]. nodePartition="
+ + targetNodePartition + " targetNodeAttributes=["
+ + StringUtils.join(",", targetNodeAttributes) + "]");
+ }
+
+ private Set handleNodeAttributeFromSchedulingRequest(
+ TargetExpression targetExpression) {
+ Set nodeAttributes = new HashSet();
+ Set values = targetExpression.getTargetValues();
+ if (values == null || values.isEmpty()) {
+ return nodeAttributes;
+ }
+
+ nodeAttributes.addAll(values);
+ return nodeAttributes;
}
@Override
--
2.15.2 (Apple Git-101.1)