From 51bcfb0e163b9e63327a305eaf287f11e5764f5f Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Wed, 16 Nov 2016 17:11:03 -0800 Subject: [PATCH] HIVE-15227 : Optimize join + gby into semijoin --- .../optimizer/calcite/rules/HiveSemiJoinRule.java | 106 +++++++++++++++++++++ .../hadoop/hive/ql/parse/CalcitePlanner.java | 6 ++ ql/src/test/queries/clientpositive/join_aggr.q | 4 + ql/src/test/results/clientpositive/join_aggr.q.out | 91 ++++++++++++++++++ 4 files changed, 207 insertions(+) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSemiJoinRule.java create mode 100644 ql/src/test/queries/clientpositive/join_aggr.q create mode 100644 ql/src/test/results/clientpositive/join_aggr.q.out diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSemiJoinRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSemiJoinRule.java new file mode 100644 index 0000000..5e068d1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSemiJoinRule.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Planner rule that creates a {@code SemiJoinRule} from a + * {@link org.apache.calcite.rel.core.Join} on top of a + * {@link org.apache.calcite.rel.logical.LogicalAggregate}. + * + * TODO Remove this rule and use Calcite's SemiJoinRule. Not possible currently + * since Calcite doesnt use RelBuilder for this rule and we want to generate HiveSemiJoin rel here. + */ +public class HiveSemiJoinRule extends RelOptRule { + + public static final HiveSemiJoinRule INSTANCE = new HiveSemiJoinRule(HiveRelFactories.HIVE_BUILDER); + protected static final Logger LOG = LoggerFactory.getLogger(HiveSemiJoinRule.class); + + private HiveSemiJoinRule(RelBuilderFactory relBuilder) { + super( + operand(Project.class, + some( + operand(Join.class, + some(operand(RelNode.class, any()), + operand(Aggregate.class, any()))))), relBuilder, null); + } + + @Override public void onMatch(RelOptRuleCall call) { + LOG.debug("Matched HiveSemiJoinRule"); + final Project project = call.rel(0); + final Join join = call.rel(1); + final RelNode left = call.rel(2); + final Aggregate aggregate = call.rel(3); + final RelOptCluster cluster = join.getCluster(); + final RexBuilder rexBuilder = cluster.getRexBuilder(); + final ImmutableBitSet bits = + RelOptUtil.InputFinder.bits(project.getProjects(), null); + final ImmutableBitSet rightBits = + ImmutableBitSet.range(left.getRowType().getFieldCount(), + join.getRowType().getFieldCount()); + if (bits.intersects(rightBits)) { + return; + } + final JoinInfo joinInfo = join.analyzeCondition(); + if (!joinInfo.rightSet().equals( + ImmutableBitSet.range(aggregate.getGroupCount()))) { + // Rule requires that aggregate key to be the same as the join key. + // By the way, neither a super-set nor a sub-set would work. + return; + } + if (!joinInfo.isEqui()) { + return; + } + LOG.debug("All conditions matched for HiveSemiJoinRule. Going to apply transformation."); + final List newRightKeyBuilder = Lists.newArrayList(); + final List aggregateKeys = aggregate.getGroupSet().asList(); + for (int key : joinInfo.rightKeys) { + newRightKeyBuilder.add(aggregateKeys.get(key)); + } + final ImmutableIntList newRightKeys = + ImmutableIntList.copyOf(newRightKeyBuilder); + final RelNode newRight = aggregate.getInput(); + final RexNode newCondition = + RelOptUtil.createEquiJoinCondition(left, joinInfo.leftKeys, newRight, + newRightKeys, rexBuilder); + RelNode semi = call.builder().push(left).push(aggregate.getInput()).semiJoin(newCondition).build(); + call.transformTo(call.builder().push(semi).project(project.getProjects(), project.getRowType().getFieldNames()).build()); + } +} + +// End SemiJoinRule.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 2bb6aa1..cab67c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -81,6 +81,7 @@ import org.apache.calcite.rel.rules.SemiJoinFilterTransposeRule; import org.apache.calcite.rel.rules.SemiJoinJoinTransposeRule; import org.apache.calcite.rel.rules.SemiJoinProjectTransposeRule; +import org.apache.calcite.rel.rules.SemiJoinRule; import org.apache.calcite.rel.rules.UnionMergeRule; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -183,6 +184,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveReduceExpressionsWithStatsRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelFieldTrimmer; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSemiJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortJoinReduceRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortLimitPullUpConstantsRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortMergeRule; @@ -1115,6 +1117,10 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Aggregate join transpose"); } + // convert Join + GBy to semijoin + // run this rule at later stages, since many calcite rules cant deal with semijoin + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, HiveSemiJoinRule.INSTANCE); + // 7. Run rule to fix windowing issue when it is done over // aggregation columns (HIVE-10627) if (profilesCBO.contains(ExtendedCBOProfile.WINDOWING_POSTPROCESSING)) { diff --git a/ql/src/test/queries/clientpositive/join_aggr.q b/ql/src/test/queries/clientpositive/join_aggr.q new file mode 100644 index 0000000..3887433 --- /dev/null +++ b/ql/src/test/queries/clientpositive/join_aggr.q @@ -0,0 +1,4 @@ +create table t1 (a int, b int); +create table t2 (c int, d int); + +explain select t1.b from t1 join (select c from t2 group by c)t3 on t1.a = t3.c; diff --git a/ql/src/test/results/clientpositive/join_aggr.q.out b/ql/src/test/results/clientpositive/join_aggr.q.out new file mode 100644 index 0000000..11a168b --- /dev/null +++ b/ql/src/test/results/clientpositive/join_aggr.q.out @@ -0,0 +1,91 @@ +PREHOOK: query: create table t1 (a int, b int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +POSTHOOK: query: create table t1 (a int, b int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: create table t2 (c int, d int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t2 +POSTHOOK: query: create table t2 (c int, d int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t2 +PREHOOK: query: explain select t1.b from t1 join (select c from t2 group by c)t3 on t1.a = t3.c +PREHOOK: type: QUERY +POSTHOOK: query: explain select t1.b from t1 join (select c from t2 group by c)t3 on t1.a = t3.c +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Filter Operator + predicate: a is not null (type: boolean) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: a (type: int), b (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + value expressions: _col1 (type: int) + TableScan + alias: t2 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Filter Operator + predicate: c is not null (type: boolean) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: c (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Group By Operator + keys: _col0 (type: int) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Left Semi Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: _col1 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + -- 1.7.12.4 (Apple Git-37)