diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 14f362f..a80afa7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -88,6 +88,7 @@ public void initialize(HiveConf hiveConf) { } if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) { + transformations.add(new PartitionColumnsSeparator()); transformations.add(new PartitionPruner()); transformations.add(new PartitionConditionRemover()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PartitionColumnsSeparator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PartitionColumnsSeparator.java new file mode 100644 index 0000000..231f320 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PartitionColumnsSeparator.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.ForwardWalker; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TypeRule; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +/** + * This optimization will take a Filter expression, and if its predicate contains + * an OR operator whose children are constant equality expressions, it will try + * to generate an IN clause (which is more efficient). If the OR operator contains + * AND operator children, the optimization might generate an IN clause that uses + * structs. + */ +public class PartitionColumnsSeparator implements Transform { + + private static final Log LOG = LogFactory.getLog(PointLookupOptimizer.class); + private static final String IN_UDF = + GenericUDFIn.class.getAnnotation(Description.class).name(); + private static final String STRUCT_UDF = + GenericUDFStruct.class.getAnnotation(Description.class).name(); + private static final String AND_UDF = + GenericUDFOPAnd.class.getAnnotation(Description.class).name(); + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + // 1. Trigger transformation + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", FilterOperator.getOperatorName() + "%"), new StructInTransformer()); + + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); + GraphWalker ogw = new ForwardWalker(disp); + + List topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + return pctx; + } + + private class StructInTransformer implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + FilterOperator filterOp = (FilterOperator) nd; + ExprNodeDesc predicate = filterOp.getConf().getPredicate(); + + // Generate the list bucketing pruning predicate + ExprNodeDesc newPredicate = generateInClause(predicate); + if (newPredicate != null) { + // Replace filter in current FIL with new FIL + if (LOG.isDebugEnabled()) { + LOG.debug("Generated new predicate with IN clause: " + newPredicate); + } + filterOp.getConf().setOrigPredicate(predicate); + filterOp.getConf().setPredicate(newPredicate); + } + + return null; + } + + private ExprNodeDesc generateInClause(ExprNodeDesc predicate) throws SemanticException { + Map exprRules = new LinkedHashMap(); + exprRules.put(new TypeRule(ExprNodeGenericFuncDesc.class), new StructInExprProcessor()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(null, exprRules, null); + GraphWalker egw = new PreOrderOnceWalker(disp); + + List startNodes = new ArrayList(); + startNodes.add(predicate); + + HashMap outputMap = new HashMap(); + egw.startWalking(startNodes, outputMap); + return (ExprNodeDesc) outputMap.get(predicate); + } + } + + private class StructInExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) nd; + + // 1. If it is not an IN operator, we bail out. + if (!(fd.getGenericUDF() instanceof GenericUDFIn)) { + return null; + } + + // 2. It is an IN operator with struct children + List children = fd.getChildren(); + if (children.size() < 2 || !(children.get(0) instanceof ExprNodeGenericFuncDesc) || + (!(((ExprNodeGenericFuncDesc) children.get(0)).getGenericUDF() instanceof GenericUDFStruct))) { + return null; + } + + // 3. See if there are partition columns in the struct, if not bail out. + boolean isValidOptimization = false; + for (ExprNodeDesc ed : ((ExprNodeGenericFuncDesc) children.get(0)).getChildren()) { + // Check if the current column is a partition column or a virtual column. + // If yes, this filter predicate is a candidate for this optimization. + if (ed instanceof ExprNodeColumnDesc && + ((ExprNodeColumnDesc) ed).getIsPartitionColOrVirtualCol()) { + isValidOptimization = true; + } + // Sanity check. If the struct field is not of column type, exit. + if (!(ed instanceof ExprNodeColumnDesc)) { + isValidOptimization = false; + break; + } + } + if (!isValidOptimization) { + return null; + } + + // Create non-partition column structs + List nonPartcolumns = new ArrayList(); + List nonPartnames = new ArrayList(); + List nonParttypeInfos = new ArrayList(); + + //Create partition column structs + List partColumns = new ArrayList(); + List partNames = new ArrayList(); + List parttypeInfos = new ArrayList(); + + ExprNodeGenericFuncDesc originalStructDesc = ((ExprNodeGenericFuncDesc) children.get(0)); + List originalDescChildren = originalStructDesc.getChildren(); + + // Set the first row of the 2 IN clauses + for (ExprNodeDesc en : originalDescChildren) { + if (!((ExprNodeColumnDesc)en).getIsPartitionColOrVirtualCol()) { + nonPartcolumns.add(en); + nonPartnames.add(en.getName()); + nonParttypeInfos.add(en.getTypeInfo()); + } else { + partColumns.add(en); + partNames.add(en.getName()); + parttypeInfos.add(en.getTypeInfo()); + } + } + List nonPartStructIn = new ArrayList(); + nonPartStructIn.add(new ExprNodeGenericFuncDesc( + TypeInfoFactory.getStructTypeInfo(nonPartnames, nonParttypeInfos), + FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + nonPartcolumns)); + List partStructIn = new ArrayList(); + partStructIn.add( new ExprNodeGenericFuncDesc( + TypeInfoFactory.getStructTypeInfo(partNames, parttypeInfos), + FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + partColumns)); + + // For each constant value, add that to the new row. + for (int i = 1; i < children.size(); i++) { + // 4. Sanity check. If the constant values do not match the metadata of the struct, return null. + if (!(((ExprNodeConstantDesc) (children.get(i))).getValue() instanceof List) || + ((List)((ExprNodeConstantDesc) (children.get(i))). + getValue()).size() != originalDescChildren.size()) { + return null; + } + List cnCols = (List)(((ExprNodeConstantDesc) (children.get(i))).getValue()); + List constPartColumns = new ArrayList(); + List constNonPartColumns = new ArrayList(); + for (int j = 0; j < originalDescChildren.size(); j++) { + if (((ExprNodeColumnDesc)(originalDescChildren.get(j))).getIsPartitionColOrVirtualCol()) { + constPartColumns.add(new ExprNodeConstantDesc(cnCols.get(j))); + } else { + constNonPartColumns.add(new ExprNodeConstantDesc(cnCols.get(j))); + } + } + // Add each row to StructIn partition Columns and non-partition columns. + nonPartStructIn.add(new ExprNodeGenericFuncDesc( + TypeInfoFactory.getStructTypeInfo(nonPartnames, nonParttypeInfos), + FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + constNonPartColumns)); + partStructIn.add( new ExprNodeGenericFuncDesc( + TypeInfoFactory.getStructTypeInfo(partNames, parttypeInfos), + FunctionRegistry.getFunctionInfo(STRUCT_UDF).getGenericUDF(), + constPartColumns)); + } + + // Connect the 2 in branches with an AND and return the new expression + final List subExpr = new ArrayList(originalDescChildren.size()+1); + subExpr.add(new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, FunctionRegistry + .getFunctionInfo(IN_UDF).getGenericUDF(), nonPartStructIn)); + subExpr.add(new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, FunctionRegistry + .getFunctionInfo(IN_UDF).getGenericUDF(), partStructIn)); + return new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, FunctionRegistry + .getFunctionInfo(AND_UDF).getGenericUDF(), subExpr); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/OpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/OpProcFactory.java index 7262164..4c39452 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/OpProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/OpProcFactory.java @@ -55,7 +55,7 @@ protected void generatePredicate(NodeProcessorCtx procCtx, FilterOperator fop, TableScanOperator top) throws SemanticException, UDFArgumentException { OpWalkerCtx owc = (OpWalkerCtx) procCtx; // Otherwise this is not a sampling predicate and we need to - ExprNodeDesc predicate = fop.getConf().getOrigPredicate(); + ExprNodeDesc predicate = fop.getConf().getPredicate() != null ? fop.getConf().getPredicate() : fop.getConf().getOrigPredicate(); predicate = predicate == null ? fop.getConf().getPredicate() : predicate; String alias = top.getConf().getAlias();