diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java new file mode 100644 index 0000000..883a765 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java @@ -0,0 +1,153 @@ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.RelFactories.FilterFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualNS; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + + +public class HivePreFilteringRule extends RelOptRule { + + protected static final Log LOG = LogFactory + .getLog(HivePreFilteringRule.class.getName()); + + + public static final HivePreFilteringRule INSTANCE = + new HivePreFilteringRule(); + + private final FilterFactory filterFactory; + + + private static final Set COMPARISON_UDFS = Sets.newHashSet( + GenericUDFOPEqual.class.getAnnotation(Description.class).name(), + GenericUDFOPEqualNS.class.getAnnotation(Description.class).name(), + GenericUDFOPEqualOrGreaterThan.class.getAnnotation(Description.class).name(), + GenericUDFOPEqualOrLessThan.class.getAnnotation(Description.class).name(), + GenericUDFOPGreaterThan.class.getAnnotation(Description.class).name(), + GenericUDFOPLessThan.class.getAnnotation(Description.class).name(), + GenericUDFOPNotEqual.class.getAnnotation(Description.class).name()); + private static final String IN_UDF = + GenericUDFIn.class.getAnnotation(Description.class).name(); + private static final String BETWEEN_UDF = + GenericUDFBetween.class.getAnnotation(Description.class).name(); + + + private HivePreFilteringRule() { + super(operand(Filter.class, + operand(RelNode.class, any()))); + this.filterFactory = HiveFilter.DEFAULT_FILTER_FACTORY; + } + + public void onMatch(RelOptRuleCall call) { + final Filter filter = call.rel(0); + final RelNode filterChild = call.rel(1); + + final RexNode condition = filter.getCondition(); + + final RexBuilder rexBuilder = filter.getCluster().getRexBuilder(); + + Multimap reductionCondition = HashMultimap.create(); + RexNode newCondition; + switch (condition.getKind()) { + case OR: + // 1. We extract the information necessary to create the predicate for the new + // filter; currently we support comparison functions, in and between + ImmutableList operands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + for (RexNode operand : operands) { + final RexNode operandCNF = RexUtil.toCnf(rexBuilder, operand); + final List conjunctions = RelOptUtil.conjunctions(operandCNF); + for (RexNode conjunction: conjunctions) { + if (!(conjunction instanceof RexCall)) { + continue; + } + RexCall conjCall = (RexCall) conjunction; + if(COMPARISON_UDFS.contains(conjCall.getOperator().getName())) { + if (conjCall.operands.get(0) instanceof RexInputRef && + conjCall.operands.get(1) instanceof RexLiteral) { + reductionCondition.put(conjCall.operands.get(0).toString(), + conjCall); + } else if (conjCall.operands.get(1) instanceof RexInputRef && + conjCall.operands.get(0) instanceof RexLiteral) { + reductionCondition.put(conjCall.operands.get(1).toString(), + conjCall); + } + } else if(conjCall.getOperator().getName().equals(IN_UDF)) { + reductionCondition.put(conjCall.operands.get(0).toString(), + conjCall); + } else if(conjCall.getOperator().getName().equals(BETWEEN_UDF)) { + reductionCondition.put(conjCall.operands.get(1).toString(), + conjCall); + } + } + } + + // 2. We create the new predicate + List disjunctions = new ArrayList<>(); + for (Entry> pair : reductionCondition.asMap().entrySet()) { + if (pair.getValue().size() == operands.size()) { + disjunctions.add(RexUtil.composeDisjunction(rexBuilder, pair.getValue(), false)); + } + } + // If we did not generate anything for the new predicate, we bail out + if (disjunctions.isEmpty()) { + return; + } + // Otherwise, we create a new condition + newCondition = RexUtil.composeConjunction(rexBuilder, disjunctions, false); + + break; + default: + return; + } + + // 3. If the condition is the same contained in the current filter, we bail out + if (condition.toString().equals(newCondition.toString())) { + return; + } + + // 4. If we already pushed down this condition, we bail out + if (filterChild instanceof Filter && ((Filter) filterChild). + getCondition().toString().equals(newCondition.toString())) { + return; + } + + // 5. We create the new filter that might be pushed down + RelNode newFilter = filterFactory.createFilter(filter.getInput(), newCondition); + RelNode newTopFilter = filterFactory.createFilter(newFilter, filter.getCondition()); + call.transformTo(newTopFilter); + + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 6e6923c..37a70bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -138,6 +138,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveExpandDistinctAggregatesRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePreFilteringRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSetOpTransposeRule; @@ -926,27 +927,43 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv ReduceExpressionsRule.PROJECT_INSTANCE, ReduceExpressionsRule.FILTER_INSTANCE, ReduceExpressionsRule.JOIN_INSTANCE, - new HiveFilterProjectTransposeRule( - Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, HiveProject.class, - HiveProject.DEFAULT_PROJECT_FACTORY), new HiveFilterSetOpTransposeRule( - HiveFilter.DEFAULT_FILTER_FACTORY), - new FilterMergeRule(HiveFilter.DEFAULT_FILTER_FACTORY), HiveFilterJoinRule.JOIN, - HiveFilterJoinRule.FILTER_ON_JOIN, new FilterAggregateTransposeRule(Filter.class, + new HiveFilterProjectTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, + HiveProject.class, HiveProject.DEFAULT_PROJECT_FACTORY), + new HiveFilterSetOpTransposeRule(HiveFilter.DEFAULT_FILTER_FACTORY), + new FilterMergeRule(HiveFilter.DEFAULT_FILTER_FACTORY), + HiveFilterJoinRule.JOIN, + HiveFilterJoinRule.FILTER_ON_JOIN, + new FilterAggregateTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, Aggregate.class)); - // 4. Transitive inference & Partition Pruning + // 4. Filter reduction + basePlan = hepPlan(basePlan, true, mdProvider, HivePreFilteringRule.INSTANCE); + + // 5. Rerun PPD as the new filter(s) created by PPD might be pushed down + // through the plan + basePlan = hepPlan(basePlan, true, mdProvider, + new HiveFilterProjectTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, + HiveProject.class, HiveProject.DEFAULT_PROJECT_FACTORY), + new HiveFilterSetOpTransposeRule(HiveFilter.DEFAULT_FILTER_FACTORY), + HiveFilterJoinRule.JOIN, + HiveFilterJoinRule.FILTER_ON_JOIN, + new FilterAggregateTransposeRule(Filter.class, + HiveFilter.DEFAULT_FILTER_FACTORY, Aggregate.class), + new FilterMergeRule(HiveFilter.DEFAULT_FILTER_FACTORY)); + + // 6. Transitive inference & Partition Pruning basePlan = hepPlan(basePlan, false, mdProvider, new JoinPushTransitivePredicatesRule( Join.class, HiveFilter.DEFAULT_FILTER_FACTORY), new HivePartitionPruneRule(conf)); - // 5. Projection Pruning + // 7. Projection Pruning RelFieldTrimmer fieldTrimmer = new RelFieldTrimmer(null, HiveProject.DEFAULT_PROJECT_FACTORY, HiveFilter.DEFAULT_FILTER_FACTORY, HiveJoin.HIVE_JOIN_FACTORY, RelFactories.DEFAULT_SEMI_JOIN_FACTORY, HiveSort.HIVE_SORT_REL_FACTORY, HiveAggregate.HIVE_AGGR_REL_FACTORY, HiveUnion.UNION_REL_FACTORY); basePlan = fieldTrimmer.trim(basePlan); - // 6. Rerun PPD through Project as column pruning would have introduced DT + // 8. Rerun PPD through Project as column pruning would have introduced DT // above scans basePlan = hepPlan(basePlan, true, mdProvider, new FilterProjectTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY,