diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index 0200506..ed5c018 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -784,6 +784,7 @@ public Boolean visitFieldAccess(RexFieldAccess fieldAccess) { public static Set getInputRefs(RexNode expr) { InputRefsCollector irefColl = new InputRefsCollector(true); + expr.accept(irefColl); return irefColl.getInputRefSet(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index 6c0bd25..42e7737 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -248,7 +248,7 @@ public void computePartitionList(HiveConf conf, RexNode pruneNode) { // We have valid pruning expressions, only retrieve qualifying partitions ExprNodeDesc pruneExpr = pruneNode.accept(new ExprNodeConverter(getName(), getRowType(), - HiveCalciteUtil.getInputRefs(pruneNode), this.getRelOptSchema().getTypeFactory())); + new HashSet(), this.getRelOptSchema().getTypeFactory())); partitionList = PartitionPruner.prune(hiveTblMetadata, pruneExpr, conf, getName(), partitionCache); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java index b52779c..16292a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java @@ -17,11 +17,16 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.stats; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelOptUtil.InputReferencedVisitor; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; @@ -31,8 +36,10 @@ import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import org.apache.hadoop.hive.ql.plan.ColStatistics; public class FilterSelectivityEstimator extends RexVisitorImpl { private final RelNode childRel; @@ -81,6 +88,21 @@ public Double visitCall(RexCall call) { break; } + case IS_NOT_NULL: { + if (childRel instanceof HiveTableScan) { + double noOfNulls = getMaxNulls(call, (HiveTableScan) childRel); + double totalNoOfTuples = childRel.getRows(); + if (totalNoOfTuples >= noOfNulls) { + selectivity = (totalNoOfTuples - noOfNulls) / totalNoOfTuples; + } else { + throw new RuntimeException("Invalid Stats number of null > no of tuples"); + } + } else { + selectivity = computeNotEqualitySelectivity(call); + } + break; + } + case LESS_THAN_OR_EQUAL: case GREATER_THAN_OR_EQUAL: case LESS_THAN: @@ -199,6 +221,23 @@ private Double computeConjunctionSelectivity(RexCall call) { return selectivity; } + private long getMaxNulls(RexCall call, HiveTableScan t) { + long tmpNoNulls = 0; + long maxNoNulls = 0; + + Set iRefSet = HiveCalciteUtil.getInputRefs(call); + List colStats = t.getColStat(new ArrayList(iRefSet)); + + for (ColStatistics cs : colStats) { + tmpNoNulls = cs.getNumNulls(); + if (tmpNoNulls > maxNoNulls) { + maxNoNulls = tmpNoNulls; + } + } + + return maxNoNulls; + } + private Double getMaxNDV(RexCall call) { double tmpNDV; double maxNDV = 1.0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java index 219289c..ce163c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java @@ -194,6 +194,12 @@ public static ASTNode buildAST(SqlOperator op, List children) { ASTNode node; if (hToken != null) { node = (ASTNode) ParseDriver.adaptor.create(hToken.type, hToken.text); + if (hToken.type == HiveParser.TOK_ISNOTNULL) { + ASTNode funcNode = (ASTNode) ParseDriver.adaptor.create(HiveParser.TOK_FUNCTION, + "TOK_FUNCTION"); + funcNode.addChild(node); + node = funcNode; + } } else { node = (ASTNode) ParseDriver.adaptor.create(HiveParser.TOK_FUNCTION, "TOK_FUNCTION"); if (op.kind != SqlKind.CAST) { @@ -296,6 +302,7 @@ private static String getName(GenericUDF hiveUDF) { hToken(HiveParser.GREATERTHANOREQUALTO, ">=")); registerFunction("!", SqlStdOperatorTable.NOT, hToken(HiveParser.KW_NOT, "not")); registerFunction("<>", SqlStdOperatorTable.NOT_EQUALS, hToken(HiveParser.NOTEQUAL, "<>")); + registerFunction("isnotnull", SqlStdOperatorTable.IS_NOT_NULL, hToken(HiveParser.TOK_ISNOTNULL, "TOK_ISNOTNULL")); } private void registerFunction(String name, SqlOperator calciteFn, HiveToken hiveToken) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 3b5dbe2..0f04578 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -59,11 +59,13 @@ import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.metadata.CachingRelMetadataProvider; import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.rules.FilterAggregateTransposeRule; +import org.apache.calcite.rel.rules.FilterMergeRule; import org.apache.calcite.rel.rules.FilterProjectTransposeRule; import org.apache.calcite.rel.rules.JoinToMultiJoinRule; import org.apache.calcite.rel.rules.LoptOptimizeJoinRule; @@ -938,59 +940,76 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProvider mdProvider) { // TODO: Decorelation of subquery should be done before attempting - // Partition Pruning; otherwise Expression evaluation may try to execute + // Partition Pruning; otherwise Expression evaluation may try to + // execute // corelated sub query. - //0. Distinct aggregate rewrite - // Run this optimization early, since it is expanding the operator pipeline. - if (conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") && - conf.getBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEDISTINCTREWRITE)) { - // Its not clear, if this rewrite is always performant on MR, since extra map phase - // introduced for 2nd MR job may offset gains of this multi-stage aggregation. + // 1. Projection Pruning + HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, + HiveProject.DEFAULT_PROJECT_FACTORY, + HiveFilter.DEFAULT_FILTER_FACTORY, HiveJoin.HIVE_JOIN_FACTORY, + RelFactories.DEFAULT_SEMI_JOIN_FACTORY, + HiveSort.HIVE_SORT_REL_FACTORY, HiveAggregate.HIVE_AGGR_REL_FACTORY, + HiveUnion.UNION_REL_FACTORY); + basePlan = fieldTrimmer.trim(basePlan); + + // 2. Distinct aggregate rewrite + // Run this optimization early, since it is expanding the operator + // pipeline. + if (conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") + && conf.getBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEDISTINCTREWRITE)) { + // Its not clear, if this rewrite is always performant on MR, + // since extra map phase + // introduced for 2nd MR job may offset gains of this + // multi-stage aggregation. // We need a cost model for MR to enable this on MR. - basePlan = hepPlan(basePlan, true, mdProvider, HiveExpandDistinctAggregatesRule.INSTANCE); + basePlan = hepPlan(basePlan, true, mdProvider, + HiveExpandDistinctAggregatesRule.INSTANCE); } - // 1. Push Down Semi Joins - basePlan = hepPlan(basePlan, true, mdProvider, SemiJoinJoinTransposeRule.INSTANCE, - SemiJoinFilterTransposeRule.INSTANCE, SemiJoinProjectTransposeRule.INSTANCE); + // 3. Push Down Semi Joins + // Run semi Join Push Down Before any PPD + basePlan = hepPlan(basePlan, true, mdProvider, + SemiJoinJoinTransposeRule.INSTANCE, + SemiJoinFilterTransposeRule.INSTANCE, + SemiJoinProjectTransposeRule.INSTANCE); - // 2. Add not null filters - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { - basePlan = hepPlan(basePlan, true, mdProvider, HiveJoinAddNotNullRule.INSTANCE); - } + // 4. common filter extraction, and PPD + // NOTE: PPD needs to run before adding not null filters + // in order to support old style join syntax. + basePlan = hepPlan(basePlan, true, mdProvider, + HivePreFilteringRule.INSTANCE, new HiveFilterProjectTransposeRule( + Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, + HiveProject.class, HiveProject.DEFAULT_PROJECT_FACTORY), + new HiveFilterSetOpTransposeRule(HiveFilter.DEFAULT_FILTER_FACTORY), + HiveFilterJoinRule.JOIN, HiveFilterJoinRule.FILTER_ON_JOIN, + new FilterAggregateTransposeRule(Filter.class, + HiveFilter.DEFAULT_FILTER_FACTORY, Aggregate.class)); + + // 5. Add not null filters below joins for NON-NULL Safe Joins + basePlan = hepPlan(basePlan, false, mdProvider, + HiveJoinAddNotNullRule.INSTANCE); - // 3. Constant propagation, common filter extraction, and PPD + // 6. Constant propagation, common filter extraction, and PPD + // NOTE: Need to rerun PPD in order to push NOT NULL filters down basePlan = hepPlan(basePlan, true, mdProvider, ReduceExpressionsRule.PROJECT_INSTANCE, ReduceExpressionsRule.FILTER_INSTANCE, - ReduceExpressionsRule.JOIN_INSTANCE, - HivePreFilteringRule.INSTANCE, - new HiveFilterProjectTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, - HiveProject.class, HiveProject.DEFAULT_PROJECT_FACTORY), + ReduceExpressionsRule.JOIN_INSTANCE, HivePreFilteringRule.INSTANCE, + new HiveFilterProjectTransposeRule(Filter.class, + HiveFilter.DEFAULT_FILTER_FACTORY, HiveProject.class, + HiveProject.DEFAULT_PROJECT_FACTORY), new HiveFilterSetOpTransposeRule(HiveFilter.DEFAULT_FILTER_FACTORY), - HiveFilterJoinRule.JOIN, - HiveFilterJoinRule.FILTER_ON_JOIN, + HiveFilterJoinRule.JOIN, HiveFilterJoinRule.FILTER_ON_JOIN, new FilterAggregateTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, Aggregate.class)); - // 4. Transitive inference & Partition Pruning - basePlan = hepPlan(basePlan, false, mdProvider, new HiveJoinPushTransitivePredicatesRule( - Join.class, HiveFilter.DEFAULT_FILTER_FACTORY), - new HivePartitionPruneRule(conf)); - - // 5. Projection Pruning - HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, HiveProject.DEFAULT_PROJECT_FACTORY, - HiveFilter.DEFAULT_FILTER_FACTORY, HiveJoin.HIVE_JOIN_FACTORY, - HiveSemiJoin.HIVE_SEMIJOIN_FACTORY, HiveSort.HIVE_SORT_REL_FACTORY, - HiveAggregate.HIVE_AGGR_REL_FACTORY, HiveUnion.UNION_REL_FACTORY); - basePlan = fieldTrimmer.trim(basePlan); - - // 6. Rerun PPD through Project as column pruning would have introduced DT - // above scans - basePlan = hepPlan(basePlan, true, mdProvider, - new FilterProjectTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, - HiveProject.class, HiveProject.DEFAULT_PROJECT_FACTORY)); + // 7. Transitive Inference, Filter Merging & Partition Pruning + basePlan = hepPlan(basePlan, false, mdProvider, + new HiveJoinPushTransitivePredicatesRule(Join.class, + HiveFilter.DEFAULT_FILTER_FACTORY), new FilterMergeRule( + HiveFilter.DEFAULT_FILTER_FACTORY), new HivePartitionPruneRule( + conf)); return basePlan; }