diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 36bb394..29d7ebb 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1188,6 +1188,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Whether to transitively replicate predicate filters over equijoin conditions."), HIVEPPDREMOVEDUPLICATEFILTERS("hive.ppd.remove.duplicatefilters", true, "Whether to push predicates down into storage handlers. Ignored when hive.optimize.ppd is false."), + HIVEPOINTLOOKUPOPTIMIZER("hive.optimize.point.lookup", true, + "Whether to transform OR clauses in Filter operators into IN clauses"), // Constant propagation optimizer HIVEOPTCONSTANTPROPAGATION("hive.optimize.constant.propagation", true, "Whether to enable constant propagation optimizer"), HIVEIDENTITYPROJECTREMOVER("hive.optimize.remove.identity.project", true, "Removes identity project from operator tree"), diff --git ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderOnceWalker.java ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderOnceWalker.java new file mode 100644 index 0000000..d891fc2 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderOnceWalker.java @@ -0,0 +1,44 @@ +package org.apache.hadoop.hive.ql.lib; + +import org.apache.hadoop.hive.ql.parse.SemanticException; + +/** + * Graph walker this class takes list of starting nodes and walks them in pre-order. + * If a rule fires up against a given node, we do not try to apply the rule + * on its children. + */ +public class PreOrderOnceWalker extends PreOrderWalker { + + public PreOrderOnceWalker(Dispatcher disp) { + super(disp); + } + + /** + * Walk the current operator and its descendants. + * + * @param nd + * current operator in the graph + * @throws SemanticException + */ + @Override + public void walk(Node nd) throws SemanticException { + opStack.push(nd); + dispatch(nd, opStack); + + // The rule has been applied, we bail out + if (retMap.get(nd) != null) { + opStack.pop(); + return; + } + + // move all the children to the front of queue + if (nd.getChildren() != null) { + for (Node n : nd.getChildren()) { + walk(n); + } + } + + opStack.pop(); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index c4e11b9..266b53e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -66,6 +66,11 @@ public void initialize(HiveConf hiveConf) { // we are translating Calcite operators into Hive operators. transformations.add(new HiveOpConverterPostProc()); + // Try to transform OR predicates in Filter into IN clauses. + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZER)) { + transformations.add(new PointLookupOptimizer()); + } + // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/PointLookupOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/PointLookupOptimizer.java new file mode 100644 index 0000000..0c4112c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/PointLookupOptimizer.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.calcite.util.Pair; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.ForwardWalker; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TypeRule; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; + +/** + * This optimization will take a Filter expression, and if its predicate contains + * an OR operator whose children are constant equality expressions, it will try + * to generate an IN clause (which is more efficient). If the OR operator contains + * AND operator children, the optimization might generate an IN clause that uses + * structs. + */ +public class PointLookupOptimizer implements Transform { + + private static final Log LOG = LogFactory.getLog(PointLookupOptimizer.class); + private static final String IN_UDF = + GenericUDFIn.class.getAnnotation(Description.class).name(); + private static final String STRUCT_UDF = + GenericUDFStruct.class.getAnnotation(Description.class).name(); + + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + // 1. Trigger transformation + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", FilterOperator.getOperatorName() + "%"), new FilterTransformer()); + + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); + GraphWalker ogw = new ForwardWalker(disp); + + List topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + return pctx; + } + + private class FilterTransformer implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + FilterOperator filterOp = (FilterOperator) nd; + ExprNodeDesc predicate = filterOp.getConf().getPredicate(); + + // Generate the list bucketing pruning predicate + ExprNodeDesc newPredicate = generateInClause(predicate); + if (newPredicate != null) { + // Replace filter in current FIL with new FIL + if (LOG.isDebugEnabled()) { + LOG.debug("Generated new predicate with IN clause: " + newPredicate); + } + filterOp.getConf().setPredicate(newPredicate); + } + + return null; + } + + private ExprNodeDesc generateInClause(ExprNodeDesc predicate) throws SemanticException { + Map exprRules = new LinkedHashMap(); + exprRules.put(new TypeRule(ExprNodeGenericFuncDesc.class), new OrExprProcessor()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(null, exprRules, null); + GraphWalker egw = new PreOrderOnceWalker(disp); + + List startNodes = new ArrayList(); + startNodes.add(predicate); + + HashMap outputMap = new HashMap(); + egw.startWalking(startNodes, outputMap); + return (ExprNodeDesc) outputMap.get(predicate); + } + } + + private class OrExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) nd; + + // 1. If it is not an OR operator, we bail out. + if (!FunctionRegistry.isOpOr(fd)) { + return null; + } + + // 2. It is an OR operator + List children = fd.getChildren(); +// Map columnToColumnDesc = +// new HashMap(); + ListMultimap> columnConstantsMap = + ArrayListMultimap.create(); + boolean modeAnd = false; + for (int i = 0; i < children.size(); i++) { + ExprNodeDesc child = children.get(i); + + // - If the child is an AND operator, extract its children + // - Otherwise, take the child itself + final List conjunctions; + if (FunctionRegistry.isOpAnd(child)) { + // If it is the first child, we set the mode variable value + // Otherwise, if the mode we are working on is different, we + // bail out + if (i == 0) { + modeAnd = true; + } else { + if (!modeAnd) { + return null; + } + } + + // Multiple children + conjunctions = child.getChildren(); + } else { + // If it is the first child, we set the mode variable value + // Otherwise, if the mode we are working on is different, we + // bail out + if (i == 0) { + modeAnd = false; + } else { + if (modeAnd) { + return null; + } + } + + // One child + conjunctions = new ArrayList(1); + conjunctions.add(child); + } + + // 3. We will extract the literals to introduce in the IN clause. + // If the patterns OR-AND-EqOp or OR-EqOp are not matched, we bail out + for (ExprNodeDesc conjunction: conjunctions) { + if (!(conjunction instanceof ExprNodeGenericFuncDesc)) { + return null; + } + + ExprNodeGenericFuncDesc conjCall = (ExprNodeGenericFuncDesc) conjunction; + Class genericUdfClass = conjCall.getGenericUDF().getClass(); + if(GenericUDFOPEqual.class == genericUdfClass) { + if (conjCall.getChildren().get(0) instanceof ExprNodeColumnDesc && + conjCall.getChildren().get(1) instanceof ExprNodeConstantDesc) { + ExprNodeColumnDesc ref = (ExprNodeColumnDesc) conjCall.getChildren().get(0); + String refString = ref.toString(); + columnConstantsMap.put(refString, + new Pair( + ref, (ExprNodeConstantDesc) conjCall.getChildren().get(1))); + if (columnConstantsMap.get(refString).size() != i+1) { + // If we have not added to this column desc before, we bail out + return null; + } + } else if (conjCall.getChildren().get(1) instanceof ExprNodeColumnDesc && + conjCall.getChildren().get(0) instanceof ExprNodeConstantDesc) { + ExprNodeColumnDesc ref = (ExprNodeColumnDesc) conjCall.getChildren().get(1); + String refString = ref.toString(); + columnConstantsMap.put(refString, + new Pair( + ref, (ExprNodeConstantDesc) conjCall.getChildren().get(0))); + if (columnConstantsMap.get(refString).size() != i+1) { + // If we have not added to this column desc before, we bail out + return null; + } + } else { + // We bail out + return null; + } + } else { + // We bail out + return null; + } + } + } + + // 4. We build the new predicate and return it + ExprNodeDesc newPredicate = null; + List newChildren = new ArrayList(children.size()); + // 4.1 Create structs + List columns = new ArrayList(); + List names = new ArrayList(); + List typeInfos = new ArrayList(); + for (int i = 0; i < children.size(); i++) { + List constantFields = new ArrayList(children.size()); + + for (String keyString : columnConstantsMap.keySet()) { + Pair columnConstant = + columnConstantsMap.get(keyString).get(i); + if (i == 0) { + columns.add(columnConstant.left); + names.add(columnConstant.left.getColumn()); + typeInfos.add(columnConstant.left.getTypeInfo()); + } + constantFields.add(columnConstant.right); + } + + if (i == 0) { + ExprNodeDesc columnsStruct = new ExprNodeGenericFuncDesc( + TypeInfoFactory.getStructTypeInfo(names, typeInfos), + FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + columns); + newChildren.add(columnsStruct); + } + ExprNodeDesc valuesStruct = new ExprNodeGenericFuncDesc( + TypeInfoFactory.getStructTypeInfo(names, typeInfos), + FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + constantFields); + newChildren.add(valuesStruct); + } + newPredicate = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getFunctionInfo(IN_UDF).getGenericUDF(), newChildren); + + return newPredicate; + } + + } + +} diff --git ql/src/test/results/clientpositive/flatten_and_or.q.out ql/src/test/results/clientpositive/flatten_and_or.q.out index 9c51ff3..d63f5ed 100644 --- ql/src/test/results/clientpositive/flatten_and_or.q.out +++ ql/src/test/results/clientpositive/flatten_and_or.q.out @@ -44,15 +44,15 @@ STAGE PLANS: alias: src Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Filter Operator - predicate: (((key = '0') and (value = '8')) or ((key = '1') and (value = '5')) or ((key = '2') and (value = '6')) or ((key = '3') and (value = '8')) or ((key = '4') and (value = '1')) or ((key = '5') and (value = '6')) or ((key = '6') and (value = '1')) or ((key = '7') and (value = '1')) or ((key = '8') and (value = '1')) or ((key = '9') and (value = '1')) or ((key = '10') and (value = '3'))) (type: boolean) - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + predicate: (struct(key,value)) IN (struct('0','8'), struct('1','5'), struct('2','6'), struct('3','8'), struct('4','1'), struct('5','6'), struct('6','1'), struct('7','1'), struct('8','1'), struct('9','1'), struct('10','3')) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string) outputColumnNames: _col0 - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat