Index: src/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- src/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 21506) +++ src/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 21736) @@ -568,6 +568,7 @@ HIVEPPDRECOGNIZETRANSITIVITY("hive.ppd.recognizetransivity", true), // predicate pushdown HIVEPPDREMOVEDUPLICATEFILTERS("hive.ppd.remove.duplicatefilters", true), HIVEMETADATAONLYQUERIES("hive.optimize.metadataonly", true), + HIVEOPTBUCKETFILTER("hive.optimize.bucket.filter", false), // bucket pruner // push predicates down to storage handlers HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true), HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by Index: src/RELEASE_NOTES.txt =================================================================== --- src/RELEASE_NOTES.txt (revision 21506) +++ src/RELEASE_NOTES.txt (revision 21736) @@ -48,6 +48,7 @@ * [HIVE-5236] - Change HCatalog spacing from 4 spaces to 2 * [HIVE-5260] - Introduce HivePassThroughOutputFormat that allows Hive to use general purpose OutputFormats instead of HiveOutputFormats in StorageHandlers * [HIVE-5261] - Make the Hive HBase storage handler work from HCatalog, and use HiveStorageHandlers instead of HCatStorageHandlers + * [IHADOOP-6217] - Merge bucket filter to hive-0.12 and fix bugs -- the previous implementation(IHADOOP-316) didn't consider multiple conditions in the predicate. Add support for BETWEEN AND clause. Index: src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/OpProcFactory.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/OpProcFactory.java (revision 21506) +++ src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/OpProcFactory.java (revision 21736) @@ -27,6 +27,9 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * Operator factory for partition pruning processing of operator graph We find * all the filter operators that appear just beneath the table scan operators. @@ -44,6 +47,9 @@ */ public final class OpProcFactory extends PrunerOperatorFactory { + private static final Log LOG = LogFactory + .getLog("hive.ql.optimizer.ppr.OpProcFactory"); + /** * Determines the partition pruner for the filter. This is called only when * the filter follows a table scan operator. Index: src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 21506) +++ src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 21736) @@ -61,6 +61,9 @@ transformations.add(new ListBucketingPruner()); } } + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETFILTER)){ + transformations.add(new BucketPruner()); + } if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) { transformations.add(new ColumnPruner()); } Index: src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketPruner.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketPruner.java (revision 0) +++ src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketPruner.java (revision 21736) @@ -0,0 +1,528 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedBucketList; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.serde2.io.DateWritable; + +public class BucketPruner implements Transform { + + private static final Log LOG = LogFactory + .getLog("hive.ql.optimizer.BucketPruner"); + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "(TS%FIL%)|(TS%FIL%FIL%)"), + getBucketFilterProc(pctx)); + // The dispatcher fires the processor corresponding to the closest + // matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, + null); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + return pctx; + } + + private NodeProcessor getBucketFilterProc(ParseContext pctx) { + return new BucketFilterPPR(pctx); + } + + private NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + return null; + } + }; + } + + static class Hash { + public static int hashCode(ExprNodeConstantDesc node) { + Object val = node.getValue(); + LOG.info("hashCode for " + val + " " + + node.getTypeInfo().getCategory() + " " + + node.getTypeString()); + String type = node.getTypeString(); + if (type.equalsIgnoreCase("int")) { + return (Integer) val; + } + if (type.equalsIgnoreCase("string")) { + int r = 0; + String t = (String) val; + for (int i = 0; i < t.length(); i++) { + r = r * 31 + t.getBytes()[i]; + } + return r; + } + if (type.equalsIgnoreCase("byte")) { + return (Integer) val; + } + if (type.equalsIgnoreCase("short")) { + return (Integer) val; + } + if (type.equalsIgnoreCase("long")) { + long a = (Long) val; + return (int) ((a >>> 32) ^ a); + } + if (type.equalsIgnoreCase("double")) { + long a = (Long) val; + return (int) ((a >>> 32) ^ a); + } + if (type.equalsIgnoreCase("float")) { + return Float.floatToIntBits((Float) val); + } + //date type added in hive-0.12 + if(type.equalsIgnoreCase("date")){ + return ((DateWritable) val).hashCode(); + } + return -1; + } + } + class DefaultExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + if (nd instanceof ExprNodeConstantDesc) { + return ((ExprNodeConstantDesc) nd).clone(); + } else if (nd instanceof ExprNodeNullDesc) { + return ((ExprNodeNullDesc) nd).clone(); + } + assert (false); + return null; + } + } + + class BucketFilterPPR implements NodeProcessor { + ParseContext pGraphContext; + PrunedBucketList bucket; + + public BucketFilterPPR(ParseContext pctx) { + this.pGraphContext = pctx; + } + + public BucketFilterPPR() { + } + + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + + LOG.info("Processing for " + nd.getName() + "(" + + ((Operator) nd).getIdentifier() + ")"); + FilterOperator fop = (FilterOperator) nd; + FilterOperator fop2 = null; + + // The stack contains either ... TS, Filter or + // ... TS, Filter, Filter with the head of the stack being the + // rightmost + // symbol. So we just pop out the two elements from the top and if + // the + // second one of them is not a table scan then the operator on the + // top of + // the stack is the Table scan operator. + Node tmp = stack.pop(); + Node tmp2 = stack.pop(); + TableScanOperator top = null; + if (tmp2 instanceof TableScanOperator) { + top = (TableScanOperator) tmp2; + } else { + top = (TableScanOperator) stack.peek(); + fop2 = (FilterOperator) tmp2; + } + stack.push(tmp2); + stack.push(tmp); + + // If fop2 exists (i.e this is not the top level filter and fop2 is + // not + // a sampling filter then we ignore the current filter + if (fop2 != null && !fop2.getConf().getIsSamplingPred()) { + return null; + } + + // ignore the predicate in case it is a sampling predicate + if (fop.getConf().getIsSamplingPred()) { + return null; + } + + // Otherwise this is not a sampling predicate + ExprNodeDesc predicate = fop.getConf().getPredicate(); + String alias = top.getConf().getAlias(); + Map topToTable = this.pGraphContext + .getTopToTable(); + Table tbl = topToTable.get(top); + LOG.info("TableScanOperator=" + alias + " " + tbl.getAllCols()); + LOG.info("ExprNodeDesc=" + predicate); + if (tbl.getNumBuckets() < 0) { + return null; + } + + //now support the number of key column is one, also the type of the colunm is int or string. + List columns = tbl.getBucketCols(); + if (columns.size() > 1) { + return null; + } + + PrunedBucketList ret = new PrunedBucketList(columns, + tbl.getNumBuckets()); + PredicateTree pTree = new PredicateTree(predicate, ret); + pTree.getConfirmedBucketList(); + + if(ret.getconfirmedBuckets().size()==ret.getNumBucket()){ + ret = new PrunedBucketList(columns,tbl.getNumBuckets()); + } + this.pGraphContext.getOpToBucketPruner().put(top, ret); + LOG.info("TableScanOperator=" + top + " " + ret); + // Generate the partition pruning predicate + boolean hasNonPartCols = true; + this.pGraphContext.setHasNonBucketFilter(hasNonPartCols); + + return null; + } + } + + + // the predicate represented by a tree + class PredicateTree { + // the root node, i.e. the predicate + private final ExprNodeDesc root; + // the bucketed columns + private final List bucketCols; + // num of buckets of the table + private final int numBucket; + //the pruned bucket list to be updated + private final PrunedBucketList bucketList; + + public PredicateTree(ExprNodeDesc r, PrunedBucketList list) { + root = r; + bucketList=list; + bucketCols=bucketList.getColumns(); + numBucket=bucketList.getNumBucket(); + } + + private boolean isEqualOp(ExprNodeDesc node) { + if (isOp(node)) { + ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) node; + Class genericUdfClass = fd.getGenericUDF() + .getClass(); + return GenericUDFOPEqual.class == genericUdfClass; + } + return false; + } + + private boolean isInOp(ExprNodeDesc node){ + if (isOp(node)) { + ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) node; + Class genericUdfClass = fd.getGenericUDF() + .getClass(); + return GenericUDFIn.class == genericUdfClass; + } + return false; + } + + private boolean isBetweenOp(ExprNodeDesc node){ + if (isOp(node)) { + ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) node; + Class genericUdfClass = fd.getGenericUDF() + .getClass(); + return GenericUDFBetween.class == genericUdfClass; + } + return false; + } + + private boolean isOrOp(ExprNodeDesc node) { + if (isOp(node)) { + ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) node; + Class genericUdfClass = fd.getGenericUDF() + .getClass(); + return GenericUDFOPOr.class == genericUdfClass; + } + return false; + } + + private boolean isAndOp(ExprNodeDesc node) { + if (isOp(node)) { + ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) node; + Class genericUdfClass = fd.getGenericUDF() + .getClass(); + return GenericUDFOPAnd.class == genericUdfClass; + } + return false; + } + + private boolean isOp(ExprNodeDesc node) { + return node instanceof ExprNodeGenericFuncDesc; + } + + private boolean isColDes(ExprNodeDesc node) { + return node instanceof ExprNodeColumnDesc; + } + + private boolean isConstDesc(ExprNodeDesc node) { + return node instanceof ExprNodeConstantDesc; + } + + private boolean isKey(ExprNodeDesc node) { + if (isColDes(node)) { + return bucketCols.contains(((ExprNodeColumnDesc) node).getColumn()); + } + return false; + } + + private boolean isBooleanCons(ExprNodeConstantDesc node){ + return node.getTypeString().equalsIgnoreCase("boolean"); + } + + private boolean isIntCons(ExprNodeConstantDesc node){ + return node.getTypeString().equalsIgnoreCase("int"); + } + + // merge two lists of operators + private List mergeList(List list1, + List list2) { + if (list1 == null) { + return list2; + } + if (list2 == null) { + return list1; + } + list1.addAll(list2); + return list1; + } + + //compute an operator list for a given node + private List computeList(ExprNodeDesc node) { + if (node == null) { + return null; + } + // return the '=' node if the column involved is a bucketed column + if (isEqualOp(node)) { + List children = node.getChildren(); + if (children != null && children.size() == 2) { + for (ExprNodeDesc child : children) { + // column desc + if (isColDes(child) && isKey(child)){ + List list=new ArrayList(); + list.add(node); + return list; + } + } + } + } + // return the 'in' node if the column involved is a bucketed column + if (isInOp(node)) { + List children = node.getChildren(); + if (children != null && children.size() > 0) { + for (ExprNodeDesc child : children) { + // column desc + if (isColDes(child) && isKey(child)){ + List list=new ArrayList(); + list.add(node); + return list; + } + } + } + } + //return the 'between' node if the column involved is a bucketed column of type int and it's not a 'NOT BETWEEN' clause + if(isBetweenOp(node)){ + List children = node.getChildren(); + if (children != null && children.size() > 0) { + boolean checkCol=false; + boolean checkCls=false; + for(ExprNodeDesc child : children){ + // column desc + if (isColDes(child) + && isKey(child) + && ((ExprNodeColumnDesc) child).getTypeString() + .equalsIgnoreCase("int")) { + checkCol = true; + } + // boolean constant desc + if (isConstDesc(child) + && ((ExprNodeConstantDesc) child).getValue() != null + && isBooleanCons((ExprNodeConstantDesc) child) + && !(Boolean) ((ExprNodeConstantDesc) child).getValue()) { + checkCls = true; + } + } + if(checkCls && checkCol){ + List list=new ArrayList(); + list.add(node); + return list; + } + } + } + // recursively get the list for 'and' operator. result is the smaller list returned from the two children + if (isAndOp(node)) { + List children = node.getChildren(); + if (children != null && children.size() == 2) { + List list1 = computeList(children.get(0)); + List list2 = computeList(children.get(1)); + if(list1!=null && list2!=null){ + return list1.size()<=list2.size() ? list1 : list2; + } + else{ + return list1!=null ? list1 : list2; + } + } + } + // recursively get the list for 'or' operator. result is the merged lists from the two children if neither of them is null + if (isOrOp(node)) { + List children = node.getChildren(); + if (children != null && children.size() == 2) { + List list1 = computeList(children.get(0)); + List list2 = computeList(children.get(1)); + if (list1 == null || list2 == null) { + return null; + } + return mergeList(list1, list2); + } + } + // other operators that we don't support for now + return null; + } + + // compute the pruned bucket list from this predicate + public void getConfirmedBucketList() { + List opList = computeList(root); + if (opList != null) { + for (ExprNodeDesc node : opList) { + if(isEqualOp(node)){ + ProcessEqual(node); + } + if(isInOp(node)){ + ProcessIn(node); + } + if(isBetweenOp(node)){ + ProcessBetween(node); + } + } + } + } + + //add a bucket for this constant + private void AddBucket(ExprNodeConstantDesc node){ + int num=Hash.hashCode(node); + int bucket = (num & Integer.MAX_VALUE) % numBucket; + if (bucket != -1) { + bucketList.getconfirmedBuckets().add(bucket); + } + } + + private void ProcessEqual(ExprNodeDesc node) { + List children = node.getChildren(); + if (children != null && children.size() == 2) { + for (ExprNodeDesc child : children) { + // constant desc + if (isConstDesc(child) + && ((ExprNodeConstantDesc) child).getValue() != null) { + AddBucket((ExprNodeConstantDesc) child); + } + } + } + } + + private void ProcessIn(ExprNodeDesc node){ + List children = node.getChildren(); + if (children != null && children.size() > 0) { + for (ExprNodeDesc child : children) { + //constant desc + if(isConstDesc(child) && ((ExprNodeConstantDesc) child).getValue() != null){ + AddBucket((ExprNodeConstantDesc) child); + } + } + } + } + + private void ProcessBetween(ExprNodeDesc node){ + List children = node.getChildren(); + if (children != null && children.size() > 0) { + ExprNodeConstantDesc begin=null; + ExprNodeConstantDesc end=null; + for (ExprNodeDesc child : children) { + //constant desc of int + if(isConstDesc(child) && isIntCons((ExprNodeConstantDesc) child) && ((ExprNodeConstantDesc) child).getValue() != null){ + if(begin==null){ + begin=(ExprNodeConstantDesc) child; + } + else{ + end=(ExprNodeConstantDesc) child; + } + } + } + if(begin!=null && end!=null && ((Integer) begin.getValue())<=((Integer) end.getValue())){ + AddBucket(begin); + AddBucket(end); + int b=((Integer) begin.getValue())+1; + while(b<(Integer) end.getValue()){ + AddBucket(new ExprNodeConstantDesc(b)); + b++; + } + } + } + } + } +} Index: src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 21506) +++ src/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 21736) @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedBucketList; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -443,6 +444,13 @@ ParseContext parseCtx = opProcCtx.getParseCtx(); Set inputs = opProcCtx.getInputs(); + LOG.info("parseCtx.getOpToBucketPruner()::" + parseCtx.getOpToBucketPruner()); + PrunedBucketList bucketList=parseCtx.getOpToBucketPruner().get(topOp); + if(bucketList!=null){ + LOG.info("PrunedBucketList::" + bucketList); + plan.getAliasToBucketInfo().put(alias_id, bucketList); + } + ArrayList partDir = new ArrayList(); ArrayList partDesc = new ArrayList(); Index: src/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (revision 21506) +++ src/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (revision 21736) @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.hive.ql.parse.PrunedBucketList; /** * MapWork represents all the information used to run a map task on the cluster. @@ -71,6 +72,8 @@ private LinkedHashMap> aliasToWork = new LinkedHashMap>(); private LinkedHashMap aliasToPartnInfo = new LinkedHashMap(); + + private LinkedHashMap aliasToBucketInfo= new LinkedHashMap(); private HashMap nameToSplitSample = new LinkedHashMap(); @@ -182,6 +185,10 @@ public LinkedHashMap getAliasToPartnInfo() { return aliasToPartnInfo; } + + public LinkedHashMap getAliasToBucketInfo(){ + return aliasToBucketInfo; + } /** * @param aliasToPartnInfo Index: src/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (revision 21506) +++ src/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (revision 21736) @@ -458,7 +458,7 @@ pCtx.getNameToSplitSample(), pCtx.getSemanticInputs(), rootTasks, pCtx.getOpToPartToSkewedPruner(), pCtx.getViewAliasToInput(), pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(), - pCtx.getQueryProperties()); + pCtx.getQueryProperties(),pCtx.getOpToBucketPruner()); } // loop over all the tasks recursively Index: src/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 21506) +++ src/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 21736) @@ -180,6 +180,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private HashMap opToPartPruner; private HashMap opToPartList; + private HashMap opToBucketPruner; private HashMap> topOps; private HashMap> topSelOps; private LinkedHashMap, OpParseContext> opParseCtx; @@ -242,6 +243,7 @@ super(conf); opToPartPruner = new HashMap(); opToPartList = new HashMap(); + opToBucketPruner= new HashMap(); opToSamplePruner = new HashMap(); nameToSplitSample = new HashMap(); topOps = new HashMap>(); @@ -291,6 +293,7 @@ public void initParseCtx(ParseContext pctx) { opToPartPruner = pctx.getOpToPartPruner(); opToPartList = pctx.getOpToPartList(); + opToBucketPruner=pctx.getOpToBucketPruner(); opToSamplePruner = pctx.getOpToSamplePruner(); topOps = pctx.getTopOps(); topSelOps = pctx.getTopSelOps(); @@ -320,7 +323,7 @@ opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - queryProperties); + queryProperties,opToBucketPruner); } @SuppressWarnings("nls") @@ -8374,7 +8377,7 @@ listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, - reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties); + reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties,opToBucketPruner); // Generate table access stats if required if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) { Index: src/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 21506) +++ src/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 21736) @@ -69,6 +69,7 @@ private QB qb; private ASTNode ast; private HashMap opToPartPruner; + private HashMap opToBucketPruner; private HashMap opToPartList; private HashMap opToSamplePruner; private Map> opToPartToSkewedPruner; @@ -113,6 +114,8 @@ private TableDesc fetchTabledesc; private Operator fetchSource; private ListSinkOperator fetchSink; + + private boolean hasNonBucketFilter; public ParseContext() { } @@ -182,7 +185,7 @@ Map> opToPartToSkewedPruner, Map viewAliasToInput, List reduceSinkOperatorsAddedByEnforceBucketingSorting, - QueryProperties queryProperties) { + QueryProperties queryProperties, HashMap opToBucketPruner) { this.conf = conf; this.qb = qb; this.ast = ast; @@ -215,6 +218,7 @@ this.reduceSinkOperatorsAddedByEnforceBucketingSorting = reduceSinkOperatorsAddedByEnforceBucketingSorting; this.queryProperties = queryProperties; + this.opToBucketPruner=opToBucketPruner; } /** @@ -677,4 +681,25 @@ public void setFetchSink(ListSinkOperator fetchSink) { this.fetchSink = fetchSink; } + + public HashMap getOpToBucketPruner(){ + return opToBucketPruner; + } + + /** + * @param opToPartPruner + * the opToPartPruner to set + */ + public void setOpToBucketPruner( + HashMap opToBucketPruner) { + this.opToBucketPruner = opToBucketPruner; + } + + public void setHasNonBucketFilter(boolean val){ + hasNonBucketFilter = val; + } + + public boolean getHasNonBucketFilter(){ + return hasNonBucketFilter; + } } Index: src/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedBucketList.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedBucketList.java (revision 0) +++ src/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedBucketList.java (revision 21736) @@ -0,0 +1,60 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + + +public class PrunedBucketList { + + private Set confirmedBuckets; + private List cols; + private int numBucket; + + public PrunedBucketList(Set confirmedBuckets){ + this.confirmedBuckets=confirmedBuckets; + + } + + public PrunedBucketList(List cols ,int numBucket) { + // TODO Auto-generated constructor stub + this.confirmedBuckets=new LinkedHashSet(); + this.cols=cols; + this.numBucket=numBucket; + } + + public Set getconfirmedBuckets() { + return confirmedBuckets; + } + + public List getColumns() { + return this.cols; + } + + public int getNumBucket() { + return this.numBucket; + } + + public String toString(){ + return "size="+this.confirmedBuckets.size()+",content="+this.confirmedBuckets; + } +} Index: src/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 21506) +++ src/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 21736) @@ -166,6 +166,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.hive.ql.parse.PrunedBucketList; /** * Utilities. @@ -2765,10 +2766,33 @@ LOG.info("Adding input file " + path); if (isEmptyPath(job, path, ctx)) { path = createDummyFileForEmptyPartition(path, job, work, - hiveScratchDir, alias, sequenceNumber++); - + hiveScratchDir, alias, sequenceNumber++); + pathsToAdd.add(path); + } else { + if (work.getAliasToBucketInfo().containsKey(alias)) { + Set list = work.getAliasToBucketInfo().get(alias) + .getconfirmedBuckets(); + if (list != null && !list.isEmpty()) { + LOG.info("PrunedBucketList:" + list); + FileSystem fs = FileSystem.get(path.toUri(), ctx.getConf()); + FileStatus[] files = fs.listStatus(path); + if (files != null) { + Iterator its = list.iterator(); + while (its.hasNext()) { + int bucketNum = (Integer) its.next(); + String oneFile = path.toString() + "/" + + files[bucketNum].getPath().getName(); + pathsToAdd.add(new Path(oneFile)); + LOG.info("Adding input file " + oneFile); + } + } + } else { + pathsToAdd.add(path); + } + } else { + pathsToAdd.add(path); + } } - pathsToAdd.add(path); } } Index: src/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java =================================================================== --- src/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 21506) +++ src/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 21736) @@ -329,10 +329,14 @@ protected static PartitionDesc getPartitionDescFromPath( Map pathToPartitionInfo, Path dir) throws IOException { + LOG.debug("pathToPartitionInfo = " + pathToPartitionInfo +" "+dir.toString()); PartitionDesc partDesc = pathToPartitionInfo.get(dir.toString()); if (partDesc == null) { partDesc = pathToPartitionInfo.get(dir.toUri().getPath()); } + if(partDesc==null){ + partDesc = pathToPartitionInfo.get(dir.getParent().toString()); + } if (partDesc == null) { throw new IOException("cannot find dir = " + dir.toString() + " in partToPartitionInfo!");