Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 987314) +++ conf/hive-default.xml (working copy) @@ -273,6 +273,12 @@ + hive.optimize.ppd.storage + false + Whether to push predicates down into storage handlers. Ignored when hive.optimize.ppd is false. + + + hive.optimize.pruner true Whether to enable the new partition pruner which depends on predicate pushdown. If this is disabled, Index: hbase-handler/src/test/results/hbase_pushdown.q.out =================================================================== --- hbase-handler/src/test/results/hbase_pushdown.q.out (revision 0) +++ hbase-handler/src/test/results/hbase_pushdown.q.out (revision 0) @@ -0,0 +1,127 @@ +PREHOOK: query: CREATE TABLE hbase_pushdown(key int, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string") +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE hbase_pushdown(key int, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@hbase_pushdown +PREHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown +SELECT * +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_pushdown +POSTHOOK: query: INSERT OVERWRITE TABLE hbase_pushdown +SELECT * +FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_pushdown +PREHOOK: query: -- without pushdown +explain select * from hbase_pushdown where key=90 +PREHOOK: type: QUERY +POSTHOOK: query: -- without pushdown +explain select * from hbase_pushdown where key=90 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + hbase_pushdown + TableScan + alias: hbase_pushdown + Filter Operator + predicate: + expr: (key = 90) + type: boolean + Filter Operator + predicate: + expr: (key = 90) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: -- with pushdown +explain select * from hbase_pushdown where key=90 +PREHOOK: type: QUERY +POSTHOOK: query: -- with pushdown +explain select * from hbase_pushdown where key=90 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF hbase_pushdown)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 90)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + hbase_pushdown + TableScan + alias: hbase_pushdown + filterExpr: + expr: (key = 90) + type: boolean + Filter Operator + predicate: + expr: (key = 90) + type: boolean + Filter Operator + predicate: + expr: (key = 90) + type: boolean + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select * from hbase_pushdown where key=90 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_pushdown +PREHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-08-19_14-24-01_759_143813178377646105/-mr-10000 +POSTHOOK: query: select * from hbase_pushdown where key=90 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_pushdown +POSTHOOK: Output: file:/var/folders/7P/7PeC14kXFIWq0PIYyexGbmKuXUk/-Tmp-/jsichi/hive_2010-08-19_14-24-01_759_143813178377646105/-mr-10000 +90 val_90 Index: hbase-handler/src/test/queries/hbase_pushdown.q =================================================================== --- hbase-handler/src/test/queries/hbase_pushdown.q (revision 0) +++ hbase-handler/src/test/queries/hbase_pushdown.q (revision 0) @@ -0,0 +1,17 @@ +CREATE TABLE hbase_pushdown(key int, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string"); + +INSERT OVERWRITE TABLE hbase_pushdown +SELECT * +FROM src; + +-- without pushdown +explain select * from hbase_pushdown where key=90; + +set hive.optimize.ppd.storage=true; + +-- with pushdown +explain select * from hbase_pushdown where key=90; + +select * from hbase_pushdown where key=90; Index: hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java =================================================================== --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (revision 987314) +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.logging.Log; @@ -29,12 +30,27 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -63,6 +79,7 @@ final Reporter reporter) throws IOException { HBaseSplit hbaseSplit = (HBaseSplit) split; + TableSplit tableSplit = hbaseSplit.getSplit(); String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName))); @@ -74,6 +91,8 @@ throw new IOException("Cannot read more columns than the given table contains."); } + Scan scan = new Scan(); + List scanColumns = new ArrayList(); boolean addAll = (readColIDs.size() == 0); if (!addAll) { @@ -96,10 +115,67 @@ } } } + scan.addColumns(scanColumns.toArray(new byte[0][])); - setScan(new Scan().addColumns(scanColumns.toArray(new byte[0][]))); - org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit = hbaseSplit.getSplit(); + String filterExprSerialized = + jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filterExprSerialized != null) { + ExprNodeDesc filterExpr = + Utilities.deserializeExpression(filterExprSerialized, jobConf); + IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + List searchConditions = + new ArrayList(); + ExprNodeDesc residualExpr = + analyzer.analyzePredicate(filterExpr, searchConditions); + // TODO: deal with residual + + String columnNameProperty = jobConf.get(Constants.LIST_COLUMNS); + List columnNames = + Arrays.asList(columnNameProperty.split(",")); + + for (IndexPredicateAnalyzer.SearchCondition sc : searchConditions) { + // TODO: the real thing, and assert stuff about table aliases + int iCol = columnNames.indexOf(sc.getColumnDesc().getColumn()); + if (HBaseSerDe.HBASE_KEY_COL.equals(columns.get(iCol))) { + ExprNodeConstantEvaluator eval = + new ExprNodeConstantEvaluator(sc.getConstantDesc()); + byte [] bytes; + try { + ObjectInspector objInspector = eval.initialize(null); + Object writable = eval.evaluate(null); + ByteStream.Output serializeStream = new ByteStream.Output(); + // REVIEW: escaped etc + LazyUtils.writePrimitiveUTF8( + serializeStream, + writable, + (PrimitiveObjectInspector) objInspector, + false, + (byte) 0, + null); + bytes = new byte[serializeStream.getCount()]; + System.arraycopy( + serializeStream.getData(), 0, + bytes, 0, serializeStream.getCount()); + } catch (HiveException ex) { + // FIXME + throw new IOException(ex); + } + tableSplit = new TableSplit( + tableSplit.getTableName(), + bytes, + tableSplit.getEndRow(), + tableSplit.getRegionLocation()); + scan.setFilter( + new RowFilter( + CompareFilter.CompareOp.EQUAL, + new BinaryComparator(bytes))); + } + } + } + + setScan(scan); + Job job = new Job(jobConf); TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()) { Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 987314) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -258,6 +258,8 @@ // Optimizer HIVEOPTCP("hive.optimize.cp", true), // column pruner HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown + // push predicates down to storage handlers + HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", false), HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join Index: ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (revision 987314) +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (working copy) @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -39,6 +40,8 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -47,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; /** @@ -378,6 +382,20 @@ return null; } + if (op instanceof TableScanOperator) { + boolean pushFilterToStorage; + try { + pushFilterToStorage = + Hive.get().getConf().getBoolVar(HiveConf.ConfVars.HIVEOPTPPD_STORAGE); + } catch (HiveException ex) { + throw new RuntimeException(ex); + } + if (pushFilterToStorage) { + TableScanDesc tableScanDesc = ((TableScanOperator) op).getConf(); + tableScanDesc.setFilterExpr(condn); + } + } + // add new filter op List> originalChilren = op .getChildOperators(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 987314) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -26,6 +26,8 @@ import java.beans.XMLDecoder; import java.beans.XMLEncoder; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.EOFException; import java.io.File; @@ -37,6 +39,7 @@ import java.io.OutputStream; import java.io.PrintStream; import java.io.Serializable; +import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URL; import java.net.URLClassLoader; @@ -81,6 +84,7 @@ import org.apache.hadoop.hive.ql.parse.ErrorMsg; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -316,6 +320,40 @@ return null; } + public static String serializeExpression(ExprNodeDesc expr) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + XMLEncoder encoder = new XMLEncoder(baos); + try { + encoder.writeObject(expr); + } finally { + encoder.close(); + } + try { + return baos.toString("UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + } + + public static ExprNodeDesc deserializeExpression( + String s, Configuration conf) { + byte [] bytes; + try { + bytes = s.getBytes("UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + XMLDecoder decoder = new XMLDecoder( + bais, null, null, conf.getClassLoader()); + try { + ExprNodeDesc expr = (ExprNodeDesc) decoder.readObject(); + return expr; + } finally { + decoder.close(); + } + } + /** * Serialize a single Task. */ Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (revision 987314) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (working copy) @@ -36,6 +36,14 @@ private List virtualCols; + private ExprNodeDesc filterExpr; + + public static final String FILTER_EXPR_CONF_STR = + "hive.io.filter.expr.serialized"; + + public static final String FILTER_TEXT_CONF_STR = + "hive.io.filter.text"; + @SuppressWarnings("nls") public TableScanDesc() { } @@ -54,6 +62,15 @@ return alias; } + @Explain(displayName = "filterExpr") + public ExprNodeDesc getFilterExpr() { + return filterExpr; + } + + public void setFilterExpr(ExprNodeDesc filterExpr) { + this.filterExpr = filterExpr; + } + public void setAlias(String alias) { this.alias = alias; } Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (revision 987314) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (working copy) @@ -337,7 +337,8 @@ throw new IOException("cannot find class " + inputFormatClassName); } - initColumnsNeeded(job, inputFormatClass, hsplit.getPath(0).toString(), + pushProjectionsAndFilters(job, inputFormatClass, + hsplit.getPath(0).toString(), hsplit.getPath(0).toUri().getPath()); return ShimLoader.getHadoopShims().getCombineFileInputFormat() Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 987314) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy) @@ -33,11 +33,16 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; @@ -215,7 +220,7 @@ // clone a jobConf for setting needed columns for reading JobConf cloneJobConf = new JobConf(job); - initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath() + pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath() .toString(), hsplit.getPath().toUri().getPath()); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, @@ -309,7 +314,7 @@ return partDesc; } - protected void initColumnsNeeded(JobConf jobConf, Class inputFormatClass, + protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass, String splitPath, String splitPathWithNoSchema) { if (this.mrwork == null) { init(job); @@ -335,12 +340,55 @@ alias); if (op != null && op instanceof TableScanOperator) { TableScanOperator tableScan = (TableScanOperator) op; + + // push down projections ArrayList list = tableScan.getNeededColumnIDs(); if (list != null) { ColumnProjectionUtils.appendReadColumnIDs(jobConf, list); } else { ColumnProjectionUtils.setFullyReadColumns(jobConf); } + + // construct column name list for reference by filter push down + TableScanDesc scanDesc = tableScan.getConf(); + if (scanDesc != null) { + RowSchema rowSchema = op.getSchema(); + StringBuilder columnNames = new StringBuilder(); + if (rowSchema != null) { + for (ColumnInfo colInfo : rowSchema.getSignature()) { + if (columnNames.length() > 0) { + columnNames.append(","); + } + columnNames.append(colInfo.getInternalName()); + } + String columnNamesString = columnNames.toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("Table scan column list = " + columnNamesString); + } + jobConf.set( + Constants.LIST_COLUMNS, + columnNamesString); + } + + // push down filters + ExprNodeDesc filterExpr = scanDesc.getFilterExpr(); + if (filterExpr != null) { + String filterText = + filterExpr.getExprString(); + String filterExprSerialized = + Utilities.serializeExpression(filterExpr); + if (LOG.isDebugEnabled()) { + LOG.debug("Filter text = " + filterText); + LOG.debug("Filter expression = " + filterExprSerialized); + } + jobConf.set( + TableScanDesc.FILTER_TEXT_CONF_STR, + filterText); + jobConf.set( + TableScanDesc.FILTER_EXPR_CONF_STR, + filterExprSerialized); + } + } } } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (revision 987314) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (working copy) @@ -68,7 +68,7 @@ // clone a jobConf for setting needed columns for reading JobConf cloneJobConf = new JobConf(job); - initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath() + pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath() .toString(), hsplit.getPath().toUri().getPath()); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, Index: ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (revision 0) @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import java.util.List; + +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; + +/** + * IndexPredicateAnalyzer decomposes predicates, separating the parts + * which can be satisfied by an index from the parts which cannot. + */ +public class IndexPredicateAnalyzer +{ + public IndexPredicateAnalyzer() { + } + + public ExprNodeDesc analyzePredicate( + ExprNodeDesc predicate, + List searchConditions) { + + if (predicate instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) predicate; + if (funcDesc.getGenericUDF() instanceof GenericUDFBridge) { + GenericUDFBridge func = (GenericUDFBridge) funcDesc.getGenericUDF(); + if (func.getUdfName().equals("=")) { + ExprNodeDesc child1 = funcDesc.getChildren().get(0); + ExprNodeDesc child2 = funcDesc.getChildren().get(1); + ExprNodeColumnDesc columnDesc = null; + ExprNodeConstantDesc constantDesc = null; + if ((child1 instanceof ExprNodeColumnDesc) + && (child2 instanceof ExprNodeConstantDesc)) { + // COL = CONSTANT + columnDesc = (ExprNodeColumnDesc) child1; + constantDesc = (ExprNodeConstantDesc) child2; + } else if ((child2 instanceof ExprNodeColumnDesc) + && (child1 instanceof ExprNodeConstantDesc)) { + // CONSTANT = COL + columnDesc = (ExprNodeColumnDesc) child2; + constantDesc = (ExprNodeConstantDesc) child1; + } + if (columnDesc != null) { + searchConditions.add( + new SearchCondition( + columnDesc, + SearchComparison.EQ, + constantDesc)); + return null; + } + } + } + } + return predicate; + } + + public enum SearchComparison { + LT, LE, GT, GE, EQ, NE + } + + public static class SearchCondition { + private ExprNodeColumnDesc columnDesc; + private SearchComparison comparison; + private ExprNodeConstantDesc constantDesc; + + SearchCondition( + ExprNodeColumnDesc columnDesc, + SearchComparison comparison, + ExprNodeConstantDesc constantDesc) { + + this.columnDesc = columnDesc; + this.comparison = comparison; + this.constantDesc = constantDesc; + } + + public ExprNodeColumnDesc getColumnDesc() { + return columnDesc; + } + + public SearchComparison getComparison() { + return comparison; + } + + public ExprNodeConstantDesc getConstantDesc() { + return constantDesc; + } + } +}