diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cc95008..1187e84 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -945,6 +945,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal // CBO related HIVE_CBO_ENABLED("hive.cbo.enable", true, "Flag to control enabling Cost Based Optimizations using Calcite framework."), + HIVE_CBO_CNF_NODES_LIMIT("hive.cbo.cnf.maxnodes", -1, "When converting to conjunctive normal form (CNF), fail if" + + "the expression exceeds this threshold; the threshold is expressed in terms of number of nodes (leaves and" + + "interior nodes). -1 to not set up a threshold."), HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", false, "Flag to control calcite plan to hive operator conversion"), HIVE_CBO_EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on" + "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."), diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRexUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRexUtil.java index d466378..87fc1b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRexUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRexUtil.java @@ -38,6 +38,7 @@ import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ControlFlowException; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; import org.slf4j.Logger; @@ -51,6 +52,160 @@ protected static final Logger LOG = LoggerFactory.getLogger(HiveRexUtil.class); + + /** Converts an expression to conjunctive normal form (CNF). + * + *

The following expression is in CNF: + * + *

(a OR b) AND (c OR d)
+ * + *

The following expression is not in CNF: + * + *

(a AND b) OR c
+ * + * but can be converted to CNF: + * + *
(a OR c) AND (b OR c)
+ * + *

The following expression is not in CNF: + * + *

NOT (a OR NOT b)
+ * + * but can be converted to CNF by applying de Morgan's theorem: + * + *
NOT a AND b
+ * + *

Expressions not involving AND, OR or NOT at the top level are in CNF. + */ + public static RexNode toCnf(RexBuilder rexBuilder, RexNode rex) { + return new CnfHelper(rexBuilder).toCnf(rex); + } + + public static RexNode toCnf(RexBuilder rexBuilder, int maxCNFNodeCount, RexNode rex) { + return new CnfHelper(rexBuilder, maxCNFNodeCount).toCnf(rex); + } + + /** Helps {@link org.apache.calcite.rex.RexUtil#toCnf}. */ + private static class CnfHelper { + final RexBuilder rexBuilder; + int currentCount; + final int maxNodeCount; + + private CnfHelper(RexBuilder rexBuilder) { + this(rexBuilder, Integer.MAX_VALUE); + } + + private CnfHelper(RexBuilder rexBuilder, int maxNodeCount) { + this.rexBuilder = rexBuilder; + this.maxNodeCount = maxNodeCount == -1 ? Integer.MAX_VALUE : maxNodeCount; + } + + public RexNode toCnf(RexNode rex) { + try { + this.currentCount = 0; + return toCnf2(rex); + } catch (OverflowError e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Transformation to CNF not carried out as number of resulting nodes " + + "in expression is greater than the max number of nodes allowed"); + } + Util.swallow(e, null); + return rex; + } + } + + private RexNode toCnf2(RexNode rex) { + final List operands; + switch (rex.getKind()) { + case AND: + incrementAndCheck(); + operands = RexUtil.flattenAnd(((RexCall) rex).getOperands()); + final List cnfOperands = Lists.newArrayList(); + for (RexNode node : operands) { + RexNode cnf = toCnf2(node); + switch (cnf.getKind()) { + case AND: + incrementAndCheck(); + cnfOperands.addAll(((RexCall) cnf).getOperands()); + break; + default: + incrementAndCheck(); + cnfOperands.add(cnf); + } + } + return and(cnfOperands); + case OR: + incrementAndCheck(); + operands = RexUtil.flattenOr(((RexCall) rex).getOperands()); + final RexNode head = operands.get(0); + final RexNode headCnf = toCnf2(head); + final List headCnfs = RelOptUtil.conjunctions(headCnf); + final RexNode tail = or(Util.skip(operands)); + final RexNode tailCnf = toCnf2(tail); + final List tailCnfs = RelOptUtil.conjunctions(tailCnf); + final List list = Lists.newArrayList(); + for (RexNode h : headCnfs) { + for (RexNode t : tailCnfs) { + list.add(or(ImmutableList.of(h, t))); + } + } + return and(list); + case NOT: + final RexNode arg = ((RexCall) rex).getOperands().get(0); + switch (arg.getKind()) { + case NOT: + return toCnf2(((RexCall) arg).getOperands().get(0)); + case OR: + operands = ((RexCall) arg).getOperands(); + List transformedDisj = new ArrayList<>(); + for (RexNode input : RexUtil.flattenOr(operands)) { + transformedDisj.add(rexBuilder.makeCall(input.getType(), SqlStdOperatorTable.NOT, + ImmutableList.of(input))); + } + return toCnf2(and(transformedDisj)); + case AND: + operands = ((RexCall) arg).getOperands(); + List transformedConj = new ArrayList<>(); + for (RexNode input : RexUtil.flattenAnd(operands)) { + transformedConj.add(rexBuilder.makeCall(input.getType(), SqlStdOperatorTable.NOT, + ImmutableList.of(input))); + } + return toCnf2(or(transformedConj)); + default: + incrementAndCheck(); + return rex; + } + default: + incrementAndCheck(); + return rex; + } + } + + private RexNode and(Iterable nodes) { + return RexUtil.composeConjunction(rexBuilder, nodes, false); + } + + private RexNode or(Iterable nodes) { + return RexUtil.composeDisjunction(rexBuilder, nodes, false); + } + + private void incrementAndCheck() { + this.currentCount++; + if (this.currentCount > this.maxNodeCount) { + throw OverflowError.INSTANCE; + } + } + + @SuppressWarnings("serial") + private static class OverflowError extends ControlFlowException { + + public static final OverflowError INSTANCE = new OverflowError(); + + private OverflowError() {} + } + } + + /** * Simplifies a boolean expression. * diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java index 9609a1e..1f85a42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePointLookupOptimizerRule.java @@ -42,6 +42,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRexUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIn; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -69,11 +70,15 @@ // Minimum number of OR clauses needed to transform into IN clauses - private final int min; + private final int minNumORClauses; - public HivePointLookupOptimizerRule(int min) { + // Max number of nodes when converting to CNF + private final int maxCNFNodeCount; + + public HivePointLookupOptimizerRule(int minNumORClauses, int maxCNFNodeCount) { super(operand(Filter.class, any())); - this.min = min; + this.minNumORClauses = minNumORClauses; + this.maxCNFNodeCount = maxCNFNodeCount; } public void onMatch(RelOptRuleCall call) { @@ -81,10 +86,12 @@ public void onMatch(RelOptRuleCall call) { final RexBuilder rexBuilder = filter.getCluster().getRexBuilder(); - final RexNode condition = RexUtil.pullFactors(rexBuilder, filter.getCondition()); + RexNode condition = RexUtil.pullFactors(rexBuilder, filter.getCondition()); + condition = HiveRexUtil.toCnf(rexBuilder, maxCNFNodeCount, condition); // 1. We try to transform possible candidates - RexTransformIntoInClause transformIntoInClause = new RexTransformIntoInClause(rexBuilder, filter, min); + RexTransformIntoInClause transformIntoInClause = new RexTransformIntoInClause(rexBuilder, filter, + minNumORClauses); RexNode newCondition = transformIntoInClause.apply(condition); // 2. We merge IN expressions @@ -109,12 +116,12 @@ public void onMatch(RelOptRuleCall call) { protected static class RexTransformIntoInClause extends RexShuttle { private final RexBuilder rexBuilder; private final Filter filterOp; - private final int min; + private final int minNumORClauses; - RexTransformIntoInClause(RexBuilder rexBuilder, Filter filterOp, int min) { + RexTransformIntoInClause(RexBuilder rexBuilder, Filter filterOp, int minNumORClauses) { this.filterOp = filterOp; this.rexBuilder = rexBuilder; - this.min = min; + this.minNumORClauses = minNumORClauses; } @Override public RexNode visitCall(RexCall call) { @@ -128,9 +135,9 @@ public void onMatch(RelOptRuleCall call) { if (operand.getKind() == SqlKind.OR) { try { newOperand = transformIntoInClauseCondition(rexBuilder, - filterOp.getRowType(), operand, min); + filterOp.getRowType(), operand, minNumORClauses); if (newOperand == null) { - return call; + newOperand = operand; } } catch (SemanticException e) { LOG.error("Exception in HivePointLookupOptimizerRule", e); @@ -146,7 +153,7 @@ public void onMatch(RelOptRuleCall call) { case OR: try { node = transformIntoInClauseCondition(rexBuilder, - filterOp.getRowType(), call, min); + filterOp.getRowType(), call, minNumORClauses); if (node == null) { return call; } @@ -162,22 +169,20 @@ public void onMatch(RelOptRuleCall call) { } private static RexNode transformIntoInClauseCondition(RexBuilder rexBuilder, RelDataType inputSchema, - RexNode condition, int min) throws SemanticException { + RexNode condition, int minNumORClauses) throws SemanticException { assert condition.getKind() == SqlKind.OR; // 1. We extract the information necessary to create the predicate for the new // filter ListMultimap columnConstantsMap = ArrayListMultimap.create(); ImmutableList operands = RexUtil.flattenOr(((RexCall) condition).getOperands()); - if (operands.size() < min) { + if (operands.size() < minNumORClauses) { // We bail out return null; } for (int i = 0; i < operands.size(); i++) { RexNode operand = operands.get(i); - - final RexNode operandCNF = RexUtil.toCnf(rexBuilder, operand); - final List conjunctions = RelOptUtil.conjunctions(operandCNF); + final List conjunctions = RelOptUtil.conjunctions(operand); for (RexNode conjunction: conjunctions) { // 1.1. If it is not a RexCall, we bail out diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java index 17fcc82..3a2dd7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java @@ -39,6 +39,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRexUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,19 +54,21 @@ .getLogger(HivePreFilteringRule.class .getName()); - public static final HivePreFilteringRule INSTANCE = new HivePreFilteringRule(); - - private final FilterFactory filterFactory; - private static final Set COMPARISON = EnumSet.of(SqlKind.EQUALS, SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL, SqlKind.GREATER_THAN, SqlKind.LESS_THAN, SqlKind.NOT_EQUALS); - private HivePreFilteringRule() { + private final FilterFactory filterFactory; + + // Max number of nodes when converting to CNF + private final int maxCNFNodeCount; + + public HivePreFilteringRule(int maxCNFNodeCount) { super(operand(Filter.class, operand(RelNode.class, any()))); this.filterFactory = HiveRelFactories.HIVE_FILTER_FACTORY; + this.maxCNFNodeCount = maxCNFNodeCount; } @Override @@ -105,6 +108,7 @@ public void onMatch(RelOptRuleCall call) { // 1. Recompose filter possibly by pulling out common elements from DNF // expressions RexNode topFilterCondition = RexUtil.pullFactors(rexBuilder, filter.getCondition()); + topFilterCondition = HiveRexUtil.toCnf(rexBuilder, maxCNFNodeCount, topFilterCondition); // 2. We extract possible candidates to be pushed down List operandsToPushDown = new ArrayList<>(); @@ -120,7 +124,7 @@ public void onMatch(RelOptRuleCall call) { for (RexNode operand : operands) { if (operand.getKind() == SqlKind.OR) { - extractedCommonOperands = extractCommonOperands(rexBuilder, operand); + extractedCommonOperands = extractCommonOperands(rexBuilder, operand, maxCNFNodeCount); for (RexNode extractedExpr : extractedCommonOperands) { if (operandsToPushDownDigest.add(extractedExpr.toString())) { operandsToPushDown.add(extractedExpr); @@ -155,7 +159,7 @@ public void onMatch(RelOptRuleCall call) { break; case OR: - operandsToPushDown = extractCommonOperands(rexBuilder, topFilterCondition); + operandsToPushDown = extractCommonOperands(rexBuilder, topFilterCondition, maxCNFNodeCount); break; default: return; @@ -191,7 +195,8 @@ public void onMatch(RelOptRuleCall call) { } - private static List extractCommonOperands(RexBuilder rexBuilder, RexNode condition) { + private static List extractCommonOperands(RexBuilder rexBuilder, RexNode condition, + int maxCNFNodeCount) { assert condition.getKind() == SqlKind.OR; Multimap reductionCondition = LinkedHashMultimap.create(); @@ -205,9 +210,7 @@ public void onMatch(RelOptRuleCall call) { ImmutableList operands = RexUtil.flattenOr(((RexCall) condition).getOperands()); for (int i = 0; i < operands.size(); i++) { final RexNode operand = operands.get(i); - - final RexNode operandCNF = RexUtil.toCnf(rexBuilder, operand); - final List conjunctions = RelOptUtil.conjunctions(operandCNF); + final List conjunctions = RelOptUtil.conjunctions(operand); Set refsInCurrentOperand = Sets.newHashSet(); for (RexNode conjunction : conjunctions) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index d084552..4a7d0b1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -1119,6 +1119,9 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv PerfLogger perfLogger = SessionState.getPerfLogger(); + final int maxCNFNodeCount = conf.getIntVar(HiveConf.ConfVars.HIVE_CBO_CNF_NODES_LIMIT); + final int minNumORClauses = conf.getIntVar(HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZERMIN); + //1. Distinct aggregate rewrite // Run this optimization early, since it is expanding the operator pipeline. if (!conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("mr") && @@ -1139,7 +1142,7 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv // ((R1.x=R2.x) and R1.z=10)) and rand(1) < 0.1 perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); basePlan = hepPlan(basePlan, false, mdProvider, null, HepMatchOrder.ARBITRARY, - HivePreFilteringRule.INSTANCE); + new HivePreFilteringRule(maxCNFNodeCount)); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, factor out common filter elements and separating deterministic vs non-deterministic UDF"); @@ -1165,8 +1168,7 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv rules.add(HiveReduceExpressionsRule.FILTER_INSTANCE); rules.add(HiveReduceExpressionsRule.JOIN_INSTANCE); if (conf.getBoolVar(HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZER)) { - final int min = conf.getIntVar(HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZERMIN); - rules.add(new HivePointLookupOptimizerRule(min)); + rules.add(new HivePointLookupOptimizerRule(minNumORClauses, maxCNFNodeCount)); } rules.add(HiveJoinAddNotNullRule.INSTANCE_JOIN); rules.add(HiveJoinAddNotNullRule.INSTANCE_SEMIJOIN); diff --git ql/src/test/queries/clientpositive/tocnf.q ql/src/test/queries/clientpositive/tocnf.q new file mode 100644 index 0000000..67ebff0 --- /dev/null +++ ql/src/test/queries/clientpositive/tocnf.q @@ -0,0 +1,22 @@ +set hive.cbo.cnf.maxnodes=8; +set hive.optimize.point.lookup.min=0; + +-- transformation creates 7 nodes (key='0' OR key='1') AND (value='8' OR key='1') +-- thus, IN transformation yields: (key IN ('0','1')) AND (value='8' OR key='1') +explain +SELECT key +FROM src +WHERE + ((key = '0' + AND value = '8') OR key = '1') +; + +-- transformation creates 9 nodes (key='0' OR key='1' OR key = '2') AND (value='8' OR key='1' OR key = '2') +-- thus, IN transformation not triggered as we pass max number of nodes in CNF +explain +SELECT key +FROM src +WHERE + ((key = '0' + AND value = '8') OR key = '1' OR key = '2') +; diff --git ql/src/test/results/clientpositive/tocnf.q.out ql/src/test/results/clientpositive/tocnf.q.out new file mode 100644 index 0000000..4a58add --- /dev/null +++ ql/src/test/results/clientpositive/tocnf.q.out @@ -0,0 +1,100 @@ +PREHOOK: query: -- transformation creates 7 nodes (key='0' OR key='1') AND (value='8' OR key='1') +-- thus, IN transformation yields: (key IN ('0','1')) AND (value='8' OR key='1') +explain +SELECT key +FROM src +WHERE + ((key = '0' + AND value = '8') OR key = '1') +PREHOOK: type: QUERY +POSTHOOK: query: -- transformation creates 7 nodes (key='0' OR key='1') AND (value='8' OR key='1') +-- thus, IN transformation yields: (key IN ('0','1')) AND (value='8' OR key='1') +explain +SELECT key +FROM src +WHERE + ((key = '0' + AND value = '8') OR key = '1') +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((key) IN ('0', '1') and ((value = '8') or (key = '1'))) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: -- transformation creates 9 nodes (key='0' OR key='1' OR key = '2') AND (value='8' OR key='1' OR key = '2') +-- thus, IN transformation not triggered as we pass max number of nodes in CNF +explain +SELECT key +FROM src +WHERE + ((key = '0' + AND value = '8') OR key = '1' OR key = '2') +PREHOOK: type: QUERY +POSTHOOK: query: -- transformation creates 9 nodes (key='0' OR key='1' OR key = '2') AND (value='8' OR key='1' OR key = '2') +-- thus, IN transformation not triggered as we pass max number of nodes in CNF +explain +SELECT key +FROM src +WHERE + ((key = '0' + AND value = '8') OR key = '1' OR key = '2') +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (((key = '0') and (value = '8')) or (key = '1') or (key = '2')) (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +