diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintParseException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintParseException.java new file mode 100644 index 00000000000..9e65e7b0d41 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintParseException.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +public class PlacementConstraintParseException extends Exception { + + public PlacementConstraintParseException(String msg) { + super(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintParser.java new file mode 100644 index 00000000000..9aba568d52b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintParser.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; + +import java.util.List; +import java.util.Enumeration; +import java.util.StringTokenizer; +import java.util.TreeSet; +import java.util.Set; +import java.util.Iterator; +import java.util.ArrayList; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class PlacementConstraintParser { + + private interface ConstraintTokenizer extends Enumeration { + + default void validate() throws PlacementConstraintParseException { + // do nothing + } + } + + private static class BaseStringTokenizer implements ConstraintTokenizer { + final StringTokenizer tokenizer; + BaseStringTokenizer(String expr, String delimiter) { + tokenizer = new StringTokenizer(expr, delimiter); + } + + @Override + public boolean hasMoreElements() { + return tokenizer.hasMoreTokens(); + } + + @Override + public String nextElement() { + return tokenizer.nextToken(); + } + } + + private static class ConjunctionTokenizer implements ConstraintTokenizer { + + // AND(A:B:C) + final String expression; + + Iterator iterator; + ConjunctionTokenizer(String expr) { + this.expression = expr; + } + + // Traverse the expression and try to get a list of parsed elements + // based on schema. + @Override + public void validate() throws PlacementConstraintParseException { + List parsedElements = new ArrayList<>(); + // expression should start with AND or OR + String op; + if (expression.startsWith("AND")) { + op = "AND"; + } else if(expression.startsWith("OR")) { + op = "OR"; + } else { + throw new PlacementConstraintParseException( + "Excepting starting with AND or OR, but get " + expression); + } + parsedElements.add(op); + // TODO need to handle nested mode + Pattern p = Pattern.compile("\\((.*)\\)"); + Matcher m = p.matcher(expression); + if (!m.find()) { + throw new PlacementConstraintParseException("Unexpected format," + + " expecting AND(A:B...) but current expression is " + expression); + } + + String childStrs = m.group(1); + StringTokenizer st = new StringTokenizer(childStrs, ":"); + while(st.hasMoreTokens()) { + parsedElements.add(st.nextToken()); + } + this.iterator = parsedElements.iterator(); + } + + @Override + public boolean hasMoreElements() { + return iterator.hasNext(); + } + + @Override + public String nextElement() { + return iterator.next(); + } + } + + public static abstract class ConstraintParser { + + final ConstraintTokenizer tokenizer; + public ConstraintParser(ConstraintTokenizer tk){ + this.tokenizer = tk; + } + + void validate() throws PlacementConstraintParseException { + tokenizer.validate(); + } + + void shouldHaveNext() + throws PlacementConstraintParseException { + if (!tokenizer.hasMoreElements()) { + throw new PlacementConstraintParseException("Expecting more tokens"); + } + } + + String nextToken() { + return this.tokenizer.nextElement().trim(); + } + + boolean hasMoreTokens() { + return this.tokenizer.hasMoreElements(); + } + + int toInt(String name) throws PlacementConstraintParseException { + try { + return Integer.parseInt(name); + } catch (NumberFormatException e) { + throw new PlacementConstraintParseException( + "Expecting an Integer, but get " + name); + } + } + + void verifyScope(String scopeString) + throws PlacementConstraintParseException { + if (!scopeString.equals("NODE") && !scopeString.equals("RACK")) { + throw new PlacementConstraintParseException( + "expecting scope to NODE or RACK, but get " + scopeString); + } + } + + AbstractConstraint tryParse() { + try { + return parse(); + } catch (PlacementConstraintParseException e) { + // unable to parse, simply return null + return null; + } + } + + abstract AbstractConstraint parse() + throws PlacementConstraintParseException; + } + + // TODO move foo=4 to a different place... + + + // A + // foo=4, NOTIN, NODE, foo + // SRC_TAG=NUM, OP, SCOPE, ...TARGET_TAG + /** + * + * targetIn + * targetNotIn + * + */ + public static class TargetConstraintParser extends ConstraintParser { + + public TargetConstraintParser(String expression) { + super(new BaseStringTokenizer(expression, ",")); + } + + @Override + AbstractConstraint parse() + throws PlacementConstraintParseException { + PlacementConstraint.AbstractConstraint placementConstraints; + String op = nextToken(); + if (op.equals("IN") || op.equals("NOTIN")) { + String scope = nextToken(); + verifyScope(scope); + + Set allocationTags = new TreeSet<>(); + while(hasMoreTokens()) { + String tag = nextToken(); + allocationTags.add(tag); + } + PlacementConstraint.TargetExpression target = + PlacementConstraints.PlacementTargets.allocationTag( + allocationTags.toArray(new String[allocationTags.size()])); + if (op.equals("IN")) { + placementConstraints = PlacementConstraints + .targetIn(scope, target); + } else { + placementConstraints = PlacementConstraints + .targetNotIn(scope, target); + } + } else { + throw new PlacementConstraintParseException( + "expecting IN or NOTIN, but get " + op); + } + return placementConstraints; + } + } + + // B + // foo=4, cardinality, NODE, foo, 0, 1 + // SRC_TAG=NUM, CARD, SCOPE, TARGET_TAG, MIN, MAX + /** + * minCardinality + * maxCardinality + * targetCardinality + */ + public static class CardinalityConstraintParser extends ConstraintParser { + + public CardinalityConstraintParser(String expr) { + super(new BaseStringTokenizer(expr, ",")); + } + + @Override + AbstractConstraint parse() + throws PlacementConstraintParseException { + String op = nextToken(); + if (!op.equals("cardinality")) { + throw new PlacementConstraintParseException("expecting cardinality," + + " but get " + op); + } + + shouldHaveNext(); + String scope = nextToken(); + verifyScope(scope); + + //TODO support more tags + shouldHaveNext(); + String targetTag = nextToken(); + + shouldHaveNext(); + String minCardinalityStr = nextToken(); + Integer min = toInt(minCardinalityStr); + + shouldHaveNext(); + String maxCarinalityStr = nextToken(); + Integer max = toInt(maxCarinalityStr); + + return PlacementConstraints.cardinality(scope, min, max, targetTag); + } + } + + // C + // AND(A, ..., B), OR(A, ..., B) + /** + * and + * or + */ + public static class ConjunctionConstraintParser extends ConstraintParser { + + public ConjunctionConstraintParser(String expr) { + super(new ConjunctionTokenizer(expr)); + } + + @Override + AbstractConstraint parse() throws PlacementConstraintParseException { + // do pre-process, validate input + validate(); + String op = nextToken(); + shouldHaveNext(); + List constraints = new ArrayList<>(); + while(hasMoreTokens()) { + String constraintStr = nextToken(); + TargetConstraintParser tp = new TargetConstraintParser(constraintStr); + Optional constraintOptional = + Optional.ofNullable(tp.tryParse()); + if (!constraintOptional.isPresent()) { + CardinalityConstraintParser cp = + new CardinalityConstraintParser(constraintStr); + constraintOptional = Optional.ofNullable(cp.tryParse()); + if (!constraintOptional.isPresent()) { + ConjunctionConstraintParser jp = + new ConjunctionConstraintParser(constraintStr); + constraintOptional = Optional.ofNullable(jp.tryParse()); + } + if (!constraintOptional.isPresent()) { + throw new PlacementConstraintParseException("Invalid child expr " + + constraintStr); + } + } + constraints.add(constraintOptional.get()); + } + if ("AND".equals(op)) { + return PlacementConstraints.and( + constraints.toArray( + new AbstractConstraint[constraints.size()])); + } else { + return PlacementConstraints.or( + constraints.toArray( + new AbstractConstraint[constraints.size()])); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintParser.java new file mode 100644 index 00000000000..89eedc62f83 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintParser.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintParser.CardinalityConstraintParser; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintParser.ConjunctionConstraintParser; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintParser.ConstraintParser; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintParser.TargetConstraintParser; +import org.junit.Assert; +import org.junit.Test; + +public class TestPlacementConstraintParser { + + @Test + public void testTargetExpressionParser() + throws PlacementConstraintParseException { + ConstraintParser parser; + AbstractConstraint constraint; + SingleConstraint single; + + // Anti-affinity with single target tag + // NOTIN,NDOE,foo + parser = new TargetConstraintParser("NOTIN, NODE, foo"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("NODE", single.getScope()); + Assert.assertEquals(0, single.getMinCardinality()); + Assert.assertEquals(0, single.getMaxCardinality()); + + // Affinity with single target tag + // IN,NODE,foo + parser = new TargetConstraintParser("IN, NODE, foo"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("NODE", single.getScope()); + Assert.assertEquals(1, single.getMinCardinality()); + Assert.assertEquals(Integer.MAX_VALUE, single.getMaxCardinality()); + + // Anti-affinity with multiple target tags + // NOTIN,NDOE,foo,bar,exp + parser = new TargetConstraintParser("NOTIN, NODE, foo, bar, exp"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("NODE", single.getScope()); + Assert.assertEquals(0, single.getMinCardinality()); + Assert.assertEquals(0, single.getMaxCardinality()); + Assert.assertEquals(1, single.getTargetExpressions().size()); + TargetExpression exp = + single.getTargetExpressions().iterator().next(); + Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString()); + Assert.assertEquals(3, exp.getTargetValues().size()); + + // Invalid OP + parser = new TargetConstraintParser("XYZ, NODE, foo"); + try { + parser.parse(); + } catch (Exception e) { + Assert.assertTrue(e instanceof PlacementConstraintParseException); + Assert.assertTrue(e.getMessage().contains("expecting IN or NOTIN")); + } + } + + @Test + public void testCardinalityConstraintParser() + throws PlacementConstraintParseException { + ConstraintParser parser; + AbstractConstraint constraint; + SingleConstraint single; + + // cardinality,NODE,foo,0,1 + parser = new CardinalityConstraintParser("cardinality, NODE, foo, 0, 1"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals("NODE", single.getScope()); + Assert.assertEquals(0, single.getMinCardinality()); + Assert.assertEquals(1, single.getMaxCardinality()); + Assert.assertEquals(1, single.getTargetExpressions().size()); + TargetExpression exp = + single.getTargetExpressions().iterator().next(); + Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString()); + Assert.assertEquals(1, exp.getTargetValues().size()); + Assert.assertEquals("foo", exp.getTargetValues().iterator().next()); + } + + @Test + public void testAndConstraintParser() throws PlacementConstraintParseException { + ConstraintParser parser; + AbstractConstraint constraint; + And and; + SingleConstraint single; + + parser = new ConjunctionConstraintParser( + "AND(NOTIN,NODE,foo:NOTIN,NODE,bar)"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof And); + and = (And) constraint; + Assert.assertEquals(2, and.getChildren().size()); + + parser = new ConjunctionConstraintParser( + "AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1)"); + constraint = parser.parse(); + Assert.assertTrue(constraint instanceof And); + Assert.assertEquals(2, and.getChildren().size()); + +// parser = new ConjunctionConstraintParser( +// "AND(NOTIN,NODE,foo:AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1))"); +// constraint = parser.parse(); +// Assert.assertTrue(constraint instanceof And); +// and = (And) constraint; +// Assert.assertTrue(and.getChildren().get(0) instanceof SingleConstraint); +// Assert.assertTrue(and.getChildren().get(1) instanceof And); + } +}