commit 6470b5ce49ac0ebb0e79293d1e7d299cb0e3e8a6 Author: Ashutosh Chauhan Date: Thu Feb 19 15:11:06 2015 -0800 HIVE-9732 : [CBO] Add InsertExchangeRule for join diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelDistribution.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelDistribution.java new file mode 100644 index 0000000..653f1c5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelDistribution.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite; + +import java.util.List; + +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.util.mapping.Mappings.TargetMapping; + +public class HiveRelDistribution implements RelDistribution { + + List keys; + RelDistribution.Type type; + + public HiveRelDistribution(Type type, List keys) { + this.type = type; + this.keys = keys; + } + + @Override + public RelTraitDef getTraitDef() { + return RelDistributionTraitDef.INSTANCE; + } + + @Override + public void register(RelOptPlanner planner) { + + } + + @Override + public boolean satisfies(RelTrait trait) { + if (trait == this) { + return true; + } + switch (((RelDistribution)trait).getType()) { + case HASH_DISTRIBUTED : + return this.getKeys().equals(((RelDistribution)trait).getKeys()); + default: + throw new RuntimeException("Other distributions are not used yet."); + } + } + + @Override + public RelDistribution apply(TargetMapping mapping) { + if (keys.isEmpty()) { + return this; + } + return new HiveRelDistribution(type, keys); + } + + @Override + public List getKeys() { + return keys; + } + + @Override + public Type getType() { + return type; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchangeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchangeRule.java new file mode 100644 index 0000000..c08f6f9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchangeRule.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.calcite.sql.JoinType; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelDistribution; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; + +/** Not an optimization rule. + * Rule to aid in translation from Calcite tree -> Hive tree. + * Transforms : + * Left Right Left Right + * \ / -> \ / + * Join HashExchange HashExchange + * \ / + * Join + */ +public class HiveInsertExchangeRule extends RelOptRule { + + public HiveInsertExchangeRule() { + + // match join with exactly 2 inputs + super(RelOptRule.operand(Join.class, + operand(RelNode.class, any()), + operand(RelNode.class, any()))); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Join join = call.rel(0); + + if (!join.getJoinType().equals(JoinType.INNER)) { + // not ready to handle other join type yet. + return; + } + JoinPredicateInfo joinPredInfo = + HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join); + + // get key columns from inputs. Those are the columns on which we will distribute on. + List joinLeftKeyPositions = new ArrayList(); + List joinRightKeyPositions = new ArrayList(); + for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { + JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. + getEquiJoinPredicateElements().get(i); + joinLeftKeyPositions.addAll(joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema()); + joinRightKeyPositions.addAll(joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema()); + } + + LogicalExchange left = LogicalExchange.create(join.getLeft(), new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED, joinLeftKeyPositions)); + LogicalExchange right = LogicalExchange.create(join.getRight(), new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED, joinRightKeyPositions)); + + Join newJoin = join.copy(join.getTraitSet(), join.getCondition(), + left, right, join.getJoinType(), join.isSemiJoinDone()); + +call.getPlanner().onCopy(join, newJoin); + +call.transformTo(newJoin); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index d769c65..9d16645 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -132,6 +132,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSetOpTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchangeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter; @@ -601,7 +602,7 @@ Operator getOptimizedHiveOPDag() throws SemanticException { rethrowCalciteException(e); throw new AssertionError("rethrowCalciteException didn't throw for " + e.getMessage()); } - + Operator hiveRoot = new HiveOpConverter(topOps, HiveOpConverter.getAggOPMode(conf), conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase("strict")).convert(optimizedOptiqPlan); RowResolver hiveRootRR = genRowResolver(hiveRoot, getQB()); @@ -786,6 +787,18 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu calciteOptimizedPlan = hepPlanner.findBestExp(); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { + // run rules to aid in translation from Optiq tree -> Hive tree + hepPgm = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP) + .addRuleInstance(new HiveInsertExchangeRule()).build(); + hepPlanner = new HepPlanner(hepPgm); + + hepPlanner.registerMetadataProviders(list); + cluster.setMetadataProvider(new CachingRelMetadataProvider(chainedProvider, hepPlanner)); + hepPlanner.setRoot(calciteOptimizedPlan); + calciteOptimizedPlan = hepPlanner.findBestExp(); + } + if (LOG.isDebugEnabled() && !conf.getBoolVar(ConfVars.HIVE_IN_TEST)) { LOG.debug("CBO Planning details:\n"); LOG.debug("Original Plan:\n" + RelOptUtil.toString(calciteGenPlan));