diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveFilterMergeRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveFilterMergeRule.java new file mode 100644 index 0000000..094e1c3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveFilterMergeRule.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import org.apache.calcite.rel.rules.FilterMergeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; + +public class HiveFilterMergeRule extends FilterMergeRule { + + public static final HiveFilterMergeRule INSTANCE = + new HiveFilterMergeRule(); + + + public HiveFilterMergeRule() { + super(HiveFilter.DEFAULT_FILTER_FACTORY); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveOrToInClauseRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveOrToInClauseRule.java new file mode 100644 index 0000000..993180c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveOrToInClauseRule.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ListMultimap; + + +public class HiveOrToInClauseRule extends RelOptRule { + + protected static final Log LOG = LogFactory + .getLog(HiveOrToInClauseRule.class.getName()); + + + public static final HiveOrToInClauseRule INSTANCE = + new HiveOrToInClauseRule(); + + + private static final String EQUAL_UDF = + GenericUDFOPEqual.class.getAnnotation(Description.class).name(); + private static final String IN_UDF = + GenericUDFIn.class.getAnnotation(Description.class).name(); + private static final String STRUCT_UDF = + GenericUDFStruct.class.getAnnotation(Description.class).name(); + + private HiveOrToInClauseRule() { + super(operand(Filter.class, any())); + } + + public void onMatch(RelOptRuleCall call) { + final Filter filter = call.rel(0); + + final RexBuilder rexBuilder = filter.getCluster().getRexBuilder(); + + final RexNode condition = RexUtil.pullFactors(rexBuilder, filter.getCondition()); + + // 1. We try to transform possible candidates + RexNode newCondition; + switch (condition.getKind()) { + case AND: + ImmutableList operands = RexUtil.flattenAnd(((RexCall) condition).getOperands()); + List newOperands = new ArrayList(); + for (RexNode operand: operands) { + RexNode newOperand; + if (operand.getKind() == SqlKind.OR) { + try { + newOperand = transformIntoInClauseCondition(rexBuilder, + filter.getRowType(), operand); + if (newOperand == null) { + return; + } + } catch (SemanticException e) { + LOG.error("Exception in HivePreFilteringRule", e); + return; + } + } else { + newOperand = operand; + } + newOperands.add(newOperand); + } + newCondition = RexUtil.composeConjunction(rexBuilder, newOperands, false); + break; + case OR: + try { + newCondition = transformIntoInClauseCondition(rexBuilder, + filter.getRowType(), condition); + if (newCondition == null) { + return; + } + } catch (SemanticException e) { + LOG.error("Exception in HivePreFilteringRule", e); + return; + } + break; + default: + return; + } + + // 2. If we could not transform anything, we bail out + if (newCondition.toString().equals(condition.toString())) { + return; + } + + // 3. We create the filter with the new condition + RelNode newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + + call.transformTo(newFilter); + + } + + private static RexNode transformIntoInClauseCondition(RexBuilder rexBuilder, RelDataType inputSchema, + RexNode condition) throws SemanticException { + assert condition.getKind() == SqlKind.OR; + + // 1. We extract the information necessary to create the predicate for the new + // filter + ListMultimap columnConstantsMap = ArrayListMultimap.create(); + ImmutableList operands = RexUtil.flattenOr(((RexCall) condition).getOperands()); + for (int i = 0; i < operands.size(); i++) { + RexNode operand = operands.get(i); + + final RexNode operandCNF = RexUtil.toCnf(rexBuilder, operand); + final List conjunctions = RelOptUtil.conjunctions(operandCNF); + + for (RexNode conjunction: conjunctions) { + // 1.1. If it is not a RexCall, we bail out + if (!(conjunction instanceof RexCall)) { + return null; + } + // 1.2. We extract the information that we need + RexCall conjCall = (RexCall) conjunction; + if(conjCall.getOperator().getName().equals(EQUAL_UDF)) { + if (conjCall.operands.get(0) instanceof RexInputRef && + conjCall.operands.get(1) instanceof RexLiteral) { + RexInputRef ref = (RexInputRef) conjCall.operands.get(0); + RexLiteral literal = (RexLiteral) conjCall.operands.get(1); + columnConstantsMap.put(ref, literal); + if (columnConstantsMap.get(ref).size() != i+1) { + // If we have not added to this column before, we bail out + return null; + } + } else if (conjCall.operands.get(1) instanceof RexInputRef && + conjCall.operands.get(0) instanceof RexLiteral) { + RexInputRef ref = (RexInputRef) conjCall.operands.get(1); + RexLiteral literal = (RexLiteral) conjCall.operands.get(0); + columnConstantsMap.put(ref, literal); + if (columnConstantsMap.get(ref).size() != i+1) { + // If we have not added to this column before, we bail out + return null; + } + } else { + // Bail out + return null; + } + } else { + return null; + } + } + } + + // 3. We build the new predicate and return it + List newOperands = new ArrayList(operands.size()); + // 3.1 Create structs + List columns = new ArrayList(); + List names = new ArrayList(); + ImmutableList.Builder paramsTypes = ImmutableList.builder(); + List structReturnType = new ArrayList(); + ImmutableList.Builder newOperandsTypes = ImmutableList.builder(); + for (int i = 0; i < operands.size(); i++) { + List constantFields = new ArrayList(operands.size()); + + for (RexInputRef ref : columnConstantsMap.keySet()) { + // If any of the elements was not referenced by every operand, we bail out + if (columnConstantsMap.get(ref).size() <= i) { + return null; + } + RexLiteral columnConstant = columnConstantsMap.get(ref).get(i); + if (i == 0) { + columns.add(ref); + names.add(inputSchema.getFieldNames().get(ref.getIndex())); + paramsTypes.add(ref.getType()); + structReturnType.add(TypeConverter.convert(ref.getType())); + } + constantFields.add(columnConstant); + } + + if (i == 0) { + RexNode columnsRefs; + if (columns.size() == 1) { + columnsRefs = columns.get(0); + } else { + // Create STRUCT clause + RelDataType retType = TypeConverter.convert( + TypeInfoFactory.getStructTypeInfo(names, structReturnType), + rexBuilder.getTypeFactory()); + SqlOperator structOp = SqlFunctionConverter.getCalciteOperator( + STRUCT_UDF, FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + paramsTypes.build(), retType); + columnsRefs = rexBuilder.makeCall(structOp, columns); + } + newOperands.add(columnsRefs); + newOperandsTypes.add(columnsRefs.getType()); + } + RexNode values; + if (constantFields.size() == 1) { + values = constantFields.get(0); + } else { + // Create STRUCT clause + RelDataType retType = TypeConverter.convert( + TypeInfoFactory.getStructTypeInfo(names, structReturnType), + rexBuilder.getTypeFactory()); + SqlOperator structOp = SqlFunctionConverter.getCalciteOperator( + STRUCT_UDF, FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + paramsTypes.build(), retType); + values = rexBuilder.makeCall(structOp, constantFields); + } + newOperands.add(values); + newOperandsTypes.add(values.getType()); + } + + // 4. Create and return IN clause + RelDataType retType = TypeConverter.convert( + TypeInfoFactory.booleanTypeInfo, rexBuilder.getTypeFactory()); + SqlOperator inOp = SqlFunctionConverter.getCalciteOperator( + IN_UDF, FunctionRegistry.getFunctionInfo(IN_UDF).getGenericUDF(), + newOperandsTypes.build(), retType); + return rexBuilder.makeCall(inOp, newOperands); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index f26d1df..a5fb29a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -136,6 +136,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveExpandDistinctAggregatesRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterMergeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSetOpTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchange4JoinRule; @@ -144,6 +145,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinProjectTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinPushTransitivePredicatesRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinToMultiJoinRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveOrToInClauseRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePreFilteringRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectMergeRule; @@ -994,19 +996,25 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv new FilterAggregateTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, Aggregate.class)); - // 4. Transitive inference & Partition Pruning + // 4. Transform OR into IN clauses + basePlan = hepPlan(basePlan, true, mdProvider, + HiveFilterMergeRule.INSTANCE, + ReduceExpressionsRule.FILTER_INSTANCE, + HiveOrToInClauseRule.INSTANCE); + + // 5. Transitive inference & Partition Pruning basePlan = hepPlan(basePlan, false, mdProvider, new HiveJoinPushTransitivePredicatesRule( Join.class, HiveFilter.DEFAULT_FILTER_FACTORY), new HivePartitionPruneRule(conf)); - // 5. Projection Pruning + // 6. Projection Pruning HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, HiveProject.DEFAULT_PROJECT_FACTORY, HiveFilter.DEFAULT_FILTER_FACTORY, HiveJoin.HIVE_JOIN_FACTORY, HiveSemiJoin.HIVE_SEMIJOIN_FACTORY, HiveSort.HIVE_SORT_REL_FACTORY, HiveAggregate.HIVE_AGGR_REL_FACTORY, HiveUnion.UNION_REL_FACTORY); basePlan = fieldTrimmer.trim(basePlan); - // 6. Rerun PPD through Project as column pruning would have introduced DT + // 7. Rerun PPD through Project as column pruning would have introduced DT // above scans basePlan = hepPlan(basePlan, true, mdProvider, new FilterProjectTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, diff --git ql/src/test/queries/clientpositive/filter_cond_pushdown.q ql/src/test/queries/clientpositive/filter_cond_pushdown.q index 5e23b71..21aa6c5 100644 --- ql/src/test/queries/clientpositive/filter_cond_pushdown.q +++ ql/src/test/queries/clientpositive/filter_cond_pushdown.q @@ -17,3 +17,8 @@ JOIN ( JOIN (SELECT * FROM cbo_t3 t3 WHERE c_int=1) t3 ON t2.key=t3.c_int WHERE ((t2.key=t3.key) AND (t2.c_float + t3.c_float > 2)) OR ((t2.key=t3.key) AND (t2.c_int + t3.c_int > 2))) t4 ON t1.key=t4.key; + +EXPLAIN +SELECT f.key, g.value +FROM cbo_t1 f JOIN cbo_t2 m JOIN cbo_t3 g ON(f.key = m.key AND g.value = m.value) +WHERE (f.key = '1' AND f.value='2008-04-08') OR (f.key = '2' AND f.value='2008-04-09'); diff --git ql/src/test/results/clientpositive/filter_cond_pushdown.q.out ql/src/test/results/clientpositive/filter_cond_pushdown.q.out index e09057a..146dc54 100644 --- ql/src/test/results/clientpositive/filter_cond_pushdown.q.out +++ ql/src/test/results/clientpositive/filter_cond_pushdown.q.out @@ -21,17 +21,17 @@ STAGE PLANS: alias: f Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: (((value = '2008-04-08') or (value = '2008-04-09')) and key is not null) (type: boolean) - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + predicate: ((value) IN ('2008-04-08', '2008-04-09') and key is not null) (type: boolean) + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string), value (type: string) outputColumnNames: _col0, _col1 - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string) sort order: + Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: string) TableScan alias: f @@ -57,14 +57,14 @@ STAGE PLANS: 0 _col0 (type: string) 1 _col0 (type: string) outputColumnNames: _col0, _col1, _col3 - Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 137 Data size: 1460 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (((_col1 = '2008-04-08') and (_col3 = '2008-04-08')) or (_col1 = '2008-04-09')) (type: boolean) - Statistics: Num rows: 205 Data size: 2177 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 102 Data size: 1087 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col0 (type: string), _col3 (type: string) outputColumnNames: _col0, _col3 - Statistics: Num rows: 205 Data size: 2177 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 102 Data size: 1087 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: @@ -80,7 +80,7 @@ STAGE PLANS: key expressions: _col3 (type: string) sort order: + Map-reduce partition columns: _col3 (type: string) - Statistics: Num rows: 205 Data size: 2177 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 102 Data size: 1087 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: string) TableScan alias: f @@ -380,3 +380,121 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: EXPLAIN +SELECT f.key, g.value +FROM cbo_t1 f JOIN cbo_t2 m JOIN cbo_t3 g ON(f.key = m.key AND g.value = m.value) +WHERE (f.key = '1' AND f.value='2008-04-08') OR (f.key = '2' AND f.value='2008-04-09') +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT f.key, g.value +FROM cbo_t1 f JOIN cbo_t2 m JOIN cbo_t3 g ON(f.key = m.key AND g.value = m.value) +WHERE (f.key = '1' AND f.value='2008-04-08') OR (f.key = '2' AND f.value='2008-04-09') +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: m + Statistics: Num rows: 20 Data size: 262 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (((key) IN ('1', '2') and key is not null) and value is not null) (type: boolean) + Statistics: Num rows: 3 Data size: 39 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 3 Data size: 39 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 3 Data size: 39 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + TableScan + alias: f + Statistics: Num rows: 20 Data size: 262 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((((key) IN ('1', '2') and (value) IN ('2008-04-08', '2008-04-09')) and (struct(key,value)) IN (const struct('1','2008-04-08'), const struct('2','2008-04-09'))) and key is not null) (type: boolean) + Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col1, _col2 + Statistics: Num rows: 3 Data size: 42 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 3 Data size: 42 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: string) + TableScan + alias: g + Statistics: Num rows: 20 Data size: 262 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: value is not null (type: boolean) + Statistics: Num rows: 10 Data size: 131 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 10 Data size: 131 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 10 Data size: 131 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col2, _col4 + Statistics: Num rows: 11 Data size: 144 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col2 (type: string), _col4 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 11 Data size: 144 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 11 Data size: 144 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + diff --git ql/src/test/results/clientpositive/vectorized_case.q.out ql/src/test/results/clientpositive/vectorized_case.q.out index 9e47014..28a648e 100644 --- ql/src/test/results/clientpositive/vectorized_case.q.out +++ ql/src/test/results/clientpositive/vectorized_case.q.out @@ -59,6 +59,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized Stage: Stage-0 Fetch Operator