Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 987314)
+++ conf/hive-default.xml (working copy)
@@ -273,6 +273,12 @@
+ hive.optimize.ppd.storage
+ false
+ Whether to push predicates down into storage handlers. Ignored when hive.optimize.ppd is false.
+
+
+
hive.optimize.pruner
true
Whether to enable the new partition pruner which depends on predicate pushdown. If this is disabled,
Index: hbase-handler/src/test/results/hbase_pushdown.q.out
===================================================================
--- hbase-handler/src/test/results/hbase_pushdown.q.out (revision 0)
+++ hbase-handler/src/test/results/hbase_pushdown.q.out (revision 0)
@@ -0,0 +1,127 @@
+PREHOOK: query: CREATE TABLE hbase_pushdown(key int, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string")
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE hbase_pushdown(key int, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@hbase_pushdown
+PREHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown
+SELECT *
+FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@hbase_pushdown
+POSTHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown
+SELECT *
+FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@hbase_pushdown
+PREHOOK: query: -- without pushdown
+explain select * from hbase_pushdown where key=90
+PREHOOK: type: QUERY
+POSTHOOK: query: -- without pushdown
+explain select * from hbase_pushdown where key=90
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ hbase_pushdown
+ TableScan
+ alias: hbase_pushdown
+ Filter Operator
+ predicate:
+ expr: (key = 90)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (key = 90)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: -- with pushdown
+explain select * from hbase_pushdown where key=90
+PREHOOK: type: QUERY
+POSTHOOK: query: -- with pushdown
+explain select * from hbase_pushdown where key=90
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ hbase_pushdown
+ TableScan
+ alias: hbase_pushdown
+ filterExpr:
+ expr: (key = 90)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (key = 90)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (key = 90)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select * from hbase_pushdown where key=90
+PREHOOK: type: QUERY
+PREHOOK: Input: default@hbase_pushdown
+PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-08-19_14-24-01_759_143813178377646105/-mr-10000
+POSTHOOK: query: select * from hbase_pushdown where key=90
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@hbase_pushdown
+POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-08-19_14-24-01_759_143813178377646105/-mr-10000
+90 val_90
Index: hbase-handler/src/test/queries/hbase_pushdown.q
===================================================================
--- hbase-handler/src/test/queries/hbase_pushdown.q (revision 0)
+++ hbase-handler/src/test/queries/hbase_pushdown.q (revision 0)
@@ -0,0 +1,17 @@
+CREATE TABLE hbase_pushdown(key int, value string)
+STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
+WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string");
+
+INSERT OVERWRITE TABLE hbase_pushdown
+SELECT *
+FROM src;
+
+-- without pushdown
+explain select * from hbase_pushdown where key=90;
+
+set hive.optimize.ppd.storage=true;
+
+-- with pushdown
+explain select * from hbase_pushdown where key=90;
+
+select * from hbase_pushdown where key=90;
Index: hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
===================================================================
--- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (revision 987314)
+++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (working copy)
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -29,12 +30,27 @@
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -63,6 +79,7 @@
final Reporter reporter) throws IOException {
HBaseSplit hbaseSplit = (HBaseSplit) split;
+ TableSplit tableSplit = hbaseSplit.getSplit();
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
@@ -74,6 +91,8 @@
throw new IOException("Cannot read more columns than the given table contains.");
}
+ Scan scan = new Scan();
+
List scanColumns = new ArrayList();
boolean addAll = (readColIDs.size() == 0);
if (!addAll) {
@@ -96,10 +115,67 @@
}
}
}
+ scan.addColumns(scanColumns.toArray(new byte[0][]));
- setScan(new Scan().addColumns(scanColumns.toArray(new byte[0][])));
- org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit = hbaseSplit.getSplit();
+ String filterExprSerialized =
+ jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ if (filterExprSerialized != null) {
+ ExprNodeDesc filterExpr =
+ Utilities.deserializeExpression(filterExprSerialized, jobConf);
+ IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+ List searchConditions =
+ new ArrayList();
+ ExprNodeDesc residualExpr =
+ analyzer.analyzePredicate(filterExpr, searchConditions);
+ // TODO: deal with residual
+
+ String columnNameProperty = jobConf.get(Constants.LIST_COLUMNS);
+ List columnNames =
+ Arrays.asList(columnNameProperty.split(","));
+
+ for (IndexPredicateAnalyzer.SearchCondition sc : searchConditions) {
+ // TODO: the real thing, and assert stuff about table aliases
+ int iCol = columnNames.indexOf(sc.getColumnDesc().getColumn());
+ if (HBaseSerDe.HBASE_KEY_COL.equals(columns.get(iCol))) {
+ ExprNodeConstantEvaluator eval =
+ new ExprNodeConstantEvaluator(sc.getConstantDesc());
+ byte [] bytes;
+ try {
+ ObjectInspector objInspector = eval.initialize(null);
+ Object writable = eval.evaluate(null);
+ ByteStream.Output serializeStream = new ByteStream.Output();
+ // REVIEW: escaped etc
+ LazyUtils.writePrimitiveUTF8(
+ serializeStream,
+ writable,
+ (PrimitiveObjectInspector) objInspector,
+ false,
+ (byte) 0,
+ null);
+ bytes = new byte[serializeStream.getCount()];
+ System.arraycopy(
+ serializeStream.getData(), 0,
+ bytes, 0, serializeStream.getCount());
+ } catch (HiveException ex) {
+ // FIXME
+ throw new IOException(ex);
+ }
+ tableSplit = new TableSplit(
+ tableSplit.getTableName(),
+ bytes,
+ tableSplit.getEndRow(),
+ tableSplit.getRegionLocation());
+ scan.setFilter(
+ new RowFilter(
+ CompareFilter.CompareOp.EQUAL,
+ new BinaryComparator(bytes)));
+ }
+ }
+ }
+
+ setScan(scan);
+
Job job = new Job(jobConf);
TaskAttemptContext tac =
new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()) {
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 987314)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -258,6 +258,8 @@
// Optimizer
HIVEOPTCP("hive.optimize.cp", true), // column pruner
HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
+ // push predicates down to storage handlers
+ HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", false),
HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by
HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
Index: ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (revision 987314)
+++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (working copy)
@@ -29,6 +29,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -39,6 +40,8 @@
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -47,6 +50,7 @@
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/**
@@ -378,6 +382,20 @@
return null;
}
+ if (op instanceof TableScanOperator) {
+ boolean pushFilterToStorage;
+ try {
+ pushFilterToStorage =
+ Hive.get().getConf().getBoolVar(HiveConf.ConfVars.HIVEOPTPPD_STORAGE);
+ } catch (HiveException ex) {
+ throw new RuntimeException(ex);
+ }
+ if (pushFilterToStorage) {
+ TableScanDesc tableScanDesc = ((TableScanOperator) op).getConf();
+ tableScanDesc.setFilterExpr(condn);
+ }
+ }
+
// add new filter op
List> originalChilren = op
.getChildOperators();
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 987314)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy)
@@ -26,6 +26,8 @@
import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.EOFException;
import java.io.File;
@@ -37,6 +39,7 @@
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
@@ -81,6 +84,7 @@
import org.apache.hadoop.hive.ql.parse.ErrorMsg;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -316,6 +320,40 @@
return null;
}
+ public static String serializeExpression(ExprNodeDesc expr) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ XMLEncoder encoder = new XMLEncoder(baos);
+ try {
+ encoder.writeObject(expr);
+ } finally {
+ encoder.close();
+ }
+ try {
+ return baos.toString("UTF-8");
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ }
+
+ public static ExprNodeDesc deserializeExpression(
+ String s, Configuration conf) {
+ byte [] bytes;
+ try {
+ bytes = s.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("UTF-8 support required", ex);
+ }
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ XMLDecoder decoder = new XMLDecoder(
+ bais, null, null, conf.getClassLoader());
+ try {
+ ExprNodeDesc expr = (ExprNodeDesc) decoder.readObject();
+ return expr;
+ } finally {
+ decoder.close();
+ }
+ }
+
/**
* Serialize a single Task.
*/
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (revision 987314)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (working copy)
@@ -36,6 +36,14 @@
private List virtualCols;
+ private ExprNodeDesc filterExpr;
+
+ public static final String FILTER_EXPR_CONF_STR =
+ "hive.io.filter.expr.serialized";
+
+ public static final String FILTER_TEXT_CONF_STR =
+ "hive.io.filter.text";
+
@SuppressWarnings("nls")
public TableScanDesc() {
}
@@ -54,6 +62,15 @@
return alias;
}
+ @Explain(displayName = "filterExpr")
+ public ExprNodeDesc getFilterExpr() {
+ return filterExpr;
+ }
+
+ public void setFilterExpr(ExprNodeDesc filterExpr) {
+ this.filterExpr = filterExpr;
+ }
+
public void setAlias(String alias) {
this.alias = alias;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (revision 987314)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (working copy)
@@ -337,7 +337,8 @@
throw new IOException("cannot find class " + inputFormatClassName);
}
- initColumnsNeeded(job, inputFormatClass, hsplit.getPath(0).toString(),
+ pushProjectionsAndFilters(job, inputFormatClass,
+ hsplit.getPath(0).toString(),
hsplit.getPath(0).toUri().getPath());
return ShimLoader.getHadoopShims().getCombineFileInputFormat()
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 987314)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy)
@@ -33,11 +33,16 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.Writable;
@@ -215,7 +220,7 @@
// clone a jobConf for setting needed columns for reading
JobConf cloneJobConf = new JobConf(job);
- initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath()
+ pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath()
.toString(), hsplit.getPath().toUri().getPath());
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
@@ -309,7 +314,7 @@
return partDesc;
}
- protected void initColumnsNeeded(JobConf jobConf, Class inputFormatClass,
+ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass,
String splitPath, String splitPathWithNoSchema) {
if (this.mrwork == null) {
init(job);
@@ -335,12 +340,55 @@
alias);
if (op != null && op instanceof TableScanOperator) {
TableScanOperator tableScan = (TableScanOperator) op;
+
+ // push down projections
ArrayList list = tableScan.getNeededColumnIDs();
if (list != null) {
ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
} else {
ColumnProjectionUtils.setFullyReadColumns(jobConf);
}
+
+ // construct column name list for reference by filter push down
+ TableScanDesc scanDesc = tableScan.getConf();
+ if (scanDesc != null) {
+ RowSchema rowSchema = op.getSchema();
+ StringBuilder columnNames = new StringBuilder();
+ if (rowSchema != null) {
+ for (ColumnInfo colInfo : rowSchema.getSignature()) {
+ if (columnNames.length() > 0) {
+ columnNames.append(",");
+ }
+ columnNames.append(colInfo.getInternalName());
+ }
+ String columnNamesString = columnNames.toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table scan column list = " + columnNamesString);
+ }
+ jobConf.set(
+ Constants.LIST_COLUMNS,
+ columnNamesString);
+ }
+
+ // push down filters
+ ExprNodeDesc filterExpr = scanDesc.getFilterExpr();
+ if (filterExpr != null) {
+ String filterText =
+ filterExpr.getExprString();
+ String filterExprSerialized =
+ Utilities.serializeExpression(filterExpr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Filter text = " + filterText);
+ LOG.debug("Filter expression = " + filterExprSerialized);
+ }
+ jobConf.set(
+ TableScanDesc.FILTER_TEXT_CONF_STR,
+ filterText);
+ jobConf.set(
+ TableScanDesc.FILTER_EXPR_CONF_STR,
+ filterExprSerialized);
+ }
+ }
}
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (revision 987314)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (working copy)
@@ -68,7 +68,7 @@
// clone a jobConf for setting needed columns for reading
JobConf cloneJobConf = new JobConf(job);
- initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath()
+ pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath()
.toString(), hsplit.getPath().toUri().getPath());
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (revision 0)
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+
+/**
+ * IndexPredicateAnalyzer decomposes predicates, separating the parts
+ * which can be satisfied by an index from the parts which cannot.
+ */
+public class IndexPredicateAnalyzer
+{
+ public IndexPredicateAnalyzer() {
+ }
+
+ public ExprNodeDesc analyzePredicate(
+ ExprNodeDesc predicate,
+ List searchConditions) {
+
+ if (predicate instanceof ExprNodeGenericFuncDesc) {
+ ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) predicate;
+ if (funcDesc.getGenericUDF() instanceof GenericUDFBridge) {
+ GenericUDFBridge func = (GenericUDFBridge) funcDesc.getGenericUDF();
+ if (func.getUdfName().equals("=")) {
+ ExprNodeDesc child1 = funcDesc.getChildren().get(0);
+ ExprNodeDesc child2 = funcDesc.getChildren().get(1);
+ ExprNodeColumnDesc columnDesc = null;
+ ExprNodeConstantDesc constantDesc = null;
+ if ((child1 instanceof ExprNodeColumnDesc)
+ && (child2 instanceof ExprNodeConstantDesc)) {
+ // COL = CONSTANT
+ columnDesc = (ExprNodeColumnDesc) child1;
+ constantDesc = (ExprNodeConstantDesc) child2;
+ } else if ((child2 instanceof ExprNodeColumnDesc)
+ && (child1 instanceof ExprNodeConstantDesc)) {
+ // CONSTANT = COL
+ columnDesc = (ExprNodeColumnDesc) child2;
+ constantDesc = (ExprNodeConstantDesc) child1;
+ }
+ if (columnDesc != null) {
+ searchConditions.add(
+ new SearchCondition(
+ columnDesc,
+ SearchComparison.EQ,
+ constantDesc));
+ return null;
+ }
+ }
+ }
+ }
+ return predicate;
+ }
+
+ public enum SearchComparison {
+ LT, LE, GT, GE, EQ, NE
+ }
+
+ public static class SearchCondition {
+ private ExprNodeColumnDesc columnDesc;
+ private SearchComparison comparison;
+ private ExprNodeConstantDesc constantDesc;
+
+ SearchCondition(
+ ExprNodeColumnDesc columnDesc,
+ SearchComparison comparison,
+ ExprNodeConstantDesc constantDesc) {
+
+ this.columnDesc = columnDesc;
+ this.comparison = comparison;
+ this.constantDesc = constantDesc;
+ }
+
+ public ExprNodeColumnDesc getColumnDesc() {
+ return columnDesc;
+ }
+
+ public SearchComparison getComparison() {
+ return comparison;
+ }
+
+ public ExprNodeConstantDesc getConstantDesc() {
+ return constantDesc;
+ }
+ }
+}