diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java new file mode 100644 index 0000000..b53c5eb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveWindowingFixRule.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories.ProjectFactory; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; + +/** + * Rule to fix windowing issue when it is done over + * aggregation columns (HIVE-10627) + */ +public class HiveWindowingFixRule extends RelOptRule { + + public static final HiveWindowingFixRule INSTANCE = new HiveWindowingFixRule(); + + private final ProjectFactory projectFactory; + + + private HiveWindowingFixRule() { + super( + operand(Project.class, + operand(Aggregate.class, any()))); + this.projectFactory = HiveProject.DEFAULT_PROJECT_FACTORY; + } + + @Override + public void onMatch(RelOptRuleCall call) { + Project select = call.rel(0); + Aggregate groupBy = call.rel(1); + + Set projectNodes = new HashSet(); + Map windowingNodes = new HashMap(); + for (RexNode r : select.getChildExps()) { + if (r instanceof RexOver) { + for (RexNode operand : ((RexOver) r).getOperands()) { + windowingNodes.put(operand.toString(), operand); + } + } else { + projectNodes.add(r.toString()); + } + } + + final int projectCount = select.getChildExps().size(); + final List newProjects = new ArrayList(); + final List newProjectColumnNames = new ArrayList(); + final List newTopProjects = new ArrayList(); + for (int i = 0; i < projectCount; i++) { + newProjects.add(select.getChildExps().get(i)); + newProjectColumnNames.add(select.getRowType().getFieldNames().get(i)); + newTopProjects.add(RexInputRef.of(i, select.getRowType())); + } + boolean windowingFix = false; + for (Entry windowingNode : windowingNodes.entrySet()) { + if (!projectNodes.contains(windowingNode.getKey())) { + windowingFix = true; + newProjects.add(windowingNode.getValue()); + int colIndex = 0; + String alias = "window_col_" + colIndex; + while (newProjectColumnNames.contains(alias)) { + alias = "window_col_" + (colIndex++); + } + newProjectColumnNames.add(alias); + } + } + + if (!windowingFix) { + // We do not need to do anything + return; + } + + RelNode newProjectRel = projectFactory.createProject( + groupBy, newProjects, newProjectColumnNames); + + RelNode newTopProjectRel = projectFactory.createProject( + newProjectRel, newTopProjects, select.getRowType().getFieldNames()); + + call.transformTo(newTopProjectRel); + + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java index 0ada068..95f43d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java @@ -54,10 +54,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; -import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 48f488f..9a8ef64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -144,6 +144,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinToMultiJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveWindowingFixRule; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory; @@ -854,6 +855,16 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu calciteOptimizedPlan = hepPlanner.findBestExp(); + // run rule to fix windowing issue when it is done over + // aggregation columns (HIVE-10627) + hepPgmBldr = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP); + hepPgmBldr.addRuleInstance(HiveWindowingFixRule.INSTANCE); + hepPlanner = new HepPlanner(hepPgmBldr.build()); + hepPlanner.registerMetadataProviders(list); + cluster.setMetadataProvider(new CachingRelMetadataProvider(chainedProvider, hepPlanner)); + hepPlanner.setRoot(calciteOptimizedPlan); + calciteOptimizedPlan = hepPlanner.findBestExp(); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { // run rules to aid in translation from Optiq tree -> Hive tree hepPgmBldr = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP);