diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/InClauseGeneratorOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/InClauseGeneratorOptimizer.java new file mode 100644 index 0000000..6c362c6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/InClauseGeneratorOptimizer.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.ForwardWalker; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TypeRule; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; + +public class InClauseGeneratorOptimizer implements Transform { + + private static final Log LOG = LogFactory.getLog(InClauseGeneratorOptimizer.class); + private static final String IN_UDF = + GenericUDFIn.class.getAnnotation(Description.class).name(); + private static final String STRUCT_UDF = + GenericUDFStruct.class.getAnnotation(Description.class).name(); + + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + // 1. Trigger transformation + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", FilterOperator.getOperatorName() + "%"), new FilterTransformer()); + + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); + GraphWalker ogw = new ForwardWalker(disp); + + List topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + return pctx; + } + + private class FilterTransformer implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + FilterOperator filterOp = (FilterOperator) nd; + ExprNodeDesc predicate = filterOp.getConf().getPredicate(); + + // Generate the list bucketing pruning predicate + ExprNodeDesc newPredicate = generateInClause(predicate); + if (newPredicate != null) { + // Replace filter in current FIL with new FIL + if (LOG.isDebugEnabled()) { + LOG.debug("Predicate " + predicate + " transformed into " + newPredicate); + } + filterOp.getConf().setPredicate(newPredicate); + } + + return null; + } + + private ExprNodeDesc generateInClause(ExprNodeDesc predicate) throws SemanticException { + Map exprRules = new LinkedHashMap(); + exprRules.put(new TypeRule(ExprNodeGenericFuncDesc.class), new OrExprProcessor()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(null, exprRules, null); + GraphWalker egw = new DefaultGraphWalker(disp); + + List startNodes = new ArrayList(); + startNodes.add(predicate); + + HashMap outputMap = new HashMap(); + egw.startWalking(startNodes, outputMap); + return (ExprNodeDesc) outputMap.get(predicate); + } + } + + private class OrExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) nd; + + // 1. If it is not an OR operator, we bail out. + if (!FunctionRegistry.isOpOr(fd)) { + return null; + } + + // 2. It is an OR operator + List children = fd.getChildren(); + Map columnToColumnDesc = + new HashMap(); + ListMultimap columnToConstantDesc = + ArrayListMultimap.create(); + boolean modeAnd = false; + for (int i = 0; i < children.size(); i++) { + ExprNodeDesc child = children.get(i); + + // - If the child is an AND operator, extract its children + // - Otherwise, take the child itself + final List conjunctions; + if (FunctionRegistry.isOpAnd(child)) { + // If it is the first child, we set the mode variable value + // Otherwise, if the mode we are working on is different, we + // bail out + if (i == 0) { + modeAnd = true; + } else { + if (!modeAnd) { + return null; + } + } + + // Multiple children + conjunctions = child.getChildren(); + } else { + // If it is the first child, we set the mode variable value + // Otherwise, if the mode we are working on is different, we + // bail out + if (i == 0) { + modeAnd = false; + } else { + if (modeAnd) { + return null; + } + } + + // One child + conjunctions = new ArrayList(1); + conjunctions.add(child); + } + + // 3. We will introduce extract the literals to introduce in the IN clause. + // If the patterns OR-AND-EqOp or OR-EqOp are not matched, we bail out + for (ExprNodeDesc conjunction: conjunctions) { + if (!(conjunction instanceof ExprNodeGenericFuncDesc)) { + return null; + } + + ExprNodeGenericFuncDesc conjCall = (ExprNodeGenericFuncDesc) conjunction; + Class genericUdfClass = conjCall.getGenericUDF().getClass(); + if(GenericUDFOPEqual.class == genericUdfClass) { + if (conjCall.getChildren().get(0) instanceof ExprNodeColumnDesc && + conjCall.getChildren().get(1) instanceof ExprNodeConstantDesc) { + ExprNodeColumnDesc ref = (ExprNodeColumnDesc) conjCall.getChildren().get(0); + String refString = ref.toString(); + columnToColumnDesc.put(refString, ref); + columnToConstantDesc.put(refString, (ExprNodeConstantDesc) conjCall.getChildren().get(1)); + if (columnToConstantDesc.get(refString).size() != i+1) { + // If we have not added to this column desc before, we bail out + return null; + } + } else if (conjCall.getChildren().get(1) instanceof ExprNodeColumnDesc && + conjCall.getChildren().get(0) instanceof ExprNodeConstantDesc) { + ExprNodeColumnDesc ref = (ExprNodeColumnDesc) conjCall.getChildren().get(1); + String refString = ref.toString(); + columnToColumnDesc.put(refString, ref); + columnToConstantDesc.put(refString, (ExprNodeConstantDesc) conjCall.getChildren().get(0)); + if (columnToConstantDesc.get(refString).size() != i+1) { + // If we have not added to this column desc before, we bail out + return null; + } + } else { + // We bail out + return null; + } + } else { + // We bail out + return null; + } + } + } + + // 4. We build the new predicate and return it + ExprNodeDesc newPredicate = null; + List newChildren = new ArrayList(children.size()); + // 4.1 Create structs + List columns = new ArrayList(); + List names = new ArrayList(); + List typeInfos = new ArrayList(); + for (int i = 0; i < children.size(); i++) { + List constantFields = new ArrayList(children.size()); + + for (String keyString : columnToConstantDesc.keySet()) { + ExprNodeColumnDesc key = columnToColumnDesc.get(keyString); + if (i == 0) { + columns.add(key); + names.add(key.getColumn()); + typeInfos.add(key.getTypeInfo()); + } + constantFields.add(columnToConstantDesc.get(keyString).get(i)); + } + + if (i == 0) { + ExprNodeDesc columnsStruct = new ExprNodeGenericFuncDesc( + TypeInfoFactory.getStructTypeInfo(names, typeInfos), + FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + columns); + newChildren.add(columnsStruct); + } + ExprNodeDesc valuesStruct = new ExprNodeGenericFuncDesc( + TypeInfoFactory.getStructTypeInfo(names, typeInfos), + FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + constantFields); + newChildren.add(valuesStruct); + } + newPredicate = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getFunctionInfo(IN_UDF).getGenericUDF(), newChildren); + + return newPredicate; + } + + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index c4e11b9..2bc3670 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -66,6 +66,8 @@ public void initialize(HiveConf hiveConf) { // we are translating Calcite operators into Hive operators. transformations.add(new HiveOpConverterPostProc()); + transformations.add(new InClauseGeneratorOptimizer()); + // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) { diff --git ql/src/test/results/clientpositive/flatten_and_or.q.out ql/src/test/results/clientpositive/flatten_and_or.q.out index 9c51ff3..d63f5ed 100644 --- ql/src/test/results/clientpositive/flatten_and_or.q.out +++ ql/src/test/results/clientpositive/flatten_and_or.q.out @@ -44,15 +44,15 @@ STAGE PLANS: alias: src Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: (((key = '0') and (value = '8')) or ((key = '1') and (value = '5')) or ((key = '2') and (value = '6')) or ((key = '3') and (value = '8')) or ((key = '4') and (value = '1')) or ((key = '5') and (value = '6')) or ((key = '6') and (value = '1')) or ((key = '7') and (value = '1')) or ((key = '8') and (value = '1')) or ((key = '9') and (value = '1')) or ((key = '10') and (value = '3'))) (type: boolean) - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + predicate: (struct(key,value)) IN (struct('0','8'), struct('1','5'), struct('2','6'), struct('3','8'), struct('4','1'), struct('5','6'), struct('6','1'), struct('7','1'), struct('8','1'), struct('9','1'), struct('10','3')) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string) outputColumnNames: _col0 - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat