Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1512555) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -555,6 +555,9 @@ HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown HIVEPPDRECOGNIZETRANSITIVITY("hive.ppd.recognizetransivity", true), // predicate pushdown HIVEPPDREMOVEDUPLICATEFILTERS("hive.ppd.remove.duplicatefilters", true), + HIVEPPDFILES("hive.optimize.ppd.vc.filename", false), + // this is only for confirming test + HIVEPPDFILESREMOVEFILTER("hive.optimize.ppd.vc.filename.remove.filter", false), HIVEMETADATAONLYQUERIES("hive.optimize.metadataonly", true), // push predicates down to storage handlers HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true), Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -1088,6 +1088,13 @@ } /** + * A shortcut to get the "or" GenericUDF. + */ + public static GenericUDF getGenericUDFForOr() { + return FunctionRegistry.getFunctionInfo("or").getGenericUDF(); + } + + /** * Create a copy of an existing GenericUDF. */ public static GenericUDF cloneGenericUDF(GenericUDF genericUDF) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -2710,7 +2710,7 @@ hiveScratchDir, alias, sequenceNumber++); } - pathsToAdd.add(path); + pathsToAdd.addAll(work.getPathsFor(path, ctx.getConf())); } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (working copy) @@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -104,11 +105,13 @@ private static final long serialVersionUID = 1L; private static final String JOBCONF_FILENAME = "jobconf.xml"; - protected transient JobConf job; - public static MemoryMXBean memoryMXBean; + protected static transient final Log LOG = LogFactory.getLog(ExecDriver.class); + protected static MemoryMXBean memoryMXBean; + protected HadoopJobExecHelper jobExecHelper; - protected static transient final Log LOG = LogFactory.getLog(ExecDriver.class); + protected transient JobConf job; + protected transient ContentSummary inputSummary; private RunningJob rj; @@ -121,6 +124,13 @@ this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } + public ContentSummary getInputSummary(Context ctx) throws IOException, HiveException { + if (inputSummary == null) { + inputSummary = getWork().getTotalSummary(ctx, job); + } + return inputSummary; + } + @Override public boolean requireLock() { return true; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (working copy) @@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -66,7 +65,6 @@ static final String HIVE_CHILD_CLIENT_DEBUG_OPTS = "HIVE_CHILD_CLIENT_DEBUG_OPTS"; static final String[] HIVE_SYS_PROP = {"build.dir", "build.dir.hive"}; - private transient ContentSummary inputSummary = null; private transient boolean runningViaChild = false; private transient boolean inputSizeEstimated = false; @@ -96,15 +94,13 @@ } // estimate number of reducers - setNumberOfReducers(); + setNumberOfReducers(ctx); // auto-determine local mode if allowed if (!ctx.isLocalOnlyExecutionMode() && conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) { - if (inputSummary == null) { - inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null); - } + ContentSummary inputSummary = getInputSummary(ctx); // set the values of totalInputFileSize and totalInputNumFiles, estimating them // if percentage block sampling is being used @@ -386,7 +382,7 @@ /** * Set the number of reducers for the mapred work. */ - private void setNumberOfReducers() throws IOException { + private void setNumberOfReducers(Context ctx) throws IOException, HiveException { ReduceWork rWork = work.getReduceWork(); // this is a temporary hack to fix things that are not fixed in the compiler Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks(); @@ -405,10 +401,8 @@ .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: " + reducers); } else { - if (inputSummary == null) { - inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null); - } - int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), + ContentSummary inputSummary = getInputSummary(ctx); + int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), work.isFinalMapRed()); rWork.setNumReduceTasks(reducers); console Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (working copy) @@ -66,10 +66,21 @@ private Set allowedColumnNames; + private boolean allowAllColumns; + private boolean allowAllFunctions; + public IndexPredicateAnalyzer() { udfNames = new HashSet(); } + public void setAllowAllColumns(boolean allowAllColumns) { + this.allowAllColumns = allowAllColumns; + } + + public void setAllowAllFunctions(boolean allowAllFunctions) { + this.allowAllFunctions = allowAllFunctions; + } + /** * Registers a comparison operator as one which can be satisfied * by an index search. Unless this is called, analyzePredicate @@ -189,7 +200,7 @@ } else { udfName = funcDesc.getGenericUDF().getClass().getName(); } - if (!udfNames.contains(udfName)) { + if (!allowAllFunctions && !udfNames.contains(udfName)) { return expr; } @@ -212,7 +223,7 @@ return expr; } if (allowedColumnNames != null) { - if (!allowedColumnNames.contains(columnDesc.getColumn())) { + if (!allowAllColumns && !allowedColumnNames.contains(columnDesc.getColumn())) { return expr; } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (working copy) @@ -404,6 +404,19 @@ return null; } + public static String getMatchingPath(Map> pathToAliases, String path) { + while (true) { + if (foundAlias(pathToAliases, path)) { + return path; + } + int index = path.lastIndexOf(Path.SEPARATOR); + if (index < 0) { + return null; + } + path = path.substring(0, index); + } + } + /** * Get the list of operators from the operator tree that are needed for the path * @param pathToAliases mapping from path to aliases Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy) @@ -269,7 +269,8 @@ // for each dir, get the InputFormat, and do getSplits. for (Path dir : dirs) { - PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); + PartitionDesc part = HiveFileFormatUtils. + getPartitionDescFromPathRecursively(pathToPartitionInfo, dir, null); // create a new InputFormat instance if this is the first time to see this // class Class inputFormatClass = part.getInputFileFormatClass(); @@ -312,7 +313,8 @@ // for each dir, get the InputFormat, and do validateInput. for (Path dir : dirs) { - PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); + PartitionDesc part = HiveFileFormatUtils. + getPartitionDescFromPathRecursively(pathToPartitionInfo, dir, null); // create a new InputFormat instance if this is the first time to see this // class InputFormat inputFormat = getInputFormatFromCache(part Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/FilePrunningPredicateHandler.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/FilePrunningPredicateHandler.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/FilePrunningPredicateHandler.java (working copy) @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.mapred.JobConf; + +/** + * Extracts file pruning expression (used by MapredWork) + */ +public class FilePrunningPredicateHandler implements HiveStoragePredicateHandler { + + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + ExprNodeDesc predicate) { + IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + analyzer.allowColumnName(VirtualColumn.FILENAME.getName()); + analyzer.setAllowAllFunctions(true); + + List searchConditions = new ArrayList(); + ExprNodeDesc residualPredicate = analyzer.analyzePredicate(predicate, searchConditions); + + // there is no assertion that pushedPredicate to be applied always, so residue all of them. + DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); + decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions); + if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVEPPDFILESREMOVEFILTER)) { + // this is only for confirming test case + decomposedPredicate.residualPredicate = residualPredicate; + } else { + decomposedPredicate.residualPredicate = predicate; + } + return decomposedPredicate; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java (working copy) @@ -124,10 +124,6 @@ Map> pathToAliases, HashMap aliasToSize) throws SemanticException { try { - // go over all the input paths, and calculate a known total size, known - // size for each input alias. - Utilities.getInputSummary(context, currWork, null).getLength(); - // set alias to size mapping, this can be used to determine if one table // is chosen as big table, what's the total size of left tables, which // are going to be small tables. @@ -135,7 +131,7 @@ for (Map.Entry> entry : pathToAliases.entrySet()) { String path = entry.getKey(); List aliasList = entry.getValue(); - ContentSummary cs = context.getCS(path); + ContentSummary cs = currWork.getSummaryFor(path, context.getConf()); if (cs != null) { long size = cs.getLength(); for (String alias : aliasList) { Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.optimizer.physical.index; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; @@ -33,7 +32,6 @@ import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; @@ -204,14 +202,15 @@ } // check the size + MapredWork work = task.getWork(); try { - ContentSummary inputSummary = Utilities.getInputSummary(pctx.getContext(), task.getWork().getMapWork(), null); + ContentSummary inputSummary = work.getTotalSummary(pctx.getContext(), pctx.getConf()); long inputSize = inputSummary.getLength(); if (!indexHandler.checkQuerySize(inputSize, pctx.getConf())) { queryContext.setQueryTasks(null); return; } - } catch (IOException e) { + } catch (Exception e) { throw new SemanticException("Failed to get task size", e); } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -600,9 +599,9 @@ boolean hasNonLocalJob = false; for (ExecDriver mrtask : mrtasks) { try { - ContentSummary inputSummary = Utilities.getInputSummary - (ctx, ((MapredWork) mrtask.getWork()).getMapWork(), p); - int numReducers = getNumberOfReducers(mrtask.getWork(), conf); + MapredWork work = mrtask.getWork(); + ContentSummary inputSummary = work.getTotalSummary(lCtx, ctx.getConf()); + int numReducers = getNumberOfReducers(work, conf); long estimatedInput; @@ -635,7 +634,7 @@ } else { mrtask.setLocalMode(true); } - } catch (IOException e) { + } catch (Exception e) { throw new SemanticException(e); } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (working copy) @@ -94,11 +94,22 @@ /** * bind two predicates by AND op */ - public static ExprNodeDesc mergePredicates(ExprNodeDesc prev, ExprNodeDesc next) { + public static ExprNodeDesc orPredicates(ExprNodeDesc prev, ExprNodeDesc next) { List children = new ArrayList(2); children.add(prev); children.add(next); return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getGenericUDFForOr(), children); + } + + /** + * bind two predicates by AND op + */ + public static ExprNodeDesc andPredicates(ExprNodeDesc prev, ExprNodeDesc next) { + List children = new ArrayList(2); + children.add(prev); + children.add(next); + return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, FunctionRegistry.getGenericUDFForAnd(), children); } @@ -112,7 +123,7 @@ prev = expr; continue; } - prev = mergePredicates(prev, expr); + prev = andPredicates(prev, expr); } return prev; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -30,16 +31,34 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SplitSample; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; /** @@ -479,4 +498,200 @@ PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job); } } + + private static final ObjectInspector VC_FILE_OI = + ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList(VirtualColumn.FILENAME.getName()), + Arrays.asList((ObjectInspector) PrimitiveObjectInspectorFactory.javaStringObjectInspector)); + + private static final BooleanObjectInspector EVAL_OI = + PrimitiveObjectInspectorFactory.writableBooleanObjectInspector; + + private transient Map evals; + private transient Map summaries; + + public ContentSummary getSummaryFor(String path, Configuration conf) + throws IOException, HiveException { + return getInputSummaryFor(path, conf).toContentSummary(); + } + + // if file pruning is applied, return file paths passed the filter. if not, return input path + public List getPathsFor(String path, Configuration conf) + throws IOException, HiveException { + return getInputSummaryFor(path, conf).paths; + } + + // HIVE-4843 changed strings to paths + public List getPathsFor(Path path, Configuration conf) + throws IOException, HiveException { + List paths = new ArrayList(); + for (String apath : getPathsFor(path.toString(), conf)) { + paths.add(new Path(apath)); + } + return paths; + } + + private InputSummary getInputSummaryFor(String path, Configuration conf) + throws IOException, HiveException { + path = HiveFileFormatUtils.getMatchingPath(pathToAliases, path); + if (path == null) { + throw new HiveException("Invalid path " + path); + } + InputSummary summary = summaries == null ? null : summaries.get(path); + if (summary == null) { + ExprNodeEvaluator evaluator = getEvals().get(path); + if (summaries == null) { + summaries = new HashMap(); + } + summaries.put(path, summary = summarize(path, conf, evaluator)); + } + return summary; + } + + public ContentSummary getTotalSummary(Context ctx, Configuration conf) + throws IOException, HiveException { + long[] summary = new long[3]; + for (String path : pathToAliases.keySet()) { + ContentSummary pathSummary = getSummaryFor(path, conf); + if (pathSummary != null) { + summary[0] += pathSummary.getLength(); + summary[1] += pathSummary.getFileCount(); + summary[2] += pathSummary.getDirectoryCount(); + ctx.addCS(path, pathSummary); + } + } + return new ContentSummary(summary[0], summary[1], summary[2]); + } + + private ExprNodeEvaluator toFilter(List> operators) throws HiveException { + ExprNodeDesc prev = null; + for (Operator operator : operators) { + if (operator instanceof TableScanOperator && operator.getConf() != null) { + ExprNodeDesc filterExpr = ((TableScanOperator) operator).getConf().getFileFilterExpr(); + if (filterExpr == null) { + continue; + } + if (prev == null) { + prev = filterExpr; + } else { + prev = ExprNodeDescUtils.orPredicates(prev, filterExpr); + } + } + } + if (prev == null) { + return null; + } + ExprNodeEvaluator evaluator = ExprNodeEvaluatorFactory.get(prev); + evaluator.initialize(VC_FILE_OI); + return evaluator; + } + + private InputSummary summarize(String path, Configuration conf, ExprNodeEvaluator evaluator) + throws IOException, HiveException { + Path apath = new Path(path); + Class format = pathToPartitionInfo.get(path).getInputFileFormatClass(); + if (ContentSummaryInputFormat.class.isAssignableFrom(format)) { + JobConf jobConf = new JobConf(conf); + ContentSummaryInputFormat summaryInput = (ContentSummaryInputFormat) + HiveInputFormat.getInputFormatFromCache(format, jobConf); + ContentSummary content = summaryInput.getContentSummary(apath, jobConf); + return new InputSummary(path, content); + } + if (evaluator == null) { + FileSystem fs = apath.getFileSystem(conf); + if (fs.exists(apath)) { + return new InputSummary(path, fs.getContentSummary(apath)); + } + return new InputSummary(path); + } + InputSummary summary = new InputSummary(); + for (FileStatus inputStatus : getInputStatus(apath, conf)) { + String input = normalize(path, inputStatus.getPath().toString()); + Object evaluated = evaluator.evaluate(new String[]{input}); + if (EVAL_OI.get(evaluated)) { + summary.add(input, inputStatus); + } + } + if (summary.paths.isEmpty()) { + // not existing directory + summary.paths.add(path); + } + return summary; + } + + private String normalize(String path, String inputPath) { + int index = inputPath.lastIndexOf(path); + return path + inputPath.substring(index + path.length()); + } + + private FileStatus[] getInputStatus(Path path, Configuration conf) throws IOException { + FileSystem fs = path.getFileSystem(conf); + if (!fs.exists(path)) { + return new FileStatus[0]; + } + FileStatus status = fs.getFileStatus(path); + if (!status.isDir()) { + return new FileStatus[] {status}; + } + return fs.listStatus(path); + } + + private Map getEvals() throws HiveException { + if (evals == null) { + evals = new HashMap(); + } + for (Map.Entry> entry : pathToAliases.entrySet()) { + evals.put(entry.getKey(), toFilter(getWorkForAliases(entry.getValue()))); + } + return evals; + } + + private List> getWorkForAliases(List aliases) { + List> operators = new ArrayList>(); + for (String alias : aliases) { + Operator work = aliasToWork.get(alias); + if (work == null) { + throw new IllegalStateException("Invalid alias " + alias); + } + operators.add(work); + } + return operators; + } + + private static class InputSummary { + + private long[] summary; + private List paths; + + public InputSummary() { + paths = new ArrayList(); + } + + public InputSummary(String path) { + paths = Arrays.asList(path); + } + + public InputSummary(String path, ContentSummary content) { + this.paths = Arrays.asList(path); + this.summary = new long[] + {content.getLength(), content.getFileCount(), content.getDirectoryCount()}; + } + + public void add(String path, FileStatus status) { + if (summary == null) { + summary = new long[3]; + } + paths.add(path); + summary[0] += status.getLen(); + if (!status.isDir()) { + summary[1]++; + } else { + summary[2]++; + } + } + + public ContentSummary toContentSummary() { + return summary == null ? null : new ContentSummary(summary[0], summary[1], summary[2]); + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -19,11 +19,16 @@ package org.apache.hadoop.hive.ql.plan; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.mapred.JobConf; @@ -83,6 +88,11 @@ return ops; } + public ContentSummary getTotalSummary(Context ctx, Configuration conf) + throws IOException, HiveException { + return mapWork.getTotalSummary(ctx, conf); + } + public String toXML() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); Utilities.serializeObject(this, baos); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (working copy) @@ -61,6 +61,7 @@ private int maxStatsKeyPrefixLength = -1; private ExprNodeDesc filterExpr; + private ExprNodeDesc fileFilterExpr; public static final String FILTER_EXPR_CONF_STR = "hive.io.filter.expr.serialized"; @@ -104,6 +105,15 @@ this.filterExpr = filterExpr; } + @Explain(displayName = "fileFilterExpr") + public ExprNodeDesc getFileFilterExpr() { + return fileFilterExpr; + } + + public void setFileFilterExpr(ExprNodeDesc fileFilterExpr) { + this.fileFilterExpr = fileFilterExpr; + } + public void setAlias(String alias) { this.alias = alias; } Index: ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (revision 1512555) +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (working copy) @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.FilePrunningPredicateHandler; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; import org.apache.hadoop.hive.ql.metadata.Table; @@ -783,11 +784,19 @@ // optimizations are applied tableScanDesc.setFilterExpr(originalPredicate); } - if (!tbl.isNonNative()) { + + HiveStoragePredicateHandler predicateHandler = null; + if (tbl.isNonNative()) { + HiveStorageHandler storageHandler = tbl.getStorageHandler(); + if (storageHandler instanceof HiveStoragePredicateHandler) { + predicateHandler = (HiveStoragePredicateHandler) storageHandler; + } + } else if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEPPDFILES)) { + predicateHandler = new FilePrunningPredicateHandler(); + } else { return originalPredicate; } - HiveStorageHandler storageHandler = tbl.getStorageHandler(); - if (!(storageHandler instanceof HiveStoragePredicateHandler)) { + if (predicateHandler == null) { // The storage handler does not provide predicate decomposition // support, so we'll implement the entire filter in Hive. However, // we still provide the full predicate to the storage handler in @@ -795,8 +804,6 @@ tableScanDesc.setFilterExpr(originalPredicate); return originalPredicate; } - HiveStoragePredicateHandler predicateHandler = - (HiveStoragePredicateHandler) storageHandler; JobConf jobConf = new JobConf(owi.getParseContext().getConf()); Utilities.setColumnNameList(jobConf, tableScanOp); Utilities.setColumnTypeList(jobConf, tableScanOp); @@ -831,7 +838,11 @@ + decomposed.residualPredicate.getExprString()); } } - tableScanDesc.setFilterExpr(decomposed.pushedPredicate); + if (predicateHandler instanceof FilePrunningPredicateHandler) { + tableScanDesc.setFileFilterExpr(decomposed.pushedPredicate); + } else { + tableScanDesc.setFilterExpr(decomposed.pushedPredicate); + } return decomposed.residualPredicate; } Index: ql/src/test/queries/clientpositive/file_pruning.q =================================================================== --- ql/src/test/queries/clientpositive/file_pruning.q (revision 0) +++ ql/src/test/queries/clientpositive/file_pruning.q (working copy) @@ -0,0 +1,18 @@ +set hive.optimize.ppd.vc.filename=true; +set hive.optimize.ppd.vc.filename.remove.filter=true; +set hive.auto.convert.join=false; + +-- HIVE-1662 File Prunning by filter on INPUT__FILE__NAME(VC) +-- srcbucket2 has 4 files (srcbucket20, srcbucket21, srcbucket22, srcbucket23) + +explain extended +select key, value, reflect2(INPUT__FILE__NAME, "replaceAll", ".*/", "") from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt' AND key < 100; +select key, value, reflect2(INPUT__FILE__NAME, "replaceAll", ".*/", "") from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt' AND key < 100; + +-- below query fails by HIVE-3926(PPD on virtual column of partitioned table is not working) +-- but confirmed input pruning is working + +explain extended +select reflect2(a.INPUT__FILE__NAME, "replaceAll", ".*/", ""), reflect2(b.INPUT__FILE__NAME, "replaceAll", ".*/", "") from + (select key,INPUT__FILE__NAME from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt' AND key < 100) a join + (select key,INPUT__FILE__NAME from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[02].txt' AND key < 100) b on (a.key=b.key); Index: ql/src/test/results/clientpositive/file_pruning.q.out =================================================================== --- ql/src/test/results/clientpositive/file_pruning.q.out (revision 0) +++ ql/src/test/results/clientpositive/file_pruning.q.out (working copy) @@ -0,0 +1,350 @@ +PREHOOK: query: -- HIVE-1662 File Prunning by filter on INPUT__FILE__NAME(VC) +-- srcbucket2 has 4 files (srcbucket20, srcbucket21, srcbucket22, srcbucket23) + +explain extended +select key, value, reflect2(INPUT__FILE__NAME, "replaceAll", ".*/", "") from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt' AND key < 100 +PREHOOK: type: QUERY +POSTHOOK: query: -- HIVE-1662 File Prunning by filter on INPUT__FILE__NAME(VC) +-- srcbucket2 has 4 files (srcbucket20, srcbucket21, srcbucket22, srcbucket23) + +explain extended +select key, value, reflect2(INPUT__FILE__NAME, "replaceAll", ".*/", "") from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt' AND key < 100 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME srcbucket2))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value)) (TOK_SELEXPR (TOK_FUNCTION reflect2 (TOK_TABLE_OR_COL INPUT__FILE__NAME) "replaceAll" ".*/" ""))) (TOK_WHERE (AND (rlike (TOK_TABLE_OR_COL INPUT__FILE__NAME) '.*/srcbucket2[03].txt') (< (TOK_TABLE_OR_COL key) 100))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + srcbucket2 + TableScan + alias: srcbucket2 + fileFilterExpr: + expr: (INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt') + type: boolean + GatherStats: false + Filter Operator + isSamplingPred: false + predicate: + expr: (key < 100) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + expr: reflect2(INPUT__FILE__NAME,'replaceAll','.*/','') + type: string + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1,_col2 + columns.types int:string:string + escape.delim \ + hive.serialization.extend.nesting.levels true + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: srcbucket2 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count 4 + bucket_field_name key + columns key,value + columns.types int:string +#### A masked pattern was here #### + name default.srcbucket2 + numFiles 4 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct srcbucket2 { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count 4 + bucket_field_name key + columns key,value + columns.types int:string +#### A masked pattern was here #### + name default.srcbucket2 + numFiles 4 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct srcbucket2 { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.srcbucket2 + name: default.srcbucket2 + Truncated Path -> Alias: + /srcbucket2 [srcbucket2] + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select key, value, reflect2(INPUT__FILE__NAME, "replaceAll", ".*/", "") from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt' AND key < 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket2 +#### A masked pattern was here #### +POSTHOOK: query: select key, value, reflect2(INPUT__FILE__NAME, "replaceAll", ".*/", "") from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt' AND key < 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket2 +#### A masked pattern was here #### +66 val_66 srcbucket20.txt +37 val_37 srcbucket20.txt +15 val_15 srcbucket20.txt +0 val_0 srcbucket20.txt +4 val_4 srcbucket20.txt +51 val_51 srcbucket20.txt +84 val_84 srcbucket20.txt +8 val_8 srcbucket20.txt +0 val_0 srcbucket20.txt +26 val_26 srcbucket20.txt +51 val_51 srcbucket20.txt +95 val_95 srcbucket20.txt +77 val_77 srcbucket20.txt +0 val_0 srcbucket20.txt +15 val_15 srcbucket20.txt +19 val_19 srcbucket20.txt +95 val_95 srcbucket20.txt +11 val_11 srcbucket20.txt +33 val_33 srcbucket20.txt +80 val_80 srcbucket20.txt +44 val_44 srcbucket20.txt +26 val_26 srcbucket20.txt +84 val_84 srcbucket20.txt +37 val_37 srcbucket20.txt +98 val_98 srcbucket23.txt +47 val_47 srcbucket23.txt +72 val_72 srcbucket23.txt +54 val_54 srcbucket23.txt +65 val_65 srcbucket23.txt +83 val_83 srcbucket23.txt +58 val_58 srcbucket23.txt +43 val_43 srcbucket23.txt +98 val_98 srcbucket23.txt +87 val_87 srcbucket23.txt +72 val_72 srcbucket23.txt +90 val_90 srcbucket23.txt +10 val_10 srcbucket23.txt +58 val_58 srcbucket23.txt +76 val_76 srcbucket23.txt +76 val_76 srcbucket23.txt +69 val_69 srcbucket23.txt +90 val_90 srcbucket23.txt +83 val_83 srcbucket23.txt +18 val_18 srcbucket23.txt +18 val_18 srcbucket23.txt +90 val_90 srcbucket23.txt +PREHOOK: query: -- below query fails by HIVE-3926(PPD on virtual column of partitioned table is not working) +-- but confirmed input pruning is working + +explain extended +select reflect2(a.INPUT__FILE__NAME, "replaceAll", ".*/", ""), reflect2(b.INPUT__FILE__NAME, "replaceAll", ".*/", "") from + (select key,INPUT__FILE__NAME from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt' AND key < 100) a join + (select key,INPUT__FILE__NAME from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[02].txt' AND key < 100) b on (a.key=b.key) +PREHOOK: type: QUERY +POSTHOOK: query: -- below query fails by HIVE-3926(PPD on virtual column of partitioned table is not working) +-- but confirmed input pruning is working + +explain extended +select reflect2(a.INPUT__FILE__NAME, "replaceAll", ".*/", ""), reflect2(b.INPUT__FILE__NAME, "replaceAll", ".*/", "") from + (select key,INPUT__FILE__NAME from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt' AND key < 100) a join + (select key,INPUT__FILE__NAME from srcbucket2 where INPUT__FILE__NAME rlike '.*/srcbucket2[02].txt' AND key < 100) b on (a.key=b.key) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME srcbucket2))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL INPUT__FILE__NAME))) (TOK_WHERE (AND (rlike (TOK_TABLE_OR_COL INPUT__FILE__NAME) '.*/srcbucket2[03].txt') (< (TOK_TABLE_OR_COL key) 100))))) a) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME srcbucket2))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL INPUT__FILE__NAME))) (TOK_WHERE (AND (rlike (TOK_TABLE_OR_COL INPUT__FILE__NAME) '.*/srcbucket2[02].txt') (< (TOK_TABLE_OR_COL key) 100))))) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION reflect2 (. (TOK_TABLE_OR_COL a) INPUT__FILE__NAME) "replaceAll" ".*/" "")) (TOK_SELEXPR (TOK_FUNCTION reflect2 (. (TOK_TABLE_OR_COL b) INPUT__FILE__NAME) "replaceAll" ".*/" ""))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a:srcbucket2 + TableScan + alias: srcbucket2 + fileFilterExpr: + expr: (INPUT__FILE__NAME rlike '.*/srcbucket2[03].txt') + type: boolean + GatherStats: false + Filter Operator + isSamplingPred: false + predicate: + expr: (key < 100) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: INPUT__FILE__NAME + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + Map-reduce partition columns: + expr: _col0 + type: int + tag: 0 + value expressions: + expr: _col1 + type: string + b:srcbucket2 + TableScan + alias: srcbucket2 + fileFilterExpr: + expr: (INPUT__FILE__NAME rlike '.*/srcbucket2[02].txt') + type: boolean + GatherStats: false + Filter Operator + isSamplingPred: false + predicate: + expr: (key < 100) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: INPUT__FILE__NAME + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + Map-reduce partition columns: + expr: _col0 + type: int + tag: 1 + value expressions: + expr: _col1 + type: string + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: srcbucket2 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count 4 + bucket_field_name key + columns key,value + columns.types int:string +#### A masked pattern was here #### + name default.srcbucket2 + numFiles 4 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct srcbucket2 { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count 4 + bucket_field_name key + columns key,value + columns.types int:string +#### A masked pattern was here #### + name default.srcbucket2 + numFiles 4 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct srcbucket2 { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.srcbucket2 + name: default.srcbucket2 + Truncated Path -> Alias: + /srcbucket2 [b:srcbucket2, a:srcbucket2] + Needs Tagging: true + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col1} + 1 {VALUE._col1} + handleSkewJoin: false + outputColumnNames: _col1, _col3 + Select Operator + expressions: + expr: reflect2(_col1,'replaceAll','.*/','') + type: string + expr: reflect2(_col3,'replaceAll','.*/','') + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1 + columns.types string:string + escape.delim \ + hive.serialization.extend.nesting.levels true + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + +