Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 996701) +++ conf/hive-default.xml (working copy) @@ -273,6 +273,12 @@ + hive.optimize.ppd.storage + true + Whether to push predicates down into storage handlers. Ignored when hive.optimize.ppd is false. + + + hive.optimize.pruner true Whether to enable the new partition pruner which depends on predicate pushdown. If this is disabled, Index: hbase-handler/src/test/results/hbase_pushdown.q.out =================================================================== --- hbase-handler/src/test/results/hbase_pushdown.q.out (revision 0) +++ hbase-handler/src/test/results/hbase_pushdown.q.out (revision 0) @@ -0,0 +1,427 @@ +PREHOOK: query: CREATE TABLE hbase_pushdown(key int, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string") +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE hbase_pushdown(key int, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@hbase_pushdown +PREHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown +SELECT * +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_pushdown +POSTHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown +SELECT * +FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_pushdown +PREHOOK: query: -- with full pushdown +explain select * from hbase_pushdown where key=90 +PREHOOK: type: QUERY +POSTHOOK: query: -- with full pushdown +explain select * from hbase_pushdown where key=90 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + hbase_pushdown + TableScan + alias: hbase_pushdown + filterExpr: + expr: (key = 90) + type: boolean + Filter Operator + predicate: + expr: (key = 90) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select * from hbase_pushdown where key=90 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_pushdown +PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-05-54_049_1244315391309244934/-mr-10000 +POSTHOOK: query: select * from hbase_pushdown where key=90 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_pushdown +POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-05-54_049_1244315391309244934/-mr-10000 +90 val_90 +PREHOOK: query: -- with partial pushdown + +explain select * from hbase_pushdown where key=90 and value like '%90%' +PREHOOK: type: QUERY +POSTHOOK: query: -- with partial pushdown + +explain select * from hbase_pushdown where key=90 and value like '%90%' +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (and (= (TOK_TABLE_OR_COL key) 90) (like (TOK_TABLE_OR_COL value) '%90%'))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + hbase_pushdown + TableScan + alias: hbase_pushdown + filterExpr: + expr: (key = 90) + type: boolean + Filter Operator + predicate: + expr: (value like '%90%') + type: boolean + Filter Operator + predicate: + expr: ((key = 90) and (value like '%90%')) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select * from hbase_pushdown where key=90 and value like '%90%' +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_pushdown +PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-00_089_9169062048458581014/-mr-10000 +POSTHOOK: query: select * from hbase_pushdown where key=90 and value like '%90%' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_pushdown +POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-00_089_9169062048458581014/-mr-10000 +90 val_90 +PREHOOK: query: -- with two residuals + +explain select * from hbase_pushdown +where key=90 and value like '%90%' and key=cast(value as int) +PREHOOK: type: QUERY +POSTHOOK: query: -- with two residuals + +explain select * from hbase_pushdown +where key=90 and value like '%90%' and key=cast(value as int) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (and (and (= (TOK_TABLE_OR_COL key) 90) (like (TOK_TABLE_OR_COL value) '%90%')) (= (TOK_TABLE_OR_COL key) (TOK_FUNCTION TOK_INT (TOK_TABLE_OR_COL value))))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + hbase_pushdown + TableScan + alias: hbase_pushdown + filterExpr: + expr: (key = 90) + type: boolean + Filter Operator + predicate: + expr: ((value like '%90%') and (key = UDFToInteger(value))) + type: boolean + Filter Operator + predicate: + expr: (((key = 90) and (value like '%90%')) and (key = UDFToInteger(value))) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: -- with contradictory pushdowns + +explain select * from hbase_pushdown +where key=80 and key=90 and value like '%90%' +PREHOOK: type: QUERY +POSTHOOK: query: -- with contradictory pushdowns + +explain select * from hbase_pushdown +where key=80 and key=90 and value like '%90%' +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (and (and (= (TOK_TABLE_OR_COL key) 80) (= (TOK_TABLE_OR_COL key) 90)) (like (TOK_TABLE_OR_COL value) '%90%'))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + hbase_pushdown + TableScan + alias: hbase_pushdown + Filter Operator + predicate: + expr: (((key = 80) and (key = 90)) and (value like '%90%')) + type: boolean + Filter Operator + predicate: + expr: (((key = 80) and (key = 90)) and (value like '%90%')) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select * from hbase_pushdown +where key=80 and key=90 and value like '%90%' +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_pushdown +PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-05_982_8346366828445837832/-mr-10000 +POSTHOOK: query: select * from hbase_pushdown +where key=80 and key=90 and value like '%90%' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_pushdown +POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-09-09_17-06-05_982_8346366828445837832/-mr-10000 +PREHOOK: query: -- with nothing to push down + +explain select * from hbase_pushdown +PREHOOK: type: QUERY +POSTHOOK: query: -- with nothing to push down + +explain select * from hbase_pushdown +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: -- with a predicate which is not actually part of the filter, so +-- it should be ignored by pushdown + +explain select * from hbase_pushdown +where (case when key=90 then 2 else 4 end) > 3 +PREHOOK: type: QUERY +POSTHOOK: query: -- with a predicate which is not actually part of the filter, so +-- it should be ignored by pushdown + +explain select * from hbase_pushdown +where (case when key=90 then 2 else 4 end) > 3 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (> (TOK_FUNCTION when (= (TOK_TABLE_OR_COL key) 90) 2 4) 3)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + hbase_pushdown + TableScan + alias: hbase_pushdown + Filter Operator + predicate: + expr: (CASE WHEN ((key = 90)) THEN (2) ELSE (4) END > 3) + type: boolean + Filter Operator + predicate: + expr: (CASE WHEN ((key = 90)) THEN (2) ELSE (4) END > 3) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: -- with a predicate which is under an OR, so it should +-- be ignored by pushdown + +explain select * from hbase_pushdown +where key=80 or value like '%90%' +PREHOOK: type: QUERY +POSTHOOK: query: -- with a predicate which is under an OR, so it should +-- be ignored by pushdown + +explain select * from hbase_pushdown +where key=80 or value like '%90%' +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (or (= (TOK_TABLE_OR_COL key) 80) (like (TOK_TABLE_OR_COL value) '%90%'))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + hbase_pushdown + TableScan + alias: hbase_pushdown + Filter Operator + predicate: + expr: ((key = 80) or (value like '%90%')) + type: boolean + Filter Operator + predicate: + expr: ((key = 80) or (value like '%90%')) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: -- with pushdown disabled + +explain select * from hbase_pushdown where key=90 +PREHOOK: type: QUERY +POSTHOOK: query: -- with pushdown disabled + +explain select * from hbase_pushdown where key=90 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + hbase_pushdown + TableScan + alias: hbase_pushdown + Filter Operator + predicate: + expr: (key = 90) + type: boolean + Filter Operator + predicate: + expr: (key = 90) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + Index: hbase-handler/src/test/queries/hbase_pushdown.q =================================================================== --- hbase-handler/src/test/queries/hbase_pushdown.q (revision 0) +++ hbase-handler/src/test/queries/hbase_pushdown.q (revision 0) @@ -0,0 +1,53 @@ +CREATE TABLE hbase_pushdown(key int, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string"); + +INSERT OVERWRITE TABLE hbase_pushdown +SELECT * +FROM src; + +-- with full pushdown +explain select * from hbase_pushdown where key=90; + +select * from hbase_pushdown where key=90; + +-- with partial pushdown + +explain select * from hbase_pushdown where key=90 and value like '%90%'; + +select * from hbase_pushdown where key=90 and value like '%90%'; + +-- with two residuals + +explain select * from hbase_pushdown +where key=90 and value like '%90%' and key=cast(value as int); + +-- with contradictory pushdowns + +explain select * from hbase_pushdown +where key=80 and key=90 and value like '%90%'; + +select * from hbase_pushdown +where key=80 and key=90 and value like '%90%'; + +-- with nothing to push down + +explain select * from hbase_pushdown; + +-- with a predicate which is not actually part of the filter, so +-- it should be ignored by pushdown + +explain select * from hbase_pushdown +where (case when key=90 then 2 else 4 end) > 3; + +-- with a predicate which is under an OR, so it should +-- be ignored by pushdown + +explain select * from hbase_pushdown +where key=80 or value like '%90%'; + +set hive.optimize.ppd.storage=false; + +-- with pushdown disabled + +explain select * from hbase_pushdown where key=90; Index: hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java =================================================================== --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (revision 996701) +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.logging.Log; @@ -29,13 +30,31 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -50,7 +69,7 @@ /** * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler * tables, decorating an underlying HBase TableInputFormat with extra Hive logic - * such as column pruning. + * such as column pruning and filter pushdown. */ public class HiveHBaseTableInputFormat extends TableInputFormatBase implements InputFormat { @@ -64,6 +83,7 @@ final Reporter reporter) throws IOException { HBaseSplit hbaseSplit = (HBaseSplit) split; + TableSplit tableSplit = hbaseSplit.getSplit(); String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName))); String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); @@ -105,7 +125,7 @@ } } - // The HBase table's row key maps to an Hive table column. In the corner case when only the + // The HBase table's row key maps to a Hive table column. In the corner case when only the // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/ // column qualifier will have been added to the scan. We arbitrarily add at least one column // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive @@ -128,8 +148,11 @@ } } + // If Hive's optimizer gave us a filter to process, convert it to the + // HBase scan form now. + tableSplit = convertFilter(jobConf, scan, tableSplit, iKey); + setScan(scan); - org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit = hbaseSplit.getSplit(); Job job = new Job(jobConf); TaskAttemptContext tac = @@ -200,6 +223,137 @@ }; } + /** + * Converts a filter (which has been pushed down from Hive's optimizer) + * into corresponding restrictions on the HBase scan. The + * filter should already be in a form which can be fully converted. + * + * @param jobConf configuration for the scan + * + * @param scan the HBase scan object to restrict + * + * @param tableSplit the HBase table split to restrict, or null + * if calculating splits + * + * @param iKey 0-based offset of key column within Hive table + * + * @return converted table split if any + */ + private TableSplit convertFilter( + JobConf jobConf, + Scan scan, + TableSplit tableSplit, + int iKey) + throws IOException { + + String filterExprSerialized = + jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filterExprSerialized == null) { + return tableSplit; + } + ExprNodeDesc filterExpr = + Utilities.deserializeExpression(filterExprSerialized, jobConf); + + String columnNameProperty = jobConf.get(Constants.LIST_COLUMNS); + List columnNames = + Arrays.asList(columnNameProperty.split(",")); + + IndexPredicateAnalyzer analyzer = + newIndexPredicateAnalyzer(columnNames.get(iKey)); + + List searchConditions = + new ArrayList(); + ExprNodeDesc residualPredicate = + analyzer.analyzePredicate(filterExpr, searchConditions); + + // There should be no residual since we already negotiated + // that earlier in HBaseStorageHandler.decomposePredicate. + if (residualPredicate != null) { + throw new RuntimeException( + "Unexpected residual predicate " + residualPredicate.getExprString()); + } + + // There should be exactly one predicate since we already + // negotiated that also. + if (searchConditions.size() != 1) { + throw new RuntimeException( + "Exactly one search condition expected in push down"); + } + + // Convert the search condition into a restriction on the HBase scan + IndexSearchCondition sc = searchConditions.get(0); + ExprNodeConstantEvaluator eval = + new ExprNodeConstantEvaluator(sc.getConstantDesc()); + byte [] startRow; + try { + ObjectInspector objInspector = eval.initialize(null); + Object writable = eval.evaluate(null); + ByteStream.Output serializeStream = new ByteStream.Output(); + LazyUtils.writePrimitiveUTF8( + serializeStream, + writable, + (PrimitiveObjectInspector) objInspector, + false, + (byte) 0, + null); + startRow = new byte[serializeStream.getCount()]; + System.arraycopy( + serializeStream.getData(), 0, + startRow, 0, serializeStream.getCount()); + } catch (HiveException ex) { + throw new IOException(ex); + } + + // stopRow is exclusive, so pad it with a trailing 0 byte to + // make it compare as the very next value after startRow + byte [] stopRow = new byte[startRow.length + 1]; + System.arraycopy(startRow, 0, stopRow, 0, startRow.length); + + if (tableSplit != null) { + tableSplit = new TableSplit( + tableSplit.getTableName(), + startRow, + stopRow, + tableSplit.getRegionLocation()); + } + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + + // Add a WhileMatchFilter to make the scan terminate as soon + // as we see a non-matching key. This is probably redundant + // since the stopRow above should already take care of it for us. + scan.setFilter( + new WhileMatchFilter( + new RowFilter( + CompareFilter.CompareOp.EQUAL, + new BinaryComparator(startRow)))); + return tableSplit; + } + + /** + * Instantiates a new predicate analyzer suitable for + * determining how to push a filter down into the HBase scan, + * based on the rules for what kinds of pushdown we currently support. + * + * @param keyColumnName name of the Hive column mapped to the HBase row key + * + * @return preconfigured predicate analyzer + */ + static IndexPredicateAnalyzer newIndexPredicateAnalyzer( + String keyColumnName) { + + IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + + // for now, we only support equality comparisons + analyzer.addComparisonOp("="); + + // and only on the key column + analyzer.clearAllowedColumnNames(); + analyzer.allowColumnName(keyColumnName); + + return analyzer; + } + @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { @@ -226,6 +380,14 @@ Scan scan = new Scan(); + // Take filter pushdown into account while calculating splits; this + // allows us to prune off regions immediately. Note that although + // the Javadoc for the superclass getSplits says that it returns one + // split per region, the implementation actually takes the scan + // definition into account and excludes regions which don't satisfy + // the start/stop row conditions (HBASE-1829). + convertFilter(jobConf, scan, null, iKey); + // REVIEW: are we supposed to be applying the getReadColumnIDs // same as in getRecordReader? for (int i = 0; i < hbaseColumnFamilies.size(); i++) { @@ -244,7 +406,9 @@ Job job = new Job(jobConf); JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID()); Path [] tablePaths = FileInputFormat.getInputPaths(jobContext); - List splits = getSplits(jobContext); + + List splits = + super.getSplits(jobContext); InputSplit [] results = new InputSplit[splits.size()]; for (int i = 0; i < splits.size(); i++) { Index: hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java =================================================================== --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (revision 996701) +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (working copy) @@ -582,4 +582,11 @@ public void setUseJSONSerialize(boolean useJSONSerialize) { this.useJSONSerialize = useJSONSerialize; } + + /** + * @return 0-based offset of the key column within the table + */ + int getKeyColumnOffset() { + return iKey; + } } Index: hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java =================================================================== --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (revision 996701) +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -39,11 +40,17 @@ import org.apache.hadoop.hive.metastore.api.Constants; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.util.StringUtils; @@ -51,7 +58,8 @@ * HBaseStorageHandler provides a HiveStorageHandler implementation for * HBase. */ -public class HBaseStorageHandler implements HiveStorageHandler, HiveMetaHook { +public class HBaseStorageHandler extends DefaultStorageHandler + implements HiveMetaHook, HiveStoragePredicateHandler { private HBaseConfiguration hbaseConf; private HBaseAdmin admin; @@ -260,4 +268,38 @@ } jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName); } + + @Override + public DecomposedPredicate decomposePredicate( + JobConf jobConf, + Deserializer deserializer, + ExprNodeDesc predicate) + { + String columnNameProperty = jobConf.get( + org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS); + List columnNames = + Arrays.asList(columnNameProperty.split(",")); + HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer; + IndexPredicateAnalyzer analyzer = + HiveHBaseTableInputFormat.newIndexPredicateAnalyzer( + columnNames.get(hbaseSerde.getKeyColumnOffset())); + List searchConditions = + new ArrayList(); + ExprNodeDesc residualPredicate = + analyzer.analyzePredicate(predicate, searchConditions); + if (searchConditions.size() != 1) { + // Either there was nothing which could be pushed down (size = 0), + // or more than one predicate (size > 1); in the latter case, + // we bail out for now since multiple lookups on the key are + // either contradictory or redundant. We'll need to handle + // this better later when we support more interesting predicates. + return null; + } + + DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); + decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions( + searchConditions); + decomposedPredicate.residualPredicate = residualPredicate; + return decomposedPredicate; + } } Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 996701) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -262,6 +262,8 @@ // Optimizer HIVEOPTCP("hive.optimize.cp", true), // column pruner HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown + // push predicates down to storage handlers + HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true), HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java (revision 0) @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.Deserializer; + +/** + * HiveStoragePredicateHandler is an optional companion to {@link + * HiveStorageHandler}; it should only be implemented by handlers which + * support decomposition of predicates being pushed down into table scans. + */ +public interface HiveStoragePredicateHandler { + + /** + * Gives the storage handler a chance to decompose a predicate. The storage + * handler should analyze the predicate and return the portion of it which + * cannot be evaluated during table access. For example, if the original + * predicate is x = 2 AND upper(y)='YUM', the storage handler + * might be able to handle x = 2 but leave the "residual" + * upper(y)='YUM' for Hive to deal with. The breakdown + * need not be non-overlapping; for example, given the + * predicate x LIKE 'a%b', the storage handler might + * be able to evaluate the prefix search x LIKE 'a%', leaving + * x LIKE '%b' as the residual. + * + * @param jobConf contains a job configuration matching the one that + * will later be passed to getRecordReader and getSplits + * + * @param deserializer deserializer which will be used when + * fetching rows + * + * @param predicate predicate to be decomposed + * + * @return decomposed form of predicate, or null if no pushdown is + * possible at all + */ + public DecomposedPredicate decomposePredicate( + JobConf jobConf, + Deserializer deserializer, + ExprNodeDesc predicate); + + /** + * Struct class for returning multiple values from decomposePredicate. + */ + public static class DecomposedPredicate { + /** + * Portion of predicate to be evaluated by storage handler. Hive + * will pass this into the storage handler's input format. + */ + public ExprNodeDesc pushedPredicate; + + /** + * Portion of predicate to be post-evaluated by Hive for any rows + * which are returned by storage handler. + */ + public ExprNodeDesc residualPredicate; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java (revision 996701) +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java (working copy) @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.RowResolver; /** @@ -37,10 +38,11 @@ */ private final HashMap, ExprWalkerInfo> opToPushdownPredMap; private final Map, OpParseContext> opToParseCtxMap; + private final ParseContext pGraphContext; - public OpWalkerInfo( - HashMap, OpParseContext> opToParseCtxMap) { - this.opToParseCtxMap = opToParseCtxMap; + public OpWalkerInfo(ParseContext pGraphContext) { + this.pGraphContext = pGraphContext; + opToParseCtxMap = pGraphContext.getOpParseCtx(); opToPushdownPredMap = new HashMap, ExprWalkerInfo>(); } @@ -62,4 +64,7 @@ return opToParseCtxMap.put(key, value); } + public ParseContext getParseContext() { + return pGraphContext; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java (revision 996701) +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java (working copy) @@ -81,7 +81,7 @@ opToParseCtxMap = pGraphContext.getOpParseCtx(); // create a the context for walking operators - OpWalkerInfo opWalkerInfo = new OpWalkerInfo(opToParseCtxMap); + OpWalkerInfo opWalkerInfo = new OpWalkerInfo(pGraphContext); Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("R1", "FIL%"), OpProcFactory.getFilterProc()); Index: ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (revision 996701) +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (working copy) @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -36,9 +37,14 @@ import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -47,7 +53,10 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.JobConf; /** * Operator factory for predicate pushdown processing of operator graph Each @@ -63,6 +72,9 @@ */ public final class OpProcFactory { + protected static final Log LOG = LogFactory.getLog(OpProcFactory.class + .getName()); + /** * Processor for Script Operator Prevents any predicates being pushed. */ @@ -266,9 +278,6 @@ */ public static class DefaultPPD implements NodeProcessor { - protected static final Log LOG = LogFactory.getLog(OpProcFactory.class - .getName()); - @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -377,6 +386,24 @@ if (condn == null) { return null; } + + if (op instanceof TableScanOperator) { + boolean pushFilterToStorage; + HiveConf hiveConf = owi.getParseContext().getConf(); + pushFilterToStorage = + hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTPPD_STORAGE); + if (pushFilterToStorage) { + condn = pushFilterToStorageHandler( + (TableScanOperator) op, + condn, + owi, + hiveConf); + if (condn == null) { + // we pushed the whole thing down + return null; + } + } + } // add new filter op List> originalChilren = op @@ -399,6 +426,81 @@ return output; } + /** + * Attempts to push a predicate down into a storage handler. For + * native tables, this is a no-op. + * + * @param tableScanOp table scan against which predicate applies + * + * @param originalPredicate predicate to be pushed down + * + * @param owi object walk info + * + * @param hiveConf Hive configuration + * + * @return portion of predicate which needs to be evaluated + * by Hive as a post-filter, or null if it was possible + * to push down the entire predicate + */ + private static ExprNodeDesc pushFilterToStorageHandler( + TableScanOperator tableScanOp, + ExprNodeDesc originalPredicate, + OpWalkerInfo owi, + HiveConf hiveConf) { + + TableScanDesc tableScanDesc = tableScanOp.getConf(); + Table tbl = owi.getParseContext().getTopToTable().get(tableScanOp); + if (!tbl.isNonNative()) { + return originalPredicate; + } + HiveStorageHandler storageHandler = tbl.getStorageHandler(); + if (!(storageHandler instanceof HiveStoragePredicateHandler)) { + // The storage handler does not provide predicate decomposition + // support, so we'll implement the entire filter in Hive. However, + // we still provide the full predicate to the storage handler in + // case it wants to do any of its own prefiltering. + tableScanDesc.setFilterExpr(originalPredicate); + return originalPredicate; + } + HiveStoragePredicateHandler predicateHandler = + (HiveStoragePredicateHandler) storageHandler; + JobConf jobConf = new JobConf(owi.getParseContext().getConf()); + Utilities.setColumnNameList(jobConf, tableScanOp); + Utilities.copyTableJobPropertiesToConf( + Utilities.getTableDesc(tbl), + jobConf); + Deserializer deserializer = tbl.getDeserializer(); + HiveStoragePredicateHandler.DecomposedPredicate decomposed = + predicateHandler.decomposePredicate( + jobConf, + deserializer, + originalPredicate); + if (decomposed == null) { + // not able to push anything down + if (LOG.isDebugEnabled()) { + LOG.debug("No pushdown possible for predicate: " + + originalPredicate.getExprString()); + } + return originalPredicate; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Original predicate: " + + originalPredicate.getExprString()); + if (decomposed.pushedPredicate != null) { + LOG.debug( + "Pushed predicate: " + + decomposed.pushedPredicate.getExprString()); + } + if (decomposed.residualPredicate != null) { + LOG.debug( + "Residual predicate: " + + decomposed.residualPredicate.getExprString()); + } + } + tableScanDesc.setFilterExpr(decomposed.pushedPredicate); + return decomposed.residualPredicate; + } + public static NodeProcessor getFilterProc() { return new FilterPPD(); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 996701) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -26,6 +26,8 @@ import java.beans.XMLDecoder; import java.beans.XMLEncoder; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.EOFException; import java.io.File; @@ -37,6 +39,7 @@ import java.io.OutputStream; import java.io.PrintStream; import java.io.Serializable; +import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URL; import java.net.URLClassLoader; @@ -81,6 +84,7 @@ import org.apache.hadoop.hive.ql.parse.ErrorMsg; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -317,6 +321,40 @@ return null; } + public static String serializeExpression(ExprNodeDesc expr) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + XMLEncoder encoder = new XMLEncoder(baos); + try { + encoder.writeObject(expr); + } finally { + encoder.close(); + } + try { + return baos.toString("UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + } + + public static ExprNodeDesc deserializeExpression( + String s, Configuration conf) { + byte [] bytes; + try { + bytes = s.getBytes("UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + XMLDecoder decoder = new XMLDecoder( + bais, null, null, conf.getClassLoader()); + try { + ExprNodeDesc expr = (ExprNodeDesc) decoder.readObject(); + return expr; + } finally { + decoder.close(); + } + } + /** * Serialize a single Task. */ @@ -1426,4 +1464,22 @@ public static boolean supportCombineFileInputFormat() { return ShimLoader.getHadoopShims().getCombineFileInputFormat() != null; } + + public static void setColumnNameList(JobConf jobConf, Operator op) { + RowSchema rowSchema = op.getSchema(); + if (rowSchema == null) { + return; + } + StringBuilder columnNames = new StringBuilder(); + for (ColumnInfo colInfo : rowSchema.getSignature()) { + if (columnNames.length() > 0) { + columnNames.append(","); + } + columnNames.append(colInfo.getInternalName()); + } + String columnNamesString = columnNames.toString(); + jobConf.set( + Constants.LIST_COLUMNS, + columnNamesString); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (revision 996701) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (working copy) @@ -36,6 +36,14 @@ private List virtualCols; + private ExprNodeDesc filterExpr; + + public static final String FILTER_EXPR_CONF_STR = + "hive.io.filter.expr.serialized"; + + public static final String FILTER_TEXT_CONF_STR = + "hive.io.filter.text"; + @SuppressWarnings("nls") public TableScanDesc() { } @@ -54,6 +62,15 @@ return alias; } + @Explain(displayName = "filterExpr") + public ExprNodeDesc getFilterExpr() { + return filterExpr; + } + + public void setFilterExpr(ExprNodeDesc filterExpr) { + this.filterExpr = filterExpr; + } + public void setAlias(String alias) { this.alias = alias; } Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (revision 996701) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (working copy) @@ -340,7 +340,8 @@ throw new IOException("cannot find class " + inputFormatClassName); } - initColumnsNeeded(job, inputFormatClass, hsplit.getPath(0).toString(), + pushProjectionsAndFilters(job, inputFormatClass, + hsplit.getPath(0).toString(), hsplit.getPath(0).toUri().getPath()); return ShimLoader.getHadoopShims().getCombineFileInputFormat() Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 996701) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy) @@ -33,11 +33,15 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; @@ -215,7 +219,7 @@ // clone a jobConf for setting needed columns for reading JobConf cloneJobConf = new JobConf(job); - initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath() + pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath() .toString(), hsplit.getPath().toUri().getPath()); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, @@ -260,6 +264,17 @@ InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob); + // Make filter pushdown information available to getSplits. + ArrayList aliases = + mrwork.getPathToAliases().get(dir.toUri().toString()); + if ((aliases != null) && (aliases.size() == 1)) { + Operator op = mrwork.getAliasToWork().get(aliases.get(0)); + if ((op != null) && (op instanceof TableScanOperator)) { + TableScanOperator tableScan = (TableScanOperator) op; + pushFilters(newjob, tableScan); + } + } + FileInputFormat.setInputPaths(newjob, dir); newjob.setInputFormat(inputFormat.getClass()); InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length); @@ -309,7 +324,37 @@ return partDesc; } - protected void initColumnsNeeded(JobConf jobConf, Class inputFormatClass, + protected void pushFilters(JobConf jobConf, TableScanOperator tableScan) { + + TableScanDesc scanDesc = tableScan.getConf(); + if (scanDesc == null) { + return; + } + + // construct column name list for reference by filter push down + Utilities.setColumnNameList(jobConf, tableScan); + + // push down filters + ExprNodeDesc filterExpr = scanDesc.getFilterExpr(); + if (filterExpr == null) { + return; + } + + String filterText = filterExpr.getExprString(); + String filterExprSerialized = Utilities.serializeExpression(filterExpr); + if (LOG.isDebugEnabled()) { + LOG.debug("Filter text = " + filterText); + LOG.debug("Filter expression = " + filterExprSerialized); + } + jobConf.set( + TableScanDesc.FILTER_TEXT_CONF_STR, + filterText); + jobConf.set( + TableScanDesc.FILTER_EXPR_CONF_STR, + filterExprSerialized); + } + + protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass, String splitPath, String splitPathWithNoSchema) { if (this.mrwork == null) { init(job); @@ -335,12 +380,16 @@ alias); if (op != null && op instanceof TableScanOperator) { TableScanOperator tableScan = (TableScanOperator) op; + + // push down projections ArrayList list = tableScan.getNeededColumnIDs(); if (list != null) { ColumnProjectionUtils.appendReadColumnIDs(jobConf, list); } else { ColumnProjectionUtils.setFullyReadColumns(jobConf); } + + pushFilters(jobConf, tableScan); } } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (revision 996701) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (working copy) @@ -68,7 +68,7 @@ // clone a jobConf for setting needed columns for reading JobConf cloneJobConf = new JobConf(job); - initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath() + pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath() .toString(), hsplit.getPath().toUri().getPath()); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java (revision 0) @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; + +/** + * IndexSearchCondition represents an individual search condition + * found by {@link IndexPredicateAnalyzer}. + * + * @author John Sichi + * @version $Id:$ + */ +public class IndexSearchCondition +{ + private ExprNodeColumnDesc columnDesc; + private String comparisonOp; + private ExprNodeConstantDesc constantDesc; + private ExprNodeDesc comparisonExpr; + + /** + * Constructs a search condition, which takes the form + *
column-ref comparison-op constant-value
. + * + * @param columnDesc column being compared + * + * @param comparisonOp comparison operator, e.g. "=" + * (taken from GenericUDFBridge.getUdfName()) + * + * @param constantDesc constant value to search for + * + * @Param comparisonExpr the original comparison expression + */ + public IndexSearchCondition( + ExprNodeColumnDesc columnDesc, + String comparisonOp, + ExprNodeConstantDesc constantDesc, + ExprNodeDesc comparisonExpr) { + + this.columnDesc = columnDesc; + this.comparisonOp = comparisonOp; + this.constantDesc = constantDesc; + this.comparisonExpr = comparisonExpr; + } + + public void setColumnDesc(ExprNodeColumnDesc columnDesc) { + this.columnDesc = columnDesc; + } + + public ExprNodeColumnDesc getColumnDesc() { + return columnDesc; + } + + public void setComparisonOp(String comparisonOp) { + this.comparisonOp = comparisonOp; + } + + public String getComparisonOp() { + return comparisonOp; + } + + public void setConstantDesc(ExprNodeConstantDesc constantDesc) { + this.constantDesc = constantDesc; + } + + public ExprNodeConstantDesc getConstantDesc() { + return constantDesc; + } + + public void setComparisonExpr(ExprNodeDesc comparisonExpr) { + this.comparisonExpr = comparisonExpr; + } + + public ExprNodeDesc getComparisonExpr() { + return comparisonExpr; + } + + @Override + public String toString() { + return comparisonExpr.getExprString(); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (revision 0) @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; + +/** + * IndexPredicateAnalyzer decomposes predicates, separating the parts + * which can be satisfied by an index from the parts which cannot. + * Currently, it only supports pure conjunctions over binary expressions + * comparing a column reference with a constant value. It is assumed + * that all column aliases encountered refer to the same table. + */ +public class IndexPredicateAnalyzer +{ + private Set udfNames; + + private Set allowedColumnNames; + + public IndexPredicateAnalyzer() { + udfNames = new HashSet(); + } + + /** + * Registers a comparison operator as one which can be satisfied + * by an index search. Unless this is called, analyzePredicate + * will never find any indexable conditions. + * + * @param udfName name of comparison operator as returned + * by either {@link GenericUDFBridge#getUdfName} (for simple UDF's) + * or udf.getClass().getName() (for generic UDF's). + */ + public void addComparisonOp(String udfName) { + udfNames.add(udfName); + } + + /** + * Clears the set of column names allowed in comparisons. (Initially, all + * column names are allowed.) + */ + public void clearAllowedColumnNames() { + allowedColumnNames = new HashSet(); + } + + /** + * Adds a column name to the set of column names allowed. + * + * @param columnName name of column to be allowed + */ + public void allowColumnName(String columnName) { + if (allowedColumnNames == null) { + clearAllowedColumnNames(); + } + allowedColumnNames.add(columnName); + } + + /** + * Analyzes a predicate. + * + * @param predicate predicate to be analyzed + * + * @param searchConditions receives conditions produced by analysis + * + * @return residual predicate which could not be translated to + * searchConditions + */ + public ExprNodeDesc analyzePredicate( + ExprNodeDesc predicate, + final List searchConditions) { + + Map opRules = new LinkedHashMap(); + NodeProcessor nodeProcessor = new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + + // We can only push down stuff which appears as part of + // a pure conjunction: reject OR, CASE, etc. + for (Node ancestor : stack) { + if (nd == ancestor) { + break; + } + if (!FunctionRegistry.isOpAnd((ExprNodeDesc) ancestor)) { + return nd; + } + } + + return analyzeExpr((ExprNodeDesc) nd, searchConditions, nodeOutputs); + } + }; + + Dispatcher disp = new DefaultRuleDispatcher( + nodeProcessor, opRules, null); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.add(predicate); + HashMap nodeOutput = new HashMap(); + try { + ogw.startWalking(topNodes, nodeOutput); + } catch (SemanticException ex) { + throw new RuntimeException(ex); + } + ExprNodeDesc residualPredicate = (ExprNodeDesc) nodeOutput.get(predicate); + return residualPredicate; + } + + private ExprNodeDesc analyzeExpr( + ExprNodeDesc expr, + List searchConditions, + Object... nodeOutputs) { + + if (!(expr instanceof ExprNodeGenericFuncDesc)) { + return expr; + } + if (FunctionRegistry.isOpAnd(expr)) { + assert(nodeOutputs.length == 2); + ExprNodeDesc residual1 = (ExprNodeDesc) nodeOutputs[0]; + ExprNodeDesc residual2 = (ExprNodeDesc) nodeOutputs[1]; + if (residual1 == null) { + return residual2; + } + if (residual2 == null) { + return residual1; + } + List residuals = new ArrayList(); + residuals.add(residual1); + residuals.add(residual2); + return new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getGenericUDFForAnd(), + residuals); + } + + String udfName; + ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) expr; + if (funcDesc.getGenericUDF() instanceof GenericUDFBridge) { + GenericUDFBridge func = (GenericUDFBridge) funcDesc.getGenericUDF(); + udfName = func.getUdfName(); + } else { + udfName = funcDesc.getGenericUDF().getClass().getName(); + } + if (!udfNames.contains(udfName)) { + return expr; + } + + ExprNodeDesc child1 = (ExprNodeDesc) nodeOutputs[0]; + ExprNodeDesc child2 = (ExprNodeDesc) nodeOutputs[1]; + ExprNodeColumnDesc columnDesc = null; + ExprNodeConstantDesc constantDesc = null; + if ((child1 instanceof ExprNodeColumnDesc) + && (child2 instanceof ExprNodeConstantDesc)) { + // COL CONSTANT + columnDesc = (ExprNodeColumnDesc) child1; + constantDesc = (ExprNodeConstantDesc) child2; + } else if ((child2 instanceof ExprNodeColumnDesc) + && (child1 instanceof ExprNodeConstantDesc)) { + // CONSTANT COL + columnDesc = (ExprNodeColumnDesc) child2; + constantDesc = (ExprNodeConstantDesc) child1; + } + if (columnDesc == null) { + return expr; + } + if (allowedColumnNames != null) { + if (!allowedColumnNames.contains(columnDesc.getColumn())) { + return expr; + } + } + searchConditions.add( + new IndexSearchCondition( + columnDesc, + udfName, + constantDesc, + expr)); + + // we converted the expression to a search condition, so + // remove it from the residual predicate + return null; + } + + /** + * Translates search conditions back to ExprNodeDesc form (as + * a left-deep conjunction). + * + * @param searchConditions (typically produced by analyzePredicate) + * + * @return ExprNodeDesc form of search conditions + */ + public ExprNodeDesc translateSearchConditions( + List searchConditions) { + + ExprNodeDesc expr = null; + for (IndexSearchCondition searchCondition : searchConditions) { + if (expr == null) { + expr = searchCondition.getComparisonExpr(); + continue; + } + List children = new ArrayList(); + children.add(expr); + children.add(searchCondition.getComparisonExpr()); + expr = new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getGenericUDFForAnd(), + children); + } + return expr; + } +}