From 2270fd4204aa29a5fc9d0b5c23c9b4245496ecfe Mon Sep 17 00:00:00 2001
From: Sunil G
Date: Sat, 25 Aug 2018 20:59:27 +0530
Subject: [PATCH] YARN-7863-YARN-3409
---
.../yarn/api/records/NodeAttributeOpCode.java | 39 +++++++
.../yarn/api/resource/PlacementConstraint.java | 37 ++++++-
.../yarn/api/resource/PlacementConstraints.java | 19 ++++
.../util/constraint/PlacementConstraintParser.java | 114 ++++++++++++++++++--
.../src/main/proto/yarn_protos.proto | 6 ++
.../resource/TestPlacementConstraintParser.java | 53 +++++++++
.../distributedshell/ApplicationMaster.java | 36 ++++---
.../yarn/applications/distributedshell/Client.java | 6 +-
.../distributedshell/PlacementSpec.java | 19 +++-
.../pb/PlacementConstraintFromProtoConverter.java | 10 +-
.../pb/PlacementConstraintToProtoConverter.java | 11 ++
.../server/resourcemanager/ResourceManager.java | 7 +-
.../nodelabels/NodeAttributesManagerImpl.java | 27 ++++-
.../resourcemanager/scheduler/SchedulerNode.java | 11 ++
.../scheduler/capacity/CapacityScheduler.java | 36 ++++++-
.../constraint/PlacementConstraintsUtil.java | 118 ++++++++++++++++++---
.../event/NodeAttributesUpdateSchedulerEvent.java | 39 +++++++
.../scheduler/event/SchedulerEventType.java | 1 +
.../placement/LocalityAppPlacementAllocator.java | 4 +
19 files changed, 549 insertions(+), 44 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java
new file mode 100644
index 00000000000..5964cb4b5ee
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Enumeration of various node attribute op codes.
+ */
+@Public
+@Evolving
+public enum NodeAttributeOpCode {
+ /**
+ * EQUALS op code for Attribute.
+ */
+ EQ,
+
+ /**
+ * NOT EQUALS op code for Attribute.
+ */
+ NE
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
index 0fe8273e6d7..20a65f6b21d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
/**
* {@code PlacementConstraint} represents a placement constraint for a resource
@@ -155,13 +156,21 @@ public String toString() {
private int minCardinality;
private int maxCardinality;
private Set targetExpressions;
+ private NodeAttributeOpCode attributeOpCode;
public SingleConstraint(String scope, int minCardinality,
- int maxCardinality, Set targetExpressions) {
+ int maxCardinality, NodeAttributeOpCode opCode,
+ Set targetExpressions) {
this.scope = scope;
this.minCardinality = minCardinality;
this.maxCardinality = maxCardinality;
this.targetExpressions = targetExpressions;
+ this.attributeOpCode = opCode;
+ }
+
+ public SingleConstraint(String scope, int minCardinality,
+ int maxCardinality, Set targetExpressions) {
+ this(scope, minCardinality, maxCardinality, null, targetExpressions);
}
public SingleConstraint(String scope, int minC, int maxC,
@@ -169,6 +178,13 @@ public SingleConstraint(String scope, int minC, int maxC,
this(scope, minC, maxC, new HashSet<>(Arrays.asList(targetExpressions)));
}
+ public SingleConstraint(String scope, int minC, int maxC,
+ NodeAttributeOpCode opCode,
+ TargetExpression... targetExpressions) {
+ this(scope, minC, maxC, opCode,
+ new HashSet<>(Arrays.asList(targetExpressions)));
+ }
+
/**
* Get the scope of the constraint.
*
@@ -205,6 +221,15 @@ public int getMaxCardinality() {
return targetExpressions;
}
+ /**
+ * Get the NodeAttributeOpCode of the constraint.
+ *
+ * @return nodeAttribute Op Code
+ */
+ public NodeAttributeOpCode getNodeAttributeOpCode() {
+ return attributeOpCode;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -225,6 +250,9 @@ public boolean equals(Object o) {
if (!getScope().equals(that.getScope())) {
return false;
}
+ if (!getNodeAttributeOpCode().equals(that.getNodeAttributeOpCode())) {
+ return false;
+ }
return getTargetExpressions().equals(that.getTargetExpressions());
}
@@ -259,6 +287,13 @@ public String toString() {
.append(getScope()).append(",")
.append(targetExpr)
.toString());
+ } else if (min == -1 && max == -1) {
+ // node attribute
+ targetConstraints.add(new StringBuilder()
+ .append(getScope()).append(",")
+ .append(getNodeAttributeOpCode()).append(",")
+ .append(targetExpr)
+ .toString());
} else {
// cardinality
targetConstraints.add(new StringBuilder()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
index d22a6bd90c0..73fa328833f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
@@ -85,6 +86,24 @@ public static AbstractConstraint targetNotIn(String scope,
return new SingleConstraint(scope, 0, 0, targetExpressions);
}
+ /**
+ * Creates a constraint that requires allocations to be placed on nodes that
+ * belong to a scope (e.g., node or rack) that satisfy any of the
+ * target expressions based on node attribute op code.
+ *
+ * @param scope the scope within which the target expressions should not be
+ * true
+ * @param opCode Node Attribute code which could be equals, not equals.
+ * @param targetExpressions the expressions that need to not be true within
+ * the scope
+ * @return the resulting placement constraint
+ */
+ public static AbstractConstraint targetNodeAttribute(String scope,
+ NodeAttributeOpCode opCode,
+ TargetExpression... targetExpressions) {
+ return new SingleConstraint(scope, -1, -1, opCode, targetExpressions);
+ }
+
/**
* Creates a constraint that restricts the number of allocations within a
* given scope (e.g., node or rack).
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
index 2926c9d1de8..28b374341ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
@@ -19,6 +19,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
@@ -44,11 +46,12 @@
@InterfaceStability.Unstable
public final class PlacementConstraintParser {
+ public static final char EXPRESSION_VAL_DELIM = ',';
private static final char EXPRESSION_DELIM = ':';
private static final char KV_SPLIT_DELIM = '=';
- private static final char EXPRESSION_VAL_DELIM = ',';
private static final char BRACKET_START = '(';
private static final char BRACKET_END = ')';
+ private static final String KV_NE_DELIM = "!=";
private static final String IN = "in";
private static final String NOT_IN = "notin";
private static final String AND = "and";
@@ -57,6 +60,10 @@
private static final String SCOPE_NODE = PlacementConstraints.NODE;
private static final String SCOPE_RACK = PlacementConstraints.RACK;
+ public enum TargetExprType {
+ NODE_ATTRIBUTE, ALLOCATION_TAG
+ }
+
private PlacementConstraintParser() {
// Private constructor for this utility class.
}
@@ -349,6 +356,88 @@ public String nextElement() {
}
}
+ /**
+ * Constraint parser used to parse a given target expression.
+ */
+ public static class NodeConstraintParser extends ConstraintParser {
+
+ public NodeConstraintParser(String expression) {
+ super(new BaseStringTokenizer(expression,
+ String.valueOf(EXPRESSION_VAL_DELIM)));
+ }
+
+ @Override
+ public AbstractConstraint parse()
+ throws PlacementConstraintParseException {
+ PlacementConstraint.AbstractConstraint placementConstraints = null;
+ String attributeName = "";
+ NodeAttributeOpCode opCode = NodeAttributeOpCode.EQ;
+ String scope = SCOPE_NODE;
+
+ Set constraintEntities = new TreeSet<>();
+ while (hasMoreTokens()) {
+ String currentTag = nextToken();
+ StringTokenizer attributeKV = getAttributeOpCodeTokenizer(currentTag);
+
+ // Usually there will be only one k=v pair. However in case when multiple
+ // values are present for same attribute, it will also be coming as
+ // next token. for example, java=1.8,1.9 or python!=2
+ if (attributeKV.countTokens() > 1) {
+ opCode = getAttributeOpCode(currentTag);
+ attributeName = attributeKV.nextToken();
+ currentTag = attributeKV.nextToken();
+ }
+ constraintEntities.add(currentTag);
+ }
+
+ if(attributeName.isEmpty()) {
+ throw new PlacementConstraintParseException(
+ "expecting valid expression like k=v or k!=v, but get "
+ + constraintEntities);
+ }
+
+ PlacementConstraint.TargetExpression target = null;
+ if (!constraintEntities.isEmpty()) {
+ target = PlacementConstraints.PlacementTargets
+ .nodeAttribute(attributeName,
+ constraintEntities
+ .toArray(new String[constraintEntities.size()]));
+ }
+
+ placementConstraints = PlacementConstraints
+ .targetNodeAttribute(scope, opCode, target);
+ return placementConstraints;
+ }
+
+ private StringTokenizer getAttributeOpCodeTokenizer(String currentTag) {
+ StringTokenizer attributeKV = new StringTokenizer(currentTag,
+ KV_NE_DELIM);
+
+ // Try with '!=' delim as well.
+ if (attributeKV.countTokens() < 2) {
+ attributeKV = new StringTokenizer(currentTag,
+ String.valueOf(KV_SPLIT_DELIM));
+ }
+ return attributeKV;
+ }
+
+ /**
+ * Below conditions are validated.
+ * java=8 : OpCode = EQUALS
+ * java!=8 : OpCode = NEQUALS
+ * @param currentTag tag
+ * @return Attribute op code.
+ */
+ private NodeAttributeOpCode getAttributeOpCode(String currentTag) {
+ if (currentTag.contains(KV_NE_DELIM)) {
+ return NodeAttributeOpCode.NE;
+ } else if (currentTag.contains(String.valueOf(KV_SPLIT_DELIM))) {
+ return NodeAttributeOpCode.EQ;
+ }
+ return NodeAttributeOpCode.EQ;
+ }
+ }
+
/**
* Constraint parser used to parse a given target expression, such as
* "NOTIN, NODE, foo, bar".
@@ -363,20 +452,23 @@ public TargetConstraintParser(String expression) {
@Override
public AbstractConstraint parse()
throws PlacementConstraintParseException {
- PlacementConstraint.AbstractConstraint placementConstraints;
+ PlacementConstraint.AbstractConstraint placementConstraints = null;
String op = nextToken();
if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) {
String scope = nextToken();
scope = parseScope(scope);
- Set allocationTags = new TreeSet<>();
+ Set constraintEntities = new TreeSet<>();
while(hasMoreTokens()) {
String tag = nextToken();
- allocationTags.add(tag);
+ constraintEntities.add(tag);
+ }
+ PlacementConstraint.TargetExpression target = null;
+ if(!constraintEntities.isEmpty()) {
+ target = PlacementConstraints.PlacementTargets.allocationTag(
+ constraintEntities
+ .toArray(new String[constraintEntities.size()]));
}
- PlacementConstraint.TargetExpression target =
- PlacementConstraints.PlacementTargets.allocationTag(
- allocationTags.toArray(new String[allocationTags.size()]));
if (op.equalsIgnoreCase(IN)) {
placementConstraints = PlacementConstraints
.targetIn(scope, target);
@@ -550,6 +642,11 @@ public static AbstractConstraint parseExpression(String constraintStr)
new ConjunctionConstraintParser(constraintStr);
constraintOptional = Optional.ofNullable(jp.tryParse());
}
+ if (!constraintOptional.isPresent()) {
+ NodeConstraintParser np =
+ new NodeConstraintParser(constraintStr);
+ constraintOptional = Optional.ofNullable(np.tryParse());
+ }
if (!constraintOptional.isPresent()) {
throw new PlacementConstraintParseException(
"Invalid constraint expression " + constraintStr);
@@ -584,12 +681,13 @@ public static AbstractConstraint parseExpression(String constraintStr)
*/
public static Map parsePlacementSpec(
String expression) throws PlacementConstraintParseException {
+ // Continue handling for application tag based constraint otherwise.
// Respect insertion order.
Map result = new LinkedHashMap<>();
PlacementConstraintParser.ConstraintTokenizer tokenizer =
new PlacementConstraintParser.MultipleConstraintsTokenizer(expression);
tokenizer.validate();
- while(tokenizer.hasMoreElements()) {
+ while (tokenizer.hasMoreElements()) {
String specStr = tokenizer.nextElement();
// each spec starts with sourceAllocationTag=numOfContainers and
// followed by a constraint expression.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 10b36c75f59..f9d691c97b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -646,11 +646,17 @@ message PlacementConstraintProto {
optional CompositePlacementConstraintProto compositeConstraint = 2;
}
+enum NodeAttributeOpCodeProto {
+ EQ = 1;
+ NE = 2;
+}
+
message SimplePlacementConstraintProto {
required string scope = 1;
repeated PlacementConstraintTargetProto targetExpressions = 2;
optional int32 minCardinality = 3;
optional int32 maxCardinality = 4;
+ optional NodeAttributeOpCodeProto attributeOpCode = 5;
}
message PlacementConstraintTargetProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
index a69571c5c80..32037686d08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
@@ -22,6 +22,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.Or;
@@ -443,4 +445,55 @@ private void verifyConstraintToString(String inputExpr,
+ constrainExpr + ", caused by: " + e.getMessage());
}
}
+
+ @Test
+ public void testParseNodeAttributeSpec()
+ throws PlacementConstraintParseException {
+ Map result;
+ PlacementConstraint.AbstractConstraint expectedPc1, expectedPc2;
+ PlacementConstraint actualPc1, actualPc2;
+
+ // A single node attribute constraint
+ result = PlacementConstraintParser
+ .parsePlacementSpec("xyz=4,rm.yarn.io/foo=true");
+ Assert.assertEquals(1, result.size());
+ TargetExpression target = PlacementTargets
+ .nodeAttribute("rm.yarn.io/foo", "true");
+ expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target);
+
+ actualPc1 = result.values().iterator().next();
+ Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+
+ // A single node attribute constraint
+ result = PlacementConstraintParser
+ .parsePlacementSpec("xyz=3,rm.yarn.io/foo!=abc");
+ Assert.assertEquals(1, result.size());
+ target = PlacementTargets
+ .nodeAttribute("rm.yarn.io/foo", "abc");
+ expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.NE, target);
+
+ actualPc1 = result.values().iterator().next();
+ Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+
+ actualPc1 = result.values().iterator().next();
+ Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+
+ // A single node attribute constraint
+ result = PlacementConstraintParser
+ .parsePlacementSpec(
+ "xyz=1,rm.yarn.io/foo!=abc:zxy=1,rm.yarn.io/bar=true");
+ Assert.assertEquals(2, result.size());
+ target = PlacementTargets
+ .nodeAttribute("rm.yarn.io/foo", "abc");
+ expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.NE, target);
+ target = PlacementTargets
+ .nodeAttribute("rm.yarn.io/bar", "true");
+ expectedPc2 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target);
+
+ Iterator valueIt = result.values().iterator();
+ actualPc1 = valueIt.next();
+ actualPc2 = valueIt.next();
+ Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+ Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 76fa38f922a..21685854b22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -118,6 +118,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.BoundedAppender;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -523,9 +524,13 @@ public boolean init(String[] args) throws ParseException, IOException {
if (cliParser.hasOption("placement_spec")) {
String placementSpec = cliParser.getOptionValue("placement_spec");
- LOG.info("Placement Spec received [{}]", placementSpec);
- parsePlacementSpecs(placementSpec);
+ String decodedSpec = getDecodedPlacementSpec(placementSpec);
+ LOG.info("Placement Spec received [{}]", decodedSpec);
+
+ this.numTotalContainers = 0;
+ parsePlacementSpecs(decodedSpec);
LOG.info("Total num containers requested [{}]", numTotalContainers);
+
if (numTotalContainers == 0) {
throw new IllegalArgumentException(
"Cannot run distributed shell with no containers");
@@ -694,23 +699,25 @@ public boolean init(String[] args) throws ParseException, IOException {
return true;
}
- private void parsePlacementSpecs(String placementSpecifications) {
- // Client sends placement spec in encoded format
- Base64.Decoder decoder = Base64.getDecoder();
- byte[] decodedBytes = decoder.decode(
- placementSpecifications.getBytes(StandardCharsets.UTF_8));
- String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
- LOG.info("Decode placement spec: " + decodedSpec);
+ private void parsePlacementSpecs(String decodedSpec) {
Map pSpecs =
PlacementSpec.parse(decodedSpec);
this.placementSpecs = new HashMap<>();
- this.numTotalContainers = 0;
for (PlacementSpec pSpec : pSpecs.values()) {
- this.numTotalContainers += pSpec.numContainers;
+ this.numTotalContainers += pSpec.getNumContainers();
this.placementSpecs.put(pSpec.sourceTag, pSpec);
}
}
+ private String getDecodedPlacementSpec(String placementSpecifications) {
+ Base64.Decoder decoder = Base64.getDecoder();
+ byte[] decodedBytes = decoder.decode(
+ placementSpecifications.getBytes(StandardCharsets.UTF_8));
+ String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
+ LOG.info("Decode placement spec: " + decodedSpec);
+ return decodedSpec;
+ }
+
/**
* Helper function to print usage
*
@@ -798,6 +805,7 @@ public void run() throws YarnException, IOException, InterruptedException {
}
}
}
+
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl, placementConstraintMap);
@@ -845,14 +853,18 @@ public void run() throws YarnException, IOException, InterruptedException {
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
if (this.placementSpecs == null) {
+ LOG.info("placementSpecs null");
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
} else {
+ LOG.info("placementSpecs to create req:" + placementSpecs);
List schedReqs = new ArrayList<>();
for (PlacementSpec pSpec : this.placementSpecs.values()) {
- for (int i = 0; i < pSpec.numContainers; i++) {
+ LOG.info("placementSpec :" + pSpec + ", container:" + pSpec
+ .getNumContainers());
+ for (int i = 0; i < pSpec.getNumContainers(); i++) {
SchedulingRequest sr = setupSchedulingRequest(pSpec);
schedReqs.add(sr);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index c8a71b320c0..eb21b925f6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -90,6 +90,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -103,7 +104,7 @@
* the provided shell command on a set of containers.
*
* This client is meant to act as an example on how to write yarn-based applications.
- *
+ *
* To submit an application, a client first needs to connect to the ResourceManager
* aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol}
* provides a way for the client to get access to cluster information and to request for a
@@ -192,6 +193,8 @@
// Placement specification
private String placementSpec = "";
+ // Node Attribute specification
+ private String nodeAttributeSpec = "";
// log4j.properties file
// if available, add to local resources and set into classpath
private String log4jPropFile = "";
@@ -448,6 +451,7 @@ public boolean init(String[] args) throws ParseException {
// Check if it is parsable
PlacementSpec.parse(this.placementSpec);
}
+
appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
index 290925980a5..81824a6976f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
@@ -37,8 +37,8 @@
LoggerFactory.getLogger(PlacementSpec.class);
public final String sourceTag;
- public final int numContainers;
public final PlacementConstraint constraint;
+ private int numContainers;
public PlacementSpec(String sourceTag, int numContainers,
PlacementConstraint constraint) {
@@ -47,6 +47,22 @@ public PlacementSpec(String sourceTag, int numContainers,
this.constraint = constraint;
}
+ /**
+ * Get teh number of container for this spec
+ * @return container count
+ */
+ public int getNumContainers() {
+ return numContainers;
+ }
+
+ /**
+ * Set number of containers for this spec.
+ * @param numContainers number of containers.
+ */
+ public void setNumContainers(int numContainers) {
+ this.numContainers = numContainers;
+ }
+
// Placement specification should be of the form:
// PlacementSpec => ""|KeyVal;PlacementSpec
// KeyVal => SourceTag=Constraint
@@ -71,6 +87,7 @@ public PlacementSpec(String sourceTag, int numContainers,
public static Map parse(String specs)
throws IllegalArgumentException {
LOG.info("Parsing Placement Specs: [{}]", specs);
+
Map pSpecs = new HashMap<>();
Map parsed;
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java
index 926b6fa2793..447905e2b3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java
@@ -26,6 +26,7 @@
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
@@ -37,6 +38,7 @@
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TimedPlacementConstraint;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeOpCodeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintTargetProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SimplePlacementConstraintProto;
@@ -73,7 +75,8 @@ private SingleConstraint convert(SimplePlacementConstraintProto proto) {
}
return new SingleConstraint(proto.getScope(), proto.getMinCardinality(),
- proto.getMaxCardinality(), targets);
+ proto.getMaxCardinality(),
+ convertFromProtoFormat(proto.getAttributeOpCode()), targets);
}
private TargetExpression convert(PlacementConstraintTargetProto proto) {
@@ -113,4 +116,9 @@ private TimedPlacementConstraint convert(
return new TimedPlacementConstraint(pConstraint, proto.getSchedulingDelay(),
ProtoUtils.convertFromProtoFormat(proto.getDelayUnit()));
}
+
+ private static NodeAttributeOpCode convertFromProtoFormat(
+ NodeAttributeOpCodeProto p) {
+ return NodeAttributeOpCode.valueOf(p.name());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java
index 7816e181dd2..30f774136dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
@@ -34,6 +35,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto;
import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto.CompositeType;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeOpCodeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintTargetProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SimplePlacementConstraintProto;
@@ -72,6 +74,10 @@ public GeneratedMessage visit(SingleConstraint constraint) {
}
sb.setMinCardinality(constraint.getMinCardinality());
sb.setMaxCardinality(constraint.getMaxCardinality());
+ if (constraint.getNodeAttributeOpCode() != null) {
+ sb.setAttributeOpCode(
+ convertToProtoFormat(constraint.getNodeAttributeOpCode()));
+ }
if (constraint.getTargetExpressions() != null) {
for (TargetExpression target : constraint.getTargetExpressions()) {
sb.addTargetExpressions(
@@ -171,4 +177,9 @@ public GeneratedMessage visit(TimedPlacementConstraint constraint) {
return tb.build();
}
+
+ private static NodeAttributeOpCodeProto convertToProtoFormat(
+ NodeAttributeOpCode p) {
+ return NodeAttributeOpCodeProto.valueOf(p.name());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 2a869bddefa..831a1275c05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -509,9 +509,10 @@ protected RMNodeLabelsManager createNodeLabelManager()
return new RMNodeLabelsManager();
}
- protected NodeAttributesManager createNodeAttributesManager()
- throws InstantiationException, IllegalAccessException {
- return new NodeAttributesManagerImpl();
+ protected NodeAttributesManager createNodeAttributesManager() {
+ NodeAttributesManagerImpl namImpl = new NodeAttributesManagerImpl();
+ namImpl.setRMContext(rmContext);
+ return namImpl;
}
protected AllocationTagsManager createAllocationTagsManager() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index bf9de15a3bf..608e42dd523 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -56,6 +56,8 @@
import org.apache.hadoop.yarn.nodelabels.StringAttributeValue;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent;
import com.google.common.base.Strings;
@@ -91,6 +93,7 @@
private final ReadLock readLock;
private final WriteLock writeLock;
+ private RMContext rmContext = null;
public NodeAttributesManagerImpl() {
super("NodeAttributesManagerImpl");
@@ -130,7 +133,7 @@ protected void serviceStart() throws Exception {
}
protected void initNodeAttributeStore(Configuration conf) throws Exception {
- this.store =getAttributeStoreClass(conf);
+ this.store = getAttributeStoreClass(conf);
this.store.init(conf, this);
this.store.recover();
}
@@ -199,12 +202,30 @@ private void internalUpdateAttributesOnNodes(
LOG.debug(logMsg);
}
+ LOG.info(logMsg);
if (null != dispatcher && NodeAttribute.PREFIX_CENTRALIZED
.equals(attributePrefix)) {
dispatcher.getEventHandler()
.handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op));
}
+ // Map used to notify RM
+ Map> newNodeToAttributesMap =
+ new HashMap>();
+ nodeCollections.forEach((k, v) -> {
+ newNodeToAttributesMap.put(k, v.attributes.keySet());
+ });
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Publish events to RM for changed attributes:"
+ + newNodeToAttributesMap.values());
+ }
+ // Notify RM
+ if (rmContext != null && rmContext.getDispatcher() != null) {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
+ }
+
} finally {
writeLock.unlock();
}
@@ -702,4 +723,8 @@ protected void serviceStop() throws Exception {
store.close();
}
}
+
+ public void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 59771fdef78..20f554844ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
@@ -79,6 +80,8 @@
private volatile Set labels = null;
+ private volatile Set nodeAttributes = null;
+
// Last updated time
private volatile long lastHeartbeatMonotonicTime;
@@ -503,6 +506,14 @@ public int hashCode() {
return getNodeID().hashCode();
}
+ public Set getNodeAttributes() {
+ return nodeAttributes;
+ }
+
+ public void updateNodeAttributes(Set nodeAttributes) {
+ this.nodeAttributes = nodeAttributes;
+ }
+
private static class ContainerInfo {
private final RMContainer container;
private boolean launchedOnNode;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 37f56deb119..220135a7cc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -136,6 +137,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1728,6 +1730,14 @@ public void handle(SchedulerEvent event) {
updateNodeLabelsAndQueueResource(labelUpdateEvent);
}
break;
+ case NODE_ATTRIBUTES_UPDATE:
+ {
+ NodeAttributesUpdateSchedulerEvent attributeUpdateEvent =
+ (NodeAttributesUpdateSchedulerEvent) event;
+
+ updateNodeAttributes(attributeUpdateEvent);
+ }
+ break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
@@ -1861,6 +1871,30 @@ public void handle(SchedulerEvent event) {
}
}
+ private void updateNodeAttributes(
+ NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) {
+ try {
+ writeLock.lock();
+ for (Entry> entry : attributeUpdateEvent
+ .getUpdatedNodeToAttributes().entrySet()) {
+ String hostname = entry.getKey();
+ Set attributes = entry.getValue();
+ List nodeIds = nodeTracker.getNodeIdsByResourceName(hostname);
+ updateAttributesOnNode(nodeIds, attributes);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void updateAttributesOnNode(List nodeIds,
+ Set attributes) {
+ nodeIds.forEach((k) -> {
+ SchedulerNode node = nodeTracker.getNode(k);
+ node.updateNodeAttributes(attributes);
+ });
+ }
+
/**
* Process node labels update.
*/
@@ -2708,7 +2742,7 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
schedulingRequest, schedulerNode,
rmContext.getPlacementConstraintManager(),
rmContext.getAllocationTagsManager())) {
- LOG.debug("Failed to allocate container for application "
+ LOG.info("Failed to allocate container for application "
+ appAttempt.getApplicationId() + " on node "
+ schedulerNode.getNodeName()
+ " because this allocation violates the"
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index f47e1d4889d..0d2b78f11c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -24,8 +24,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
@@ -114,22 +113,89 @@ private static boolean canSatisfySingleConstraintExpression(
|| maxScopeCardinality <= desiredMaxCardinality);
}
- private static boolean canSatisfyNodePartitionConstraintExpresssion(
- TargetExpression targetExpression, SchedulerNode schedulerNode) {
+ private static boolean canSatisfyNodeConstraintExpresssion(
+ SingleConstraint sc, TargetExpression targetExpression,
+ SchedulerNode schedulerNode) {
Set values = targetExpression.getTargetValues();
- if (values == null || values.isEmpty()) {
- return schedulerNode.getPartition().equals(
- RMNodeLabelsManager.NO_LABEL);
- } else{
- String nodePartition = values.iterator().next();
- if (!nodePartition.equals(schedulerNode.getPartition())) {
+
+ if (targetExpression.getTargetKey().equals(NODE_PARTITION)) {
+ if (values == null || values.isEmpty()) {
+ return schedulerNode.getPartition()
+ .equals(RMNodeLabelsManager.NO_LABEL);
+ } else {
+ String nodePartition = values.iterator().next();
+ if (!nodePartition.equals(schedulerNode.getPartition())) {
+ return false;
+ }
+ }
+ } else {
+ NodeAttributeOpCode opCode = sc.getNodeAttributeOpCode();
+ // compare attributes.
+ String inputAttribute = values.iterator().next();
+ NodeAttribute requestAttribute = getNodeConstraintFromRequest(
+ targetExpression.getTargetKey(), inputAttribute);
+ if (requestAttribute == null) {
+ return true;
+ }
+
+ if (schedulerNode.getNodeAttributes() == null ||
+ !schedulerNode.getNodeAttributes().contains(requestAttribute)) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Incoming requestAttribute:" + requestAttribute
+ + "is not present in " + schedulerNode.getNodeID());
+ }
+ return false;
+ }
+ boolean found = false;
+ for (Iterator it = schedulerNode.getNodeAttributes()
+ .iterator(); it.hasNext(); ) {
+ NodeAttribute nodeAttribute = it.next();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Starting to compare Incoming requestAttribute :"
+ + requestAttribute
+ + " with requestAttribute value= " + requestAttribute
+ .getAttributeValue()
+ + ", stored nodeAttribute value=" + nodeAttribute
+ .getAttributeValue());
+ }
+ if (requestAttribute.equals(nodeAttribute)) {
+ if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Incoming requestAttribute:" + requestAttribute
+ + " matches with node:" + schedulerNode.getNodeID());
+ }
+ found = true;
+ return found;
+ }
+ }
+ }
+ if (!found) {
+ LOG.info("skip this node for requestAttribute:" + requestAttribute);
return false;
}
}
-
return true;
}
+ private static boolean isOpCodeMatches(NodeAttribute requestAttribute,
+ NodeAttribute nodeAttribute, NodeAttributeOpCode opCode) {
+ boolean retCode = false;
+ switch (opCode) {
+ case EQ:
+ retCode = requestAttribute.getAttributeValue()
+ .equals(nodeAttribute.getAttributeValue());
+ break;
+ case NE:
+ retCode = !(requestAttribute.getAttributeValue()
+ .equals(nodeAttribute.getAttributeValue()));
+ break;
+ default:
+ break;
+ }
+ return retCode;
+ }
+
private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
SingleConstraint singleConstraint, SchedulerNode schedulerNode,
AllocationTagsManager tagsManager)
@@ -146,10 +212,11 @@ private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
singleConstraint, currentExp, schedulerNode, tagsManager)) {
return false;
}
- } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)
- && currentExp.getTargetKey().equals(NODE_PARTITION)) {
- // This is a node partition expression, check it.
- canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode);
+ } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) {
+ // This is a node attribute expression, check it.
+ if(!canSatisfyNodeConstraintExpresssion(singleConstraint, currentExp, schedulerNode)) {
+ return false;
+ }
}
}
// return true if all targetExpressions are satisfied
@@ -203,6 +270,11 @@ private static boolean canSatisfyConstraints(ApplicationId appId,
AllocationTagsManager atm)
throws InvalidAllocationTagsQueryException {
if (constraint == null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Constraint is found empty during constraint validation for app:"
+ + appId);
+ }
return true;
}
@@ -263,4 +335,20 @@ public static boolean canSatisfyConstraints(ApplicationId applicationId,
pcm.getMultilevelConstraint(applicationId, sourceTags, pc),
schedulerNode, atm);
}
+
+ private static NodeAttribute getNodeConstraintFromRequest(String attrKey, String attrString) {
+ NodeAttribute nodeAttribute = null;
+ LOG.info("Incoming node attribute: " + attrKey + "=" + attrString);
+ // Input node attribute could be like 1.8
+ String[] name = attrKey.split("/");
+ if(name == null || name.length == 1) {
+ nodeAttribute = NodeAttribute
+ .newInstance(attrKey, NodeAttributeType.STRING, attrString);
+ } else {
+ nodeAttribute = NodeAttribute
+ .newInstance(name[0], name[1], NodeAttributeType.STRING, attrString);
+ }
+
+ return nodeAttribute;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java
new file mode 100644
index 00000000000..2a3effbde8f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class NodeAttributesUpdateSchedulerEvent extends SchedulerEvent {
+ private Map> nodeToAttributes;
+
+ public NodeAttributesUpdateSchedulerEvent(
+ Map> newNodeToAttributesMap) {
+ super(SchedulerEventType.NODE_ATTRIBUTES_UPDATE);
+ this.nodeToAttributes = newNodeToAttributesMap;
+ }
+
+ public Map> getUpdatedNodeToAttributes() {
+ return nodeToAttributes;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index b107cf4ee61..869bf0ed9e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -26,6 +26,7 @@
NODE_UPDATE,
NODE_RESOURCE_UPDATE,
NODE_LABELS_UPDATE,
+ NODE_ATTRIBUTES_UPDATE,
// Source: RMApp
APP_ADDED,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index e1239a9db34..b35cbb63ec9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -366,6 +366,10 @@ public boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode) {
// We will only look at node label = nodeLabelToLookAt according to
// schedulingMode and partition of node.
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("precheckNode is invoked for " + schedulerNode.getNodeID() + ","
+ + schedulingMode);
+ }
String nodePartitionToLookAt;
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
nodePartitionToLookAt = schedulerNode.getPartition();
--
2.15.2 (Apple Git-101.1)