diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectMergeRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectMergeRule.java index fc48a26..e963546 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectMergeRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectMergeRule.java @@ -17,13 +17,53 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.rules; +import java.util.Set; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.rules.ProjectMergeRule; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +/** + * ProjectMergeRule merges a {@link org.apache.calcite.rel.core.Project} into + * another {@link org.apache.calcite.rel.core.Project}, + * provided the projects aren't projecting identical sets of input references. + */ public class HiveProjectMergeRule extends ProjectMergeRule { - public static final HiveProjectMergeRule INSTANCE = new HiveProjectMergeRule(); - public HiveProjectMergeRule() { - super(true, HiveRelFactories.HIVE_BUILDER); + public static final HiveProjectMergeRule INSTANCE = + new HiveProjectMergeRule(true, HiveRelFactories.HIVE_BUILDER); + + public static final HiveProjectMergeRule INSTANCE_NO_FORCE = + new HiveProjectMergeRule(false, HiveRelFactories.HIVE_BUILDER); + + + private HiveProjectMergeRule(boolean force, RelBuilderFactory relBuilderFactory) { + super(force, relBuilderFactory); + } + + @Override + public boolean matches(RelOptRuleCall call) { + // Currently we do not support merging windowing functions with other + // windowing functions i.e. embedding windowing functions within each + // other + final Project topProject = call.rel(0); + final Project bottomProject = call.rel(1); + for (RexNode expr : topProject.getChildExps()) { + if (expr instanceof RexOver) { + Set positions = HiveCalciteUtil.getInputRefs(expr); + for (int pos : positions) { + if (bottomProject.getChildExps().get(pos) instanceof RexOver) { + return false; + } + } + } + } + return super.matches(call); } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 6c57d3e..1e44ccf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -1016,10 +1016,9 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu // 4. Run other optimizations that do not need stats perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, - HepMatchOrder.BOTTOM_UP, - ProjectRemoveRule.INSTANCE, UnionMergeRule.INSTANCE, - new ProjectMergeRule(false, HiveRelFactories.HIVE_PROJECT_FACTORY), - HiveAggregateProjectMergeRule.INSTANCE, HiveJoinCommuteRule.INSTANCE); + HepMatchOrder.BOTTOM_UP, ProjectRemoveRule.INSTANCE, UnionMergeRule.INSTANCE, + HiveProjectMergeRule.INSTANCE_NO_FORCE, HiveAggregateProjectMergeRule.INSTANCE, + HiveJoinCommuteRule.INSTANCE); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Optimizations without stats"); // 5. Run aggregate-join transpose (cost based) @@ -1119,8 +1118,6 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv // TODO: Decorelation of subquery should be done before attempting // Partition Pruning; otherwise Expression evaluation may try to execute // corelated sub query. - - LOG.info("Jesus - Plan0: " + RelOptUtil.toString(basePlan)); PerfLogger perfLogger = SessionState.getPerfLogger(); @@ -1151,8 +1148,6 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, factor out common filter elements and separating deterministic vs non-deterministic UDF"); - LOG.info("Jesus - Plan2: " + RelOptUtil.toString(basePlan)); - // 3. Run exhaustive PPD, add not null filters, transitive inference, // constant propagation, constant folding List rules = Lists.newArrayList(); @@ -1191,8 +1186,6 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, PPD, not null predicates, transitive inference, constant folding"); - LOG.info("Jesus - Plan3: " + RelOptUtil.toString(basePlan)); - // 4. Push down limit through outer join // NOTE: We run this after PPD to support old style join syntax. // Ex: select * from R1 left outer join R2 where ((R1.x=R2.x) and R1.y<10) or diff --git ql/src/test/queries/clientpositive/windowing_double.q ql/src/test/queries/clientpositive/windowing_double.q new file mode 100644 index 0000000..ebdecd7 --- /dev/null +++ ql/src/test/queries/clientpositive/windowing_double.q @@ -0,0 +1,13 @@ +create table mytable1 ( + mytime timestamp, + string1 string); + +create table t1 as +select + sum(bound3) OVER (PARTITION BY string1 ORDER BY mytime) as bound1 +from ( + select + string1, mytime, + lag(mytime) over (partition by string1 order by mytime) as bound3 + from mytable1 +) sub; diff --git ql/src/test/results/clientpositive/windowing_double.q.out ql/src/test/results/clientpositive/windowing_double.q.out new file mode 100644 index 0000000..c7b6d4f --- /dev/null +++ ql/src/test/results/clientpositive/windowing_double.q.out @@ -0,0 +1,39 @@ +PREHOOK: query: create table mytable1 ( + mytime timestamp, + string1 string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@mytable1 +POSTHOOK: query: create table mytable1 ( + mytime timestamp, + string1 string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@mytable1 +PREHOOK: query: create table t1 as +select + sum(bound3) OVER (PARTITION BY string1 ORDER BY mytime) as bound1 +from ( + select + string1, mytime, + lag(mytime) over (partition by string1 order by mytime) as bound3 + from mytable1 +) sub +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@mytable1 +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +POSTHOOK: query: create table t1 as +select + sum(bound3) OVER (PARTITION BY string1 ORDER BY mytime) as bound1 +from ( + select + string1, mytime, + lag(mytime) over (partition by string1 order by mytime) as bound3 + from mytable1 +) sub +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@mytable1 +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1.bound1 SCRIPT [(mytable1)mytable1.FieldSchema(name:mytime, type:timestamp, comment:null), (mytable1)mytable1.FieldSchema(name:string1, type:string, comment:null), ]