diff --git i/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java w/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cef43a4..ca31b0b 100644 --- i/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ w/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -275,7 +275,8 @@ public class HiveConf extends Configuration { HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), // Statistics - HIVESTATSAUTOGATHER("hive.stats.autogather", true), + HIVESTATSAUTOGATHER("hive.stats.autogather", true), // autogather stats on write? + HIVESTATSAUTOGATHERREAD("hive.stats.autogather.read", false), // autogather stats on read? HIVESTATSDBCLASS("hive.stats.dbclass", "jdbc:derby"), // other options are jdbc:mysql and hbase as defined in StatsSetupConst.java HIVESTATSJDBCDRIVER("hive.stats.jdbcdriver", diff --git i/conf/hive-default.xml w/conf/hive-default.xml index f31250e..a5069a3 100644 --- i/conf/hive-default.xml +++ w/conf/hive-default.xml @@ -625,6 +625,12 @@ + hive.stats.autogather.read + false + A flag to gather statistics automatically during the SELECT command. + + + hive.stats.jdbcdriver org.apache.derby.jdbc.EmbeddedDriver The JDBC driver for the database that stores temporary hive statistics. diff --git i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java index 2e1a6a9..f84b99a 100644 --- i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java +++ w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java @@ -199,7 +199,7 @@ public class BucketMapJoinOptimizer implements Transform { PrunedPartitionList prunedParts = null; try { prunedParts = PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso), pGraphContext.getConf(), alias, - pGraphContext.getPrunedPartitions()); + pGraphContext.getPrunedPartitions(), pGraphContext); } catch (HiveException e) { // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils diff --git i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java index 6162676..e008824 100644 --- i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java +++ w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java @@ -76,6 +76,7 @@ public class GenMRTableScan1 implements NodeProcessor { ctx.setCurrAliasId(currAliasId); mapCurrCtx.put(op, new GenMapRedCtx(currTask, currTopOp, currAliasId)); + currWork.setGatheringStats(true); QBParseInfo parseInfo = parseCtx.getQB().getParseInfo(); if (parseInfo.isAnalyzeCommand()) { @@ -88,7 +89,6 @@ public class GenMRTableScan1 implements NodeProcessor { Task statsTask = TaskFactory.get(statsWork, parseCtx.getConf()); currTask.addDependentTask(statsTask); ctx.getRootTasks().add(currTask); - currWork.setGatheringStats(true); // NOTE: here we should use the new partition predicate pushdown API to get a list of pruned list, // and pass it to setTaskPlan as the last parameter Set confirmedPartns = new HashSet(); diff --git i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 397492e..8b86f45 100644 --- i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -547,7 +547,7 @@ public final class GenMapRedUtils { try { partsList = PartitionPruner.prune(parseCtx.getTopToTable().get(topOp), parseCtx.getOpToPartPruner().get(topOp), opProcCtx.getConf(), - alias_id, parseCtx.getPrunedPartitions()); + alias_id, parseCtx.getPrunedPartitions(), parseCtx); } catch (SemanticException e) { throw e; } catch (HiveException e) { diff --git i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java index 7f6099a..25daa58 100644 --- i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java +++ w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java @@ -205,7 +205,7 @@ public class GroupByOptimizer implements Transform { try { partsList = PartitionPruner.prune(destTable, pGraphContext .getOpToPartPruner().get(ts), pGraphContext.getConf(), table, - pGraphContext.getPrunedPartitions()); + pGraphContext.getPrunedPartitions(), pGraphContext); } catch (HiveException e) { // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils diff --git i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java index f975243..42ff9d5 100644 --- i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java +++ w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java @@ -219,7 +219,7 @@ public class SortedMergeBucketMapJoinOptimizer implements Transform { try { prunedParts = PartitionPruner.prune(tbl, pGraphContext .getOpToPartPruner().get(tso), pGraphContext.getConf(), alias, - pGraphContext.getPrunedPartitions()); + pGraphContext.getPrunedPartitions(), pGraphContext); } catch (HiveException e) { LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); throw new SemanticException(e.getMessage(), e); diff --git i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index e7f7f72..df08c2a 100644 --- i/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ w/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec; /** * The transformation step that does partition pruning. @@ -157,7 +158,8 @@ public class PartitionPruner implements Transform { */ public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, HiveConf conf, String alias, - Map prunedPartitionsMap) throws HiveException { + Map prunedPartitionsMap, + ParseContext parseCtx) throws HiveException { LOG.trace("Started pruning partiton"); LOG.trace("tabname = " + tab.getTableName()); LOG.trace("prune Expression = " + prunerExpr); @@ -266,6 +268,13 @@ public class PartitionPruner implements Transform { // Now return the set of partitions ret = new PrunedPartitionList(true_parts, unkn_parts, denied_parts); prunedPartitionsMap.put(key, ret); + + List partitions = new ArrayList(); + partitions.addAll(true_parts); + partitions.addAll(unkn_parts); + tableSpec ts = new tableSpec(tab, tab.getTableName(), partitions); + parseCtx.setInputTableSpecs(alias, ts); + return ret; } diff --git i/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java w/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 3bddb05..1e4282b 100644 --- i/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ w/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -583,6 +583,13 @@ public abstract class BaseSemanticAnalyzer { public static enum SpecType {TABLE_ONLY, STATIC_PARTITION, DYNAMIC_PARTITION}; public SpecType specType; + /* Constructor for a "dummy" tableSpec used for stats publishing */ + public tableSpec(Table tableHandle, String tableName, List partitions) { + this.tableName = tableName; + this.tableHandle = tableHandle; + this.partitions = partitions; + } + public tableSpec(Hive db, HiveConf conf, ASTNode ast) throws SemanticException { diff --git i/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java w/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 0622844..f84601b 100644 --- i/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ w/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec; +import org.apache.hadoop.hive.ql.metadata.Partition; /** * Parse Context: The current parse context. This is passed to the optimizer @@ -76,6 +78,18 @@ public class ParseContext { private Map> groupOpToInputTables; private Map prunedPartitions; + private Map aliasToInputTableSpecs; + + public void setInputTableSpecs(String key, tableSpec ts) { + System.out.println("setting table specs for " + key + " (" + ts + ")"); + aliasToInputTableSpecs.put(key, ts); + } + + public tableSpec getInputTableSpecs(String key) { + System.out.println("getting table specs for " + key); + return aliasToInputTableSpecs.get(key); + } + /** * The lineage information. */ @@ -160,6 +174,7 @@ public class ParseContext { this.listMapJoinOpsNoReducer = listMapJoinOpsNoReducer; hasNonPartCols = false; this.groupOpToInputTables = new HashMap>(); + this.aliasToInputTableSpecs = new HashMap(); this.groupOpToInputTables = groupOpToInputTables; this.prunedPartitions = prunedPartitions; this.opToSamplePruner = opToSamplePruner; diff --git i/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java w/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 7a7e4c7..8f8cb5e 100644 --- i/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ w/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -28,6 +28,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.LinkedList; +import java.util.Queue; import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -41,8 +43,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -91,6 +93,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; import org.apache.hadoop.hive.ql.optimizer.GenMROperator; +import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2; @@ -101,12 +104,12 @@ import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.Optimizer; -import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; @@ -121,6 +124,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; import org.apache.hadoop.hive.ql.plan.ExtractDesc; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.ForwardDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; @@ -139,27 +143,27 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UDTFDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -783,17 +787,28 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { qb.getMetaData().setSrcForAlias(alias, tab); + tableSpec ts = null; if (qb.getParseInfo().isAnalyzeCommand()) { - tableSpec ts = new tableSpec(db, conf, (ASTNode) ast.getChild(0)); - if (ts.specType == SpecType.DYNAMIC_PARTITION) { // dynamic partitions - try { - ts.partitions = db.getPartitionsByNames(ts.tableHandle, ts.partSpec); - } catch (HiveException e) { - throw new SemanticException("Cannot get partitions for " + ts.partSpec, e); - } + ts = new tableSpec(db, conf, (ASTNode) ast.getChild(0)); + } else { + // we are piggybacking stats tracking on a TableScanOperator + if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) { + ts = new tableSpec(db, conf, (ASTNode) ast.getChild(2).getChild(0).getChild(0)); + } else if (ast.getToken().getType() == HiveParser.TOK_QUERY) { + ts = new tableSpec(db, conf, (ASTNode) ast.getChild(0).getChild(0)); + } else { + // We should never get here + assert(false); + } + } + if (ts.specType == SpecType.DYNAMIC_PARTITION) { // dynamic partitions + try { + ts.partitions = db.getPartitionsByNames(ts.tableHandle, ts.partSpec); + } catch (HiveException e) { + throw new SemanticException("Cannot get partitions for " + ts.partSpec, e); } - qb.getParseInfo().addTableSpec(alias, ts); } + qb.getParseInfo().addTableSpec(alias, ts); } LOG.info("Get metadata for subqueries"); @@ -5817,29 +5832,47 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private void setupStats(TableScanDesc tsDesc, QBParseInfo qbp, Table tab, String alias) throws SemanticException { - if (!qbp.isAnalyzeCommand()) { - tsDesc.setGatherStats(false); - } else { + if (qbp.isAnalyzeCommand()) { + tsDesc.setGatherStats(true); + } else if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHERREAD) && + qbp.getOuterQueryLimit() == -1) { + // if we are autogathering stats on read and this query is NOT limited, + // we gather stats on this TableScanOperator + // TODO: is getOuterQueryLimit the right method here? tsDesc.setGatherStats(true); + } else { + tsDesc.setGatherStats(false); + return; + } - String tblName = tab.getTableName(); - tableSpec tblSpec = qbp.getTableSpec(alias); - Map partSpec = tblSpec.getPartSpec(); + String tblName = tab.getTableName(); + tableSpec tblSpec = qbp.getTableSpec(alias); + Map partSpec = tblSpec.getPartSpec(); - if (partSpec != null) { - List cols = new ArrayList(); - cols.addAll(partSpec.keySet()); - tsDesc.setPartColumns(cols); - } + // Theoretically the key prefix could be any unique string shared + // between TableScanOperator (when publishing) and StatsTask (when aggregating). + // Here we use + // table_name + partitionSec + // as the prefix for easy of read during explain and debugging. + // Currently, partition spec can only be static partition. + String k = tblName + Path.SEPARATOR; + tsDesc.setStatsAggPrefix(k); - // Theoretically the key prefix could be any unique string shared - // between TableScanOperator (when publishing) and StatsTask (when aggregating). - // Here we use - // table_name + partitionSec - // as the prefix for easy of read during explain and debugging. - // Currently, partition spec can only be static partition. - String k = tblName + Path.SEPARATOR; - tsDesc.setStatsAggPrefix(k); + if (!qbp.isAnalyzeCommand()) { + // Get partition names from the table handle + List partitions = tab.getPartCols(); + List partNames = new ArrayList(); + for (FieldSchema fs : partitions) { + partNames.add(fs.getName()); + } + tsDesc.setPartColumns(partNames); + } else { + // Get partition names from the partition spec + if (partSpec != null) { + List cols = new ArrayList(); + cols.addAll(partSpec.keySet()); + tsDesc.setPartColumns(cols); + } // set up WritenEntity for replication outputs.add(new WriteEntity(tab)); @@ -6064,6 +6097,53 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } + /** + * Add StatsTask to all MapRedTasks with a TableScanOperator in the given list + * of tasks. + * + * @param tasks a list of tasks + * @param conf a hive configuration object + * @param pctx the current parse context + */ + private void addStatsTask(List> tasks, HiveConf conf, ParseContext pctx) { + for (Task task : tasks) { + if (MapRedTask.class.isInstance(task)) { + MapRedTask mrTask = (MapRedTask) task; + MapredWork mrWork = (MapredWork) mrTask.getWork(); + + LinkedHashMap> aliasToWork = mrWork.getAliasToWork(); + + for (String key : aliasToWork.keySet()) { + Queue> opsToProcess = new LinkedList>(); + Operator op = aliasToWork.get(key); + opsToProcess.add(op); + while(!opsToProcess.isEmpty()) { + op = opsToProcess.remove(); + if (TableScanOperator.class.isInstance(op)) { + TableScanOperator tso = (TableScanOperator) op; + TableScanDesc tsd = (TableScanDesc) op.getConf(); + if (tsd.isGatherStats()) { + tableSpec ts = pctx.getInputTableSpecs(key); + StatsWork statsWork = new StatsWork(ts); + String k = key + Path.SEPARATOR; + statsWork.setAggKey(k); + Task statsTask = TaskFactory.get(statsWork, conf); + task.addDependentTask(statsTask); + } + } else { + List children = (List) op.getChildren(); + if (children != null) { + for (Node child : children) { + opsToProcess.add((Operator) child); + } + } + } + } + } + } + } + } + @SuppressWarnings("nls") private void genMapRedTasks(QB qb) throws SemanticException { FetchWork fetch = null; @@ -6102,7 +6182,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { try { partsList = PartitionPruner.prune(topToTable.get(ts), opToPartPruner.get(ts), conf, (String) topOps.keySet() - .toArray()[0], prunedPartitions); + .toArray()[0], prunedPartitions, getParseContext()); } catch (HiveException e) { // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils @@ -6244,6 +6324,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { topNodes.addAll(topOps.values()); ogw.startWalking(topNodes, null); + ParseContext pctx = procCtx.getParseCtx(); + if (!qb.getParseInfo().isAnalyzeCommand() && conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHERREAD)) { + // StatsTask for ANALYZE is added elsewhere + addStatsTask(rootTasks, conf, pctx); + } + // reduce sink does not have any kids - since the plan by now has been // broken up into multiple // tasks, iterate over all tasks.