Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1074743)
+++ conf/hive-default.xml (working copy)
@@ -313,6 +313,12 @@
+ hive.optimize.autoindex
+ true
+ Whether to enable automatic use of indexes
+
+
+
hive.optimize.ppd
true
Whether to enable predicate pushdown
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1074743)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -305,6 +305,7 @@
// Optimizer
HIVEOPTCP("hive.optimize.cp", true), // column pruner
+ HIVEOPTAUTOINDEX("hive.optimize.autoindex", true), // automatically use indexes
HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
// push predicates down to storage handlers
HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true),
Index: ql/src/test/results/clientpositive/index_opt_where_simple.q.out
===================================================================
--- ql/src/test/results/clientpositive/index_opt_where_simple.q.out (revision 0)
+++ ql/src/test/results/clientpositive/index_opt_where_simple.q.out (revision 0)
@@ -0,0 +1,36 @@
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+POSTHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+POSTHOOK: query: ALTER INDEX src_index ON src REBUILD
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@default__src_src_index__
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: SELECT * FROM src WHERE key=86 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/rmelick/hive_2011-02-20_14-25-36_662_316250227718976882/-mr-10000
+POSTHOOK: query: SELECT * FROM src WHERE key=86 ORDER BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@default__src_src_index__
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/tmp/rmelick/hive_2011-02-20_14-25-36_662_316250227718976882/-mr-10000
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+86 val_86
+PREHOOK: query: DROP INDEX src_index on src
+PREHOOK: type: DROPINDEX
+POSTHOOK: query: DROP INDEX src_index on src
+POSTHOOK: type: DROPINDEX
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
Index: ql/src/test/results/clientpositive/index_opt_where.q.out
===================================================================
--- ql/src/test/results/clientpositive/index_opt_where.q.out (revision 0)
+++ ql/src/test/results/clientpositive/index_opt_where.q.out (revision 0)
@@ -0,0 +1,170 @@
+PREHOOK: query: -- try the queries without indexing
+-- TODO HMC put ORDER BY into the select queries to keep test results consistent
+SELECT * FROM src WHERE key=86 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-08-24_460_2243632729227398370/-mr-10000
+POSTHOOK: query: -- try the queries without indexing
+-- TODO HMC put ORDER BY into the select queries to keep test results consistent
+SELECT * FROM src WHERE key=86 ORDER BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-08-24_460_2243632729227398370/-mr-10000
+86 val_86
+PREHOOK: query: SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-08-29_321_8583072376226541567/-mr-10000
+POSTHOOK: query: SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-08-29_321_8583072376226541567/-mr-10000
+82 val_82
+83 val_83
+83 val_83
+84 val_84
+84 val_84
+85 val_85
+86 val_86
+87 val_87
+90 val_90
+90 val_90
+90 val_90
+92 val_92
+95 val_95
+95 val_95
+96 val_96
+97 val_97
+97 val_97
+98 val_98
+98 val_98
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+POSTHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+POSTHOOK: query: ALTER INDEX src_index ON src REBUILD
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@default__src_src_index__
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result_where1" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key=86
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Output: /tmp/index_result_where1
+POSTHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result_where1" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key=86
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@default__src_src_index__
+POSTHOOK: Output: /tmp/index_result_where1
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: SELECT key, value FROM src WHERE key=86 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-08-52_248_1402456502169248212/-mr-10000
+POSTHOOK: query: SELECT key, value FROM src WHERE key=86 ORDER BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-08-52_248_1402456502169248212/-mr-10000
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+86 val_86
+PREHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_where2" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key > 80 AND key < 100
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Output: /tmp/index_where2
+POSTHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_where2" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key > 80 AND key < 100
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@default__src_src_index__
+POSTHOOK: Output: /tmp/index_where2
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: SELECT key, value FROM src WHERE key > 80 AND key < 100 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-08-58_886_5164613094498566262/-mr-10000
+POSTHOOK: query: SELECT key, value FROM src WHERE key > 80 AND key < 100 ORDER BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-08-58_886_5164613094498566262/-mr-10000
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+82 val_82
+83 val_83
+83 val_83
+84 val_84
+84 val_84
+85 val_85
+86 val_86
+87 val_87
+90 val_90
+90 val_90
+90 val_90
+92 val_92
+95 val_95
+95 val_95
+96 val_96
+97 val_97
+97 val_97
+98 val_98
+98 val_98
+PREHOOK: query: -- test automatic usage of index in queries
+SELECT * FROM src WHERE key=86 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-09-02_338_7323660045243271625/-mr-10000
+POSTHOOK: query: -- test automatic usage of index in queries
+SELECT * FROM src WHERE key=86 ORDER BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-09-02_338_7323660045243271625/-mr-10000
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+86 val_86
+PREHOOK: query: SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-09-05_745_1914338222235282341/-mr-10000
+POSTHOOK: query: SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/tmp/rmelick/hive_2011-02-10_21-09-05_745_1914338222235282341/-mr-10000
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+82 val_82
+83 val_83
+83 val_83
+84 val_84
+84 val_84
+85 val_85
+86 val_86
+87 val_87
+90 val_90
+90 val_90
+90 val_90
+92 val_92
+95 val_95
+95 val_95
+96 val_96
+97 val_97
+97 val_97
+98 val_98
+98 val_98
+PREHOOK: query: DROP INDEX src_index on src
+PREHOOK: type: DROPINDEX
+POSTHOOK: query: DROP INDEX src_index on src
+POSTHOOK: type: DROPINDEX
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
Index: ql/src/test/queries/clientpositive/index_opt_where_simple.q
===================================================================
--- ql/src/test/queries/clientpositive/index_opt_where_simple.q (revision 0)
+++ ql/src/test/queries/clientpositive/index_opt_where_simple.q (revision 0)
@@ -0,0 +1,8 @@
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
+SELECT * FROM src WHERE key=86 ORDER BY key;
+
+DROP INDEX src_index on src;
Index: ql/src/test/queries/clientpositive/index_opt_where.q
===================================================================
--- ql/src/test/queries/clientpositive/index_opt_where.q (revision 0)
+++ ql/src/test/queries/clientpositive/index_opt_where.q (revision 0)
@@ -0,0 +1,28 @@
+-- try the queries without indexing
+-- TODO HMC put ORDER BY into the select queries to keep test results consistent
+SELECT * FROM src WHERE key=86 ORDER BY key;
+SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+INSERT OVERWRITE DIRECTORY "/tmp/index_result_where1" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key=86;
+SET hive.index.compact.file=/tmp/index_result_where1;
+SET hive.input.format=org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat;
+SELECT key, value FROM src WHERE key=86 ORDER BY key;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
+INSERT OVERWRITE DIRECTORY "/tmp/index_where2" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key > 80 AND key < 100;
+SET hive.index.compact.file=/tmp/index_where2;
+SET hive.input.format=org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat;
+SELECT key, value FROM src WHERE key > 80 AND key < 100 ORDER BY key;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
+-- test automatic usage of index in queries
+SELECT * FROM src WHERE key=86 ORDER BY key;
+SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key;
+
+DROP INDEX src_index on src;
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (working copy)
@@ -38,6 +38,7 @@
import org.apache.hadoop.hive.metastore.ProtectMode;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -812,4 +813,14 @@
public String getCompleteName() {
return getDbName() + "@" + getTableName();
}
+
+ /**
+ * @return List containing Index Table names if there is exists indexes
+ * on this table
+ * @throws HiveException
+ **/
+ public List getAllIndexes(short max) throws HiveException {
+ Hive hive = Hive.get();
+ return hive.getIndexesOnTable(getTTable().getDbName(), getTTable().getTableName(), max);
+ }
};
Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy)
@@ -681,6 +681,18 @@
}
}
+
+ public List getIndexesOnTable(String db_name, String tbl_name,
+ short max) throws HiveException {
+ try {
+ return getMSC().listIndexes(db_name, tbl_name, max);
+ } catch (NoSuchObjectException e) {
+ throw new HiveException("Partition or table doesn't exist.", e);
+ } catch (Exception e) {
+ throw new HiveException("Unknow error. Please check logs.", e);
+ }
+ }
+
public boolean dropIndex(String db_name, String tbl_name, String index_name, boolean deleteData) throws HiveException {
try {
return getMSC().dropIndex(db_name, tbl_name, index_name, deleteData);
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (working copy)
@@ -33,11 +33,11 @@
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.QBParseInfo;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
/**
@@ -69,6 +69,19 @@
ctx.setCurrTask(currTask);
ctx.setCurrTopOp(currTopOp);
+ // Reset the inputFormat and inputFormatFile if the table scan needs a different one.
+ String indexClassName = op.getIndexClassName();
+ String inputFormatFile = op.getIndexTmpFile();
+ if (indexClassName != null) {
+ ((MapredWork)currTask.getWork()).setInputformat("org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat");
+ ((MapredWork)currTask.getWork()).setInputFormatFile(inputFormatFile);
+ System.out.println("GenMRTableScan1: indexClassName: " + indexClassName + ", inputFormatFile: " + inputFormatFile);
+ }
+ else {
+ System.out.println("GenMRTableScan1: Nothing.");
+ }
+
+
for (String alias : parseCtx.getTopOps().keySet()) {
Operator extends Serializable> currOp = parseCtx.getTopOps().get(alias);
if (currOp == op) {
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy)
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.index.IndexWhereOptimizer;
import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
import org.apache.hadoop.hive.ql.optimizer.pcr.PartitionConditionRemover;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -72,6 +73,11 @@
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
transformations.add(new ReduceSinkDeDuplication());
}
+
+ //TODO use the hiveConf to figure out whether to use this optimization
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTAUTOINDEX)) {
+ transformations.add(new IndexWhereOptimizer());
+ }
}
/**
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/IndexWhereProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/IndexWhereProcessor.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/IndexWhereProcessor.java (revision 0)
@@ -0,0 +1,141 @@
+package org.apache.hadoop.hive.ql.optimizer.index;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+
+/**
+*
+* IndexWhereProcessor.
+* Processes nodes
+* TODO implement this class
+*/
+public class IndexWhereProcessor implements NodeProcessor {
+
+ private static final Log LOG = LogFactory.getLog(IndexWhereProcessor.class.getName());
+ private final List indexes;
+
+ public IndexWhereProcessor(List indexes) {
+ super();
+ this.indexes = indexes;
+ }
+
+ @Override
+ /**
+ * TODO HMC
+ */
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ LOG.info("Processing for " + nd.getName() + "(" + ((Operator) nd).getIdentifier() + ")");
+
+ FilterOperator operator = (FilterOperator) nd;
+ FilterDesc operatorDesc = operator.getConf();
+ ExprNodeDesc predicate = operatorDesc.getPredicate();
+
+ ParseContext pctx = ((IndexWhereProcCtx) procCtx).getParseContext();
+
+ // TODO HMC somehow, we're picking up 2 Filters within one select query: SELECT key, value FROM src WHERE key=86
+ // why would we run this code twice?
+
+
+ // get potential reentrant index queries from each index we have
+ List> indexQueryTaskList = new ArrayList>();
+ for (Index index : indexes) {
+ List> indexQueryTasks = rewriteForIndex(predicate, index, pctx);
+ if (indexQueryTasks != null)
+ {
+ indexQueryTaskList.addAll(indexQueryTasks);
+ }
+ }
+
+ // TODO HMC place for cost based choice?
+ /*
+ ParseContext chosenRewrite = indexQueries.get(0);
+ if (chosenRewrite != null)
+ {
+ // Attach index query to original query (using Operator trees)
+ pctx.getTopOps().putAll(chosenRewrite.getTopOps());
+ pctx.getTopToTable().putAll(chosenRewrite.getTopToTable());
+ }
+ */
+ pctx.getSemanticAnalyzer().getRootTasks().addAll(indexQueryTaskList);
+
+ return null;
+ }
+
+ /**
+ * Rewrite the operaindexestor tree to activate use of indexes.
+ * Mainly, generate the tree for the index query (where we store results of
+ * querying the index in a tmp file) by using reentrant QL
+ */
+ private List> rewriteForIndex(ExprNodeDesc predicate, Index index,
+ ParseContext pctx)
+ throws SemanticException {
+ HiveIndexHandler indexHandler;
+ try {
+ indexHandler = HiveUtils.getIndexHandler(pctx.getConf(), index.getIndexHandlerClass());
+ } catch (HiveException e) {
+ LOG.error("Exception while loading IndexHandler: " + index.getIndexHandlerClass());
+ throw new SemanticException("Failed to load indexHandler: " + index.getIndexHandlerClass(), e);
+ }
+
+ LOG.info("Found indexHandler " + indexHandler.toString());
+
+ // use the IndexHandler to generate the index query
+ List> indexQueryTasks = indexHandler.generateIndexQuery(index, predicate, pctx, pctx.getSemanticAnalyzer().getInputs());
+
+ return indexQueryTasks;
+ }
+
+ /**
+ * Instantiate a new predicate analyzer suitable for determining
+ * whether we can use an index, based on rules for indexes in
+ * WHERE clauses that we support
+ *
+ * @return preconfigured predicate analyzer for WHERE queries
+ */
+ private IndexPredicateAnalyzer getIndexPredicateAnalyzer() {
+ // TODO HMC make sure that List is the correct type of
+ // a list of columns
+ IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+
+ // TODO HMC are these the comparisons we should be supporting?
+ analyzer.addComparisonOp(GenericUDFOPEqual.class.getName());
+ analyzer.addComparisonOp(GenericUDFOPLessThan.class.getName());
+ analyzer.addComparisonOp(GenericUDFOPGreaterThan.class.getName());
+
+ // only return results for columns on which we have indexes
+ for (Index index : indexes) {
+ List columnSchemas = index.getSd().getCols();
+ for (FieldSchema column : columnSchemas) {
+ analyzer.allowColumnName(column.getName());
+ }
+ }
+
+ return analyzer;
+ }
+
+}
+
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/IndexWhereOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/IndexWhereOptimizer.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/IndexWhereOptimizer.java (revision 0)
@@ -0,0 +1,118 @@
+package org.apache.hadoop.hive.ql.optimizer.index;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+public class IndexWhereOptimizer implements Transform {
+
+ private static final Log LOG = LogFactory.getLog(IndexWhereOptimizer.class.getName());
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ List idxType = new ArrayList();
+ idxType.add(CompactIndexHandler.class.getName()); //TODO HMC don't hardcode supported indexes
+
+ // query the metastore to know what columns we have indexed
+ Collection topOps = pctx.getTopToTable().values();
+ List indexesOnTable = new ArrayList();
+ for (Table topOp : topOps)
+ {
+ indexesOnTable.addAll(getIndexes(topOp, idxType));
+ }
+ LOG.info("indexes " + indexesOnTable.toString());
+
+ // create the regex's so the walker can recognize our WHERE queries
+ Map operatorRules = new LinkedHashMap();
+ // FIL% is a filter operator, which is what WHERE's end up as
+ // TODO HMC better regex to match only 1 search condition
+ operatorRules.put(new RuleRegExp("RULEWhere", "FIL%"), new IndexWhereProcessor(indexesOnTable));
+
+ // create context the walker can carry with it
+ IndexWhereProcCtx indexWhereOptimizeCtx = new IndexWhereProcCtx(pctx);
+
+
+ // create the dispatcher, which fires the processor according to the rule that
+ // best matches
+ Dispatcher dispatcher = new DefaultRuleDispatcher(getDefaultProcessor(),
+ operatorRules,
+ indexWhereOptimizeCtx);
+ GraphWalker opGraphWalker = new DefaultGraphWalker(dispatcher);
+
+
+ // start walking on the top operations (head of the tree)
+ // TODO HMC why do we need to copy values?
+ ArrayList topNodes = new ArrayList();
+ topNodes.addAll(pctx.getTopOps().values());
+ opGraphWalker.startWalking(topNodes, null);
+
+ return pctx;
+ }
+
+ // Copied from
+ // https://github.com/prafullat/hive/blob/ddc89c33a1d4541e1687ad8b89c4d3f73d35d477/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RewriteGBUsingIndex.java
+ List getIndexes(Table baseTableMetaData, List matchIndexTypes) {
+ List matchingIndexes = new ArrayList();
+ List indexesOnTable = null;
+
+ try {
+ short maxNumOfIndexes = 1024; // XTODO: Hardcoding. Need to know if
+ // there's a limit (and what is it) on
+ // # of indexes that can be created
+ // on a table. If not, why is this param
+ // required by metastore APIs?
+ indexesOnTable = baseTableMetaData.getAllIndexes(maxNumOfIndexes);
+
+ } catch (HiveException e) {
+ return matchingIndexes; // Return empty list (trouble doing rewrite
+ // shouldn't stop regular query execution,
+ // if there's serious problem with metadata
+ // or anything else, it's assumed to be
+ // checked & handled in core hive code itself.
+ }
+
+ for (Index index : indexesOnTable) {
+ // The handler class implies the type of the index (e.g. compact
+ // summary index would be:
+ // "org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler").
+ String indexType = index.getIndexHandlerClass();
+ if (matchIndexTypes.contains(indexType)) {
+ matchingIndexes.add(index);
+ }
+ }
+ return matchingIndexes;
+ }
+
+ private NodeProcessor getDefaultProcessor() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ return null;
+ }
+ };
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/IndexWhereProcCtx.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/IndexWhereProcCtx.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/IndexWhereProcCtx.java (revision 0)
@@ -0,0 +1,21 @@
+package org.apache.hadoop.hive.ql.optimizer.index;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+
+public class IndexWhereProcCtx implements NodeProcessorCtx {
+
+ private static final Log LOG = LogFactory.getLog(IndexWhereProcCtx.class.getName());
+
+ private final ParseContext parseCtx;
+
+ public IndexWhereProcCtx(ParseContext parseCtx) {
+ this.parseCtx = parseCtx;
+ }
+
+ public ParseContext getParseContext() {
+ return parseCtx;
+ }
+}
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/RewriteParseContextGenerator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/RewriteParseContextGenerator.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/RewriteParseContextGenerator.java (revision 0)
@@ -0,0 +1,100 @@
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * RewriteParseContextGenerator is a class that offers methods to generate operator tree
+ * for input queries. It is implemented on lines of the analyzeInternal(..) method
+ * of {@link SemanticAnalyzer} but it creates only the ParseContext for the input query command.
+ * It does not optimize or generate map-reduce tasks for the input query.
+ * This can be used when you need to create operator tree for an internal query.
+ * For example, {@link RewriteGBUsingIndex} uses the {@link RewriteIndexSubqueryProcFactory} methods to
+ * generate subquery that scans over index table rather than original table.
+ *
+ */
+public final class RewriteParseContextGenerator {
+ protected static Log LOG = LogFactory.getLog(RewriteParseContextGenerator.class.getName());
+
+ /**
+ * Parse the input {@link String} command and generate a ASTNode tree
+ * @param conf
+ * @param command
+ * @return
+ */
+ public static ParseContext generateOperatorTree(HiveConf conf, String command){
+ Context ctx;
+ ParseContext subPCtx = null;
+ try {
+ ctx = new Context(conf);
+ ParseDriver pd = new ParseDriver();
+ ASTNode tree = pd.parse(command, ctx);
+ tree = ParseUtils.findRootNonNullToken(tree);
+
+ BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
+ doSemanticAnalysis(sem, tree, ctx);
+
+ subPCtx = ((SemanticAnalyzer) sem).getParseContext();
+ LOG.info("Sub-query Semantic Analysis Completed");
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (SemanticException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return subPCtx;
+
+ }
+
+ /**
+ * For the input ASTNode tree, perform a semantic analysis and check metadata
+ * Generate a operator tree and return the {@link ParseContext} instance for the operator tree
+ *
+ * @param ctx
+ * @param sem
+ * @param ast
+ * @return
+ * @throws SemanticException
+ */
+ private static void doSemanticAnalysis(BaseSemanticAnalyzer sem, ASTNode ast, Context ctx) throws SemanticException {
+
+ if(sem instanceof SemanticAnalyzer){
+ QB qb = new QB(null, null, false);
+ ASTNode child = ast;
+ ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext();
+ subPCtx.setContext(ctx);
+ ((SemanticAnalyzer) sem).init(subPCtx);
+
+ LOG.info("Starting Sub-query Semantic Analysis");
+ ((SemanticAnalyzer) sem).doPhase1(child, qb, ((SemanticAnalyzer) sem).initPhase1Ctx());
+ LOG.info("Completed phase 1 of Sub-query Semantic Analysis");
+
+ ((SemanticAnalyzer) sem).getMetaData(qb);
+ LOG.info("Completed getting MetaData in Sub-query Semantic Analysis");
+
+ LOG.info("Sub-query Abstract syntax tree: " + ast.toStringTree());
+ ((SemanticAnalyzer) sem).genPlan(qb);
+
+ LOG.info("Sub-query Completed plan generation");
+ }
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (working copy)
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
@@ -53,6 +54,8 @@
private transient String partitionSpecs;
private transient boolean inputFileChanged = false;
private TableDesc tableDesc;
+ private String indexClassName;
+ private String indexTmpFile;
public TableDesc getTableDesc() {
@@ -63,6 +66,26 @@
this.tableDesc = tableDesc;
}
+ public String getIndexClassName() {
+ return indexClassName;
+ }
+
+ public void setIndexClassName(String indexClassName) {
+ this.indexClassName = indexClassName;
+ }
+
+ public String getIndexTmpFile() {
+ return indexTmpFile;
+ }
+
+ public void setIndexTmpFile(String indexTmpFile) {
+ this.indexTmpFile = indexTmpFile;
+ }
+
+ public boolean usesIndex() {
+ return StringUtils.isNotEmpty(indexClassName) && StringUtils.isNotEmpty(indexTmpFile);
+ }
+
/**
* Other than gathering statistics for the ANALYZE command, the table scan operator
* does not do anything special other than just forwarding the row. Since the table
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -260,9 +260,13 @@
job.setNumReduceTasks(work.getNumReduceTasks().intValue());
job.setReducerClass(ExecReducer.class);
+ // Set hive input format, and input format file if necessary.
if (work.getInputformat() != null) {
HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, work.getInputformat());
}
+ if (work.getInputFormatFile() != null) {
+ job.set("hive.index.compact.file", work.getInputFormatFile());
+ }
// Turn on speculative execution for reducers
boolean useSpeculativeExecReducers = HiveConf.getBoolVar(job,
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy)
@@ -29,9 +29,6 @@
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
/**
* MapredWork.
@@ -70,6 +67,7 @@
private MapredLocalWork mapLocalWork;
private String inputformat;
+ private String inputFormatFile;
private boolean gatheringStats;
private String tmpHDFSFileURI;
@@ -335,10 +333,18 @@
return inputformat;
}
+ public String getInputFormatFile() {
+ return inputFormatFile;
+ }
+
public void setInputformat(String inputformat) {
this.inputformat = inputformat;
}
+ public void setInputFormatFile(String inputFormatFile) {
+ this.inputFormatFile = inputFormatFile;
+ }
+
public void setGatheringStats(boolean gatherStats) {
this.gatheringStats = gatherStats;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java (working copy)
@@ -22,12 +22,14 @@
import java.util.Set;
import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
/**
* HiveIndexHandler defines a pluggable interface for adding new index handlers
@@ -114,4 +116,12 @@
Set inputs, Set outputs)
throws HiveException;
-}
\ No newline at end of file
+ /**
+ *
+ * @param index
+ * @param decomposedPredicate
+ * @return parseContext to run index table query
+ */
+ List> generateIndexQuery(Index index, ExprNodeDesc predicate,
+ ParseContext pctx, Set inputs);
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (working copy)
@@ -19,37 +19,45 @@
package org.apache.hadoop.hive.ql.index.compact;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.Map.Entry;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.index.AbstractIndexHandler;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
+import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
+import org.apache.hadoop.hive.ql.optimizer.RewriteParseContextGenerator;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
public class CompactIndexHandler extends AbstractIndexHandler {
-
+
private Configuration configuration;
@Override
@@ -201,6 +209,80 @@
}
@Override
+ public List> generateIndexQuery(Index index, ExprNodeDesc predicate,
+ ParseContext pctx, Set inputs) {
+
+ DecomposedPredicate decomposedPredicate = decomposePredicate(predicate, index);
+
+ // Build reentrant QL for index query
+ String qlCommand = "INSERT OVERWRITE DIRECTORY ";
+
+ // TODO HMC come up with good tmp file name
+ String tmpFile = "/tmp/index_opt_tmp";
+ qlCommand += "\"" + tmpFile + "\" "; // QL includes " around file name
+ qlCommand += "SELECT `_bucketname` , `_offsets` FROM ";
+ qlCommand += index.getIndexTableName();
+ qlCommand += " WHERE ";
+
+ // TODO HMC Must be a better way to turn the predicate into a QL string
+ String predicateString = decomposedPredicate.pushedPredicate.getExprString();
+ String commandString = StringUtils.remove(StringUtils.remove(predicateString, '('), ')');
+ qlCommand += commandString;
+
+ // generate tasks from index query string
+ Driver driver = new Driver(pctx.getConf());
+ driver.compile(qlCommand.toString());
+
+ ParseContext indexQueryPctx = RewriteParseContextGenerator.generateOperatorTree(pctx.getConf(), qlCommand);
+
+ // setup TableScanOperator to change input format for original query
+ TableScanOperator originalTblScan = (TableScanOperator) pctx.getTopOps().get(index.getOrigTableName());
+ originalTblScan.setIndexClassName(index.getIndexHandlerClass());
+ originalTblScan.setIndexTmpFile(tmpFile);
+
+ inputs.addAll(driver.getPlan().getInputs());
+ return driver.getPlan().getRootTasks();
+ }
+
+ private DecomposedPredicate decomposePredicate(ExprNodeDesc predicate, Index index) {
+ IndexPredicateAnalyzer analyzer = getIndexPredicateAnalyzer(index);
+ List searchConditions = new ArrayList();
+ // analyze the predicate, looking for TODO HMC: what are we looking for?
+ // store results? in searchConditions
+ ExprNodeDesc residualPredicate = analyzer.analyzePredicate(predicate, searchConditions);
+
+ DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
+ decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions);
+ decomposedPredicate.residualPredicate = residualPredicate;
+
+ return decomposedPredicate;
+ }
+
+ /**
+ * Instantiate a new predicate analyzer suitable for determining
+ * whether we can use an index, based on rules for indexes in
+ * WHERE clauses that we support
+ *
+ * @return preconfigured predicate analyzer for WHERE queries
+ */
+ private IndexPredicateAnalyzer getIndexPredicateAnalyzer(Index index) {
+ IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
+
+ // TODO HMC are these the comparisons we should be supporting?
+ analyzer.addComparisonOp(GenericUDFOPEqual.class.getName());
+ analyzer.addComparisonOp(GenericUDFOPLessThan.class.getName());
+ analyzer.addComparisonOp(GenericUDFOPGreaterThan.class.getName());
+
+ // only return results for columns in this index
+ List columnSchemas = index.getSd().getCols();
+ for (FieldSchema column : columnSchemas) {
+ analyzer.allowColumnName(column.getName());
+ }
+
+ return analyzer;
+ }
+
+ @Override
public boolean usesIndexTable() {
return true;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java (working copy)
@@ -19,9 +19,15 @@
package org.apache.hadoop.hive.ql.index;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
/**
* Abstract base class for index handlers. This is provided as insulation
@@ -42,4 +48,9 @@
return sb.toString();
}
+ public List> generateIndexQuery(Index index, ExprNodeDesc predicate,
+ ParseContext pctx, Set inputs) {
+ return null;
+ }
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -6621,6 +6621,7 @@
loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner);
+ pCtx.setSemanticAnalyzer(this);
Optimizer optm = new Optimizer();
optm.setPctx(pCtx);
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 1074743)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy)
@@ -91,6 +91,8 @@
// a map-reduce job
private boolean hasNonPartCols;
+ private SemanticAnalyzer semanticAnalyzer;
+
public ParseContext() {
}
@@ -486,4 +488,12 @@
public void setMapJoinContext(Map mapJoinContext) {
this.mapJoinContext = mapJoinContext;
}
+
+ public SemanticAnalyzer getSemanticAnalyzer() {
+ return semanticAnalyzer;
+ }
+
+ public void setSemanticAnalyzer(SemanticAnalyzer semanticAnalyzer){
+ this.semanticAnalyzer = semanticAnalyzer;
+ }
}