Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1572127) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -623,8 +623,8 @@ // Statistics HIVESTATSAUTOGATHER("hive.stats.autogather", true), - HIVESTATSDBCLASS("hive.stats.dbclass", "counter", - new PatternValidator("jdbc(:.*)", "hbase", "counter", "custom")), // StatsSetupConst.StatDB + HIVESTATSDBCLASS("hive.stats.dbclass", "fs", + new PatternValidator("jdbc(:.*)", "hbase", "counter", "custom", "fs")), // StatsSetupConst.StatDB HIVESTATSJDBCDRIVER("hive.stats.jdbcdriver", "org.apache.derby.jdbc.EmbeddedDriver"), // JDBC driver specific to the dbclass HIVESTATSDBCONNECTIONSTRING("hive.stats.dbconnectionstring", Index: common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java (revision 1572127) +++ common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java (working copy) @@ -32,29 +32,48 @@ public enum StatDB { hbase { + @Override public String getPublisher(Configuration conf) { return "org.apache.hadoop.hive.hbase.HBaseStatsPublisher"; } + @Override public String getAggregator(Configuration conf) { return "org.apache.hadoop.hive.hbase.HBaseStatsAggregator"; } }, jdbc { + @Override public String getPublisher(Configuration conf) { return "org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsPublisher"; } + @Override public String getAggregator(Configuration conf) { return "org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsAggregator"; } }, counter { + @Override public String getPublisher(Configuration conf) { return "org.apache.hadoop.hive.ql.stats.CounterStatsPublisher"; } + @Override public String getAggregator(Configuration conf) { if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregatorTez"; } return "org.apache.hadoop.hive.ql.stats.CounterStatsAggregator"; } }, + fs { + @Override + public String getPublisher(Configuration conf) { + return "org.apache.hadoop.hive.ql.stats.fs.FSStatsPublisher"; + } + + @Override + public String getAggregator(Configuration conf) { + return "org.apache.hadoop.hive.ql.stats.fs.FSStatsAggregator"; + } + }, custom { + @Override public String getPublisher(Configuration conf) { return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER); } + @Override public String getAggregator(Configuration conf) { return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR); } }; @@ -89,9 +108,15 @@ public static final String RAW_DATA_SIZE = "rawDataSize"; /** + * Temp dir for writing stats from tasks. + */ + public static final String STATS_TMP_LOC = "hive.stats.tmp.loc"; + + public static final String STATS_FILE_PREFIX = "tmpstats-"; + /** * @return List of all supported statistics */ - public static final String[] supportedStats = new String[] + public static final String[] supportedStats = new String[] {NUM_FILES,ROW_COUNT,TOTAL_SIZE,RAW_DATA_SIZE}; /** @@ -104,7 +129,7 @@ * @return List of statistics that can be collected quickly without requiring a scan of the data. */ public static final String[] fastStats = new String[] {NUM_FILES,TOTAL_SIZE}; - + // This string constant is used by stats task to indicate to AlterHandler that // alterPartition/alterTable is happening via statsTask. public static final String STATS_GENERATED_VIA_STATS_TASK = "STATS_GENERATED_VIA_STATS_TASK"; @@ -112,11 +137,11 @@ // This string constant will be persisted in metastore to indicate whether corresponding // table or partition's statistics are accurate or not. public static final String COLUMN_STATS_ACCURATE = "COLUMN_STATS_ACCURATE"; - + public static final String TRUE = "true"; - + public static final String FALSE = "false"; - + public static boolean areStatsUptoDate(Map params) { String statsAcc = params.get(COLUMN_STATS_ACCURATE); return statsAcc == null ? false : statsAcc.equals(TRUE); Index: ql/src/test/results/clientpositive/statsfs.q.out =================================================================== --- ql/src/test/results/clientpositive/statsfs.q.out (revision 0) +++ ql/src/test/results/clientpositive/statsfs.q.out (revision 0) @@ -0,0 +1,616 @@ +PREHOOK: query: -- stats computation on partitioned table with analyze command + +create table t1 (key string, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- stats computation on partitioned table with analyze command + +create table t1 (key string, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: load data local inpath '../../data/files/kv1.txt' into table t1 partition (ds = '2010') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1 +POSTHOOK: query: load data local inpath '../../data/files/kv1.txt' into table t1 partition (ds = '2010') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1 +POSTHOOK: Output: default@t1@ds=2010 +PREHOOK: query: load data local inpath '../../data/files/kv1.txt' into table t1 partition (ds = '2011') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1 +POSTHOOK: query: load data local inpath '../../data/files/kv1.txt' into table t1 partition (ds = '2011') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1 +POSTHOOK: Output: default@t1@ds=2011 +PREHOOK: query: analyze table t1 partition (ds) compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +PREHOOK: Input: default@t1@ds=2010 +PREHOOK: Input: default@t1@ds=2011 +PREHOOK: Output: default@t1 +PREHOOK: Output: default@t1@ds=2010 +PREHOOK: Output: default@t1@ds=2011 +POSTHOOK: query: analyze table t1 partition (ds) compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t1@ds=2010 +POSTHOOK: Input: default@t1@ds=2011 +POSTHOOK: Output: default@t1 +POSTHOOK: Output: default@t1@ds=2010 +POSTHOOK: Output: default@t1@ds=2011 +PREHOOK: query: describe formatted t1 partition (ds='2010') +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe formatted t1 partition (ds='2010') +POSTHOOK: type: DESCTABLE +# col_name data_type comment + +key string None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [2010] +Database: default +Table: t1 +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: describe formatted t1 partition (ds='2011') +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe formatted t1 partition (ds='2011') +POSTHOOK: type: DESCTABLE +# col_name data_type comment + +key string None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [2011] +Database: default +Table: t1 +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table t1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: drop table t1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +PREHOOK: query: -- stats computation on partitioned table with autogather on insert query + +create table t1 (key string, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- stats computation on partitioned table with autogather on insert query + +create table t1 (key string, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: insert into table t1 partition (ds='2010') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@t1@ds=2010 +POSTHOOK: query: insert into table t1 partition (ds='2010') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@t1@ds=2010 +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert into table t1 partition (ds='2011') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@t1@ds=2011 +POSTHOOK: query: insert into table t1 partition (ds='2011') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@t1@ds=2011 +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: describe formatted t1 partition (ds='2010') +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe formatted t1 partition (ds='2010') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [2010] +Database: default +Table: t1 +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: describe formatted t1 partition (ds='2011') +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe formatted t1 partition (ds='2011') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [2011] +Database: default +Table: t1 +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table t1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: drop table t1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- analyze stmt on unpartitioned table + +create table t1 (key string, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- analyze stmt on unpartitioned table + +create table t1 (key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: load data local inpath '../../data/files/kv1.txt' into table t1 +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1 +POSTHOOK: query: load data local inpath '../../data/files/kv1.txt' into table t1 +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: analyze table t1 compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: analyze table t1 compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: describe formatted t1 +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe formatted t1 +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string None +value string None + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table t1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: drop table t1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- stats computation on unpartitioned table with autogather on insert query + +create table t1 (key string, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- stats computation on unpartitioned table with autogather on insert query + +create table t1 (key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert into table t1 select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@t1 +POSTHOOK: query: insert into table t1 select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: describe formatted t1 +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe formatted t1 +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: t1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string None +value string None + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table t1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: drop table t1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- stats computation on partitioned table with autogather on insert query with dynamic partitioning + + +create table t1 (key string, value string) partitioned by (ds string, hr string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- stats computation on partitioned table with autogather on insert query with dynamic partitioning + + +create table t1 (key string, value string) partitioned by (ds string, hr string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert into table t1 partition (ds,hr) select * from srcpart +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: default@t1 +POSTHOOK: query: insert into table t1 partition (ds,hr) select * from srcpart +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +POSTHOOK: Output: default@t1@ds=2008-04-08/hr=11 +POSTHOOK: Output: default@t1@ds=2008-04-08/hr=12 +POSTHOOK: Output: default@t1@ds=2008-04-09/hr=11 +POSTHOOK: Output: default@t1@ds=2008-04-09/hr=12 +POSTHOOK: Lineage: t1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: describe formatted t1 partition (ds='2008-04-08',hr='11') +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe formatted t1 partition (ds='2008-04-08',hr='11') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: t1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string None +value string None + +# Partition Information +# col_name data_type comment + +ds string None +hr string None + +# Detailed Partition Information +Partition Value: [2008-04-08, 11] +Database: default +Table: t1 +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: describe formatted t1 partition (ds='2008-04-09',hr='12') +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe formatted t1 partition (ds='2008-04-09',hr='12') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: t1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string None +value string None + +# Partition Information +# col_name data_type comment + +ds string None +hr string None + +# Detailed Partition Information +Partition Value: [2008-04-09, 12] +Database: default +Table: t1 +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + COLUMN_STATS_ACCURATE true + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table t1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: drop table t1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=12).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2008-04-09,hr=12).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2010).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: t1 PARTITION(ds=2011).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/queries/clientpositive/statsfs.q =================================================================== --- ql/src/test/queries/clientpositive/statsfs.q (revision 0) +++ ql/src/test/queries/clientpositive/statsfs.q (revision 0) @@ -0,0 +1,63 @@ +set hive.stats.dbclass=fs; + +-- stats computation on partitioned table with analyze command + +create table t1 (key string, value string) partitioned by (ds string); +load data local inpath '../../data/files/kv1.txt' into table t1 partition (ds = '2010'); +load data local inpath '../../data/files/kv1.txt' into table t1 partition (ds = '2011'); + +analyze table t1 partition (ds) compute statistics; + +describe formatted t1 partition (ds='2010'); +describe formatted t1 partition (ds='2011'); + +drop table t1; + +-- stats computation on partitioned table with autogather on insert query + +create table t1 (key string, value string) partitioned by (ds string); + +insert into table t1 partition (ds='2010') select * from src; +insert into table t1 partition (ds='2011') select * from src; + +describe formatted t1 partition (ds='2010'); +describe formatted t1 partition (ds='2011'); + +drop table t1; + +-- analyze stmt on unpartitioned table + +create table t1 (key string, value string); +load data local inpath '../../data/files/kv1.txt' into table t1; + +analyze table t1 compute statistics; + +describe formatted t1 ; + +drop table t1; + +-- stats computation on unpartitioned table with autogather on insert query + +create table t1 (key string, value string); + +insert into table t1 select * from src; + +describe formatted t1 ; + +drop table t1; + +-- stats computation on partitioned table with autogather on insert query with dynamic partitioning + + +create table t1 (key string, value string) partitioned by (ds string, hr string); + +set hive.exec.dynamic.partition.mode=nonstrict; +insert into table t1 partition (ds,hr) select * from srcpart; + +describe formatted t1 partition (ds='2008-04-08',hr='11'); +describe formatted t1 partition (ds='2008-04-09',hr='12'); + +drop table t1; +set hive.exec.dynamic.partition.mode=strict; + +reset set hive.stats.dbclass; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1572127) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSDBCLASS; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -44,6 +46,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.StatsSetupConst.StatDB; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -5837,6 +5841,11 @@ // can be changed in the optimizer but the key should not be changed // it should be the same as the MoveWork's sourceDir. fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString()); + if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { + String statsTmpLoc = ctx.getExternalTmpPath(queryTmpdir.toUri()).toString(); + LOG.info("Set stats collection dir : " + statsTmpLoc); + conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc); + } if (dest_part != null) { try { @@ -8716,6 +8725,11 @@ if (!qbp.isAnalyzeCommand()) { tsDesc.setGatherStats(false); } else { + if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { + String statsTmpLoc = ctx.getExternalTmpPath(tab.getPath().toUri()).toString(); + LOG.info("Set stats collection dir : " + statsTmpLoc); + conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc); + } tsDesc.setGatherStats(true); tsDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE)); tsDesc.setMaxStatsKeyPrefixLength(StatsFactory.getMaxPrefixLength(conf)); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1572127) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.stats.CounterStatsPublisher; +import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -901,7 +902,7 @@ String spSpec = conf.getStaticSpec(); int maxKeyLength = conf.getMaxStatsKeyPrefixLength(); - boolean counterStats = statsPublisher instanceof CounterStatsPublisher; + boolean taskIndependent = statsPublisher instanceof StatsCollectionTaskIndependent; for (Map.Entry entry : valToPaths.entrySet()) { String fspKey = entry.getKey(); // DP/LB @@ -914,7 +915,7 @@ String prefix; String postfix; - if (counterStats) { + if (taskIndependent) { // key = "database.table/SP/DP/"LB/ prefix = conf.getTableInfo().getTableName(); postfix = Utilities.join(lbSpec); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (revision 1572127) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (working copy) @@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.ql.stats.CounterStatsPublisher; +import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -293,8 +293,8 @@ int maxKeyLength = conf.getMaxStatsKeyPrefixLength(); String key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength); - if (!(statsPublisher instanceof CounterStatsPublisher)) { - // stats publisher except counter type needs postfix 'taskID' + if (!(statsPublisher instanceof StatsCollectionTaskIndependent)) { + // stats publisher except counter or fs type needs postfix 'taskID' key = Utilities.join(prefix, taskID); } for(String statType : stats.get(pspecs).getStoredStats()) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (revision 1572127) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (working copy) @@ -43,9 +43,8 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.stats.CounterStatsAggregator; -import org.apache.hadoop.hive.ql.stats.CounterStatsAggregatorTez; import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.util.StringUtils; @@ -152,11 +151,11 @@ boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC); String tableFullName = table.getDbName() + "." + table.getTableName(); + int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf); - // "counter" type does not need to collect stats per task - boolean counterStat = statsAggregator instanceof CounterStatsAggregator - || statsAggregator instanceof CounterStatsAggregatorTez; + // "counter" or "fs" type does not need to collect stats per task + boolean taskIndependent = statsAggregator instanceof StatsCollectionTaskIndependent; if (partitions == null) { org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable(); Map parameters = tTable.getParameters(); @@ -172,7 +171,7 @@ } if (statsAggregator != null) { - String prefix = getAggregationPrefix(counterStat, table, null); + String prefix = getAggregationPrefix(taskIndependent, table, null); updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic); } @@ -206,7 +205,7 @@ } if (statsAggregator != null) { - String prefix = getAggregationPrefix(counterStat, table, partn); + String prefix = getAggregationPrefix(taskIndependent, table, partn); updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1572127) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -857,6 +857,7 @@ private static void serializeObjectByJavaXML(Object plan, OutputStream out) { XMLEncoder e = new XMLEncoder(out); e.setExceptionListener(new ExceptionListener() { + @Override public void exceptionThrown(Exception e) { LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e)); throw new RuntimeException("Cannot serialize object", e); @@ -913,7 +914,7 @@ // Kryo is not thread-safe, // Also new Kryo() is expensive, so we want to do it just once. - private static ThreadLocal runtimeSerializationKryo = new ThreadLocal() { + public static ThreadLocal runtimeSerializationKryo = new ThreadLocal() { @Override protected synchronized Kryo initialValue() { Kryo kryo = new Kryo(); @@ -1250,7 +1251,7 @@ if (isCompressed) { Class codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); - CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc); + CompressionCodec codec = ReflectionUtils.newInstance(codecClass, jc); return codec.createOutputStream(out); } else { return (out); @@ -1299,7 +1300,7 @@ if ((hiveOutputFormat instanceof HiveIgnoreKeyTextOutputFormat) && isCompressed) { Class codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); - CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc); + CompressionCodec codec = ReflectionUtils.newInstance(codecClass, jc); return codec.getDefaultExtension(); } return ""; @@ -2180,6 +2181,7 @@ final PartitionDesc partDesc = work.getPathToPartitionInfo().get( p.toString()); Runnable r = new Runnable() { + @Override public void run() { try { Class inputFormatCls = partDesc @@ -2324,7 +2326,7 @@ private static void getTezTasks(List> tasks, List tezTasks) { for (Task task : tasks) { - if (task instanceof TezTask && !tezTasks.contains((TezTask) task)) { + if (task instanceof TezTask && !tezTasks.contains(task)) { tezTasks.add((TezTask) task); } @@ -2344,7 +2346,7 @@ private static void getMRTasks(List> tasks, List mrTasks) { for (Task task : tasks) { - if (task instanceof ExecDriver && !mrTasks.contains((ExecDriver) task)) { + if (task instanceof ExecDriver && !mrTasks.contains(task)) { mrTasks.add((ExecDriver) task); } @@ -2969,7 +2971,7 @@ pathsProcessed.add(path); LOG.info("Adding input file " + path); - if (!HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") + if (!HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") && isEmptyPath(job, path, ctx)) { path = createDummyFileForEmptyPartition(path, job, work, hiveScratchDir, alias, sequenceNumber++); @@ -2987,7 +2989,7 @@ // T2) x; // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 // rows) - if (path == null + if (path == null && !HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { path = createDummyFileForEmptyTable(job, work, hiveScratchDir, alias, sequenceNumber++); @@ -3218,7 +3220,7 @@ /** * Returns true if a plan is both configured for vectorized execution * and vectorization is allowed. The plan may be configured for vectorization - * but vectorization dissalowed eg. for FetchOperator execution. + * but vectorization dissalowed eg. for FetchOperator execution. */ public static boolean isVectorMode(Configuration conf) { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && @@ -3228,11 +3230,11 @@ } return false; } - + public static void clearWorkMap() { gWorkMap.clear(); } - + /** * Create a temp dir in specified baseDir * This can go away once hive moves to support only JDK 7 Index: ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java (revision 1572127) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java (working copy) @@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; -public class CounterStatsAggregator implements StatsAggregator { +public class CounterStatsAggregator implements StatsAggregator, StatsCollectionTaskIndependent { private static final Log LOG = LogFactory.getLog(CounterStatsAggregator.class.getName()); Index: ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java (revision 1572127) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java (working copy) @@ -33,12 +33,12 @@ * using hadoop counters. They will be published using special keys and * then retrieved on the client after the insert/ctas statement ran. */ -public class CounterStatsAggregatorTez implements StatsAggregator { +public class CounterStatsAggregatorTez implements StatsAggregator, StatsCollectionTaskIndependent { private static final Log LOG = LogFactory.getLog(CounterStatsAggregatorTez.class.getName()); private TezCounters counters; - private CounterStatsAggregator mrAggregator; + private final CounterStatsAggregator mrAggregator; private boolean delegate; public CounterStatsAggregatorTez() { Index: ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java (revision 1572127) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java (working copy) @@ -42,9 +42,14 @@ private Class publisherImplementation; private Class aggregatorImplementation; - private Configuration jobConf; + private final Configuration jobConf; public static int getMaxPrefixLength(Configuration conf) { + + if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { + // no limit on prefix for fs. + return -1; + } int maxPrefixLength = HiveConf.getIntVar(conf, HIVE_STATS_KEY_PREFIX_MAX_LENGTH); if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.counter.name())) { // see org.apache.hadoop.mapred.Counter or org.apache.hadoop.mapreduce.MRJobConfig Index: ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java (revision 0) @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats.fs; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; +import org.apache.hadoop.hive.ql.stats.StatsPublisher; + +import com.esotericsoftware.kryo.io.Output; + +public class FSStatsPublisher implements StatsPublisher, StatsCollectionTaskIndependent { + + private Configuration conf; + private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private Map> statsMap; // map from partID -> (statType->value) + + @Override + public boolean init(Configuration hconf) { + Path statsDir = new Path(hconf.get(StatsSetupConst.STATS_TMP_LOC)); + LOG.debug("Initing FSStatsPublisher with : " + statsDir); + try { + statsDir.getFileSystem(hconf).mkdirs(statsDir); + LOG.info("created : " + statsDir); + return true; + } catch (IOException e) { + LOG.error(e); + return false; + } + } + + @Override + public boolean connect(Configuration hconf) { + conf = hconf; + Path statsDir = new Path(hconf.get(StatsSetupConst.STATS_TMP_LOC)); + LOG.debug("Connecting to : " + statsDir); + statsMap = new HashMap>(); + try { + return statsDir.getFileSystem(conf).exists(statsDir); + } catch (IOException e) { + LOG.error(e); + return false; + } + } + + @Override + public boolean publishStat(String partKV, Map stats) { + statsMap.put(partKV, stats); + return true; + } + + @Override + public boolean closeConnection() { + Path statsDir = new Path(conf.get(StatsSetupConst.STATS_TMP_LOC)); + try { + Path statsFile = new Path(statsDir,StatsSetupConst.STATS_FILE_PREFIX +conf.getInt("mapred.task.partition",0)); + LOG.debug("About to create stats file for this task : " + statsFile); + Output output = new Output(statsFile.getFileSystem(conf).create(statsFile,true)); + LOG.info("Created file : " + statsFile); + Utilities.runtimeSerializationKryo.get().writeObject(output, statsMap); + output.close(); + return true; + } catch (IOException e) { + LOG.error(e); + return false; + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java (revision 0) @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats.fs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; + +import com.esotericsoftware.kryo.io.Input; + +public class FSStatsAggregator implements StatsAggregator, StatsCollectionTaskIndependent { + private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private List>> statsList; + private Map> statsMap; + private FileSystem fs; + private Configuration conf; + + @Override + public boolean connect(Configuration hconf, Task sourceTask) { + conf = hconf; + Path statsDir = new Path(hconf.get(StatsSetupConst.STATS_TMP_LOC)); + LOG.debug("About to read stats from : " + statsDir); + statsMap = new HashMap>(); + + try { + fs = statsDir.getFileSystem(hconf); + statsList = new ArrayList>>(); + FileStatus[] status = fs.listStatus(statsDir, new PathFilter() { + @Override + public boolean accept(Path file) { + return file.getName().startsWith(StatsSetupConst.STATS_FILE_PREFIX); + } + }); + for (FileStatus file : status) { + Input in = new Input(fs.open(file.getPath())); + statsMap = Utilities.runtimeSerializationKryo.get().readObject(in, statsMap.getClass()); + LOG.info("Read stats : " +statsMap); + statsList.add(statsMap); + in.close(); + } + return true; + } catch (IOException e) { + LOG.error(e); + return false; + } + } + + + @Override + public String aggregateStats(String partID, String statType) { + long counter = 0; + for (Map> statsMap : statsList) { + String statVal = statsMap.get(partID).get(statType); + counter += Long.valueOf(statVal == null ? "0" : statVal); + } + LOG.info("Read stats for : " + partID + "\t" + statType + "\t" + counter); + + return String.valueOf(counter); + } + + @Override + public boolean closeConnection() { + LOG.debug("About to delete stats tmp dir"); + + try { + fs.delete(new Path(conf.get(StatsSetupConst.STATS_TMP_LOC)),true); + return true; + } catch (IOException e) { + LOG.error(e); + return true; + } + } + + @Override + public boolean cleanUp(String keyPrefix) { + return true; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionTaskIndependent.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionTaskIndependent.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionTaskIndependent.java (revision 0) @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.stats; + +/** This is just a marker interface to differentiate between stats publisher / aggregator + * which don't track stats per task, as oppose to others which do. + */ +public interface StatsCollectionTaskIndependent { + +} Index: ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java (revision 1572127) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java (working copy) @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.mapred.Reporter; -public class CounterStatsPublisher implements StatsPublisher { +public class CounterStatsPublisher implements StatsPublisher, StatsCollectionTaskIndependent { private static final Log LOG = LogFactory.getLog(CounterStatsPublisher.class.getName()); @@ -37,6 +37,7 @@ return true; } + @Override public boolean connect(Configuration hconf) { MapredContext context = MapredContext.get(); if (context == null || context.getReporter() == null) {