diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3783dc4..6adcb36 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -945,6 +945,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal // CBO related HIVE_CBO_ENABLED("hive.cbo.enable", true, "Flag to control enabling Cost Based Optimizations using Calcite framework."), + HIVE_CBO_CNF_NODES_LIMIT("hive.cbo.cnf.maxnodes", -1, "When converting to conjunctive normal form (CNF), fail if" + + "the expression exceeds this threshold; the threshold is expressed in terms of number of nodes (leaves and" + + "interior nodes). -1 to not set up a threshold."), HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", false, "Flag to control calcite plan to hive operator conversion"), HIVE_CBO_EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on" + "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."), diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRexUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRexUtil.java index d466378..87fc1b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRexUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRexUtil.java @@ -38,6 +38,7 @@ import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ControlFlowException; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; import org.slf4j.Logger; @@ -51,6 +52,160 @@ protected static final Logger LOG = LoggerFactory.getLogger(HiveRexUtil.class); + + /** Converts an expression to conjunctive normal form (CNF). + * + *

The following expression is in CNF: + * + *

(a OR b) AND (c OR d)
+ * + *

The following expression is not in CNF: + * + *

(a AND b) OR c
+ * + * but can be converted to CNF: + * + *
(a OR c) AND (b OR c)
+ * + *

The following expression is not in CNF: + * + *

NOT (a OR NOT b)
+ * + * but can be converted to CNF by applying de Morgan's theorem: + * + *
NOT a AND b
+ * + *

Expressions not involving AND, OR or NOT at the top level are in CNF. + */ + public static RexNode toCnf(RexBuilder rexBuilder, RexNode rex) { + return new CnfHelper(rexBuilder).toCnf(rex); + } + + public static RexNode toCnf(RexBuilder rexBuilder, int maxCNFNodeCount, RexNode rex) { + return new CnfHelper(rexBuilder, maxCNFNodeCount).toCnf(rex); + } + + /** Helps {@link org.apache.calcite.rex.RexUtil#toCnf}. */ + private static class CnfHelper { + final RexBuilder rexBuilder; + int currentCount; + final int maxNodeCount; + + private CnfHelper(RexBuilder rexBuilder) { + this(rexBuilder, Integer.MAX_VALUE); + } + + private CnfHelper(RexBuilder rexBuilder, int maxNodeCount) { + this.rexBuilder = rexBuilder; + this.maxNodeCount = maxNodeCount == -1 ? Integer.MAX_VALUE : maxNodeCount; + } + + public RexNode toCnf(RexNode rex) { + try { + this.currentCount = 0; + return toCnf2(rex); + } catch (OverflowError e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Transformation to CNF not carried out as number of resulting nodes " + + "in expression is greater than the max number of nodes allowed"); + } + Util.swallow(e, null); + return rex; + } + } + + private RexNode toCnf2(RexNode rex) { + final List operands; + switch (rex.getKind()) { + case AND: + incrementAndCheck(); + operands = RexUtil.flattenAnd(((RexCall) rex).getOperands()); + final List cnfOperands = Lists.newArrayList(); + for (RexNode node : operands) { + RexNode cnf = toCnf2(node); + switch (cnf.getKind()) { + case AND: + incrementAndCheck(); + cnfOperands.addAll(((RexCall) cnf).getOperands()); + break; + default: + incrementAndCheck(); + cnfOperands.add(cnf); + } + } + return and(cnfOperands); + case OR: + incrementAndCheck(); + operands = RexUtil.flattenOr(((RexCall) rex).getOperands()); + final RexNode head = operands.get(0); + final RexNode headCnf = toCnf2(head); + final List headCnfs = RelOptUtil.conjunctions(headCnf); + final RexNode tail = or(Util.skip(operands)); + final RexNode tailCnf = toCnf2(tail); + final List tailCnfs = RelOptUtil.conjunctions(tailCnf); + final List list = Lists.newArrayList(); + for (RexNode h : headCnfs) { + for (RexNode t : tailCnfs) { + list.add(or(ImmutableList.of(h, t))); + } + } + return and(list); + case NOT: + final RexNode arg = ((RexCall) rex).getOperands().get(0); + switch (arg.getKind()) { + case NOT: + return toCnf2(((RexCall) arg).getOperands().get(0)); + case OR: + operands = ((RexCall) arg).getOperands(); + List transformedDisj = new ArrayList<>(); + for (RexNode input : RexUtil.flattenOr(operands)) { + transformedDisj.add(rexBuilder.makeCall(input.getType(), SqlStdOperatorTable.NOT, + ImmutableList.of(input))); + } + return toCnf2(and(transformedDisj)); + case AND: + operands = ((RexCall) arg).getOperands(); + List transformedConj = new ArrayList<>(); + for (RexNode input : RexUtil.flattenAnd(operands)) { + transformedConj.add(rexBuilder.makeCall(input.getType(), SqlStdOperatorTable.NOT, + ImmutableList.of(input))); + } + return toCnf2(or(transformedConj)); + default: + incrementAndCheck(); + return rex; + } + default: + incrementAndCheck(); + return rex; + } + } + + private RexNode and(Iterable nodes) { + return RexUtil.composeConjunction(rexBuilder, nodes, false); + } + + private RexNode or(Iterable nodes) { + return RexUtil.composeDisjunction(rexBuilder, nodes, false); + } + + private void incrementAndCheck() { + this.currentCount++; + if (this.currentCount > this.maxNodeCount) { + throw OverflowError.INSTANCE; + } + } + + @SuppressWarnings("serial") + private static class OverflowError extends ControlFlowException { + + public static final OverflowError INSTANCE = new OverflowError(); + + private OverflowError() {} + } + } + + /** * Simplifies a boolean expression. * diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java index 9609a1e..4cfe782 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java @@ -69,11 +69,11 @@ // Minimum number of OR clauses needed to transform into IN clauses - private final int min; + private final int minNumORClauses; - public HivePointLookupOptimizerRule(int min) { + public HivePointLookupOptimizerRule(int minNumORClauses) { super(operand(Filter.class, any())); - this.min = min; + this.minNumORClauses = minNumORClauses; } public void onMatch(RelOptRuleCall call) { @@ -84,7 +84,8 @@ public void onMatch(RelOptRuleCall call) { final RexNode condition = RexUtil.pullFactors(rexBuilder, filter.getCondition()); // 1. We try to transform possible candidates - RexTransformIntoInClause transformIntoInClause = new RexTransformIntoInClause(rexBuilder, filter, min); + RexTransformIntoInClause transformIntoInClause = new RexTransformIntoInClause(rexBuilder, filter, + minNumORClauses); RexNode newCondition = transformIntoInClause.apply(condition); // 2. We merge IN expressions @@ -109,12 +110,12 @@ public void onMatch(RelOptRuleCall call) { protected static class RexTransformIntoInClause extends RexShuttle { private final RexBuilder rexBuilder; private final Filter filterOp; - private final int min; + private final int minNumORClauses; - RexTransformIntoInClause(RexBuilder rexBuilder, Filter filterOp, int min) { + RexTransformIntoInClause(RexBuilder rexBuilder, Filter filterOp, int minNumORClauses) { this.filterOp = filterOp; this.rexBuilder = rexBuilder; - this.min = min; + this.minNumORClauses = minNumORClauses; } @Override public RexNode visitCall(RexCall call) { @@ -128,9 +129,9 @@ public void onMatch(RelOptRuleCall call) { if (operand.getKind() == SqlKind.OR) { try { newOperand = transformIntoInClauseCondition(rexBuilder, - filterOp.getRowType(), operand, min); + filterOp.getRowType(), operand, minNumORClauses); if (newOperand == null) { - return call; + newOperand = operand; } } catch (SemanticException e) { LOG.error("Exception in HivePointLookupOptimizerRule", e); @@ -146,7 +147,7 @@ public void onMatch(RelOptRuleCall call) { case OR: try { node = transformIntoInClauseCondition(rexBuilder, - filterOp.getRowType(), call, min); + filterOp.getRowType(), call, minNumORClauses); if (node == null) { return call; } @@ -162,23 +163,19 @@ public void onMatch(RelOptRuleCall call) { } private static RexNode transformIntoInClauseCondition(RexBuilder rexBuilder, RelDataType inputSchema, - RexNode condition, int min) throws SemanticException { + RexNode condition, int minNumORClauses) throws SemanticException { assert condition.getKind() == SqlKind.OR; // 1. We extract the information necessary to create the predicate for the new // filter ListMultimap columnConstantsMap = ArrayListMultimap.create(); ImmutableList operands = RexUtil.flattenOr(((RexCall) condition).getOperands()); - if (operands.size() < min) { + if (operands.size() < minNumORClauses) { // We bail out return null; } for (int i = 0; i < operands.size(); i++) { - RexNode operand = operands.get(i); - - final RexNode operandCNF = RexUtil.toCnf(rexBuilder, operand); - final List conjunctions = RelOptUtil.conjunctions(operandCNF); - + final List conjunctions = RelOptUtil.conjunctions(operands.get(i)); for (RexNode conjunction: conjunctions) { // 1.1. If it is not a RexCall, we bail out if (!(conjunction instanceof RexCall)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java index 17fcc82..7c2a7e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java @@ -39,6 +39,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRexUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,19 +54,21 @@ .getLogger(HivePreFilteringRule.class .getName()); - public static final HivePreFilteringRule INSTANCE = new HivePreFilteringRule(); - - private final FilterFactory filterFactory; - private static final Set COMPARISON = EnumSet.of(SqlKind.EQUALS, SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL, SqlKind.GREATER_THAN, SqlKind.LESS_THAN, SqlKind.NOT_EQUALS); - private HivePreFilteringRule() { + private final FilterFactory filterFactory; + + // Max number of nodes when converting to CNF + private final int maxCNFNodeCount; + + public HivePreFilteringRule(int maxCNFNodeCount) { super(operand(Filter.class, operand(RelNode.class, any()))); this.filterFactory = HiveRelFactories.HIVE_FILTER_FACTORY; + this.maxCNFNodeCount = maxCNFNodeCount; } @Override @@ -120,7 +123,7 @@ public void onMatch(RelOptRuleCall call) { for (RexNode operand : operands) { if (operand.getKind() == SqlKind.OR) { - extractedCommonOperands = extractCommonOperands(rexBuilder, operand); + extractedCommonOperands = extractCommonOperands(rexBuilder, operand, maxCNFNodeCount); for (RexNode extractedExpr : extractedCommonOperands) { if (operandsToPushDownDigest.add(extractedExpr.toString())) { operandsToPushDown.add(extractedExpr); @@ -155,7 +158,7 @@ public void onMatch(RelOptRuleCall call) { break; case OR: - operandsToPushDown = extractCommonOperands(rexBuilder, topFilterCondition); + operandsToPushDown = extractCommonOperands(rexBuilder, topFilterCondition, maxCNFNodeCount); break; default: return; @@ -191,7 +194,8 @@ public void onMatch(RelOptRuleCall call) { } - private static List extractCommonOperands(RexBuilder rexBuilder, RexNode condition) { + private static List extractCommonOperands(RexBuilder rexBuilder, RexNode condition, + int maxCNFNodeCount) { assert condition.getKind() == SqlKind.OR; Multimap reductionCondition = LinkedHashMultimap.create(); @@ -200,13 +204,12 @@ public void onMatch(RelOptRuleCall call) { Set refsInAllOperands = null; // 1. We extract the information necessary to create the predicate for the - // new - // filter; currently we support comparison functions, in and between + // new filter; currently we support comparison functions, in and between ImmutableList operands = RexUtil.flattenOr(((RexCall) condition).getOperands()); for (int i = 0; i < operands.size(); i++) { final RexNode operand = operands.get(i); - final RexNode operandCNF = RexUtil.toCnf(rexBuilder, operand); + final RexNode operandCNF = HiveRexUtil.toCnf(rexBuilder, maxCNFNodeCount, operand); final List conjunctions = RelOptUtil.conjunctions(operandCNF); Set refsInCurrentOperand = Sets.newHashSet(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index d084552..0d4c1bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -1119,6 +1119,9 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv PerfLogger perfLogger = SessionState.getPerfLogger(); + final int maxCNFNodeCount = conf.getIntVar(HiveConf.ConfVars.HIVE_CBO_CNF_NODES_LIMIT); + final int minNumORClauses = conf.getIntVar(HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZERMIN); + //1. Distinct aggregate rewrite // Run this optimization early, since it is expanding the operator pipeline. if (!conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("mr") && @@ -1139,7 +1142,7 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv // ((R1.x=R2.x) and R1.z=10)) and rand(1) < 0.1 perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); basePlan = hepPlan(basePlan, false, mdProvider, null, HepMatchOrder.ARBITRARY, - HivePreFilteringRule.INSTANCE); + new HivePreFilteringRule(maxCNFNodeCount)); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, factor out common filter elements and separating deterministic vs non-deterministic UDF"); @@ -1165,8 +1168,7 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv rules.add(HiveReduceExpressionsRule.FILTER_INSTANCE); rules.add(HiveReduceExpressionsRule.JOIN_INSTANCE); if (conf.getBoolVar(HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZER)) { - final int min = conf.getIntVar(HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZERMIN); - rules.add(new HivePointLookupOptimizerRule(min)); + rules.add(new HivePointLookupOptimizerRule(minNumORClauses)); } rules.add(HiveJoinAddNotNullRule.INSTANCE_JOIN); rules.add(HiveJoinAddNotNullRule.INSTANCE_SEMIJOIN); diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBOMaxNumToCNF.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBOMaxNumToCNF.java new file mode 100644 index 0000000..277ac1e --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBOMaxNumToCNF.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite; + +import static org.junit.Assert.assertEquals; + +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +public class TestCBOMaxNumToCNF { + + final int maxNumNodesCNF = 8; + + @Test + public void testCBOMaxNumToCNF1() { + // OR(=($0, 1), AND(=($0, 0), =($1, 8))) + // transformation creates 7 nodes AND(OR(=($0, 1), =($0, 0)), OR(=($0, 1), =($1, 8))) + // thus, it is triggered + final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); + final RexBuilder rexBuilder = new RexBuilder(typeFactory); + final RexNode cond = rexBuilder.makeCall(SqlStdOperatorTable.OR, + rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 0), + rexBuilder.makeLiteral(1, typeFactory.createSqlType(SqlTypeName.INTEGER), false)), + rexBuilder.makeCall(SqlStdOperatorTable.AND, + rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 0), + rexBuilder.makeLiteral(0, typeFactory.createSqlType(SqlTypeName.INTEGER), false)), + rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 1), + rexBuilder.makeLiteral(8, typeFactory.createSqlType(SqlTypeName.INTEGER), false)))); + final RexNode newCond = HiveRexUtil.toCnf(rexBuilder, maxNumNodesCNF, cond); + + assertEquals(newCond.toString(), "AND(OR(=($0, 1), =($0, 0)), OR(=($0, 1), =($1, 8)))"); + } + + @Test + public void testCBOMaxNumToCNF2() { + // OR(=($0, 1), =($0, 2), AND(=($0, 0), =($1, 8))) + // transformation creates 9 nodes AND(OR(=($0, 1), =($0, 2), =($0, 0)), OR(=($0, 1), =($0, 2), =($1, 8))) + // thus, it is NOT triggered + final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); + final RexBuilder rexBuilder = new RexBuilder(typeFactory); + final RexNode cond = rexBuilder.makeCall(SqlStdOperatorTable.OR, + rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 0), + rexBuilder.makeLiteral(1, typeFactory.createSqlType(SqlTypeName.INTEGER), false)), + rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 0), + rexBuilder.makeLiteral(2, typeFactory.createSqlType(SqlTypeName.INTEGER), false)), + rexBuilder.makeCall(SqlStdOperatorTable.AND, + rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 0), + rexBuilder.makeLiteral(0, typeFactory.createSqlType(SqlTypeName.INTEGER), false)), + rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 1), + rexBuilder.makeLiteral(8, typeFactory.createSqlType(SqlTypeName.INTEGER), false)))); + final RexNode newCond = HiveRexUtil.toCnf(rexBuilder, maxNumNodesCNF, cond); + + assertEquals(newCond.toString(), "OR(=($0, 1), =($0, 2), AND(=($0, 0), =($1, 8)))"); + } + +}