diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index cbc3cd2..b1e37e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -413,6 +413,13 @@ + "for partition columns"), STATISTICS_CLONING_FAILED(30013, "Cloning of statistics failed"), + + STATSAGGREGATOR_SOURCETASK_NULL(30014, "SourceTask for StatsAggregator should not be null"), + + STATSAGGREGATOR_CONNECTION_ERROR(30015, "StatsAggregator cannot be connected to." + + "There was a error while connecting to the StatsAggregator, and retrying " + + "might help. If you dont want the query to fail because accurate statistics " + + "could not be collected, set hive.stats.reliable=false"), ; private int errorCode; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 142af10..45298a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -174,25 +175,12 @@ private int aggregateStats() { // Stats setup: Warehouse wh = new Warehouse(conf); - if (!this.getWork().getNoStatsAggregator()) { - String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); - StatsFactory factory = StatsFactory.newFactory(statsImplementationClass, conf); - if (factory != null && work.isNoScanAnalyzeCommand()){ - // initialize stats publishing table for noscan which has only stats task - // the rest of MR task following stats task initializes it in ExecDriver.java - StatsPublisher statsPublisher = factory.getStatsPublisher(); - if (!statsPublisher.init(conf)) { // creating stats table if not exists - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { - throw - new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); - } - } - } - if (factory != null) { - statsAggregator = factory.getStatsAggregator(); - // manufacture a StatsAggregator - if (!statsAggregator.connect(conf, getWork().getSourceTask())) { - throw new HiveException("StatsAggregator connect failed " + statsImplementationClass); + if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) { + try { + statsAggregator = createStatsAggregator(conf); + } catch (HiveException e) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { + throw e; } } } @@ -212,8 +200,7 @@ private int aggregateStats() { List partitions = getPartitionsList(); boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC); - int maxPrefixLength = HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH); + int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf); if (partitions == null) { // non-partitioned tables: @@ -355,6 +342,30 @@ else if (work.isClearAggregatorStats()) { return ret; } + private StatsAggregator createStatsAggregator(HiveConf conf) throws HiveException { + String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); + StatsFactory factory = StatsFactory.newFactory(statsImplementationClass, conf); + if (factory == null) { + throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg()); + } + // initialize stats publishing table for noscan which has only stats task + // the rest of MR task following stats task initializes it in ExecDriver.java + StatsPublisher statsPublisher = factory.getStatsPublisher(); + if (!statsPublisher.init(conf)) { // creating stats table if not exists + throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); + } + MapRedTask sourceTask = getWork().getSourceTask(); + if (sourceTask == null) { + throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg()); + } + // manufacture a StatsAggregator + StatsAggregator statsAggregator = factory.getStatsAggregator(); + if (!statsAggregator.connect(conf, getWork().getSourceTask())) { + throw new HiveException(ErrorMsg.STATSAGGREGATOR_CONNECTION_ERROR.getErrorCodedMsg()); + } + return statsAggregator; + } + private boolean existStats(Map parameters) { return parameters.containsKey(StatsSetupConst.ROW_COUNT) || parameters.containsKey(StatsSetupConst.NUM_FILES) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 91a9e6c..ea2b18a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2295,6 +2295,7 @@ public static String getHashedStatsPrefix(String statsPrefix, int maxPrefixLengt throw new RuntimeException(e); } } + assert maxPrefixLength < 0 || ret.length() < maxPrefixLength; return ret; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java index 7e701f4..456d051 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper; import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.shims.CombineHiveKey; import org.apache.hadoop.mapred.JobConf; @@ -142,7 +143,7 @@ private void publishStats() throws HiveException { // construct key used to store stats in intermediate db String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(jc)); String keyPrefix = Utilities.getHashedStatsPrefix( - statsAggKeyPrefix, HiveConf.getIntVar(jc, ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH)); + statsAggKeyPrefix, StatsFactory.getMaxPrefixLength(jc)); String key = keyPrefix + taskID; // construct statistics to be stored diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index cca8481..d866cde 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; @@ -247,8 +248,7 @@ private void addStatsTask(FileSinkOperator nd, MoveTask mvTask, mrWork.getReduceWork().setGatheringStats(true); } nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE)); - nd.getConf().setMaxStatsKeyPrefixLength( - hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH)); + nd.getConf().setMaxStatsKeyPrefixLength(StatsFactory.getMaxPrefixLength(hconf)); // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName()); // subscribe feeds from the MoveTask so that MoveTask can forward the list diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java index 0268f98..8d3dc56 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -74,7 +75,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, // create a dummy MapReduce task MapredWork currWork = GenMapRedUtils.getMapRedWork(parseCtx); - Task currTask = TaskFactory.get(currWork, parseCtx.getConf()); + MapRedTask currTask = (MapRedTask) TaskFactory.get(currWork, parseCtx.getConf()); Operator currTopOp = op; ctx.setCurrTask(currTask); ctx.setCurrTopOp(currTopOp); @@ -95,6 +96,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, StatsWork statsWork = new StatsWork(parseCtx.getQB().getParseInfo().getTableSpec()); statsWork.setAggKey(op.getConf().getStatsAggPrefix()); + statsWork.setSourceTask(currTask); statsWork.setStatsReliable( parseCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE)); Task statsTask = TaskFactory.get(statsWork, parseCtx.getConf()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index e9d9ee7..c2a46a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -154,6 +154,7 @@ import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; +import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; @@ -8481,8 +8482,7 @@ private void setupStats(TableScanDesc tsDesc, QBParseInfo qbp, Table tab, String } else { tsDesc.setGatherStats(true); tsDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE)); - tsDesc.setMaxStatsKeyPrefixLength( - conf.getIntVar(HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH)); + tsDesc.setMaxStatsKeyPrefixLength(StatsFactory.getMaxPrefixLength(conf)); // append additional virtual columns for storing statistics Iterator vcs = VirtualColumn.getStatsRegistry(conf).iterator(); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java index 2cc2519..fa1dcc9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java @@ -60,7 +60,8 @@ public String aggregateStats(String keyPrefix, String statType) { long value = 0; for (String groupName : counters.getGroupNames()) { if (groupName.startsWith(keyPrefix)) { - value += counters.getGroup(groupName).getCounter(statType); + long counter = counters.getGroup(groupName).getCounter(statType); + value += counter; } } return String.valueOf(value); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java index 8ae32f0..2fb880d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java @@ -28,6 +28,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.ReflectionUtils; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSDBCLASS; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH; + /** * A factory of stats publisher and aggregator implementations of the * StatsPublisher and StatsAggregator interfaces. @@ -40,8 +43,19 @@ private Class aggregatorImplementation; private Configuration jobConf; + public static int getMaxPrefixLength(Configuration conf) { + int maxPrefixLength = HiveConf.getIntVar(conf, HIVE_STATS_KEY_PREFIX_MAX_LENGTH); + if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.counter.name())) { + // see org.apache.hadoop.mapred.Counter or org.apache.hadoop.mapreduce.MRJobConfig + int groupNameMax = conf.getInt("mapreduce.job.counters.group.name.max", 128); + maxPrefixLength = maxPrefixLength < 0 ? groupNameMax : + Math.min(maxPrefixLength, groupNameMax); + } + return maxPrefixLength; + } + public static StatsFactory newFactory(Configuration conf) { - return newFactory(HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS), conf); + return newFactory(HiveConf.getVar(conf, HIVESTATSDBCLASS), conf); } /** diff --git ql/src/test/queries/clientpositive/stats_counter.q ql/src/test/queries/clientpositive/stats_counter.q index 20769e4..3c1f132 100644 --- ql/src/test/queries/clientpositive/stats_counter.q +++ ql/src/test/queries/clientpositive/stats_counter.q @@ -1,6 +1,16 @@ -set hive.stats.autogather=true; set hive.stats.dbclass=counter; +set hive.stats.autogather=false; + +-- by analyze +create table dummy1 as select * from src; + +analyze table dummy1 compute statistics; +desc formatted dummy1; + +set hive.stats.dbclass=counter; +set hive.stats.autogather=true; -create table dummy as select * from src; +-- by autogather +create table dummy2 as select * from src; -desc formatted dummy; +desc formatted dummy2; diff --git ql/src/test/results/clientpositive/stats_counter.q.out ql/src/test/results/clientpositive/stats_counter.q.out index f15d8c5..40d8656 100644 --- ql/src/test/results/clientpositive/stats_counter.q.out +++ ql/src/test/results/clientpositive/stats_counter.q.out @@ -1,13 +1,66 @@ -PREHOOK: query: create table dummy as select * from src +PREHOOK: query: -- by analyze +create table dummy1 as select * from src PREHOOK: type: CREATETABLE_AS_SELECT PREHOOK: Input: default@src -POSTHOOK: query: create table dummy as select * from src +POSTHOOK: query: -- by analyze +create table dummy1 as select * from src POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@src -POSTHOOK: Output: default@dummy -PREHOOK: query: desc formatted dummy +POSTHOOK: Output: default@dummy1 +PREHOOK: query: analyze table dummy1 compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@dummy1 +PREHOOK: Output: default@dummy1 +POSTHOOK: query: analyze table dummy1 compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dummy1 +POSTHOOK: Output: default@dummy1 +PREHOOK: query: desc formatted dummy1 PREHOOK: type: DESCTABLE -POSTHOOK: query: desc formatted dummy +POSTHOOK: query: desc formatted dummy1 +POSTHOOK: type: DESCTABLE +# col_name data_type comment + +key string None +value string None + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: -- by autogather +create table dummy2 as select * from src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +POSTHOOK: query: -- by autogather +create table dummy2 as select * from src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dummy2 +PREHOOK: query: desc formatted dummy2 +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted dummy2 POSTHOOK: type: DESCTABLE # col_name data_type comment