commit 3a3e788bf2c6356a8148aa758f8aa70a8f86ff6b Author: Ashutosh Chauhan Date: Mon Mar 2 17:14:04 2015 -0800 Add rule for inserting exchange for GBy. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchangeForGroupBy.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchangeForGroupBy.java new file mode 100644 index 0000000..b6d2e49 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchangeForGroupBy.java @@ -0,0 +1,57 @@ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelDistribution.Type; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelDistribution; + +/** + * Not an optimization rule. Aids in translation of Calcite tree -> Hive Op tree + * + * Group-by (map-side) + * Groupby -> | + * Hash-Exchange + * | + * Group-by (reduce-side) + */ +public class HiveInsertExchangeForGroupBy extends RelOptRule{ + + private final boolean mapAggr; + + public HiveInsertExchangeForGroupBy(HiveConf conf) { + super(operand(Aggregate.class, any())); + this.mapAggr = conf.getBoolVar(ConfVars.HIVEMAPSIDEAGGREGATE); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Aggregate gby = call.rel(0); + if (call.rel(1) instanceof LogicalExchange) { + return; + } + + List aggrCalls = gby.getAggCallList(); + List shuffleCols = new ArrayList<>(aggrCalls.size()); + for (AggregateCall aggrCall : aggrCalls) { + shuffleCols.addAll(aggrCall.getArgList()); + } + LogicalExchange ex = LogicalExchange.create(gby, new HiveRelDistribution(Type.HASH_DISTRIBUTED, shuffleCols)); + Aggregate newGby = gby.copy(gby.getTraitSet(), ex, gby.indicator, gby.getGroupSet(), gby.getGroupSets(), gby.getAggCallList()); + call.getPlanner().onCopy(gby, newGby); + call.transformTo(newGby); + + if(mapAggr) { + Aggregate mapGby = gby.copy(gby.getTraitSet(), ex.getInput(), gby.indicator, gby.getGroupSet(), gby.getGroupSets(), gby.getAggCallList()); + call.getPlanner().onCopy(gby, mapGby); + call.transformTo(mapGby); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index e170db1..71bb2f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -134,6 +134,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSetOpTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchange4JoinRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchangeForGroupBy; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter; @@ -791,7 +792,9 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { // run rules to aid in translation from Optiq tree -> Hive tree hepPgm = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP) - .addRuleInstance(new HiveInsertExchange4JoinRule()).build(); + .addRuleInstance(new HiveInsertExchange4JoinRule()) + .addRuleInstance(new HiveInsertExchangeForGroupBy(conf)) + .build(); hepPlanner = new HepPlanner(hepPgm); hepPlanner.registerMetadataProviders(list);