diff --git ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java index 6894c81..e67996d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Set; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; @@ -46,12 +45,9 @@ */ public class AggregateIndexHandler extends CompactIndexHandler { - private static Index index = null; - @Override - public void analyzeIndexDefinition(Table baseTable, Index idx, + public void analyzeIndexDefinition(Table baseTable, Index index, Table indexTable) throws HiveException { - index = idx; StorageDescriptor storageDesc = index.getSd(); if (this.usesIndexTable() && indexTable != null) { StorageDescriptor indexTableSd = storageDesc.deepCopy(); @@ -92,10 +88,11 @@ private void createAggregationFunction(List indexTblCols, String pr @Override protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, - List indexField, boolean partitioned, + Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, PartitionDesc baseTablePartDesc, String baseTableName, String dbName) { + List indexField = index.getSd().getCols(); String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); //form a new insert overwrite query. diff --git ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java index 3c1362e..a019350 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -64,7 +65,7 @@ if (!baseTbl.isPartitioned()) { // the table does not have any partition, then create index for the // whole table - Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), false, + Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, false, new PartitionDesc(desc, null), indexTbl.getTableName(), new PartitionDesc(Utilities.getTableDesc(baseTbl), null), baseTbl.getTableName(), indexTbl.getDbName()); @@ -88,7 +89,7 @@ "Partitions of base table and index table are inconsistent."); } // for each partition, spawn a map reduce task. - Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), true, + Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, true, new PartitionDesc(indexPart), indexTbl.getTableName(), new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName()); indexBuilderTasks.add(indexBuilder); @@ -100,10 +101,20 @@ } } - abstract protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, + protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, + Index index, boolean partitioned, + PartitionDesc indexTblPartDesc, String indexTableName, + PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + return getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), + partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName); + } + + protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException; + PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + return null; + } protected void setStatsDir(HiveConf builderConf) { String statsDir; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index cb9e2f8..ec24333 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -365,52 +365,57 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, cppCtx.getPrunedColLists().put((Operator) nd, cols); - List neededColumnIds = new ArrayList(); - List neededColumnNames = new ArrayList(); - List referencedColumnNames = new ArrayList(); RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver(); - TableScanDesc desc = scanOp.getConf(); - List virtualCols = desc.getVirtualCols(); - List newVirtualCols = new ArrayList(); + setupNeededColumns(scanOp, inputRR, cols); + return null; + } + } - // add virtual columns for ANALYZE TABLE - if(scanOp.getConf().isGatherStats()) { - cols.add(VirtualColumn.RAWDATASIZE.getName()); - } + public static void setupNeededColumns(TableScanOperator scanOp, RowResolver inputRR, + List cols) throws SemanticException { + List neededColumnIds = new ArrayList(); + List neededColumnNames = new ArrayList(); + List referencedColumnNames = new ArrayList(); + TableScanDesc desc = scanOp.getConf(); + List virtualCols = desc.getVirtualCols(); + List newVirtualCols = new ArrayList(); + + // add virtual columns for ANALYZE TABLE + if(scanOp.getConf().isGatherStats()) { + cols.add(VirtualColumn.RAWDATASIZE.getName()); + } - for (String column : cols) { - String[] tabCol = inputRR.reverseLookup(column); - if (tabCol == null) { - continue; - } - referencedColumnNames.add(column); - ColumnInfo colInfo = inputRR.get(tabCol[0], tabCol[1]); - if (colInfo.getIsVirtualCol()) { - // part is also a virtual column, but part col should not in this - // list. - for (int j = 0; j < virtualCols.size(); j++) { - VirtualColumn vc = virtualCols.get(j); - if (vc.getName().equals(colInfo.getInternalName())) { - newVirtualCols.add(vc); - } + for (String column : cols) { + String[] tabCol = inputRR.reverseLookup(column); + if (tabCol == null) { + continue; + } + referencedColumnNames.add(column); + ColumnInfo colInfo = inputRR.get(tabCol[0], tabCol[1]); + if (colInfo.getIsVirtualCol()) { + // part is also a virtual column, but part col should not in this + // list. + for (int j = 0; j < virtualCols.size(); j++) { + VirtualColumn vc = virtualCols.get(j); + if (vc.getName().equals(colInfo.getInternalName())) { + newVirtualCols.add(vc); } - //no need to pass virtual columns to reader. - continue; - } - int position = inputRR.getPosition(column); - if (position >= 0) { - // get the needed columns by id and name - neededColumnIds.add(position); - neededColumnNames.add(column); } + //no need to pass virtual columns to reader. + continue; + } + int position = inputRR.getPosition(column); + if (position >= 0) { + // get the needed columns by id and name + neededColumnIds.add(position); + neededColumnNames.add(column); } - - desc.setVirtualCols(newVirtualCols); - scanOp.setNeededColumnIDs(neededColumnIds); - scanOp.setNeededColumns(neededColumnNames); - scanOp.setReferencedColumns(referencedColumnNames); - return null; } + + desc.setVirtualCols(newVirtualCols); + scanOp.setNeededColumnIDs(neededColumnIds); + scanOp.setNeededColumns(neededColumnNames); + scanOp.setReferencedColumns(referencedColumnNames); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java index cc94254..9ffa708 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java @@ -30,10 +30,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; -import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; @@ -45,7 +44,6 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** * RewriteCanApplyCtx class stores the context for the {@link RewriteCanApplyProcFactory} @@ -84,7 +82,9 @@ public static RewriteCanApplyCtx getInstance(ParseContext parseContext){ private Set aggFuncColList = new LinkedHashSet(); private final ParseContext parseContext; + private String alias; private String baseTableName; + private String indexTableName; private String aggFunction; void resetCanApplyCtx(){ @@ -230,6 +230,14 @@ public void setAggFuncCnt(int aggFuncCnt) { this.aggFuncCnt = aggFuncCnt; } + public String getAlias() { + return alias; + } + + public void setAlias(String alias) { + this.alias = alias; + } + public String getBaseTableName() { return baseTableName; } @@ -238,10 +246,26 @@ public void setBaseTableName(String baseTableName) { this.baseTableName = baseTableName; } + public String getIndexTableName() { + return indexTableName; + } + + public void setIndexTableName(String indexTableName) { + this.indexTableName = indexTableName; + } + public ParseContext getParseContext() { return parseContext; } + public Set getAllColumns() { + Set allColumns = new LinkedHashSet(selectColumnsList); + allColumns.addAll(predicateColumnsList); + allColumns.addAll(gbKeyNameList); + allColumns.addAll(aggFuncColList); + return allColumns; + } + /** * This method walks all the nodes starting from topOp TableScanOperator node @@ -255,15 +279,13 @@ public ParseContext getParseContext() { * @param topOp * @throws SemanticException */ - void populateRewriteVars(Operator topOp) + void populateRewriteVars(TableScanOperator topOp) throws SemanticException{ Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("R1", FilterOperator.getOperatorName() + "%"), - RewriteCanApplyProcFactory.canApplyOnFilterOperator()); + RewriteCanApplyProcFactory.canApplyOnFilterOperator(topOp)); opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + "%"), - RewriteCanApplyProcFactory.canApplyOnGroupByOperator()); - opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + "%"), - RewriteCanApplyProcFactory.canApplyOnSelectOperator()); + RewriteCanApplyProcFactory.canApplyOnGroupByOperator(topOp)); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -364,5 +386,4 @@ boolean isIndexUsableForQueryBranchRewrite(Index index, Set indexKeyName } return true; } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java index 4a2f52f..02216de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java @@ -18,19 +18,9 @@ package org.apache.hadoop.hive.ql.optimizer.index; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Stack; - -import org.apache.hadoop.hive.ql.exec.ColumnInfo; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.RowSchema; -import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -39,10 +29,13 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +import java.util.List; +import java.util.Stack; /** * Factory of methods used by {@link RewriteGBUsingIndex} @@ -50,43 +43,46 @@ * */ public final class RewriteCanApplyProcFactory { - private static RewriteCanApplyCtx canApplyCtx = null; - - private RewriteCanApplyProcFactory(){ - //this prevents the class from getting instantiated - } /** * Check for conditions in FilterOperator that do not meet rewrite criteria. */ private static class CheckFilterProc implements NodeProcessor { + + private TableScanOperator topOp; + + public CheckFilterProc(TableScanOperator topOp) { + this.topOp = topOp; + } + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { FilterOperator operator = (FilterOperator)nd; - canApplyCtx = (RewriteCanApplyCtx)ctx; - FilterDesc conf = (FilterDesc)operator.getConf(); + RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx; + FilterDesc conf = operator.getConf(); //The filter operator should have a predicate of ExprNodeGenericFuncDesc type. //This represents the comparison operator - ExprNodeGenericFuncDesc oldengfd = (ExprNodeGenericFuncDesc) conf.getPredicate(); + ExprNodeDesc oldengfd = conf.getPredicate(); if(oldengfd == null){ canApplyCtx.setWhrClauseColsFetchException(true); + return null; } - //The predicate should have valid left and right columns - List colList = oldengfd.getCols(); - if(colList == null || colList.size() == 0){ + ExprNodeDesc backtrack = ExprNodeDescUtils.backtrack(oldengfd, operator, topOp); + if (backtrack == null) { canApplyCtx.setWhrClauseColsFetchException(true); + return null; } //Add the predicate columns to RewriteCanApplyCtx's predColRefs list to check later //if index keys contain all filter predicate columns and vice-a-versa - for (String col : colList) { + for (String col : backtrack.getCols()) { canApplyCtx.getPredicateColumnsList().add(col); } return null; } } - public static CheckFilterProc canApplyOnFilterOperator() { - return new CheckFilterProc(); + public static CheckFilterProc canApplyOnFilterOperator(TableScanOperator topOp) { + return new CheckFilterProc(topOp); } /** @@ -95,10 +91,16 @@ public static CheckFilterProc canApplyOnFilterOperator() { */ private static class CheckGroupByProc implements NodeProcessor { + private TableScanOperator topOp; + + public CheckGroupByProc(TableScanOperator topOp) { + this.topOp = topOp; + } + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { GroupByOperator operator = (GroupByOperator)nd; - canApplyCtx = (RewriteCanApplyCtx)ctx; + RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx; //for each group-by clause in query, only one GroupByOperator of the //GBY-RS-GBY sequence is stored in getGroupOpToInputTables //we need to process only this operator @@ -107,7 +109,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, !canApplyCtx.isQueryHasGroupBy()){ canApplyCtx.setQueryHasGroupBy(true); - GroupByDesc conf = (GroupByDesc) operator.getConf(); + GroupByDesc conf = operator.getConf(); List aggrList = conf.getAggregators(); if(aggrList != null && aggrList.size() > 0){ for (AggregationDesc aggregationDesc : aggrList) { @@ -119,40 +121,39 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, String aggFunc = aggregationDesc.getGenericUDAFName(); if(!("count".equals(aggFunc))){ canApplyCtx.setAggFuncIsNotCount(true); - }else{ - List para = aggregationDesc.getParameters(); - //for a valid aggregation, it needs to have non-null parameter list - if(para == null){ - canApplyCtx.setAggFuncColsFetchException(true); - }else if(para.size() == 0){ - //count(*) case - canApplyCtx.setCountOnAllCols(true); - canApplyCtx.setAggFunction("_count_of_all"); - }else{ - assert para.size()==1; - for(int i=0; i< para.size(); i++){ - ExprNodeDesc expr = para.get(i); - if(expr instanceof ExprNodeColumnDesc){ - //Add the columns to RewriteCanApplyCtx's selectColumnsList list - //to check later if index keys contain all select clause columns - //and vice-a-versa. We get the select column 'actual' names only here - //if we have a agg func along with group-by - //SelectOperator has internal names in its colList data structure - canApplyCtx.getSelectColumnsList().add( - ((ExprNodeColumnDesc) expr).getColumn()); - //Add the columns to RewriteCanApplyCtx's aggFuncColList list to check later - //if columns contained in agg func are index key columns - canApplyCtx.getAggFuncColList().add( - ((ExprNodeColumnDesc) expr).getColumn()); - canApplyCtx.setAggFunction("_count_of_" + - ((ExprNodeColumnDesc) expr).getColumn() + ""); - }else if(expr instanceof ExprNodeConstantDesc){ - //count(1) case - canApplyCtx.setCountOfOne(true); - canApplyCtx.setAggFunction("_count_of_1"); - } - } + return false; + } + List para = aggregationDesc.getParameters(); + //for a valid aggregation, it needs to have non-null parameter list + if (para == null) { + canApplyCtx.setAggFuncColsFetchException(true); + } else if (para.size() == 0) { + //count(*) case + canApplyCtx.setCountOnAllCols(true); + canApplyCtx.setAggFunction("_count_of_all"); + } else if (para.size() == 1) { + ExprNodeDesc expr = ExprNodeDescUtils.backtrack(para.get(0), operator, topOp); + if (expr instanceof ExprNodeColumnDesc){ + //Add the columns to RewriteCanApplyCtx's selectColumnsList list + //to check later if index keys contain all select clause columns + //and vice-a-versa. We get the select column 'actual' names only here + //if we have a agg func along with group-by + //SelectOperator has internal names in its colList data structure + canApplyCtx.getSelectColumnsList().add( + ((ExprNodeColumnDesc) expr).getColumn()); + //Add the columns to RewriteCanApplyCtx's aggFuncColList list to check later + //if columns contained in agg func are index key columns + canApplyCtx.getAggFuncColList().add( + ((ExprNodeColumnDesc) expr).getColumn()); + canApplyCtx.setAggFunction("_count_of_" + + ((ExprNodeColumnDesc) expr).getColumn() + ""); + } else if(expr instanceof ExprNodeConstantDesc) { + //count(1) case + canApplyCtx.setCountOfOne(true); + canApplyCtx.setAggFunction("_count_of_1"); } + } else { + throw new SemanticException("Invalid number of arguments for count"); } } } @@ -163,13 +164,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, canApplyCtx.setGbyKeysFetchException(true); } for (ExprNodeDesc expr : keyList) { - checkExpression(expr); + checkExpression(canApplyCtx, expr); } } return null; } - private void checkExpression(ExprNodeDesc expr){ + private void checkExpression(RewriteCanApplyCtx canApplyCtx, ExprNodeDesc expr){ if(expr instanceof ExprNodeColumnDesc){ //Add the group-by keys to RewriteCanApplyCtx's gbKeyNameList list to check later //if all keys are from index columns @@ -182,59 +183,14 @@ private void checkExpression(ExprNodeDesc expr){ canApplyCtx.getGbKeyNameList().addAll(expr.getCols()); canApplyCtx.getSelectColumnsList().add(((ExprNodeColumnDesc) childExpr).getColumn()); }else if(childExpr instanceof ExprNodeGenericFuncDesc){ - checkExpression(childExpr); + checkExpression(canApplyCtx, childExpr); } } } } } - - public static CheckGroupByProc canApplyOnGroupByOperator() { - return new CheckGroupByProc(); + public static CheckGroupByProc canApplyOnGroupByOperator(TableScanOperator topOp) { + return new CheckGroupByProc(topOp); } - - - /** - * Check for conditions in SelectOperator that do not meet rewrite criteria. - */ - private static class CheckSelectProc implements NodeProcessor { - public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, - Object... nodeOutputs) throws SemanticException { - SelectOperator operator = (SelectOperator)nd; - canApplyCtx = (RewriteCanApplyCtx)ctx; - - List> childrenList = operator.getChildOperators(); - Operator child = childrenList.get(0); - if(child instanceof FileSinkOperator){ - Map internalToAlias = new LinkedHashMap(); - RowSchema rs = operator.getSchema(); - //to get the internal to alias mapping - List sign = rs.getSignature(); - for (ColumnInfo columnInfo : sign) { - internalToAlias.put(columnInfo.getInternalName(), columnInfo.getAlias()); - } - - //if FilterOperator predicate has internal column names, - //we need to retrieve the 'actual' column names to - //check if index keys contain all filter predicate columns and vice-a-versa - Iterator predItr = canApplyCtx.getPredicateColumnsList().iterator(); - while(predItr.hasNext()){ - String predCol = predItr.next(); - String newPredCol = ""; - if(internalToAlias.get(predCol) != null){ - newPredCol = internalToAlias.get(predCol); - canApplyCtx.getPredicateColumnsList().remove(predCol); - canApplyCtx.getPredicateColumnsList().add(newPredCol); - } - } - } - return null; - } - } - - public static CheckSelectProc canApplyOnSelectOperator() { - return new CheckSelectProc(); - } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java index 11a6d07..664cbe0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.util.StringUtils; /** @@ -106,11 +107,6 @@ private final Map tsOpToProcess = new LinkedHashMap(); - //Name of the current table on which rewrite is being performed - private String baseTableName = null; - //Name of the current index which is used for rewrite - private String indexTableName = null; - //Index Validation Variables private static final String IDX_BUCKET_COL = "_bucketname"; private static final String IDX_OFFSETS_ARRAY_COL = "_offsets"; @@ -133,7 +129,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { /* Check if the input query passes all the tests to be eligible for a rewrite * If yes, rewrite original query; else, return the current parseContext */ - if(shouldApplyOptimization()){ + if (shouldApplyOptimization()) { LOG.info("Rewriting Original Query using " + getName() + " optimization."); rewriteOriginalQuery(); } @@ -155,59 +151,52 @@ private String getName() { * @return * @throws SemanticException */ - boolean shouldApplyOptimization() throws SemanticException{ - boolean canApply = false; - if(ifQueryHasMultipleTables()){ + boolean shouldApplyOptimization() throws SemanticException { + if (ifQueryHasMultipleTables()) { //We do not apply this optimization for this case as of now. return false; - }else{ + } + Map> tableToIndex = getIndexesForRewrite(); + if (tableToIndex.isEmpty()) { + LOG.debug("No Valid Index Found to apply Rewrite, " + + "skipping " + getName() + " optimization"); + return false; + } /* * This code iterates over each TableScanOperator from the topOps map from ParseContext. * For each operator tree originating from this top TableScanOperator, we determine * if the optimization can be applied. If yes, we add the name of the top table to * the tsOpToProcess to apply rewrite later on. * */ - Map topToTable = parseContext.getTopToTable(); - Iterator topOpItr = topToTable.keySet().iterator(); - while(topOpItr.hasNext()){ - - TableScanOperator topOp = topOpItr.next(); - Table table = topToTable.get(topOp); - baseTableName = table.getTableName(); - Map> indexes = getIndexesForRewrite(); - if(indexes == null){ - LOG.debug("Error getting valid indexes for rewrite, " + - "skipping " + getName() + " optimization"); - return false; - } + Map topToTable = parseContext.getTopToTable(); + Map> topOps = parseContext.getTopOps(); + + for (Map.Entry> entry : parseContext.getTopOps().entrySet()) { - if(indexes.size() == 0){ - LOG.debug("No Valid Index Found to apply Rewrite, " + + String alias = entry.getKey(); + TableScanOperator topOp = (TableScanOperator) entry.getValue(); + + Table table = topToTable.get(topOp); + List indexes = tableToIndex.get(table); + if (indexes.isEmpty()) { + continue; + } + + if (table.isPartitioned()) { + //if base table has partitions, we need to check if index is built for + //all partitions. If not, then we do not apply the optimization + if (!checkIfIndexBuiltOnAllTablePartitions(topOp, tableToIndex)) { + LOG.debug("Index is not built for all table partitions, " + "skipping " + getName() + " optimization"); - return false; - }else{ - //we need to check if the base table has confirmed or unknown partitions - if(parseContext.getOpToPartList() != null && parseContext.getOpToPartList().size() > 0){ - //if base table has partitions, we need to check if index is built for - //all partitions. If not, then we do not apply the optimization - if(checkIfIndexBuiltOnAllTablePartitions(topOp, indexes)){ - //check if rewrite can be applied for operator tree - //if partitions condition returns true - canApply = checkIfRewriteCanBeApplied(topOp, table, indexes); - }else{ - LOG.debug("Index is not built for all table partitions, " + - "skipping " + getName() + " optimization"); - return false; - } - }else{ - //check if rewrite can be applied for operator tree - //if there are no partitions on base table - canApply = checkIfRewriteCanBeApplied(topOp, table, indexes); - } + continue; } } + //check if rewrite can be applied for operator tree + //if there are no partitions on base table + checkIfRewriteCanBeApplied(alias, topOp, table, indexes); } - return canApply; + + return !tsOpToProcess.isEmpty(); } /** @@ -219,61 +208,36 @@ boolean shouldApplyOptimization() throws SemanticException{ * @return - true if rewrite can be applied on the current branch; false otherwise * @throws SemanticException */ - private boolean checkIfRewriteCanBeApplied(TableScanOperator topOp, Table baseTable, - Map> indexes) throws SemanticException{ - boolean canApply = false; + private boolean checkIfRewriteCanBeApplied(String alias, TableScanOperator topOp, + Table baseTable, List indexes) throws SemanticException{ //Context for checking if this optimization can be applied to the input query RewriteCanApplyCtx canApplyCtx = RewriteCanApplyCtx.getInstance(parseContext); - Map> topOps = parseContext.getTopOps(); - canApplyCtx.setBaseTableName(baseTableName); + canApplyCtx.setAlias(alias); + canApplyCtx.setBaseTableName(baseTable.getTableName()); canApplyCtx.populateRewriteVars(topOp); - Map> indexTableMap = getIndexToKeysMap(indexes.get(baseTable)); - Iterator indexMapItr = indexTableMap.keySet().iterator(); - Index index = null; - while(indexMapItr.hasNext()){ + Map> indexTableMap = getIndexToKeysMap(indexes); + for (Map.Entry> entry : indexTableMap.entrySet()) { //we rewrite the original query using the first valid index encountered //this can be changed if we have a better mechanism to //decide which index will produce a better rewrite - index = indexMapItr.next(); - canApply = canApplyCtx.isIndexUsableForQueryBranchRewrite(index, - indexTableMap.get(index)); - if(canApply){ - canApply = checkIfAllRewriteCriteriaIsMet(canApplyCtx); - //break here if any valid index is found to apply rewrite - if(canApply){ - //check if aggregation function is set. - //If not, set it using the only indexed column - if(canApplyCtx.getAggFunction() == null){ - //strip of the start and end braces [...] - String aggregationFunction = indexTableMap.get(index).toString(); - aggregationFunction = aggregationFunction.substring(1, - aggregationFunction.length() - 1); - canApplyCtx.setAggFunction("_count_of_" + aggregationFunction + ""); - } + Index index = entry.getKey(); + Set indexKeyNames = entry.getValue(); + //break here if any valid index is found to apply rewrite + if (canApplyCtx.isIndexUsableForQueryBranchRewrite(index, indexKeyNames) && + checkIfAllRewriteCriteriaIsMet(canApplyCtx)) { + //check if aggregation function is set. + //If not, set it using the only indexed column + if (canApplyCtx.getAggFunction() == null) { + canApplyCtx.setAggFunction("_count_of_" + StringUtils.join(",", indexKeyNames) + ""); } - break; + canApplyCtx.setIndexTableName(index.getIndexTableName()); + tsOpToProcess.put(alias, canApplyCtx); + return true; } } - indexTableName = index.getIndexTableName(); - - if(canApply && topOps.containsValue(topOp)) { - Iterator topOpNamesItr = topOps.keySet().iterator(); - while(topOpNamesItr.hasNext()){ - String topOpName = topOpNamesItr.next(); - if(topOps.get(topOpName).equals(topOp)){ - tsOpToProcess.put(topOpName, canApplyCtx); - } - } - } - - if(tsOpToProcess.size() == 0){ - canApply = false; - }else{ - canApply = true; - } - return canApply; + return false; } /** @@ -355,12 +319,11 @@ private boolean checkIfIndexBuiltOnAllTablePartitions(TableScanOperator tableSca * @throws SemanticException */ Map> getIndexToKeysMap(List indexTables) throws SemanticException{ - Index index = null; Hive hiveInstance = hiveDb; Map> indexToKeysMap = new LinkedHashMap>(); for (int idxCtr = 0; idxCtr < indexTables.size(); idxCtr++) { final Set indexKeyNames = new LinkedHashSet(); - index = indexTables.get(idxCtr); + Index index = indexTables.get(idxCtr); //Getting index key columns StorageDescriptor sd = index.getSd(); List idxColList = sd.getCols(); @@ -403,17 +366,17 @@ private boolean checkIfIndexBuiltOnAllTablePartitions(TableScanOperator tableSca */ @SuppressWarnings("unchecked") private void rewriteOriginalQuery() throws SemanticException { - Map> topOpMap = - (HashMap>) parseContext.getTopOps().clone(); + Map> topOpMap = parseContext.getTopOps(); Iterator tsOpItr = tsOpToProcess.keySet().iterator(); - while(tsOpItr.hasNext()){ - baseTableName = tsOpItr.next(); - RewriteCanApplyCtx canApplyCtx = tsOpToProcess.get(baseTableName); - TableScanOperator topOp = (TableScanOperator) topOpMap.get(baseTableName); + for (Map.Entry entry : tsOpToProcess.entrySet()) { + String alias = entry.getKey(); + RewriteCanApplyCtx canApplyCtx = entry.getValue(); + TableScanOperator topOp = (TableScanOperator) topOpMap.get(alias); RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb, - indexTableName, baseTableName, canApplyCtx.getAggFunction()); + canApplyCtx.getIndexTableName(), canApplyCtx.getAlias(), + canApplyCtx.getAllColumns(), canApplyCtx.getAggFunction()); rewriteQueryCtx.invokeRewriteQueryProc(topOp); parseContext = rewriteQueryCtx.getParseContext(); parseContext.setOpParseCtx((LinkedHashMap, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java index 1d8336f..74614f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.RowResolver; @@ -68,7 +69,6 @@ */ public final class RewriteQueryUsingAggregateIndex { private static final Log LOG = LogFactory.getLog(RewriteQueryUsingAggregateIndex.class.getName()); - private static RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = null; private RewriteQueryUsingAggregateIndex() { //this prevents the class from getting instantiated @@ -78,7 +78,7 @@ private RewriteQueryUsingAggregateIndex() { public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { SelectOperator operator = (SelectOperator)nd; - rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; + RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; List> childOps = operator.getChildOperators(); Operator childOp = childOps.iterator().next(); @@ -98,7 +98,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, List selRSSignature = selRS.getSignature(); //Need to create a new type for Column[_count_of_indexed_key_column] node - PrimitiveTypeInfo pti = (PrimitiveTypeInfo) TypeInfoFactory.getPrimitiveTypeInfo("bigint"); + PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo("bigint"); pti.setTypeName("bigint"); ColumnInfo newCI = new ColumnInfo(rewriteQueryCtx.getAggregateFunction(), pti, "", false); selRSSignature.add(newCI); @@ -117,19 +117,15 @@ public static NewQuerySelectSchemaProc getNewQuerySelectSchemaProc(){ /** * This processor replaces the original TableScanOperator with * the new TableScanOperator and metadata that scans over the - * index table rather than scanning over the orginal table. + * index table rather than scanning over the original table. * */ private static class ReplaceTableScanOpProc implements NodeProcessor { public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { TableScanOperator scanOperator = (TableScanOperator)nd; - rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; - String baseTableName = rewriteQueryCtx.getBaseTableName(); - String alias = null; - if(baseTableName.contains(":")){ - alias = (baseTableName.split(":"))[0]; - } + RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; + String alias = rewriteQueryCtx.getAlias(); //Need to remove the original TableScanOperators from these data structures // and add new ones @@ -144,8 +140,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, OpParseContext operatorContext = opParseContext.get(scanOperator); //remove original TableScanOperator + topOps.remove(alias); topToTable.remove(scanOperator); - topOps.remove(baseTableName); opParseContext.remove(scanOperator); //construct a new descriptor for the index table scan @@ -171,13 +167,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, try { StructObjectInspector rowObjectInspector = (StructObjectInspector) indexTableHandle.getDeserializer().getObjectInspector(); - List fields = rowObjectInspector - .getAllStructFieldRefs(); - for (int i = 0; i < fields.size(); i++) { - rr.put(indexTableName, fields.get(i).getFieldName(), new ColumnInfo(fields - .get(i).getFieldName(), TypeInfoUtils - .getTypeInfoFromObjectInspector(fields.get(i) - .getFieldObjectInspector()), indexTableName, false)); + for (String column : rewriteQueryCtx.getColumns()) { + StructField field = rowObjectInspector.getStructFieldRef(column); + rr.put(indexTableName, field.getFieldName(), new ColumnInfo(field.getFieldName(), + TypeInfoUtils.getTypeInfoFromObjectInspector(field.getFieldObjectInspector()), + indexTableName, false)); } } catch (SerDeException e) { LOG.error("Error while creating the RowResolver for new TableScanOperator."); @@ -187,18 +181,18 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, //Set row resolver for new table operatorContext.setRowResolver(rr); - String tabNameWithAlias = null; - if(alias != null){ - tabNameWithAlias = alias + ":" + indexTableName; - }else{ - tabNameWithAlias = indexTableName; - } + + String newAlias = indexTableName; + int index = alias.lastIndexOf(":"); + if (index >= 0) { + newAlias = alias.substring(0, index) + ":" + indexTableName; + } //Scan operator now points to other table topToTable.put(scanOperator, indexTableHandle); - scanOperator.getConf().setAlias(tabNameWithAlias); + scanOperator.getConf().setAlias(newAlias); scanOperator.setAlias(indexTableName); - topOps.put(tabNameWithAlias, scanOperator); + topOps.put(newAlias, scanOperator); opParseContext.put(scanOperator, operatorContext); rewriteQueryCtx.getParseContext().setTopToTable( (HashMap) topToTable); @@ -207,6 +201,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, rewriteQueryCtx.getParseContext().setOpParseCtx( (LinkedHashMap, OpParseContext>) opParseContext); + ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rr, + new ArrayList(rewriteQueryCtx.getColumns())); + return null; } } @@ -228,7 +225,7 @@ public static ReplaceTableScanOpProc getReplaceTableScanProc(){ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { GroupByOperator operator = (GroupByOperator)nd; - rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; + RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; //We need to replace the GroupByOperator which is in //groupOpToInputTables map with the new GroupByOperator diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java index b5873a4..d699308 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java @@ -22,6 +22,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Stack; import org.apache.hadoop.hive.ql.exec.Operator; @@ -54,19 +55,21 @@ public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorCtx { private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveDb, - String indexTableName, String baseTableName, String aggregateFunction){ + String indexTableName, String alias, Set columns, String aggregateFunction) { this.parseContext = parseContext; this.hiveDb = hiveDb; this.indexTableName = indexTableName; - this.baseTableName = baseTableName; + this.alias = alias; this.aggregateFunction = aggregateFunction; + this.columns = columns; this.opc = parseContext.getOpParseCtx(); } public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseContext, - Hive hiveDb, String indexTableName, String baseTableName, String aggregateFunction){ + Hive hiveDb, String indexTableName, String alias, + Set columns, String aggregateFunction) { return new RewriteQueryUsingAggregateIndexCtx( - parseContext, hiveDb, indexTableName, baseTableName, aggregateFunction); + parseContext, hiveDb, indexTableName, alias, columns, aggregateFunction); } @@ -77,8 +80,9 @@ public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseC //We need the GenericUDAFEvaluator for GenericUDAF function "sum" private GenericUDAFEvaluator eval = null; private final String indexTableName; - private final String baseTableName; + private final String alias; private final String aggregateFunction; + private final Set columns; private ExprNodeColumnDesc aggrExprNode = null; public Map, OpParseContext> getOpc() { @@ -161,11 +165,15 @@ public Object process(Node nd, Stack stack, }; } - public String getBaseTableName() { - return baseTableName; + public String getAlias() { + return alias; } public String getAggregateFunction() { return aggregateFunction; } + + public Set getColumns() { + return columns; + } } diff --git ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out index d2885f4..97eb10e 100644 --- ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out +++ ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out @@ -474,9 +474,11 @@ STAGE PLANS: key expressions: _col0 (type: int), _col1 (type: int) sort order: ++ Statistics: Num rows: 60 Data size: 6049 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint) + value expressions: _col2 (type: bigint) Reduce Operator Tree: - Extract + Select Operator + expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: int), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 60 Data size: 6049 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false @@ -621,9 +623,11 @@ STAGE PLANS: key expressions: _col0 (type: int), _col1 (type: int) sort order: ++ Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint) + value expressions: _col2 (type: bigint) Reduce Operator Tree: - Extract + Select Operator + expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: int), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false @@ -765,24 +769,27 @@ STAGE PLANS: keys: year(l_shipdate) (type: int), month(l_shipdate) (type: int) mode: hash outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int), _col1 (type: int) - sort order: ++ - Map-reduce partition columns: _col0 (type: int), _col1 (type: int) - Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE - value expressions: _col2 (type: bigint) + Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col1 is not null (type: boolean) + Statistics: Num rows: 12 Data size: 1095 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: int) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: int) + Statistics: Num rows: 12 Data size: 1095 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: bigint) Reduce Operator Tree: Group By Operator aggregations: sum(VALUE._col0) keys: KEY._col0 (type: int), KEY._col1 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col1 (type: int), _col2 (type: bigint) outputColumnNames: _col1, _col2 - Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: @@ -798,31 +805,31 @@ STAGE PLANS: key expressions: _col1 (type: int) sort order: + Map-reduce partition columns: _col1 (type: int) - Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: int), _col2 (type: bigint) + Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: bigint) TableScan Reduce Output Operator key expressions: _col1 (type: int) sort order: + Map-reduce partition columns: _col1 (type: int) - Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: int), _col2 (type: bigint) + Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: bigint) Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 condition expressions: - 0 {VALUE._col1} {VALUE._col2} - 1 {VALUE._col1} {VALUE._col2} + 0 {KEY.reducesinkkey0} {VALUE._col1} + 1 {KEY.reducesinkkey0} {VALUE._col1} outputColumnNames: _col1, _col2, _col4, _col5 - Statistics: Num rows: 25 Data size: 2308 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col1 (type: int), _col4 (type: int), ((_col5 - _col2) / _col2) (type: double) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 25 Data size: 2308 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 25 Data size: 2308 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -846,24 +853,27 @@ STAGE PLANS: keys: year(l_shipdate) (type: int), month(l_shipdate) (type: int) mode: hash outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int), _col1 (type: int) - sort order: ++ - Map-reduce partition columns: _col0 (type: int), _col1 (type: int) - Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE - value expressions: _col2 (type: bigint) + Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col1 is not null (type: boolean) + Statistics: Num rows: 12 Data size: 1095 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: int) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: int) + Statistics: Num rows: 12 Data size: 1095 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: bigint) Reduce Operator Tree: Group By Operator aggregations: sum(VALUE._col0) keys: KEY._col0 (type: int), KEY._col1 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col1 (type: int), _col2 (type: bigint) outputColumnNames: _col1, _col2 - Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: @@ -890,16 +900,16 @@ select l_shipdate, l_orderkey as cnt from lineitem) dummy POSTHOOK: type: QUERY STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 + Stage-3 is a root stage + Stage-2 depends on stages: Stage-3 Stage-0 depends on stages: Stage-2 STAGE PLANS: - Stage: Stage-1 + Stage: Stage-3 Map Reduce Map Operator Tree: TableScan - alias: null-subquery1:default__lineitem_lineitem_lshipdate_idx__ + alias: null-subquery1:dummy-subquery1:default__lineitem_lineitem_lshipdate_idx__ Statistics: Num rows: 95 Data size: 8675 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: l_shipdate (type: string), _count_of_l_shipdate (type: bigint) @@ -939,20 +949,6 @@ STAGE PLANS: Map Reduce Map Operator Tree: TableScan - Union - Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: _col0 (type: string), _col1 (type: bigint) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - TableScan alias: lineitem Statistics: Num rows: 116 Data size: 12099 Basic stats: COMPLETE Column stats: NONE Select Operator @@ -972,6 +968,20 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TableScan + Union + Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator @@ -2489,9 +2499,11 @@ STAGE PLANS: key expressions: _col0 (type: int) sort order: + Statistics: Num rows: 8 Data size: 32 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: int), _col1 (type: bigint) + value expressions: _col1 (type: bigint) Reduce Operator Tree: - Extract + Select Operator + expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1 Statistics: Num rows: 8 Data size: 32 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false @@ -2579,9 +2591,11 @@ STAGE PLANS: key expressions: _col0 (type: int) sort order: + Statistics: Num rows: 3 Data size: 215 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: int), _col1 (type: bigint) + value expressions: _col1 (type: bigint) Reduce Operator Tree: - Extract + Select Operator + expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1 Statistics: Num rows: 3 Data size: 215 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false