diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a091b95..20d5b3b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -305,6 +305,7 @@ public class HiveConf extends Configuration { // Optimizer HIVEOPTCP("hive.optimize.cp", true), // column pruner + HIVEOPTAUTOINDEX("hive.optimize.autoindex", true), // automatically use indexes HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown // push predicates down to storage handlers HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true), diff --git conf/hive-default.xml conf/hive-default.xml index f85f3ee..ce0bfc9 100644 --- conf/hive-default.xml +++ conf/hive-default.xml @@ -319,6 +319,12 @@ + hive.optimize.autoindex + true + Whether to enable automatic use of indexes + + + hive.optimize.ppd true Whether to enable predicate pushdown diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java index ea62578..1624bf4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java @@ -260,9 +260,13 @@ public class ExecDriver extends Task implements Serializable, Hadoop job.setNumReduceTasks(work.getNumReduceTasks().intValue()); job.setReducerClass(ExecReducer.class); + // Set hive input format, and input format file if necessary. if (work.getInputformat() != null) { HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, work.getInputformat()); } + if (work.getInputFormatFile() != null) { + job.set("hive.index.compact.file", work.getInputFormatFile()); + } // Turn on speculative execution for reducers boolean useSpeculativeExecReducers = HiveConf.getBoolVar(job, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index c02d90b..68c5d22 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; @@ -53,6 +54,8 @@ public class TableScanOperator extends Operator implements private transient String partitionSpecs; private transient boolean inputFileChanged = false; private TableDesc tableDesc; + private String indexClassName; + private String indexTmpFile; public TableDesc getTableDesc() { @@ -63,6 +66,26 @@ public class TableScanOperator extends Operator implements this.tableDesc = tableDesc; } + public String getIndexClassName() { + return indexClassName; + } + + public void setIndexClassName(String indexClassName) { + this.indexClassName = indexClassName; + } + + public String getIndexTmpFile() { + return indexTmpFile; + } + + public void setIndexTmpFile(String indexTmpFile) { + this.indexTmpFile = indexTmpFile; + } + + public boolean usesIndex() { + return StringUtils.isNotEmpty(indexClassName) && StringUtils.isNotEmpty(indexTmpFile); + } + /** * Other than gathering statistics for the ANALYZE command, the table scan operator * does not do anything special other than just forwarding the row. Since the table diff --git ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java index dd0186d..4630eb2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java @@ -19,9 +19,15 @@ package org.apache.hadoop.hive.ql.index; import java.util.List; +import java.util.Set; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; /** * Abstract base class for index handlers. This is provided as insulation @@ -42,4 +48,9 @@ public abstract class AbstractIndexHandler implements HiveIndexHandler { return sb.toString(); } + public List> generateIndexQuery(Index index, ExprNodeDesc predicate, + ParseContext pctx, Set inputs) { + return null; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java index 411b78f..f318979 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java @@ -22,12 +22,14 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; /** * HiveIndexHandler defines a pluggable interface for adding new index handlers @@ -114,4 +116,12 @@ public interface HiveIndexHandler extends Configurable { Set inputs, Set outputs) throws HiveException; + /** + * + * @param index + * @param decomposedPredicate + * @return parseContext to run index table query + */ + List> generateIndexQuery(Index index, ExprNodeDesc predicate, + ParseContext pctx, Set inputs); } \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 1f01446..20cf629 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -19,34 +19,41 @@ package org.apache.hadoop.hive.ql.index.compact; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Set; import java.util.Map.Entry; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.index.AbstractIndexHandler; -import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; +import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; public class CompactIndexHandler extends AbstractIndexHandler { @@ -104,9 +111,10 @@ public class CompactIndexHandler extends AbstractIndexHandler { break; } } - if (basePart == null) + if (basePart == null) { throw new RuntimeException( "Partitions of base table and index table are inconsistent."); + } // for each partition, spawn a map reduce task. Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), true, new PartitionDesc(indexPart), indexTbl.getTableName(), @@ -138,9 +146,10 @@ public class CompactIndexHandler extends AbstractIndexHandler { for (int i = 0; i < ret.size(); i++) { String partKV = ret.get(i); command.append(partKV); - if (i < ret.size() - 1) + if (i < ret.size() - 1) { command.append(","); } + } command.append(" ) "); } @@ -161,10 +170,11 @@ public class CompactIndexHandler extends AbstractIndexHandler { for (int i = 0; i < pkv.size(); i++) { String partKV = pkv.get(i); command.append(partKV); - if (i < pkv.size() - 1) + if (i < pkv.size() - 1) { command.append(" AND "); } } + } command.append(" GROUP BY "); command.append(indexCols + ", " + VirtualColumn.FILENAME.getName()); @@ -201,6 +211,80 @@ public class CompactIndexHandler extends AbstractIndexHandler { } @Override + public List> generateIndexQuery(Index index, ExprNodeDesc predicate, + ParseContext pctx, Set inputs) { + + DecomposedPredicate decomposedPredicate = decomposePredicate(predicate, index); + + // Build reentrant QL for index query + StringBuilder qlCommand = new StringBuilder("INSERT OVERWRITE DIRECTORY "); + + String tmpFile = pctx.getContext().getMRTmpFileURI(); + qlCommand.append( "\"" + tmpFile + "\" "); // QL includes " around file name + qlCommand.append("SELECT `_bucketname` , `_offsets` FROM "); + qlCommand.append(index.getIndexTableName()); + qlCommand.append(" WHERE "); + + // TODO HMC Must be a better way to turn the predicate into a QL string + String predicateString = decomposedPredicate.pushedPredicate.getExprString(); + String commandString = StringUtils.remove(StringUtils.remove(predicateString, '('), ')'); + qlCommand.append(commandString); + + // generate tasks from index query string + Driver driver = new Driver(pctx.getConf()); + driver.compile(qlCommand.toString()); + + // setup TableScanOperator to change input format for original query + TableScanOperator originalTblScan = (TableScanOperator) pctx.getTopOps().get(index.getOrigTableName()); + originalTblScan.setIndexClassName(index.getIndexHandlerClass()); + originalTblScan.setIndexTmpFile(tmpFile); + + /* + * see 192: rootTask.addDependentTask(indexMetaChangeTsk); for how we make dependencies... + */ + inputs.addAll(driver.getPlan().getInputs()); + return driver.getPlan().getRootTasks(); + } + + private DecomposedPredicate decomposePredicate(ExprNodeDesc predicate, Index index) { + IndexPredicateAnalyzer analyzer = getIndexPredicateAnalyzer(index); + List searchConditions = new ArrayList(); + // analyze the predicate, looking for TODO HMC: what are we looking for? + // store results? in searchConditions + ExprNodeDesc residualPredicate = analyzer.analyzePredicate(predicate, searchConditions); + + DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); + decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions); + decomposedPredicate.residualPredicate = residualPredicate; + + return decomposedPredicate; + } + + /** + * Instantiate a new predicate analyzer suitable for determining + * whether we can use an index, based on rules for indexes in + * WHERE clauses that we support + * + * @return preconfigured predicate analyzer for WHERE queries + */ + private IndexPredicateAnalyzer getIndexPredicateAnalyzer(Index index) { + IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + + // TODO HMC are these the comparisons we should be supporting? + analyzer.addComparisonOp(GenericUDFOPEqual.class.getName()); + analyzer.addComparisonOp(GenericUDFOPLessThan.class.getName()); + analyzer.addComparisonOp(GenericUDFOPGreaterThan.class.getName()); + + // only return results for columns in this index + List columnSchemas = index.getSd().getCols(); + for (FieldSchema column : columnSchemas) { + analyzer.allowColumnName(column.getName()); + } + + return analyzer; + } + + @Override public boolean usesIndexTable() { return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index 50db44c..f06a8de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.ProtectMode; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SerDeInfo; @@ -813,4 +814,14 @@ public class Table implements Serializable { public String getCompleteName() { return getDbName() + "@" + getTableName(); } + + /** + * @return List containing Index Table names if there is exists indexes + * on this table + * @throws HiveException + **/ + public List getAllIndexes(short max) throws HiveException { + Hive hive = Hive.get(); + return hive.getIndexes(getTTable().getDbName(), getTTable().getTableName(), max); + } }; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java index 6162676..d71d753 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java @@ -33,11 +33,11 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.QBParseInfo; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.StatsWork; /** @@ -69,6 +69,19 @@ public class GenMRTableScan1 implements NodeProcessor { ctx.setCurrTask(currTask); ctx.setCurrTopOp(currTopOp); + // Reset the inputFormat and inputFormatFile if the table scan needs a different one. + String indexClassName = op.getIndexClassName(); + String inputFormatFile = op.getIndexTmpFile(); + if (indexClassName != null) { + ((MapredWork)currTask.getWork()).setInputformat("org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat"); + ((MapredWork)currTask.getWork()).setInputFormatFile(inputFormatFile); + System.out.println("GenMRTableScan1: indexClassName: " + indexClassName + ", inputFormatFile: " + inputFormatFile); + } + else { + System.out.println("GenMRTableScan1: Nothing."); + } + + for (String alias : parseCtx.getTopOps().keySet()) { Operator currOp = parseCtx.getTopOps().get(alias); if (currOp == op) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 590d69a..f1be21a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -72,6 +72,7 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { transformations.add(new ReduceSinkDeDuplication()); } + } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/RewriteParseContextGenerator.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/RewriteParseContextGenerator.java new file mode 100644 index 0000000..3d97a4a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/RewriteParseContextGenerator.java @@ -0,0 +1,100 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.ParseDriver; +import org.apache.hadoop.hive.ql.parse.ParseException; +import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.apache.hadoop.hive.ql.parse.QB; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +/** + * RewriteParseContextGenerator is a class that offers methods to generate operator tree + * for input queries. It is implemented on lines of the analyzeInternal(..) method + * of {@link SemanticAnalyzer} but it creates only the ParseContext for the input query command. + * It does not optimize or generate map-reduce tasks for the input query. + * This can be used when you need to create operator tree for an internal query. + * For example, {@link RewriteGBUsingIndex} uses the {@link RewriteIndexSubqueryProcFactory} methods to + * generate subquery that scans over index table rather than original table. + * + */ +public final class RewriteParseContextGenerator { + protected static Log LOG = LogFactory.getLog(RewriteParseContextGenerator.class.getName()); + + /** + * Parse the input {@link String} command and generate a ASTNode tree + * @param conf + * @param command + * @return + */ + public static ParseContext generateOperatorTree(HiveConf conf, String command){ + Context ctx; + ParseContext subPCtx = null; + try { + ctx = new Context(conf); + ParseDriver pd = new ParseDriver(); + ASTNode tree = pd.parse(command, ctx); + tree = ParseUtils.findRootNonNullToken(tree); + + BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree); + doSemanticAnalysis(sem, tree, ctx); + + subPCtx = ((SemanticAnalyzer) sem).getParseContext(); + LOG.info("Sub-query Semantic Analysis Completed"); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (ParseException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (SemanticException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return subPCtx; + + } + + /** + * For the input ASTNode tree, perform a semantic analysis and check metadata + * Generate a operator tree and return the {@link ParseContext} instance for the operator tree + * + * @param ctx + * @param sem + * @param ast + * @return + * @throws SemanticException + */ + private static void doSemanticAnalysis(BaseSemanticAnalyzer sem, ASTNode ast, Context ctx) throws SemanticException { + + if(sem instanceof SemanticAnalyzer){ + QB qb = new QB(null, null, false); + ASTNode child = ast; + ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext(); + subPCtx.setContext(ctx); + ((SemanticAnalyzer) sem).init(subPCtx); + + LOG.info("Starting Sub-query Semantic Analysis"); + ((SemanticAnalyzer) sem).doPhase1(child, qb, ((SemanticAnalyzer) sem).initPhase1Ctx()); + LOG.info("Completed phase 1 of Sub-query Semantic Analysis"); + + ((SemanticAnalyzer) sem).getMetaData(qb); + LOG.info("Completed getting MetaData in Sub-query Semantic Analysis"); + + LOG.info("Sub-query Abstract syntax tree: " + ast.toStringTree()); + ((SemanticAnalyzer) sem).genPlan(qb); + + LOG.info("Sub-query Completed plan generation"); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java new file mode 100644 index 0000000..72cbcbd --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.optimizer.physical.index.IndexWhereTaskDispatcher; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +public class IndexWhereResolver implements PhysicalPlanResolver { + + private static final Log LOG = LogFactory.getLog(IndexWhereResolver.class.getName()); + + @Override + public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException { + //Dispatcher dispatcher = getDispatcher(physicalContext); + Dispatcher dispatcher = new IndexWhereTaskDispatcher(physicalContext); + GraphWalker opGraphWalker = new DefaultGraphWalker(dispatcher); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(physicalContext.rootTasks); + opGraphWalker.startWalking(topNodes, null); + + return physicalContext; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 0ae9fa2..8b3db28 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -52,6 +52,9 @@ public class PhysicalOptimizer { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { resolvers.add(new CommonJoinResolver()); } + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTAUTOINDEX)) { + resolvers.add(new IndexWhereResolver()); + } resolvers.add(new MapJoinResolver()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcCtx.java new file mode 100644 index 0000000..50852a5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcCtx.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.hive.ql.optimizer.physical.index; + +import java.io.Serializable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; + +public class IndexWhereProcCtx implements NodeProcessorCtx { + + private static final Log LOG = LogFactory.getLog(IndexWhereProcCtx.class.getName()); + + private final Task currentTask; + private final ParseContext parseCtx; + + public IndexWhereProcCtx(Task task, ParseContext parseCtx) { + this.currentTask = task; + this.parseCtx = parseCtx; + } + + public ParseContext getParseContext() { + return parseCtx; + } + + public Task getCurrentTask() { + return currentTask; + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java new file mode 100644 index 0000000..4d2fe11 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java @@ -0,0 +1,144 @@ +package org.apache.hadoop.hive.ql.optimizer.physical.index; + +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.index.HiveIndexHandler; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; + +/** +* +* IndexWhereProcessor. +* Processes Operator Nodes to look for WHERE queries with a predicate column +* on which we have an index. Creates an index subquery Task for these +* WHERE queries to use the index automatically. +*/ +public class IndexWhereProcessor implements NodeProcessor { + + private static final Log LOG = LogFactory.getLog(IndexWhereProcessor.class.getName()); + private final List indexes; + + public IndexWhereProcessor(List indexes) { + super(); + this.indexes = indexes; + } + + @Override + /** + * TODO HMC + */ + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + LOG.info("Processing for " + nd.getName() + "(" + ((Operator) nd).getIdentifier() + ")"); + + FilterOperator operator = (FilterOperator) nd; + FilterDesc operatorDesc = operator.getConf(); + ExprNodeDesc predicate = operatorDesc.getPredicate(); + + ParseContext pctx = ((IndexWhereProcCtx) procCtx).getParseContext(); + + // TODO HMC somehow, we're picking up 2 Filters within one select query: SELECT key, value FROM src WHERE key=86 + // why would we run this code twice? + + + // get potential reentrant index queries from each index we have + // TODO HMC check if using all indexQuery tasks is good + List>> indexQueryTaskList = new ArrayList>>(); + for (Index index : indexes) { + List> indexQueryTasks = rewriteForIndex(predicate, index, pctx); + if (indexQueryTasks != null) + { + indexQueryTaskList.add(indexQueryTasks); + } + } + + + // choose an index rewrite to use + // TODO HMC place for cost based choice? + if (indexQueryTaskList.size() > 0) { + List> chosenRewrite = indexQueryTaskList.get(0); + // add dependencies so index query runs first + Task currentTask = ((IndexWhereProcCtx) procCtx).getCurrentTask(); + for (Task task : chosenRewrite) { + task.addDependentTask(currentTask); + } + pctx.getSemanticAnalyzer().getRootTasks().addAll(chosenRewrite); + } + + return null; + } + + /** + * Get a list of Tasks to activate use of indexes. + * Generate the tasks for the index query (where we store results of + * querying the index in a tmp file) by using reentrant QL + */ + private List> rewriteForIndex(ExprNodeDesc predicate, Index index, + ParseContext pctx) + throws SemanticException { + HiveIndexHandler indexHandler; + try { + indexHandler = HiveUtils.getIndexHandler(pctx.getConf(), index.getIndexHandlerClass()); + } catch (HiveException e) { + LOG.error("Exception while loading IndexHandler: " + index.getIndexHandlerClass()); + throw new SemanticException("Failed to load indexHandler: " + index.getIndexHandlerClass(), e); + } + + LOG.info("Found indexHandler " + indexHandler.toString()); + + // use the IndexHandler to generate the index query + List> indexQueryTasks = indexHandler.generateIndexQuery(index, predicate, pctx, pctx.getSemanticAnalyzer().getInputs()); + + return indexQueryTasks; + } + + /** + * Instantiate a new predicate analyzer suitable for determining + * whether we can use an index, based on rules for indexes in + * WHERE clauses that we support + * + * @return preconfigured predicate analyzer for WHERE queries + */ + private IndexPredicateAnalyzer getIndexPredicateAnalyzer() { + // TODO HMC make sure that List is the correct type of + // a list of columns + IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + + // TODO HMC are these the comparisons we should be supporting? + analyzer.addComparisonOp(GenericUDFOPEqual.class.getName()); + analyzer.addComparisonOp(GenericUDFOPLessThan.class.getName()); + analyzer.addComparisonOp(GenericUDFOPGreaterThan.class.getName()); + + // only return results for columns on which we have indexes + for (Index index : indexes) { + List columnSchemas = index.getSd().getCols(); + for (FieldSchema column : columnSchemas) { + analyzer.allowColumnName(column.getName()); + } + } + + return analyzer; + } + +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java new file mode 100644 index 0000000..27b92f1 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java @@ -0,0 +1,173 @@ +package org.apache.hadoop.hive.ql.optimizer.physical.index; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +/** + * + * IndexWhereTaskDispatcher. Walks a Task tree, and for the right kind of Task, + * walks the operator tree to create an index subquery. Then attaches the + * subquery task to the task tree. + * + */ +public class IndexWhereTaskDispatcher implements Dispatcher { + + private static final Log LOG = LogFactory.getLog(IndexWhereTaskDispatcher.class.getName()); + + private final PhysicalContext physicalContext; + + public IndexWhereTaskDispatcher(PhysicalContext context) { + super(); + physicalContext = context; + } + + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + + Task task = (Task) nd; //TODO HMC checks on the task? + ParseContext pctx = physicalContext.getParseContext(); + + // all of our tasks are MapReduce + if (! task.isMapRedTask()) { //TODO HMC further checks? + return null; + } + + // if all table scans are not topOps, skip this task + List tblScanOps = findTableScanOps(task); + Collection> topOps = pctx.getTopOps().values(); + for (TableScanOperator op : tblScanOps) { + if (! topOps.contains(op)) { + return null; + } + } + + // create the regex's so the walker can recognize our WHERE queries + Map operatorRules = createOperatorRules(pctx); + + // create context so the walker can carry the current task with it. + IndexWhereProcCtx indexWhereOptimizeCtx = new IndexWhereProcCtx(task, pctx); + + // create the dispatcher, which fires the processor according to the rule that + // best matches + Dispatcher dispatcher = new DefaultRuleDispatcher(getDefaultProcessor(), + operatorRules, + indexWhereOptimizeCtx); + + // walk the mapper operator(not task) tree + GraphWalker ogw = new DefaultGraphWalker(dispatcher); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + return null; + } + + List findTableScanOps(Task task) { + return new ArrayList(); + } + + + + private NodeProcessor getDefaultProcessor() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return null; + } + }; + } + + /** + * Create a set of rules that only matches WHERE predicates on columns we have + * an index on. + * @return + */ + private Map createOperatorRules(ParseContext pctx) { + Map operatorRules = new LinkedHashMap(); + + List idxType = new ArrayList(); + idxType.add(CompactIndexHandler.class.getName()); //TODO HMC don't hardcode supported indexes + + // query the metastore to know what columns we have indexed + Collection topTables = pctx.getTopToTable().values(); + List indexesOnTable = new ArrayList(); + for (Table topOp : topTables) + { + indexesOnTable.addAll(getIndexes(topOp, idxType)); + } + LOG.info("indexes " + indexesOnTable.toString()); + + + // FIL% is a filter operator, which is what WHERE's end up as + operatorRules.put(new RuleRegExp("RULEWhere", "FIL%"), new IndexWhereProcessor(indexesOnTable)); + // TODO HMC better regex to match only 1 search condition + + return operatorRules; + } + + /** + * Get a list of indexes on a table that match given types. + * Copied from https://github.com/prafullat/hive/blob/ddc89c33a1d4541e1687ad8b89c4d3f73d35d477/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RewriteGBUsingIndex.java + */ + private List getIndexes(Table baseTableMetaData, List matchIndexTypes) { + List matchingIndexes = new ArrayList(); + List indexesOnTable = null; + + try { + short maxNumOfIndexes = 1024; // XTODO: Hardcoding. Need to know if + // there's a limit (and what is it) on + // # of indexes that can be created + // on a table. If not, why is this param + // required by metastore APIs? + indexesOnTable = baseTableMetaData.getAllIndexes(maxNumOfIndexes); + + } catch (HiveException e) { + return matchingIndexes; // Return empty list (trouble doing rewrite + // shouldn't stop regular query execution, + // if there's serious problem with metadata + // or anything else, it's assumed to be + // checked & handled in core hive code itself. + } + + for (Index index : indexesOnTable) { + // The handler class implies the type of the index (e.g. compact + // summary index would be: + // "org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler"). + String indexType = index.getIndexHandlerClass(); + if (matchIndexTypes.contains(indexType)) { + matchingIndexes.add(index); + } + } + return matchingIndexes; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 937a7b3..c870192 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -91,6 +91,8 @@ public class ParseContext { // a map-reduce job private boolean hasNonPartCols; + private SemanticAnalyzer semanticAnalyzer; + public ParseContext() { } @@ -486,4 +488,12 @@ public class ParseContext { public void setMapJoinContext(Map mapJoinContext) { this.mapJoinContext = mapJoinContext; } + + public SemanticAnalyzer getSemanticAnalyzer() { + return semanticAnalyzer; + } + + public void setSemanticAnalyzer(SemanticAnalyzer semanticAnalyzer){ + this.semanticAnalyzer = semanticAnalyzer; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 281930a..c90c16d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -258,11 +258,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } public ParseContext getParseContext() { - return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps, + ParseContext pctx = new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps, topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner); + pctx.setSemanticAnalyzer(this); + return pctx; } @SuppressWarnings("nls") @@ -6624,6 +6626,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner); + pCtx.setSemanticAnalyzer(this); Optimizer optm = new Optimizer(); optm.setPctx(pCtx); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java index ceebc7d..a60c983 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java @@ -29,9 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.QBJoinTree; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; /** * MapredWork. @@ -70,6 +67,7 @@ public class MapredWork implements Serializable { private MapredLocalWork mapLocalWork; private String inputformat; + private String inputFormatFile; private boolean gatheringStats; private String tmpHDFSFileURI; @@ -335,10 +333,18 @@ public class MapredWork implements Serializable { return inputformat; } + public String getInputFormatFile() { + return inputFormatFile; + } + public void setInputformat(String inputformat) { this.inputformat = inputformat; } + public void setInputFormatFile(String inputFormatFile) { + this.inputFormatFile = inputFormatFile; + } + public void setGatheringStats(boolean gatherStats) { this.gatheringStats = gatherStats; } diff --git ql/src/test/queries/clientpositive/index_opt_where.q ql/src/test/queries/clientpositive/index_opt_where.q new file mode 100644 index 0000000..fe20a51 --- /dev/null +++ ql/src/test/queries/clientpositive/index_opt_where.q @@ -0,0 +1,31 @@ +-- try the queries without indexing +-- TODO HMC put ORDER BY into the select queries to keep test results consistent +SELECT * FROM src WHERE key=86 ORDER BY key; +SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key; + +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +INSERT OVERWRITE DIRECTORY "/tmp/index_result_where1" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key=86; +SET hive.index.compact.file=/tmp/index_result_where1; +SET hive.optimize.autoindex=false; +SET hive.input.format=org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat; +SELECT key, value FROM src WHERE key=86 ORDER BY key; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; + +INSERT OVERWRITE DIRECTORY "/tmp/index_where2" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key > 80 AND key < 100; +SET hive.index.compact.file=/tmp/index_where2; +SET hive.optimize.autoindex=false; +SET hive.input.format=org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat; +SELECT key, value FROM src WHERE key > 80 AND key < 100 ORDER BY key; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +SET hive.optimize.autoindex=true; + +-- test automatic usage of index in queries +SELECT * FROM src WHERE key=86 ORDER BY key; +SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key; + +DROP INDEX src_index on src; \ No newline at end of file diff --git ql/src/test/queries/clientpositive/index_opt_where_simple.q ql/src/test/queries/clientpositive/index_opt_where_simple.q new file mode 100644 index 0000000..0b9c675 --- /dev/null +++ ql/src/test/queries/clientpositive/index_opt_where_simple.q @@ -0,0 +1,9 @@ +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; + +EXPLAIN SELECT * FROM src WHERE key=86 ORDER BY key; +SELECT * FROM src WHERE key=86 ORDER BY key; + +DROP INDEX src_index on src; diff --git ql/src/test/results/clientpositive/index_opt_where.q.out ql/src/test/results/clientpositive/index_opt_where.q.out new file mode 100644 index 0000000..23e5bde --- /dev/null +++ ql/src/test/results/clientpositive/index_opt_where.q.out @@ -0,0 +1,174 @@ +PREHOOK: query: -- try the queries without indexing +-- TODO HMC put ORDER BY into the select queries to keep test results consistent +SELECT * FROM src WHERE key=86 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-36-48_032_1552068944701514214/-mr-10000 +POSTHOOK: query: -- try the queries without indexing +-- TODO HMC put ORDER BY into the select queries to keep test results consistent +SELECT * FROM src WHERE key=86 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-36-48_032_1552068944701514214/-mr-10000 +86 val_86 +PREHOOK: query: SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-36-51_761_5361566409467507398/-mr-10000 +POSTHOOK: query: SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-36-51_761_5361566409467507398/-mr-10000 +82 val_82 +83 val_83 +83 val_83 +84 val_84 +84 val_84 +85 val_85 +86 val_86 +87 val_87 +90 val_90 +90 val_90 +90 val_90 +92 val_92 +95 val_95 +95 val_95 +96 val_96 +97 val_97 +97 val_97 +98 val_98 +98 val_98 +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +POSTHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +POSTHOOK: query: ALTER INDEX src_index ON src REBUILD +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@default__src_src_index__ +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result_where1" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key=86 +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Output: /tmp/index_result_where1 +POSTHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result_where1" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key=86 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@default__src_src_index__ +POSTHOOK: Output: /tmp/index_result_where1 +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT key, value FROM src WHERE key=86 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-37-09_417_6200896558779494589/-mr-10000 +POSTHOOK: query: SELECT key, value FROM src WHERE key=86 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-37-09_417_6200896558779494589/-mr-10000 +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +86 val_86 +PREHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_where2" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key > 80 AND key < 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Output: /tmp/index_where2 +POSTHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_where2" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key > 80 AND key < 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@default__src_src_index__ +POSTHOOK: Output: /tmp/index_where2 +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT key, value FROM src WHERE key > 80 AND key < 100 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-37-15_449_1824377624347660447/-mr-10000 +POSTHOOK: query: SELECT key, value FROM src WHERE key > 80 AND key < 100 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-37-15_449_1824377624347660447/-mr-10000 +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +82 val_82 +83 val_83 +83 val_83 +84 val_84 +84 val_84 +85 val_85 +86 val_86 +87 val_87 +90 val_90 +90 val_90 +90 val_90 +92 val_92 +95 val_95 +95 val_95 +96 val_96 +97 val_97 +97 val_97 +98 val_98 +98 val_98 +PREHOOK: query: -- test automatic usage of index in queries +SELECT * FROM src WHERE key=86 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-37-18_688_3106422946290227400/-mr-10000 +POSTHOOK: query: -- test automatic usage of index in queries +SELECT * FROM src WHERE key=86 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@default__src_src_index__ +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-37-18_688_3106422946290227400/-mr-10000 +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +86 val_86 +PREHOOK: query: SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-37-27_525_6268230026400940727/-mr-10000 +POSTHOOK: query: SELECT * FROM src WHERE key > 80 AND key < 100 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@default__src_src_index__ +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-37-27_525_6268230026400940727/-mr-10000 +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +82 val_82 +83 val_83 +83 val_83 +84 val_84 +84 val_84 +85 val_85 +86 val_86 +87 val_87 +90 val_90 +90 val_90 +90 val_90 +92 val_92 +95 val_95 +95 val_95 +96 val_96 +97 val_97 +97 val_97 +98 val_98 +98 val_98 +PREHOOK: query: DROP INDEX src_index on src +PREHOOK: type: DROPINDEX +POSTHOOK: query: DROP INDEX src_index on src +POSTHOOK: type: DROPINDEX +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] diff --git ql/src/test/results/clientpositive/index_opt_where_simple.q.out ql/src/test/results/clientpositive/index_opt_where_simple.q.out new file mode 100644 index 0000000..0959b68 --- /dev/null +++ ql/src/test/results/clientpositive/index_opt_where_simple.q.out @@ -0,0 +1,215 @@ +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +POSTHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +POSTHOOK: query: ALTER INDEX src_index ON src REBUILD +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@default__src_src_index__ +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: EXPLAIN SELECT * FROM src WHERE key=86 ORDER BY key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT * FROM src WHERE key=86 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (TOK_TABLE_OR_COL key) 86)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL key))))) + +STAGE DEPENDENCIES: + Stage-1 depends on stages: Stage-1, Stage-1 + Stage-1 is a root stage + Stage-4 depends on stages: Stage-1 , consists of Stage-3, Stage-2 + Stage-3 + Stage-0 depends on stages: Stage-3, Stage-2 + Stage-2 + Stage-1 is a root stage + Stage-4 depends on stages: Stage-1 , consists of Stage-3, Stage-2 + Stage-3 + Stage-0 depends on stages: Stage-3, Stage-2 + Stage-2 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Filter Operator + predicate: + expr: (key = 86) + type: boolean + Filter Operator + predicate: + expr: (key = 86) + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Extract + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + default__src_src_index__ + TableScan + alias: default__src_src_index__ + Filter Operator + predicate: + expr: (key = 86) + type: boolean + Filter Operator + predicate: + expr: (key = 86) + type: boolean + Select Operator + expressions: + expr: _bucketname + type: string + expr: _offsets + type: array + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-4 + Conditional Operator + + Stage: Stage-3 + Move Operator + files: + hdfs directory: true + destination: file:/home/rmelick/hive/build/ql/scratchdir/hive_2011-03-05_19-18-18_381_7081566602326725410/-ext-10000 + + Stage: Stage-0 + Move Operator + files: + hdfs directory: true + destination: file:/tmp/rmelick/hive_2011-03-05_19-18-18_303_3512702493348670914/-mr-10002 + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + file:/home/rmelick/hive/build/ql/scratchdir/hive_2011-03-05_19-18-18_381_7081566602326725410/-ext-10001 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + default__src_src_index__ + TableScan + alias: default__src_src_index__ + Filter Operator + predicate: + expr: (key = 86) + type: boolean + Filter Operator + predicate: + expr: (key = 86) + type: boolean + Select Operator + expressions: + expr: _bucketname + type: string + expr: _offsets + type: array + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-4 + Conditional Operator + + Stage: Stage-3 + Move Operator + files: + hdfs directory: true + destination: file:/home/rmelick/hive/build/ql/scratchdir/hive_2011-03-05_19-18-18_449_752264577584941085/-ext-10000 + + Stage: Stage-0 + Move Operator + files: + hdfs directory: true + destination: file:/tmp/rmelick/hive_2011-03-05_19-18-18_303_3512702493348670914/-mr-10003 + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + file:/home/rmelick/hive/build/ql/scratchdir/hive_2011-03-05_19-18-18_449_752264577584941085/-ext-10001 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT * FROM src WHERE key=86 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-18-18_567_5526366873818637176/-mr-10000 +POSTHOOK: query: SELECT * FROM src WHERE key=86 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@default__src_src_index__ +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/rmelick/hive_2011-03-05_19-18-18_567_5526366873818637176/-mr-10000 +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +86 val_86 +PREHOOK: query: DROP INDEX src_index on src +PREHOOK: type: DROPINDEX +POSTHOOK: query: DROP INDEX src_index on src +POSTHOOK: type: DROPINDEX +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]