diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java new file mode 100644 index 00000000000..8f3e28c815e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util.constraint; + +/** + * Exception when the placement constraint parser fails to parse an expression. + */ +public class PlacementConstraintParseException extends Exception { + + public PlacementConstraintParseException(String msg) { + super(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java new file mode 100644 index 00000000000..603e692bf49 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java @@ -0,0 +1,615 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util.constraint; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; + +import java.util.ArrayList; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.StringTokenizer; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; +import java.util.Stack; +import java.util.Set; +import java.util.TreeSet; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Placement constraint expression parser. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public final class PlacementConstraintParser { + + private static final char EXPRESSION_DELIM = ':'; + private static final char KV_SPLIT_DELIM = '='; + private static final char EXPRESSION_VAL_DELIM = ','; + private static final char BRACKET_START = '('; + private static final char BRACKET_END = ')'; + private static final String IN = "in"; + private static final String NOT_IN = "notin"; + private static final String AND = "and"; + private static final String OR = "or"; + private static final String CARDINALITY = "cardinality"; + private static final String SCOPE_NODE = PlacementConstraints.NODE; + private static final String SCOPE_RACK = PlacementConstraints.RACK; + + private PlacementConstraintParser() { + // Private constructor for this utility class. + } + + /** + * Constraint Parser used to parse placement constraints from a + * given expression. + */ + public static abstract class ConstraintParser { + + private final ConstraintTokenizer tokenizer; + + public ConstraintParser(ConstraintTokenizer tk){ + this.tokenizer = tk; + } + + void validate() throws PlacementConstraintParseException { + tokenizer.validate(); + } + + void shouldHaveNext() + throws PlacementConstraintParseException { + if (!tokenizer.hasMoreElements()) { + throw new PlacementConstraintParseException("Expecting more tokens"); + } + } + + String nextToken() { + return this.tokenizer.nextElement().trim(); + } + + boolean hasMoreTokens() { + return this.tokenizer.hasMoreElements(); + } + + int toInt(String name) throws PlacementConstraintParseException { + try { + return Integer.parseInt(name); + } catch (NumberFormatException e) { + throw new PlacementConstraintParseException( + "Expecting an Integer, but get " + name); + } + } + + String parseScope(String scopeString) + throws PlacementConstraintParseException { + if (scopeString.equalsIgnoreCase(SCOPE_NODE)) { + return SCOPE_NODE; + } else if (scopeString.equalsIgnoreCase(SCOPE_RACK)) { + return SCOPE_RACK; + } else { + throw new PlacementConstraintParseException( + "expecting scope to " + SCOPE_NODE + " or " + SCOPE_RACK + + ", but met " + scopeString); + } + } + + public AbstractConstraint tryParse() { + try { + return parse(); + } catch (PlacementConstraintParseException e) { + // unable to parse, simply return null + return null; + } + } + + public abstract AbstractConstraint parse() + throws PlacementConstraintParseException; + } + + /** + * Tokenizer interface that used to parse an expression. It first + * validates if the syntax of the given expression is valid, then traverse + * the expression and parse it to an enumeration of strings. Each parsed + * string can be further consumed by a {@link ConstraintParser} and + * transformed to a {@link AbstractConstraint}. + */ + public interface ConstraintTokenizer extends Enumeration { + + /** + * Validate the schema before actual parsing the expression. + * @throws PlacementConstraintParseException + */ + default void validate() throws PlacementConstraintParseException { + // do nothing + } + } + + /** + * A basic tokenizer that splits an expression by a given delimiter. + */ + public static class BaseStringTokenizer implements ConstraintTokenizer { + private final StringTokenizer tokenizer; + BaseStringTokenizer(String expr, String delimiter) { + this.tokenizer = new StringTokenizer(expr, delimiter); + } + + @Override + public boolean hasMoreElements() { + return tokenizer.hasMoreTokens(); + } + + @Override + public String nextElement() { + return tokenizer.nextToken(); + } + } + + /** + * Tokenizer used to parse conjunction form of a constraint expression, + * [AND|OR](C1:C2:...:Cn). Each Cn is a constraint expression. + */ + public static final class ConjunctionTokenizer + implements ConstraintTokenizer { + + private final String expression; + private Iterator iterator; + + private ConjunctionTokenizer(String expr) { + this.expression = expr; + } + + // Traverse the expression and try to get a list of parsed elements + // based on schema. + @Override + public void validate() throws PlacementConstraintParseException { + List parsedElements = new ArrayList<>(); + // expression should start with AND or OR + String op; + if (expression.startsWith(AND) || + expression.startsWith(AND.toUpperCase())) { + op = AND; + } else if(expression.startsWith(OR) || + expression.startsWith(OR.toUpperCase())) { + op = OR; + } else { + throw new PlacementConstraintParseException( + "Excepting starting with \"" + AND + "\" or \"" + OR + "\"," + + " but met " + expression); + } + parsedElements.add(op); + Pattern p = Pattern.compile("\\((.*)\\)"); + Matcher m = p.matcher(expression); + if (!m.find()) { + throw new PlacementConstraintParseException("Unexpected format," + + " expecting [AND|OR](A:B...) " + + "but current expression is " + expression); + } + String childStrs = m.group(1); + MultipleConstraintsTokenizer ct = + new MultipleConstraintsTokenizer(childStrs); + ct.validate(); + while(ct.hasMoreElements()) { + parsedElements.add(ct.nextElement()); + } + this.iterator = parsedElements.iterator(); + } + + @Override + public boolean hasMoreElements() { + return iterator.hasNext(); + } + + @Override + public String nextElement() { + return iterator.next(); + } + } + + /** + * Tokenizer used to parse allocation tags expression, which should be + * in tag=numOfAllocations syntax. + */ + public static class SourceTagsTokenizer implements ConstraintTokenizer { + + private final String expression; + private StringTokenizer st; + private Iterator iterator; + public SourceTagsTokenizer(String expr) { + this.expression = expr; + st = new StringTokenizer(expr, String.valueOf(KV_SPLIT_DELIM)); + } + + @Override + public void validate() throws PlacementConstraintParseException { + ArrayList parsedValues = new ArrayList<>(); + if (st.countTokens() != 2) { + throw new PlacementConstraintParseException( + "Expecting source allocation tag to be specified" + + " sourceTag=numOfAllocations syntax," + + " but met " + expression); + } + + String sourceTag = st.nextToken(); + parsedValues.add(sourceTag); + String num = st.nextToken(); + try { + Integer.parseInt(num); + parsedValues.add(num); + } catch (NumberFormatException e) { + throw new PlacementConstraintParseException("Value of the expression" + + " must be an integer, but met " + num); + } + iterator = parsedValues.iterator(); + } + + @Override + public boolean hasMoreElements() { + return iterator.hasNext(); + } + + @Override + public String nextElement() { + return iterator.next(); + } + } + + /** + * Tokenizer used to handle a placement spec composed by multiple + * constraint expressions. Each of them is delimited with the + * given delimiter, e.g ':'. + */ + public static class MultipleConstraintsTokenizer + implements ConstraintTokenizer { + + private final String expr; + private Iterator iterator; + + public MultipleConstraintsTokenizer(String expression) { + this.expr = expression; + } + + @Override + public void validate() throws PlacementConstraintParseException { + ArrayList parsedElements = new ArrayList<>(); + char[] arr = expr.toCharArray(); + // Memorize the location of each delimiter in a stack, + // removes invalid delimiters that embraced in brackets. + Stack stack = new Stack<>(); + for (int i=0; i it = stack.iterator(); + int currentPos = 0; + while (it.hasNext()) { + int pos = it.next(); + String sub = expr.substring(currentPos, pos); + if (sub != null && !sub.isEmpty()) { + parsedElements.add(sub); + } + currentPos = pos+1; + } + if (currentPos < expr.length()) { + parsedElements.add(expr.substring(currentPos, expr.length())); + } + } + iterator = parsedElements.iterator(); + } + + @Override + public boolean hasMoreElements() { + return iterator.hasNext(); + } + + @Override + public String nextElement() { + return iterator.next(); + } + } + + /** + * Constraint parser used to parse a given target expression, such as + * "NOTIN, NODE, foo, bar". + */ + public static class TargetConstraintParser extends ConstraintParser { + + public TargetConstraintParser(String expression) { + super(new BaseStringTokenizer(expression, + String.valueOf(EXPRESSION_VAL_DELIM))); + } + + @Override + public AbstractConstraint parse() + throws PlacementConstraintParseException { + PlacementConstraint.AbstractConstraint placementConstraints; + String op = nextToken(); + if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) { + String scope = nextToken(); + scope = parseScope(scope); + + Set allocationTags = new TreeSet<>(); + while(hasMoreTokens()) { + String tag = nextToken(); + allocationTags.add(tag); + } + PlacementConstraint.TargetExpression target = + PlacementConstraints.PlacementTargets.allocationTag( + allocationTags.toArray(new String[allocationTags.size()])); + if (op.equalsIgnoreCase(IN)) { + placementConstraints = PlacementConstraints + .targetIn(scope, target); + } else { + placementConstraints = PlacementConstraints + .targetNotIn(scope, target); + } + } else { + throw new PlacementConstraintParseException( + "expecting " + IN + " or " + NOT_IN + ", but get " + op); + } + return placementConstraints; + } + } + + /** + * Constraint parser used to parse a given target expression, such as + * "cardinality, NODE, foo, 0, 1". + */ + public static class CardinalityConstraintParser extends ConstraintParser { + + public CardinalityConstraintParser(String expr) { + super(new BaseStringTokenizer(expr, + String.valueOf(EXPRESSION_VAL_DELIM))); + } + + @Override + public AbstractConstraint parse() + throws PlacementConstraintParseException { + String op = nextToken(); + if (!op.equalsIgnoreCase(CARDINALITY)) { + throw new PlacementConstraintParseException("expecting " + CARDINALITY + + " , but met " + op); + } + + shouldHaveNext(); + String scope = nextToken(); + scope = parseScope(scope); + + Stack resetElements = new Stack<>(); + while(hasMoreTokens()) { + resetElements.add(nextToken()); + } + + // At least 3 elements + if (resetElements.size() < 3) { + throw new PlacementConstraintParseException( + "Invalid syntax for a cardinality expression, expecting" + + " \"cardinality,SCOPE,TARGET_TAG,...,TARGET_TAG," + + "MIN_CARDINALITY,MAX_CARDINALITY\" at least 5 elements," + + " but only " + (resetElements.size() + 2) + " is given."); + } + + String maxCardinalityStr = resetElements.pop(); + Integer max = toInt(maxCardinalityStr); + + String minCardinalityStr = resetElements.pop(); + Integer min = toInt(minCardinalityStr); + + ArrayList targetTags = new ArrayList<>(); + while (!resetElements.empty()) { + targetTags.add(resetElements.pop()); + } + + return PlacementConstraints.cardinality(scope, min, max, + targetTags.toArray(new String[targetTags.size()])); + } + } + + /** + * Parser used to parse conjunction form of constraints, such as + * AND(A, ..., B), OR(A, ..., B). + */ + public static class ConjunctionConstraintParser extends ConstraintParser { + + public ConjunctionConstraintParser(String expr) { + super(new ConjunctionTokenizer(expr)); + } + + @Override + public AbstractConstraint parse() throws PlacementConstraintParseException { + // do pre-process, validate input. + validate(); + String op = nextToken(); + shouldHaveNext(); + List constraints = new ArrayList<>(); + while(hasMoreTokens()) { + // each child expression can be any valid form of + // constraint expressions. + String constraintStr = nextToken(); + AbstractConstraint constraint = parseExpression(constraintStr); + constraints.add(constraint); + } + if (AND.equalsIgnoreCase(op)) { + return PlacementConstraints.and( + constraints.toArray( + new AbstractConstraint[constraints.size()])); + } else if (OR.equalsIgnoreCase(op)) { + return PlacementConstraints.or( + constraints.toArray( + new AbstractConstraint[constraints.size()])); + } else { + throw new PlacementConstraintParseException( + "Unexpected conjunction operator : " + op + + ", expecting " + AND + " or " + OR); + } + } + } + + /** + * A helper class to encapsulate source tags and allocations in the + * placement specification. + */ + public static final class SourceTags { + private String tag; + private int num; + + private SourceTags(String sourceTag, int number) { + this.tag = sourceTag; + this.num = number; + } + + public String getTag() { + return this.tag; + } + + public int getNumOfAllocations() { + return this.num; + } + + /** + * Parses source tags from expression "sourceTags=numOfAllocations". + * @param expr + * @return source tags, see {@link SourceTags} + * @throws PlacementConstraintParseException + */ + public static SourceTags parseFrom(String expr) + throws PlacementConstraintParseException { + SourceTagsTokenizer stt = new SourceTagsTokenizer(expr); + stt.validate(); + + // During validation we already checked the number of parsed elements. + String allocTag = stt.nextElement(); + int allocNum = Integer.parseInt(stt.nextElement()); + return new SourceTags(allocTag, allocNum); + } + } + + /** + * Parses a given constraint expression to a {@link AbstractConstraint}, + * this expression can be any valid form of constraint expressions. + * + * @param constraintStr expression string + * @return a parsed {@link AbstractConstraint} + * @throws PlacementConstraintParseException when given expression + * is malformed + */ + public static AbstractConstraint parseExpression(String constraintStr) + throws PlacementConstraintParseException { + // Try parse given expression with all allowed constraint parsers, + // fails if no one could parse it. + TargetConstraintParser tp = new TargetConstraintParser(constraintStr); + Optional constraintOptional = + Optional.ofNullable(tp.tryParse()); + if (!constraintOptional.isPresent()) { + CardinalityConstraintParser cp = + new CardinalityConstraintParser(constraintStr); + constraintOptional = Optional.ofNullable(cp.tryParse()); + if (!constraintOptional.isPresent()) { + ConjunctionConstraintParser jp = + new ConjunctionConstraintParser(constraintStr); + constraintOptional = Optional.ofNullable(jp.tryParse()); + } + if (!constraintOptional.isPresent()) { + throw new PlacementConstraintParseException( + "Invalid constraint expression " + constraintStr); + } + } + return constraintOptional.get(); + } + + /** + * Parses a placement constraint specification. A placement constraint spec + * is a composite expression which is composed by multiple sub constraint + * expressions delimited by ":". With following syntax: + * + *

Tag1=N1,P1:Tag2=N2,P2:...:TagN=Nn,Pn

+ * + * where TagN=Nn is a key value pair to determine the source + * allocation tag and the number of allocations, such as: + * + *

foo=3

+ * + * and where Pn can be any form of a valid constraint expression, + * such as: + * + *
    + *
  • in,node,foo,bar
  • + *
  • notin,node,foo,bar,1,2
  • + *
  • and(notin,node,foo:notin,node,bar)
  • + *
+ * @param expression expression string. + * @return a map of source tags to placement constraint mapping. + * @throws PlacementConstraintParseException + */ + public static Map parsePlacementSpec( + String expression) throws PlacementConstraintParseException { + // Respect insertion order. + Map result = new LinkedHashMap<>(); + PlacementConstraintParser.ConstraintTokenizer tokenizer = + new PlacementConstraintParser.MultipleConstraintsTokenizer(expression); + tokenizer.validate(); + while(tokenizer.hasMoreElements()) { + String specStr = tokenizer.nextElement(); + // each spec starts with sourceAllocationTag=numOfContainers and + // followed by a constraint expression. + // foo=4,Pn + String[] splitted = specStr.split( + String.valueOf(EXPRESSION_VAL_DELIM), 2); + if (splitted.length != 2) { + throw new PlacementConstraintParseException( + "Unexpected placement constraint expression " + specStr); + } + + String tagAlloc = splitted[0]; + SourceTags st = SourceTags.parseFrom(tagAlloc); + String exprs = splitted[1]; + AbstractConstraint constraint = + PlacementConstraintParser.parseExpression(exprs); + + result.put(st, constraint.build()); + } + + return result; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java new file mode 100644 index 00000000000..890d5ecf6d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package org.apache.hadoop.yarn.util.constraint contains classes + * which is used as utility class for placement constraints. + */ +package org.apache.hadoop.yarn.util.constraint; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java new file mode 100644 index 00000000000..941f9716dc2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.resource; + +import com.google.common.collect.Sets; + +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.TargetConstraintParser; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintParser; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.CardinalityConstraintParser; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConjunctionConstraintParser; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.MultipleConstraintsTokenizer; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTagsTokenizer; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintTokenizer; + +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.*; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Class to test placement constraint parser. + */ +public class TestPlacementConstraintParser { + + @Test + public void testTargetExpressionParser() + throws PlacementConstraintParseException { + ConstraintParser parser; + AbstractConstraint constraint; + SingleConstraint single; + + // Anti-affinity with single target tag + // NOTIN,NDOE,foo + parser = new TargetConstraintParser("NOTIN, NODE, foo"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("node", single.getScope()); + Assert.assertEquals(0, single.getMinCardinality()); + Assert.assertEquals(0, single.getMaxCardinality()); + + // lower cases is also valid + parser = new TargetConstraintParser("notin, node, foo"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("node", single.getScope()); + Assert.assertEquals(0, single.getMinCardinality()); + Assert.assertEquals(0, single.getMaxCardinality()); + + // Affinity with single target tag + // IN,NODE,foo + parser = new TargetConstraintParser("IN, NODE, foo"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("node", single.getScope()); + Assert.assertEquals(1, single.getMinCardinality()); + Assert.assertEquals(Integer.MAX_VALUE, single.getMaxCardinality()); + + // Anti-affinity with multiple target tags + // NOTIN,NDOE,foo,bar,exp + parser = new TargetConstraintParser("NOTIN, NODE, foo, bar, exp"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("node", single.getScope()); + Assert.assertEquals(0, single.getMinCardinality()); + Assert.assertEquals(0, single.getMaxCardinality()); + Assert.assertEquals(1, single.getTargetExpressions().size()); + TargetExpression exp = + single.getTargetExpressions().iterator().next(); + Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString()); + Assert.assertEquals(3, exp.getTargetValues().size()); + + // Invalid OP + parser = new TargetConstraintParser("XYZ, NODE, foo"); + try { + parser.parse(); + } catch (Exception e) { + Assert.assertTrue(e instanceof PlacementConstraintParseException); + Assert.assertTrue(e.getMessage().contains("expecting in or notin")); + } + } + + @Test + public void testCardinalityConstraintParser() + throws PlacementConstraintParseException { + ConstraintParser parser; + AbstractConstraint constraint; + SingleConstraint single; + + // cardinality,NODE,foo,0,1 + parser = new CardinalityConstraintParser("cardinality, NODE, foo, 0, 1"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("node", single.getScope()); + Assert.assertEquals(0, single.getMinCardinality()); + Assert.assertEquals(1, single.getMaxCardinality()); + Assert.assertEquals(1, single.getTargetExpressions().size()); + TargetExpression exp = + single.getTargetExpressions().iterator().next(); + Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString()); + Assert.assertEquals(1, exp.getTargetValues().size()); + Assert.assertEquals("foo", exp.getTargetValues().iterator().next()); + + // cardinality,NODE,foo,bar,moo,0,1 + parser = new CardinalityConstraintParser( + "cardinality,RACK,foo,bar,moo,0,1"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("rack", single.getScope()); + Assert.assertEquals(0, single.getMinCardinality()); + Assert.assertEquals(1, single.getMaxCardinality()); + Assert.assertEquals(1, single.getTargetExpressions().size()); + exp = single.getTargetExpressions().iterator().next(); + Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString()); + Assert.assertEquals(3, exp.getTargetValues().size()); + Set expectedTags = Sets.newHashSet("foo", "bar", "moo"); + Assert.assertTrue(Sets.difference(expectedTags, exp.getTargetValues()) + .isEmpty()); + + // Invalid scope string + try { + parser = new CardinalityConstraintParser( + "cardinality,NOWHERE,foo,bar,moo,0,1"); + parser.parse(); + Assert.fail("Expecting a parsing failure!"); + } catch (PlacementConstraintParseException e) { + Assert.assertTrue(e.getMessage() + .contains("expecting scope to node or rack, but met NOWHERE")); + } + + // Invalid number of expression elements + try { + parser = new CardinalityConstraintParser( + "cardinality,NODE,0,1"); + parser.parse(); + Assert.fail("Expecting a parsing failure!"); + } catch (PlacementConstraintParseException e) { + Assert.assertTrue(e.getMessage() + .contains("at least 5 elements, but only 4 is given")); + } + } + + @Test + public void testAndConstraintParser() + throws PlacementConstraintParseException { + ConstraintParser parser; + AbstractConstraint constraint; + And and; + + parser = new ConjunctionConstraintParser( + "AND(NOTIN,NODE,foo:NOTIN,NODE,bar)"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof And); + and = (And) constraint; + Assert.assertEquals(2, and.getChildren().size()); + + parser = new ConjunctionConstraintParser( + "AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1)"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof And); + Assert.assertEquals(2, and.getChildren().size()); + + parser = new ConjunctionConstraintParser( + "AND(NOTIN,NODE,foo:AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1))"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof And); + and = (And) constraint; + Assert.assertTrue(and.getChildren().get(0) instanceof SingleConstraint); + Assert.assertTrue(and.getChildren().get(1) instanceof And); + and = (And) and.getChildren().get(1); + Assert.assertEquals(2, and.getChildren().size()); + } + + @Test + public void testMultipleConstraintsTokenizer() + throws PlacementConstraintParseException { + MultipleConstraintsTokenizer ct; + SourceTagsTokenizer st; + TokenizerTester mp; + + ct = new MultipleConstraintsTokenizer( + "foo=1,A1,A2,A3:bar=2,B1,B2:moo=3,C1,C2"); + mp = new TokenizerTester(ct, + "foo=1,A1,A2,A3", "bar=2,B1,B2", "moo=3,C1,C2"); + mp.verify(); + + ct = new MultipleConstraintsTokenizer( + "foo=1,AND(A2:A3):bar=2,OR(B1:AND(B2:B3)):moo=3,C1,C2"); + mp = new TokenizerTester(ct, + "foo=1,AND(A2:A3)", "bar=2,OR(B1:AND(B2:B3))", "moo=3,C1,C2"); + mp.verify(); + + ct = new MultipleConstraintsTokenizer("A:B:C"); + mp = new TokenizerTester(ct, "A", "B", "C"); + mp.verify(); + + ct = new MultipleConstraintsTokenizer("A:AND(B:C):D"); + mp = new TokenizerTester(ct, "A", "AND(B:C)", "D"); + mp.verify(); + + ct = new MultipleConstraintsTokenizer("A:AND(B:OR(C:D)):E"); + mp = new TokenizerTester(ct, "A", "AND(B:OR(C:D))", "E"); + mp.verify(); + + ct = new MultipleConstraintsTokenizer("A:AND(B:OR(C:D)):E"); + mp = new TokenizerTester(ct, "A", "AND(B:OR(C:D))", "E"); + mp.verify(); + + st = new SourceTagsTokenizer("A=4"); + mp = new TokenizerTester(st, "A", "4"); + mp.verify(); + + try { + st = new SourceTagsTokenizer("A=B"); + mp = new TokenizerTester(st, "A", "B"); + mp.verify(); + Assert.fail("Expecting a parsing failure"); + } catch (PlacementConstraintParseException e) { + Assert.assertTrue(e.getMessage() + .contains("Value of the expression must be an integer")); + } + } + + private static class TokenizerTester { + + private ConstraintTokenizer tokenizer; + private String[] expectedExtractions; + + protected TokenizerTester(ConstraintTokenizer tk, + String... expctedStrings) { + this.tokenizer = tk; + this.expectedExtractions = expctedStrings; + } + + void verify() + throws PlacementConstraintParseException { + tokenizer.validate(); + int i = 0; + while (tokenizer.hasMoreElements()) { + String current = tokenizer.nextElement(); + Assert.assertTrue(i < expectedExtractions.length); + Assert.assertEquals(expectedExtractions[i], current); + i++; + } + } + } + + @Test + public void testParsePlacementSpec() + throws PlacementConstraintParseException { + Map result; + PlacementConstraint expectedPc1, expectedPc2; + PlacementConstraint actualPc1, actualPc2; + SourceTags tag1, tag2; + + // A single anti-affinity constraint + result = PlacementConstraintParser + .parsePlacementSpec("foo=3,notin,node,foo"); + Assert.assertEquals(1, result.size()); + tag1 = result.keySet().iterator().next(); + Assert.assertEquals("foo", tag1.getTag()); + Assert.assertEquals(3, tag1.getNumOfAllocations()); + expectedPc1 = targetNotIn("node", allocationTag("foo")).build(); + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1); + + // Upper case + result = PlacementConstraintParser + .parsePlacementSpec("foo=3,NOTIN,NODE,foo"); + Assert.assertEquals(1, result.size()); + tag1 = result.keySet().iterator().next(); + Assert.assertEquals("foo", tag1.getTag()); + Assert.assertEquals(3, tag1.getNumOfAllocations()); + expectedPc1 = targetNotIn("node", allocationTag("foo")).build(); + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1); + + // A single cardinality constraint + result = PlacementConstraintParser + .parsePlacementSpec("foo=10,cardinality,node,foo,bar,0,100"); + Assert.assertEquals(1, result.size()); + tag1 = result.keySet().iterator().next(); + Assert.assertEquals("foo", tag1.getTag()); + Assert.assertEquals(10, tag1.getNumOfAllocations()); + expectedPc1 = cardinality("node", 0, 100, "foo", "bar").build(); + Assert.assertEquals(expectedPc1, result.values().iterator().next()); + + // Two constraint expressions + result = PlacementConstraintParser + .parsePlacementSpec("foo=3,notin,node,foo:bar=2,in,node,foo"); + Assert.assertEquals(2, result.size()); + Iterator keyIt = result.keySet().iterator(); + tag1 = keyIt.next(); + Assert.assertEquals("foo", tag1.getTag()); + Assert.assertEquals(3, tag1.getNumOfAllocations()); + tag2 = keyIt.next(); + Assert.assertEquals("bar", tag2.getTag()); + Assert.assertEquals(2, tag2.getNumOfAllocations()); + Iterator valueIt = result.values().iterator(); + expectedPc1 = targetNotIn("node", allocationTag("foo")).build(); + expectedPc2 = targetIn("node", allocationTag("foo")).build(); + Assert.assertEquals(expectedPc1, valueIt.next()); + Assert.assertEquals(expectedPc2, valueIt.next()); + + // And constraint + result = PlacementConstraintParser + .parsePlacementSpec("foo=1000,and(notin,node,bar:in,node,foo)"); + Assert.assertEquals(1, result.size()); + keyIt = result.keySet().iterator(); + tag1 = keyIt.next(); + Assert.assertEquals("foo", tag1.getTag()); + Assert.assertEquals(1000, tag1.getNumOfAllocations()); + actualPc1 = result.values().iterator().next(); + expectedPc1 = and(targetNotIn("node", allocationTag("bar")), + targetIn("node", allocationTag("foo"))).build(); + Assert.assertEquals(expectedPc1, actualPc1); + + // Multiple constraints with nested forms. + result = PlacementConstraintParser.parsePlacementSpec( + "foo=1000,and(notin,node,bar:or(in,node,foo:in,node,moo))" + + ":bar=200,notin,node,foo"); + Assert.assertEquals(2, result.size()); + keyIt = result.keySet().iterator(); + tag1 = keyIt.next(); + tag2 = keyIt.next(); + Assert.assertEquals("foo", tag1.getTag()); + Assert.assertEquals(1000, tag1.getNumOfAllocations()); + Assert.assertEquals("bar", tag2.getTag()); + Assert.assertEquals(200, tag2.getNumOfAllocations()); + valueIt = result.values().iterator(); + actualPc1 = valueIt.next(); + actualPc2 = valueIt.next(); + + expectedPc1 = and(targetNotIn("node", allocationTag("bar")), + or(targetIn("node", allocationTag("foo")), + targetIn("node", allocationTag("moo")))).build(); + Assert.assertEquals(actualPc1, expectedPc1); + expectedPc2 = targetNotIn("node", allocationTag("foo")).build(); + Assert.assertEquals(expectedPc2, actualPc2); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 9ba2138d3ac..a06ee7c6d36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -28,6 +28,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; @@ -38,6 +39,7 @@ import java.util.Map; import java.util.Set; import java.util.Vector; +import java.util.Base64; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -671,8 +673,14 @@ public boolean init(String[] args) throws ParseException, IOException { } private void parsePlacementSpecs(String placementSpecifications) { + // Client sends placement spec in encoded format + Base64.Decoder decoder = Base64.getDecoder(); + byte[] decodedBytes = decoder.decode( + placementSpecifications.getBytes(StandardCharsets.UTF_8)); + String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8); + LOG.info("Decode placement spec: " + decodedSpec); Map pSpecs = - PlacementSpec.parse(placementSpecifications); + PlacementSpec.parse(decodedSpec); this.placementSpecs = new HashMap<>(); this.numTotalContainers = 0; for (PlacementSpec pSpec : pSpecs.values()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 2aafa942a90..7a84c5d527c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -28,6 +29,7 @@ import java.util.Set; import java.util.Vector; import java.util.Arrays; +import java.util.Base64; import com.google.common.base.Joiner; import org.apache.commons.cli.CommandLine; @@ -846,7 +848,11 @@ public boolean run() throws IOException, YarnException { } vargs.add("--num_containers " + String.valueOf(numContainers)); if (placementSpec != null && placementSpec.length() > 0) { - vargs.add("--placement_spec " + placementSpec); + // Encode the spec to avoid passing special chars via shell arguments. + String encodedSpec = Base64.getEncoder() + .encodeToString(placementSpec.getBytes(StandardCharsets.UTF_8)); + LOG.info("Encode placement spec: " + encodedSpec); + vargs.add("--placement_spec " + encodedSpec); } if (null != nodeLabelExpression) { appContext.setNodeLabelExpression(nodeLabelExpression); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java index ed13ee0aa93..290925980a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java @@ -18,13 +18,14 @@ package org.apache.hadoop.yarn.applications.distributedshell; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; -import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import java.util.Scanner; /** * Class encapsulating a SourceTag, number of container and a Placement @@ -34,12 +35,6 @@ private static final Logger LOG = LoggerFactory.getLogger(PlacementSpec.class); - private static final String SPEC_DELIM = ":"; - private static final String KV_SPLIT_DELIM = "="; - private static final String SPEC_VAL_DELIM = ","; - private static final String IN = "in"; - private static final String NOT_IN = "notin"; - private static final String CARDINALITY = "cardinality"; public final String sourceTag; public final int numContainers; @@ -73,65 +68,28 @@ public PlacementSpec(String sourceTag, int numContainers, * @param specs Placement spec. * @return Mapping from source tag to placement constraint. */ - public static Map parse(String specs) { + public static Map parse(String specs) + throws IllegalArgumentException { LOG.info("Parsing Placement Specs: [{}]", specs); - Scanner s = new Scanner(specs).useDelimiter(SPEC_DELIM); Map pSpecs = new HashMap<>(); - while (s.hasNext()) { - String sp = s.next(); - LOG.info("Parsing Spec: [{}]", sp); - String[] specSplit = sp.split(KV_SPLIT_DELIM); - String sourceTag = specSplit[0]; - Scanner ps = new Scanner(specSplit[1]).useDelimiter(SPEC_VAL_DELIM); - int numContainers = ps.nextInt(); - if (!ps.hasNext()) { - pSpecs.put(sourceTag, - new PlacementSpec(sourceTag, numContainers, null)); - LOG.info("Creating Spec without constraint {}: num[{}]", - sourceTag, numContainers); - continue; + Map parsed; + try { + parsed = PlacementConstraintParser.parsePlacementSpec(specs); + for (Map.Entry entry : + parsed.entrySet()) { + LOG.info("Parsed source tag: {}, number of allocations: {}", + entry.getKey().getTag(), entry.getKey().getNumOfAllocations()); + LOG.info("Parsed constraint: {}", entry.getValue() + .getConstraintExpr().getClass().getSimpleName()); + pSpecs.put(entry.getKey().getTag(), new PlacementSpec( + entry.getKey().getTag(), + entry.getKey().getNumOfAllocations(), + entry.getValue())); } - String cType = ps.next().toLowerCase(); - String scope = ps.next().toLowerCase(); - - String targetTag = ps.next(); - scope = scope.equals("rack") ? PlacementConstraints.RACK : - PlacementConstraints.NODE; - - PlacementConstraint pc; - if (cType.equals(IN)) { - pc = PlacementConstraints.build( - PlacementConstraints.targetIn(scope, - PlacementConstraints.PlacementTargets.allocationTag( - targetTag))); - LOG.info("Creating IN Constraint for source tag [{}], num[{}]: " + - "scope[{}], target[{}]", - sourceTag, numContainers, scope, targetTag); - } else if (cType.equals(NOT_IN)) { - pc = PlacementConstraints.build( - PlacementConstraints.targetNotIn(scope, - PlacementConstraints.PlacementTargets.allocationTag( - targetTag))); - LOG.info("Creating NOT_IN Constraint for source tag [{}], num[{}]: " + - "scope[{}], target[{}]", - sourceTag, numContainers, scope, targetTag); - } else if (cType.equals(CARDINALITY)) { - int minCard = ps.nextInt(); - int maxCard = ps.nextInt(); - pc = PlacementConstraints.build( - PlacementConstraints.targetCardinality(scope, minCard, maxCard, - PlacementConstraints.PlacementTargets.allocationTag( - targetTag))); - LOG.info("Creating CARDINALITY Constraint source tag [{}], num[{}]: " + - "scope[{}], min[{}], max[{}], target[{}]", - sourceTag, numContainers, scope, minCard, maxCard, targetTag); - } else { - throw new RuntimeException( - "Could not parse constraintType [" + cType + "]" + - " in [" + specSplit[1] + "]"); - } - pSpecs.put(sourceTag, new PlacementSpec(sourceTag, numContainers, pc)); + return pSpecs; + } catch (PlacementConstraintParseException e) { + throw new IllegalArgumentException( + "Invalid placement spec: " + specs, e); } - return pSpecs; } }