Index: hbase-handler/src/test/results/positive/hbase_stats_topk.q.out =================================================================== --- hbase-handler/src/test/results/positive/hbase_stats_topk.q.out (revision 0) +++ hbase-handler/src/test/results/positive/hbase_stats_topk.q.out (working copy) @@ -0,0 +1,396 @@ +PREHOOK: query: create table stats_src like src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table stats_src like src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@stats_src +PREHOOK: query: insert overwrite table stats_src select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@stats_src +POSTHOOK: query: insert overwrite table stats_src select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@stats_src +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: analyze table stats_src compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@stats_src +PREHOOK: Output: default@stats_src +POSTHOOK: query: analyze table stats_src compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@stats_src +POSTHOOK: Output: default@stats_src +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted stats_src +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted stats_src +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string default +value string default + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 1 + numPartitions 0 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70], [val_480]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: create table stats_part like srcpart +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table stats_part like srcpart +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@stats_part +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table stats_part partition (ds='2010-04-08', hr = '11') select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@stats_part@ds=2010-04-08/hr=11 +POSTHOOK: query: insert overwrite table stats_part partition (ds='2010-04-08', hr = '11') select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@stats_part@ds=2010-04-08/hr=11 +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table stats_part partition (ds='2010-04-08', hr = '12') select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@stats_part@ds=2010-04-08/hr=12 +POSTHOOK: query: insert overwrite table stats_part partition (ds='2010-04-08', hr = '12') select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@stats_part@ds=2010-04-08/hr=12 +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: analyze table stats_part partition(ds='2010-04-08', hr='11') compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@stats_part@ds=2010-04-08/hr=11 +PREHOOK: Output: default@stats_part +PREHOOK: Output: default@stats_part@ds=2010-04-08/hr=11 +POSTHOOK: query: analyze table stats_part partition(ds='2010-04-08', hr='11') compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@stats_part@ds=2010-04-08/hr=11 +POSTHOOK: Output: default@stats_part +POSTHOOK: Output: default@stats_part@ds=2010-04-08/hr=11 +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: analyze table stats_part partition(ds='2010-04-08', hr='12') compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@stats_part@ds=2010-04-08/hr=12 +PREHOOK: Output: default@stats_part +PREHOOK: Output: default@stats_part@ds=2010-04-08/hr=12 +POSTHOOK: query: analyze table stats_part partition(ds='2010-04-08', hr='12') compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@stats_part@ds=2010-04-08/hr=12 +POSTHOOK: Output: default@stats_part +POSTHOOK: Output: default@stats_part@ds=2010-04-08/hr=12 +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table stats_part partition (ds='2010-04-08', hr = '13') select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@stats_part@ds=2010-04-08/hr=13 +POSTHOOK: query: insert overwrite table stats_part partition (ds='2010-04-08', hr = '13') select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@stats_part@ds=2010-04-08/hr=13 +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted stats_part +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted stats_part +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string default +value string default + +# Partition Information +# col_name data_type comment + +ds string None +hr string None + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 3 + numPartitions 3 + numRows 1500 + rawDataSize 15936 + totalSize 17436 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: desc formatted stats_part partition (ds='2010-04-08', hr = '11') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted stats_part partition (ds='2010-04-08', hr = '11') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string default +value string default + +# Partition Information +# col_name data_type comment + +ds string None +hr string None + +# Detailed Partition Information +Partition Value: [2010-04-08, 11] +Database: default +Table: stats_part +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70], [val_480]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: desc formatted stats_part partition (ds='2010-04-08', hr = '12') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted stats_part partition (ds='2010-04-08', hr = '12') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string default +value string default + +# Partition Information +# col_name data_type comment + +ds string None +hr string None + +# Detailed Partition Information +Partition Value: [2010-04-08, 12] +Database: default +Table: stats_part +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70], [val_480]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: analyze table stats_part partition(ds, hr) compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@stats_part@ds=2010-04-08/hr=11 +PREHOOK: Input: default@stats_part@ds=2010-04-08/hr=12 +PREHOOK: Input: default@stats_part@ds=2010-04-08/hr=13 +PREHOOK: Output: default@stats_part +PREHOOK: Output: default@stats_part@ds=2010-04-08/hr=11 +PREHOOK: Output: default@stats_part@ds=2010-04-08/hr=12 +PREHOOK: Output: default@stats_part@ds=2010-04-08/hr=13 +POSTHOOK: query: analyze table stats_part partition(ds, hr) compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@stats_part@ds=2010-04-08/hr=11 +POSTHOOK: Input: default@stats_part@ds=2010-04-08/hr=12 +POSTHOOK: Input: default@stats_part@ds=2010-04-08/hr=13 +POSTHOOK: Output: default@stats_part +POSTHOOK: Output: default@stats_part@ds=2010-04-08/hr=11 +POSTHOOK: Output: default@stats_part@ds=2010-04-08/hr=12 +POSTHOOK: Output: default@stats_part@ds=2010-04-08/hr=13 +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted stats_part +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted stats_part +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string default +value string default + +# Partition Information +# col_name data_type comment + +ds string None +hr string None + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 3 + numPartitions 3 + numRows 1500 + rawDataSize 15936 + totalSize 17436 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table stats_src +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@stats_src +PREHOOK: Output: default@stats_src +POSTHOOK: query: drop table stats_src +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@stats_src +POSTHOOK: Output: default@stats_src +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: drop table stats_part +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@stats_part +PREHOOK: Output: default@stats_part +POSTHOOK: query: drop table stats_part +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@stats_part +POSTHOOK: Output: default@stats_part +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=11).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=12).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_part PARTITION(ds=2010-04-08,hr=13).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: stats_src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: hbase-handler/src/test/results/positive/hbase_stats_topk_empty_partition.q.out =================================================================== --- hbase-handler/src/test/results/positive/hbase_stats_topk_empty_partition.q.out (revision 0) +++ hbase-handler/src/test/results/positive/hbase_stats_topk_empty_partition.q.out (working copy) @@ -0,0 +1,61 @@ +PREHOOK: query: -- This test verifies that writing an empty partition succeeds when +-- hive.stats.reliable is set to true. + +create table tmptable(key string, value string) partitioned by (part string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- This test verifies that writing an empty partition succeeds when +-- hive.stats.reliable is set to true. + +create table tmptable(key string, value string) partitioned by (part string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmptable +PREHOOK: query: insert overwrite table tmptable partition (part = '1') select * from src where key = 'no_such_value' +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmptable@part=1 +POSTHOOK: query: insert overwrite table tmptable partition (part = '1') select * from src where key = 'no_such_value' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmptable@part=1 +POSTHOOK: Lineage: tmptable PARTITION(part=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmptable PARTITION(part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: describe formatted tmptable partition (part = '1') +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe formatted tmptable partition (part = '1') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmptable PARTITION(part=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmptable PARTITION(part=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string None +value string None + +# Partition Information +# col_name data_type comment + +part string None + +# Detailed Partition Information +Partition Value: [1] +Database: default +Table: tmptable +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 0 + rawDataSize 0 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 Index: hbase-handler/src/test/queries/positive/hbase_stats_topk.q =================================================================== --- hbase-handler/src/test/queries/positive/hbase_stats_topk.q (revision 0) +++ hbase-handler/src/test/queries/positive/hbase_stats_topk.q (working copy) @@ -0,0 +1,31 @@ +set datanucleus.cache.collections=false; +set hive.stats.autogather=true; +set hive.ststs.atomic=false; +set hive.stats.topk.collect=true; + +set hive.stats.dbclass=hbase; + +create table stats_src like src; +insert overwrite table stats_src select * from src; +analyze table stats_src compute statistics; +desc formatted stats_src; + +create table stats_part like srcpart; + +insert overwrite table stats_part partition (ds='2010-04-08', hr = '11') select key, value from src; +insert overwrite table stats_part partition (ds='2010-04-08', hr = '12') select key, value from src; + +analyze table stats_part partition(ds='2010-04-08', hr='11') compute statistics; +analyze table stats_part partition(ds='2010-04-08', hr='12') compute statistics; + +insert overwrite table stats_part partition (ds='2010-04-08', hr = '13') select key, value from src; + +desc formatted stats_part; +desc formatted stats_part partition (ds='2010-04-08', hr = '11'); +desc formatted stats_part partition (ds='2010-04-08', hr = '12'); + +analyze table stats_part partition(ds, hr) compute statistics; +desc formatted stats_part; + +drop table stats_src; +drop table stats_part; Index: hbase-handler/src/test/queries/positive/hbase_stats_topk_empty_partition.q =================================================================== --- hbase-handler/src/test/queries/positive/hbase_stats_topk_empty_partition.q (revision 0) +++ hbase-handler/src/test/queries/positive/hbase_stats_topk_empty_partition.q (working copy) @@ -0,0 +1,14 @@ +-- This test verifies that writing an empty partition succeeds when +-- hive.stats.reliable is set to true. + +create table tmptable(key string, value string) partitioned by (part string); + +set hive.stats.autogather=true; +set hive.stats.reliable=true; +set hive.stats.topk.collect=true; + +set hive.stats.dbclass=hbase; + +insert overwrite table tmptable partition (part = '1') select * from src where key = 'no_such_value'; + +describe formatted tmptable partition (part = '1'); Index: hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java =================================================================== --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java (revision 1377805) +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsSetupConst; /** @@ -65,6 +67,10 @@ public String aggregateStats(String rowID, String key) { byte[] family, column; + if (key == StatsSetupConst.TOP_K_VALUES) { + LOG.warn("Invalid statistic: " + key + "is supported by aggregateStatsTopK."); + return null; + } if (!HBaseStatsUtils.isValidStatistic(key)) { LOG.warn("Warning. Invalid statistic: " + key + ", supported stats: " + HBaseStatsUtils.getSupportedStatistics()); @@ -77,6 +83,7 @@ try { long retValue = 0; + String retValueString = new String(); Scan scan = new Scan(); scan.addColumn(family, column); // Filter the row by its ID @@ -88,10 +95,55 @@ scan.setFilter(filter); ResultScanner scanner = htable.getScanner(scan); + if (StatsSetupConst.getType(key) == StatsSetupConst.StatDataType.LONG_TYPE) { + for (Result result : scanner) { + retValue += Long.parseLong(Bytes.toString(result.getValue(family, column))); + } + return Long.toString(retValue); + } else if (StatsSetupConst.getType(key) == StatsSetupConst.StatDataType.STRING_TYPE) { + for (Result result : scanner) { + retValueString += Bytes.toString(result.getValue(family, column)); + } + } + return retValueString; + } catch (IOException e) { + LOG.error("Error during publishing aggregation. ", e); + return null; + } + } + + /** + * Aggregates temporary topk stats from HBase; + */ + public List aggregateStatsTopK(String rowID, String key) { + + byte[] family, column; + if (key != StatsSetupConst.TOP_K_VALUES) { + LOG.warn("Invalid statistic: " + key + ", supported stat: " + + StatsSetupConst.TOP_K_VALUES); + return null; + } + + family = HBaseStatsUtils.getFamilyName(); + column = HBaseStatsUtils.getColumnName(key); + + try { + List retList = new ArrayList(); + Scan scan = new Scan(); + scan.addColumn(family, column); + // Filter the row by its ID + // The complete key is "tableName/PartSpecs/jobID/taskID" + // This is a prefix filter, the prefix is "tableName/PartSpecs/JobID", i.e. the taskID is + // ignored. In SQL, this is equivalent to + // "Select * FROM tableName where ID LIKE 'tableName/PartSpecs/JobID%';" + PrefixFilter filter = new PrefixFilter(Bytes.toBytes(rowID)); + scan.setFilter(filter); + ResultScanner scanner = htable.getScanner(scan); + for (Result result : scanner) { - retValue += Long.parseLong(Bytes.toString(result.getValue(family, column))); + retList.add(Bytes.toString(result.getValue(family, column))); } - return Long.toString(retValue); + return retList; } catch (IOException e) { LOG.error("Error during publishing aggregation. ", e); return null; Index: hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsSetupConstants.java =================================================================== --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsSetupConstants.java (revision 1377805) +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsSetupConstants.java (working copy) @@ -20,7 +20,7 @@ public final class HBaseStatsSetupConstants { - public static final String PART_STAT_TABLE_NAME = "PARTITION_STAT_TBL"; + public static final String PART_STAT_TABLE_NAME = "PARTITION_STAT_TBL_V2"; public static final String PART_STAT_COLUMN_FAMILY = "PARTITION_STAT_FAMILY"; @@ -30,5 +30,6 @@ public static final String PART_STAT_RAW_DATA_SIZE_COLUMN_NAME = "RAW_DATA_SIZE"; + public static final String PART_STAT_TOP_K_VALUES_COLUMN_NAME = "TOP_K_VALUES"; } Index: hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsUtils.java =================================================================== --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsUtils.java (revision 1377805) +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsUtils.java (working copy) @@ -38,6 +38,7 @@ // supported statistics supportedStats.add(StatsSetupConst.ROW_COUNT); supportedStats.add(StatsSetupConst.RAW_DATA_SIZE); + supportedStats.add(StatsSetupConst.TOP_K_VALUES); // row count statistics columnNameMapping.put(StatsSetupConst.ROW_COUNT, @@ -47,6 +48,9 @@ columnNameMapping.put(StatsSetupConst.RAW_DATA_SIZE, HBaseStatsSetupConstants.PART_STAT_RAW_DATA_SIZE_COLUMN_NAME); + // top k values + columnNameMapping.put(StatsSetupConst.TOP_K_VALUES, + HBaseStatsSetupConstants.PART_STAT_TOP_K_VALUES_COLUMN_NAME); } /** Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1377805) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -26,8 +26,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Properties; import java.util.Map.Entry; -import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -505,6 +505,15 @@ "org.apache.derby.jdbc.EmbeddedDriver"), // JDBC driver specific to the dbclass HIVESTATSDBCONNECTIONSTRING("hive.stats.dbconnectionstring", "jdbc:derby:;databaseName=TempStatsStore;create=true"), // automatically create database + HIVE_STATS_TOPK_COLLECT("hive.stats.topk.collect", false), // not collect topk by default + HIVE_STATS_TOPK_NUM("hive.stats.topk.num", 5), // collect 5 top values by default + HIVE_STATS_TOPK_POOLSIZE("hive.stats.topk.poolsize", + 100), // monitor top 100 values by default while collecting topk + HIVE_STATS_TOPK_MINPERCENT("hive.stats.topk.minpercent", + 0), // if a value does not hold up to this percentage of rows, + // it will not be published even if it is within the top k. + HIVE_STATS_TOPK_COLUMN_TYPE("hive.stats.topk.column.type", + "LONG VARCHAR"), // the column data type to store topk info in the MySQL table. HIVE_STATS_DEFAULT_PUBLISHER("hive.stats.default.publisher", ""), // default stats publisher if none of JDBC/HBase is specified HIVE_STATS_DEFAULT_AGGREGATOR("hive.stats.default.aggregator", Index: ql/src/test/results/clientpositive/topk_empty_partition.q.out =================================================================== --- ql/src/test/results/clientpositive/topk_empty_partition.q.out (revision 0) +++ ql/src/test/results/clientpositive/topk_empty_partition.q.out (working copy) @@ -0,0 +1,69 @@ +PREHOOK: query: -- varying topk related parameters + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- varying topk related parameters + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_topk_test +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src where key='no_such_value' +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src where key='no_such_value' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='1') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='1') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [1] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 0 + rawDataSize 0 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table tmp_topk_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test +POSTHOOK: query: drop table tmp_topk_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/results/clientpositive/topk_vary_parameters.q.out =================================================================== --- ql/src/test/results/clientpositive/topk_vary_parameters.q.out (revision 0) +++ ql/src/test/results/clientpositive/topk_vary_parameters.q.out (working copy) @@ -0,0 +1,191 @@ +PREHOOK: query: -- varying topk related parameters + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- varying topk related parameters + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_topk_test +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='1') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='1') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [1] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70], [val_480], [val_406], [val_97], [val_462]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='2') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='2') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='2') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='2') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [2] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_469], [val_401], [val_348], [val_230], [val_489], [val_468], [val_406], [val_277]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='3') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=3 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='3') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=3 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='3') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='3') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [3] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_469], [val_401], [val_348], [val_230]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table tmp_topk_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test +POSTHOOK: query: drop table tmp_topk_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/results/clientpositive/topk_skewed_table.q.out =================================================================== --- ql/src/test/results/clientpositive/topk_skewed_table.q.out (revision 0) +++ ql/src/test/results/clientpositive/topk_skewed_table.q.out (working copy) @@ -0,0 +1,71 @@ +PREHOOK: query: -- topk should not overwrite if user has specified skewed info + +create table tmp_topk_test (key int, value string) partitioned by (ds string) skewed by (key) on (38, 49) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- topk should not overwrite if user has specified skewed info + +create table tmp_topk_test (key int, value string) partitioned by (ds string) skewed by (key) on (38, 49) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_topk_test +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='1') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='1') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [1] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [key] +Skewed Values: [[38], [49]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table tmp_topk_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test +POSTHOOK: query: drop table tmp_topk_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/results/clientpositive/topk_most_skewed_column.q.out =================================================================== --- ql/src/test/results/clientpositive/topk_most_skewed_column.q.out (revision 0) +++ ql/src/test/results/clientpositive/topk_most_skewed_column.q.out (working copy) @@ -0,0 +1,180 @@ +PREHOOK: query: -- for now, topk should put the most skewed column into skewed info + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- for now, topk should put the most skewed column into skewed info + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_topk_test +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select (1), value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select (1), value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='1') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='1') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [1] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 4406 + totalSize 4906 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [key] +Skewed Values: [[1]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: -- topk is not calculated for null column/row + +insert overwrite table tmp_topk_test partition (ds='2') select (null), value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: query: -- topk is not calculated for null column/row + +insert overwrite table tmp_topk_test partition (ds='2') select (null), value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='2') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='2') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [2] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 4906 + totalSize 5406 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70], [val_480]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: -- topk is not available at table level if table is partitioned + +desc formatted tmp_topk_test +PREHOOK: type: DESCTABLE +POSTHOOK: query: -- topk is not available at table level if table is partitioned + +desc formatted tmp_topk_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 2 + numPartitions 2 + numRows 1000 + rawDataSize 9312 + totalSize 10312 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table tmp_topk_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test +POSTHOOK: query: drop table tmp_topk_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/results/clientpositive/topk_not_enabled.q.out =================================================================== --- ql/src/test/results/clientpositive/topk_not_enabled.q.out (revision 0) +++ ql/src/test/results/clientpositive/topk_not_enabled.q.out (working copy) @@ -0,0 +1,69 @@ +PREHOOK: query: -- topk is not enabled by default, insert overwirte should not set skewed info + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- topk is not enabled by default, insert overwirte should not set skewed info + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_topk_test +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='1') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='1') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [1] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table tmp_topk_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test +POSTHOOK: query: drop table tmp_topk_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/results/clientpositive/topk_analyze_table.q.out =================================================================== --- ql/src/test/results/clientpositive/topk_analyze_table.q.out (revision 0) +++ ql/src/test/results/clientpositive/topk_analyze_table.q.out (working copy) @@ -0,0 +1,315 @@ +PREHOOK: query: -- test topk with analyze queries + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- test topk with analyze queries + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_topk_test +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: analyze table tmp_topk_test partition(ds='1') compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@tmp_topk_test@ds=1 +PREHOOK: Output: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: query: analyze table tmp_topk_test partition(ds='1') compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmp_topk_test@ds=1 +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='1') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='1') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [1] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='2') select * from src where key='no_such_value' +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='2') select * from src where key='no_such_value' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: analyze table tmp_topk_test partition(ds='2') compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@tmp_topk_test@ds=2 +PREHOOK: Output: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: query: analyze table tmp_topk_test partition(ds='2') compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmp_topk_test@ds=2 +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='2') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='2') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [2] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 0 + rawDataSize 0 + totalSize 0 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='3') select (1), value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=3 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='3') select (1), value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=3 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: analyze table tmp_topk_test partition(ds='3') compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@tmp_topk_test@ds=3 +PREHOOK: Output: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test@ds=3 +POSTHOOK: query: analyze table tmp_topk_test partition(ds='3') compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmp_topk_test@ds=3 +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test@ds=3 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='3') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='3') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [3] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 4406 + totalSize 4906 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [key] +Skewed Values: [[1]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='4') select (null), value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=4 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='4') select (null), value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=4 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=4).key EXPRESSION [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=4).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: analyze table tmp_topk_test partition(ds='4') compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@tmp_topk_test@ds=4 +PREHOOK: Output: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test@ds=4 +POSTHOOK: query: analyze table tmp_topk_test partition(ds='4') compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmp_topk_test@ds=4 +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test@ds=4 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=4).key EXPRESSION [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=4).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test partition (ds='4') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test partition (ds='4') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=4).key EXPRESSION [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=4).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +ds string None + +# Detailed Partition Information +Partition Value: [4] +Database: default +Table: tmp_topk_test +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 4906 + totalSize 5406 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table tmp_topk_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test +POSTHOOK: query: drop table tmp_topk_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key SIMPLE [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=4).key EXPRESSION [] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=4).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/results/clientpositive/topk_dynamic_partitions.q.out =================================================================== --- ql/src/test/results/clientpositive/topk_dynamic_partitions.q.out (revision 0) +++ ql/src/test/results/clientpositive/topk_dynamic_partitions.q.out (working copy) @@ -0,0 +1,213 @@ +PREHOOK: query: -- topk for a table with dynamic partitions + + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- topk for a table with dynamic partitions + + +create table tmp_topk_test (key int, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_topk_test +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='1') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=1 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='2') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='2') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=2 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table tmp_topk_test partition (ds='3') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test@ds=3 +POSTHOOK: query: insert overwrite table tmp_topk_test partition (ds='3') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test@ds=3 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: create table tmp_topk_test_2 (key int, value string) partitioned by (hr string, ds string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table tmp_topk_test_2 (key int, value string) partitioned by (hr string, ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_topk_test_2 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table tmp_topk_test_2 partition (hr='1', ds) select * from tmp_topk_test where ds>'1' +PREHOOK: type: QUERY +PREHOOK: Input: default@tmp_topk_test@ds=2 +PREHOOK: Input: default@tmp_topk_test@ds=3 +PREHOOK: Output: default@tmp_topk_test_2@hr=1 +POSTHOOK: query: insert overwrite table tmp_topk_test_2 partition (hr='1', ds) select * from tmp_topk_test where ds>'1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmp_topk_test@ds=2 +POSTHOOK: Input: default@tmp_topk_test@ds=3 +POSTHOOK: Output: default@tmp_topk_test_2@hr=1/ds=2 +POSTHOOK: Output: default@tmp_topk_test_2@hr=1/ds=3 +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: desc formatted tmp_topk_test_2 partition (hr='1', ds='2') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test_2 partition (hr='1', ds='2') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +hr string None +ds string None + +# Detailed Partition Information +Partition Value: [1, 2] +Database: default +Table: tmp_topk_test_2 +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70], [val_480]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: desc formatted tmp_topk_test_2 partition (hr='1', ds='3') +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test_2 partition (hr='1', ds='3') +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] +# col_name data_type comment + +key int None +value string None + +# Partition Information +# col_name data_type comment + +hr string None +ds string None + +# Detailed Partition Information +Partition Value: [1, 3] +Database: default +Table: tmp_topk_test_2 +#### A masked pattern was here #### +Protect Mode: None +#### A masked pattern was here #### +Partition Parameters: + numFiles 1 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70], [val_480]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table tmp_topk_feng_2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table tmp_topk_feng_2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: drop table tmp_topk_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test +POSTHOOK: query: drop table tmp_topk_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test PARTITION(ds=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=2).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).key SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: tmp_topk_test_2 PARTITION(hr=1,ds=3).value SIMPLE [(tmp_topk_test)tmp_topk_test.FieldSchema(name:value, type:string, comment:null), ] Index: ql/src/test/results/clientpositive/topk_non_partitioned_table.q.out =================================================================== --- ql/src/test/results/clientpositive/topk_non_partitioned_table.q.out (revision 0) +++ ql/src/test/results/clientpositive/topk_non_partitioned_table.q.out (working copy) @@ -0,0 +1,63 @@ +PREHOOK: query: create table tmp_topk_test (key int, value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table tmp_topk_test (key int, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_topk_test +PREHOOK: query: insert overwrite table tmp_topk_test select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tmp_topk_test +POSTHOOK: query: insert overwrite table tmp_topk_test select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Lineage: tmp_topk_test.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted tmp_topk_test +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted tmp_topk_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: tmp_topk_test.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int None +value string None + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 1 + numPartitions 0 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Skewed Columns: [value] +Skewed Values: [[val_348], [val_230], [val_401], [val_70], [val_480]] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table tmp_topk_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_topk_test +PREHOOK: Output: default@tmp_topk_test +POSTHOOK: query: drop table tmp_topk_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_topk_test +POSTHOOK: Output: default@tmp_topk_test +POSTHOOK: Lineage: tmp_topk_test.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_topk_test.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java (revision 1377805) +++ ql/src/test/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.stats; +import java.util.List; import org.apache.hadoop.conf.Configuration; /** @@ -45,6 +46,10 @@ return null; } + public List aggregateStatsTopK(String keyPrefix, String statType) { + return null; + } + public boolean closeConnection() { if (errorMethod.equalsIgnoreCase("closeConnection")) { return false; Index: ql/src/test/queries/clientpositive/topk_dynamic_partitions.q =================================================================== --- ql/src/test/queries/clientpositive/topk_dynamic_partitions.q (revision 0) +++ ql/src/test/queries/clientpositive/topk_dynamic_partitions.q (working copy) @@ -0,0 +1,27 @@ +-- topk for a table with dynamic partitions + + +create table tmp_topk_test (key int, value string) partitioned by (ds string); + +insert overwrite table tmp_topk_test partition (ds='1') select * from src; +insert overwrite table tmp_topk_test partition (ds='2') select * from src; +insert overwrite table tmp_topk_test partition (ds='3') select * from src; + + +create table tmp_topk_test_2 (key int, value string) partitioned by (hr string, ds string); + +set hive.stats.topk.collect=true; + +insert overwrite table tmp_topk_test_2 partition (hr='1', ds) select * from tmp_topk_test where ds>'1'; + + +desc formatted tmp_topk_test_2 partition (hr='1', ds='2'); + + +desc formatted tmp_topk_test_2 partition (hr='1', ds='3'); + + +drop table tmp_topk_feng_2; + + +drop table tmp_topk_test; Index: ql/src/test/queries/clientpositive/topk_vary_parameters.q =================================================================== --- ql/src/test/queries/clientpositive/topk_vary_parameters.q (revision 0) +++ ql/src/test/queries/clientpositive/topk_vary_parameters.q (working copy) @@ -0,0 +1,23 @@ +-- varying topk related parameters + +create table tmp_topk_test (key int, value string) partitioned by (ds string); + + +set hive.stats.topk.collect=true; +set hive.stats.topk.num=8; +set hive.stats.topk.minpercent=0; +insert overwrite table tmp_topk_test partition (ds='1') select * from src; +desc formatted tmp_topk_test partition (ds='1'); + + +set hive.stats.topk.poolsize=500; +insert overwrite table tmp_topk_test partition (ds='2') select * from src; +desc formatted tmp_topk_test partition (ds='2'); + + +set hive.stats.topk.minpercent=1; +insert overwrite table tmp_topk_test partition (ds='3') select * from src; +desc formatted tmp_topk_test partition (ds='3'); + + +drop table tmp_topk_test; Index: ql/src/test/queries/clientpositive/topk_analyze_table.q =================================================================== --- ql/src/test/queries/clientpositive/topk_analyze_table.q (revision 0) +++ ql/src/test/queries/clientpositive/topk_analyze_table.q (working copy) @@ -0,0 +1,43 @@ +-- test topk with analyze queries + +create table tmp_topk_test (key int, value string) partitioned by (ds string); + + +set hive.stats.topk.collect=true; +set hive.stats.topk.num=4; +set hive.stats.topk.minpercent=0; +insert overwrite table tmp_topk_test partition (ds='1') select * from src; + +analyze table tmp_topk_test partition(ds='1') compute statistics; + +desc formatted tmp_topk_test partition (ds='1'); + + + +insert overwrite table tmp_topk_test partition (ds='2') select * from src where key='no_such_value'; + +analyze table tmp_topk_test partition(ds='2') compute statistics; + +desc formatted tmp_topk_test partition (ds='2'); + + + +insert overwrite table tmp_topk_test partition (ds='3') select (1), value from src; + +analyze table tmp_topk_test partition(ds='3') compute statistics; + + +desc formatted tmp_topk_test partition (ds='3'); + + + +insert overwrite table tmp_topk_test partition (ds='4') select (null), value from src; + +analyze table tmp_topk_test partition(ds='4') compute statistics; + + +desc formatted tmp_topk_test partition (ds='4'); + + + +drop table tmp_topk_test; Index: ql/src/test/queries/clientpositive/topk_not_enabled.q =================================================================== --- ql/src/test/queries/clientpositive/topk_not_enabled.q (revision 0) +++ ql/src/test/queries/clientpositive/topk_not_enabled.q (working copy) @@ -0,0 +1,11 @@ +-- topk is not enabled by default, insert overwirte should not set skewed info + +create table tmp_topk_test (key int, value string) partitioned by (ds string); + +insert overwrite table tmp_topk_test partition (ds='1') select * from src; + +desc formatted tmp_topk_test partition (ds='1'); + + +drop table tmp_topk_test; + Index: ql/src/test/queries/clientpositive/topk_most_skewed_column.q =================================================================== --- ql/src/test/queries/clientpositive/topk_most_skewed_column.q (revision 0) +++ ql/src/test/queries/clientpositive/topk_most_skewed_column.q (working copy) @@ -0,0 +1,31 @@ +-- for now, topk should put the most skewed column into skewed info + +create table tmp_topk_test (key int, value string) partitioned by (ds string); + + +set hive.stats.topk.collect=true; + +insert overwrite table tmp_topk_test partition (ds='1') select (1), value from src; + + +desc formatted tmp_topk_test partition (ds='1'); + + + +-- topk is not calculated for null column/row + +insert overwrite table tmp_topk_test partition (ds='2') select (null), value from src; + + +desc formatted tmp_topk_test partition (ds='2'); + + + +-- topk is not available at table level if table is partitioned + +desc formatted tmp_topk_test; + + + +drop table tmp_topk_test; + Index: ql/src/test/queries/clientpositive/topk_empty_partition.q =================================================================== --- ql/src/test/queries/clientpositive/topk_empty_partition.q (revision 0) +++ ql/src/test/queries/clientpositive/topk_empty_partition.q (working copy) @@ -0,0 +1,15 @@ +-- varying topk related parameters + +create table tmp_topk_test (key int, value string) partitioned by (ds string); + + +set hive.stats.topk.collect=true; +set hive.stats.topk.num=8; +set hive.stats.topk.minpercent=0; +insert overwrite table tmp_topk_test partition (ds='1') select * from src where key='no_such_value'; + + +desc formatted tmp_topk_test partition (ds='1'); + + +drop table tmp_topk_test; Index: ql/src/test/queries/clientpositive/topk_non_partitioned_table.q =================================================================== --- ql/src/test/queries/clientpositive/topk_non_partitioned_table.q (revision 0) +++ ql/src/test/queries/clientpositive/topk_non_partitioned_table.q (working copy) @@ -0,0 +1,13 @@ +create table tmp_topk_test (key int, value string); + + +set hive.stats.topk.collect=true; + +insert overwrite table tmp_topk_test select * from src; + + +desc formatted tmp_topk_test; + + +drop table tmp_topk_test; + Index: ql/src/test/queries/clientpositive/topk_skewed_table.q =================================================================== --- ql/src/test/queries/clientpositive/topk_skewed_table.q (revision 0) +++ ql/src/test/queries/clientpositive/topk_skewed_table.q (working copy) @@ -0,0 +1,18 @@ +set hive.internal.ddl.list.bucketing.enable=true; + + +-- topk should not overwrite if user has specified skewed info + +create table tmp_topk_test (key int, value string) partitioned by (ds string) skewed by (key) on (38, 49); + + +set hive.stats.topk.collect=true; + +insert overwrite table tmp_topk_test partition (ds='1') select * from src; + + +desc formatted tmp_topk_test partition (ds='1'); + + +drop table tmp_topk_test; + Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -52,16 +52,17 @@ import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * File Sink operator implementation. @@ -82,6 +83,16 @@ protected transient List dpWritables; protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters protected transient int maxPartitions; + // topk related variables + protected transient Map colVals; + protected transient List colNames; + protected transient List vals; + protected transient List writables; + protected transient int numCols; + protected transient int topKNum; + protected transient int topKPoolSize; + protected transient float topKMinPercent; + protected transient boolean collectTopK; private static final transient String[] FATAL_ERR_MSG = { null, // counter value 0 means no error @@ -299,6 +310,15 @@ hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); isCompressed = conf.getCompressed(); parent = Utilities.toTempPath(conf.getDirName()); + // initialize topk variables + colVals = new HashMap(); + colNames = new ArrayList(); + vals = new ArrayList(); + writables = new ArrayList(); + topKNum = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVE_STATS_TOPK_NUM); + topKPoolSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVE_STATS_TOPK_POOLSIZE); + topKMinPercent = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVE_STATS_TOPK_MINPERCENT); + collectTopK = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_STATS_TOPK_COLLECT); serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(null, conf.getTableInfo().getProperties()); @@ -578,6 +598,16 @@ } } fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1); + if (collectTopK) { + // get column names and values for topk calculation + getColNamesAndValues(row); + // put topk related parameters into the current stat + fpaths.stat.setTopKNum((short)topKNum); + fpaths.stat.setTopKPoolSize((short)topKPoolSize); + fpaths.stat.setTopKMinPercent(topKMinPercent); + // add topk stats into the current stat + fpaths.stat.addToStat(StatsSetupConst.TOP_K_VALUES, colVals); + } } @@ -830,7 +860,17 @@ } Map statsToPublish = new HashMap(); for (String statType : fspValue.stat.getStoredStats()) { - statsToPublish.put(statType, Long.toString(fspValue.stat.getStat(statType))); + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + statsToPublish.put(statType, Long.toString(fspValue.stat.getLongStat(statType))); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + // topk will be published as a string type stat + try { + statsToPublish.put(statType, fspValue.stat.getStringStat(statType)); + } catch (IOException e) { + LOG.warn("FileSinkOperator failed to publish topk statistics. " + + StringUtils.stringifyException(e)); + } + } } if (!statsPublisher.publishStat(key, statsToPublish)) { // The original exception is lost. @@ -848,4 +888,38 @@ } } } + + private void getColNamesAndValues(Object row) { + // retrieve column values + StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0]; + List fieldOI = soi.getAllStructFieldRefs(); + numCols = fieldOI.size(); + if (dpCtx != null) { + numCols = fieldOI.size() - dpCtx.getDPColNames().size(); + } + vals.clear(); + writables.clear(); + ObjectInspectorUtils.partialCopyToStandardObject(writables, row, 0, numCols, + (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); + for (Object o : writables) { + if (o == null || o.toString().length() == 0) { + vals.add(null); + } else { + vals.add(o.toString()); + } + } + + // retrieve column names + String tblPropCols = conf.getTableInfo().getProperties().getProperty("columns"); + String[] columnNames = tblPropCols.split(","); + for (int i = 0; i < columnNames.length; i ++) { + colNames.add(columnNames[i]); + } + + for (int i = 0; i < numCols; i++) { + if (vals.get(i) != null) { + colVals.put(colNames.get(i), vals.get(i)); + } + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; @@ -42,6 +44,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.StringUtils; /** * Table Scan Operator If the data is coming from the map-reduce framework, just @@ -59,6 +62,7 @@ private transient Stat currentStat; private transient Map stats; + private transient boolean collectTopK; public TableDesc getTableDesc() { return tableDesc; @@ -91,6 +95,9 @@ private void gatherStats(Object row) { // first row/call or a new partition + + Map colVals = new HashMap(); + if ((currentStat == null) || inputFileChanged) { String partitionSpecs; inputFileChanged = false; @@ -159,11 +166,27 @@ currentStat.addToStat(StatsSetupConst.RAW_DATA_SIZE, (((LongWritable)rdSize.get(0)).get())); } + if (collectTopK) { + // get column names and values for topk statistics + getColNamesAndValues(row, colVals); + // update topk stats + if (!colVals.isEmpty()) { + int topKNum = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVE_STATS_TOPK_NUM); + int topKPoolSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVE_STATS_TOPK_POOLSIZE); + int topKMinPercent = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVE_STATS_TOPK_MINPERCENT); + currentStat.setTopKNum((short)topKNum); + currentStat.setTopKPoolSize((short)topKPoolSize); + currentStat.setTopKMinPercent((short)topKMinPercent); + currentStat.addToStat(StatsSetupConst.TOP_K_VALUES, colVals); + } + } } @Override protected void initializeOp(Configuration hconf) throws HiveException { initializeChildren(hconf); + collectTopK = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_STATS_TOPK_COLLECT); inputFileChanged = false; if (conf == null) { @@ -259,7 +282,16 @@ key = conf.getStatsAggPrefix() + pspecs + Path.SEPARATOR + taskID; } for(String statType : stats.get(pspecs).getStoredStats()) { - statsToPublish.put(statType, Long.toString(stats.get(pspecs).getStat(statType))); + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + statsToPublish.put(statType, Long.toString(stats.get(pspecs).getLongStat(statType))); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + try { + statsToPublish.put(statType, stats.get(pspecs).getStringStat(statType)); + } catch (IOException e) { + LOG.warn("TableScanOperator failed to publish topk statistics. " + + StringUtils.stringifyException(e)); + } + } } if (!statsPublisher.publishStat(key, statsToPublish)) { if (isStatsReliable) { @@ -274,4 +306,29 @@ } } } + + private void getColNamesAndValues(Object row, Map colVals) { + // retrieve column names + List colNames = conf.getColNames(); + int numCols = colNames.size(); + + // retrieve column values + List vals = new ArrayList(); + List writables = new ArrayList(); + ObjectInspectorUtils.partialCopyToStandardObject(writables, row, 0, numCols, + (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); + for (Object o : writables) { + if (o == null || o.toString().length() == 0) { + vals.add(null); + } else { + vals.add(o.toString()); + } + } + + for (int k = 0; k < numCols; k++) { + if (vals.get(k) != null) { + colVals.put(colNames.get(k), vals.get(k)); + } + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (working copy) @@ -19,8 +19,10 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -47,8 +49,11 @@ import org.apache.hadoop.hive.ql.stats.StatsAggregator; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsSetupConst; +import org.apache.hadoop.hive.ql.stats.topk.ItemCount; +import org.apache.hadoop.hive.ql.stats.topk.TopKStorage; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.util.StringUtils; +import org.codehaus.jackson.map.ObjectMapper; /** * StatsTask implementation. @@ -57,6 +62,7 @@ private static final long serialVersionUID = 1L; private static transient final Log LOG = LogFactory.getLog(StatsTask.class); + private static final ObjectMapper mapper = new org.codehaus.jackson.map.ObjectMapper(); private Table table; private List> dpPartSpecs; @@ -70,15 +76,18 @@ supportedStats.add(StatsSetupConst.ROW_COUNT); supportedStats.add(StatsSetupConst.TOTAL_SIZE); supportedStats.add(StatsSetupConst.RAW_DATA_SIZE); + supportedStats.add(StatsSetupConst.TOP_K_VALUES); // statistics that need to be collected throughout the execution collectableStats.add(StatsSetupConst.ROW_COUNT); collectableStats.add(StatsSetupConst.RAW_DATA_SIZE); + collectableStats.add(StatsSetupConst.TOP_K_VALUES); nameMapping.put(StatsSetupConst.NUM_FILES, "num_files"); nameMapping.put(StatsSetupConst.ROW_COUNT, "num_rows"); nameMapping.put(StatsSetupConst.TOTAL_SIZE, "total_size"); nameMapping.put(StatsSetupConst.RAW_DATA_SIZE, "raw_data_size"); + nameMapping.put(StatsSetupConst.TOP_K_VALUES, "top_k_values"); } public StatsTask() { @@ -92,37 +101,96 @@ * */ class PartitionStatistics { - Map stats; + // separating numeric and string stats + // keeps it generic for future use + Map statsLong; + Map statsString; + Map> topK; public PartitionStatistics() { - stats = new HashMap(); + statsLong = new HashMap(); + statsString = new HashMap(); + topK = new HashMap>(); for (String statType : supportedStats) { - stats.put(statType, new LongWritable(0L)); + if (statType == StatsSetupConst.TOP_K_VALUES) { + continue; + } + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + statsLong.put(statType, new LongWritable(0L)); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + statsString.put(statType, ""); + } } } - public PartitionStatistics(Map st) { - stats = new HashMap(); - for (String statType : st.keySet()) { - Long stValue = st.get(statType) == null ? 0L : st.get(statType); - stats.put(statType, new LongWritable(stValue)); + public PartitionStatistics(Map stLong, Map stString, + Map> topK) { + statsLong = new HashMap(); + statsString = new HashMap(); + for (String statType : stLong.keySet()) { + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + Long stValue = stLong.get(statType) == null ? 0L : stLong.get(statType); + statsLong.put(statType, new LongWritable(stValue)); + } } + for (String statType : stString.keySet()) { + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + String stValue = stString.get(statType) == null ? "" : stString.get(statType); + statsString.put(statType, new String(stValue)); + } + } + for (String key : topK.keySet()) { + this.topK.put(key, topK.get(key)); + } } - public long getStat(String statType) { - return stats.get(statType) == null ? 0L : stats.get(statType).get(); + public long getLongStat(String statType) { + return statsLong.get(statType) == null ? 0L : statsLong.get(statType).get(); } + public String getStringStat(String statType) { + return statsString.get(statType) == null ? "" : statsString.get(statType); + } + + public Map> getTopKStat() { + return topK; + } + public void setStat(String statType, long value) { - stats.put(statType, new LongWritable(value)); + statsLong.put(statType, new LongWritable(value)); } + public void setStat(String statType, String value) { + statsString.put(statType, new String(value)); + } + public void setStat(Map> topK) { + this.topK = topK; + } + + public void clearStat(String statType) { + if (statType == StatsSetupConst.TOP_K_VALUES) { + topK.clear(); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + statsLong.put(statType, new LongWritable(0L)); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + statsString.put(statType, ""); + } + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); for (String statType : supportedStats) { - sb.append(nameMapping.get(statType)).append(": ").append(stats.get(statType)).append(", "); + if (statType == StatsSetupConst.TOP_K_VALUES) { + sb.append(nameMapping.get(statType)).append(": ").append(topK.toString()).append(", "); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + sb.append(nameMapping.get(statType)).append(": ").append( + statsLong.get(statType)).append(", "); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + sb.append(nameMapping.get(statType)).append(": ").append( + statsString.get(statType)).append(", "); + } } sb.delete(sb.length() - 2, sb.length()); return sb.toString(); @@ -170,13 +238,19 @@ */ public void addPartitionStats(PartitionStatistics newStats) { for (String statType : supportedStats) { - LongWritable value = stats.get(statType); - if (value == null) { - stats.put(statType, new LongWritable(newStats.getStat(statType))); - } else { - value.set(value.get() + newStats.getStat(statType)); + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + LongWritable value = statsLong.get(statType); + if (value == null) { + statsLong.put(statType, new LongWritable(newStats.getLongStat(statType))); + } else { + value.set(value.get() + newStats.getLongStat(statType)); + } } + // TODO: add String type stats. } + // New topk stats are not added to existing topk, + // because table level topk is not supported for partitioned tables. + // Non-partitioned tables inherit topk stats from PartitionStatistics. this.numPartitions++; } @@ -188,9 +262,13 @@ */ public void deletePartitionStats(PartitionStatistics oldStats) { for (String statType : supportedStats) { - LongWritable value = stats.get(statType); - value.set(value.get() - oldStats.getStat(statType)); + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + LongWritable value = statsLong.get(statType); + value.set(value.get() - oldStats.getLongStat(statType)); + } + // TODO: add String type stats. } + // table level topk stats are not supported for partitioned tables. this.numPartitions--; } @@ -198,7 +276,24 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("num_partitions: ").append(numPartitions).append(", "); - sb.append(super.toString()); + if (numPartitions == 0) { + sb.append(super.toString()); + } else { // For partitioned tables, remove topk from table stats string. + for (String statType : supportedStats) { + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + sb.append(nameMapping.get(statType)).append(": ").append( + statsLong.get(statType)).append(", "); + } else if (StatsSetupConst.getType( + statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + if (statType == StatsSetupConst.TOP_K_VALUES) { + continue; + } + sb.append(nameMapping.get(statType)).append(": ").append( + statsString.get(statType)).append(", "); + } + } + sb.delete(sb.length() - 2, sb.length()); + } return sb.toString(); } } @@ -281,6 +376,7 @@ String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); StatsFactory.setImplementation(statsImplementationClass, conf); statsAggregator = StatsFactory.getStatsAggregator(); + // manufacture a StatsAggregator if (!statsAggregator.connect(conf)) { throw new HiveException("StatsAggregator connect failed " + statsImplementationClass); @@ -295,8 +391,13 @@ boolean tableStatsExist = this.existStats(parameters); for (String statType : supportedStats) { - if (parameters.containsKey(statType)) { + if (!parameters.containsKey(statType)) { + continue; + } + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { tblStats.setStat(statType, Long.parseLong(parameters.get(statType))); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + tblStats.setStat(statType, parameters.get(statType)); } } @@ -334,7 +435,7 @@ else if (work.isClearAggregatorStats()) { for (String statType : collectableStats) { if (parameters.containsKey(statType)) { - tblStats.setStat(statType, 0L); + tblStats.clearStat(statType); } } } @@ -354,11 +455,22 @@ continue; } - Map currentValues = new HashMap(); + Map currentLongValues = new HashMap(); + Map currentStringValues = new HashMap(); + Map> currentTopK = new HashMap>(); for (String statType : supportedStats) { - Long val = parameters.containsKey(statType) ? Long.parseLong(parameters.get(statType)) - : 0L; - currentValues.put(statType, val); + if (statType == StatsSetupConst.TOP_K_VALUES) { + continue; + } + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + Long val = parameters.containsKey(statType) ? Long.parseLong(parameters.get(statType)) + : 0L; + currentLongValues.put(statType, val); + } else if (StatsSetupConst.getType( + statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + String val = parameters.containsKey(statType) ? parameters.get(statType) : ""; + currentStringValues.put(statType, val); + } } // @@ -381,11 +493,19 @@ // For eg. if a file is being loaded, the old number of rows are not valid if (work.isClearAggregatorStats()) { if (parameters.containsKey(statType)) { - newPartStats.setStat(statType, 0L); + newPartStats.clearStat(statType); } } else { - newPartStats.setStat(statType, currentValues.get(statType)); + if (statType == StatsSetupConst.TOP_K_VALUES) { + newPartStats.setStat(currentTopK); + } else if (StatsSetupConst.getType( + statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + newPartStats.setStat(statType, currentLongValues.get(statType)); + } else if (StatsSetupConst.getType( + statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + newPartStats.setStat(statType, currentStringValues.get(statType)); + } } } } @@ -401,7 +521,8 @@ newPartStats.setStat(StatsSetupConst.TOTAL_SIZE, partitionSize); if (hasStats) { - PartitionStatistics oldPartStats = new PartitionStatistics(currentValues); + PartitionStatistics oldPartStats = new PartitionStatistics(currentLongValues, + currentStringValues, currentTopK); tblStats.updateStats(oldPartStats, newPartStats); } else { tblStats.addPartitionStats(newPartStats); @@ -411,13 +532,58 @@ // update the metastore // for (String statType : supportedStats) { - long statValue = newPartStats.getStat(statType); - if (statValue >= 0) { - parameters.put(statType, Long.toString(newPartStats.getStat(statType))); + if (statType == StatsSetupConst.TOP_K_VALUES) { + continue; + } else if (StatsSetupConst.getType( + statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + long statValue = newPartStats.getLongStat(statType); + if (statValue >= 0) { + parameters.put(statType, Long.toString(newPartStats.getLongStat(statType))); + } + } else if (StatsSetupConst.getType( + statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + String statValue = newPartStats.getStringStat(statType); + if (!statValue.isEmpty()) { + parameters.put(statType, newPartStats.getStringStat(statType)); + } } } + // set skewed column names and values to be the most skewed column from topk + // if they are not specified by user tPart.setParameters(parameters); + if (!newPartStats.getTopKStat().isEmpty()) { + Map> currTopK = newPartStats.getTopKStat(); + if (tPart.getSd().getSkewedInfo().getSkewedColNamesSize() == 0) { + List colNames = new ArrayList(); + List> colValues = new ArrayList>(); + for (String key : currTopK.keySet()) { + colNames.add(key); + for (String val : currTopK.get(key)) { + List listVal = new ArrayList(); + listVal.add(val); + colValues.add(listVal); + } + break; + } + tPart.getSd().getSkewedInfo().setSkewedColNames(colNames); + tPart.getSd().getSkewedInfo().setSkewedColValues(colValues); + } else if (tPart.getSd().getSkewedInfo().getSkewedColNamesSize() == 1 && + tPart.getSd().getSkewedInfo().getSkewedColValuesSize() == 0) { + // if user specified skewed column name but not values, + // set the skewed values to be those determined by topk + String colName = tPart.getSd().getSkewedInfo().getSkewedColNames().get(0); + if (currTopK.containsKey(colName)) { + List> colValues = new ArrayList>(); + for (String val : currTopK.get(colName)) { + List listVal = new ArrayList(); + listVal.add(val); + colValues.add(listVal); + } + tPart.getSd().getSkewedInfo().setSkewedColValues(colValues); + } + } + } String tableFullName = table.getDbName() + "." + table.getTableName(); db.alterPartition(tableFullName, new Partition(table, tPart)); @@ -436,11 +602,49 @@ // parameters = tTable.getParameters(); for (String statType : supportedStats) { - parameters.put(statType, Long.toString(tblStats.getStat(statType))); + if (statType == StatsSetupConst.TOP_K_VALUES) { + continue; + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + parameters.put(statType, Long.toString(tblStats.getLongStat(statType))); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + parameters.put(statType, tblStats.getStringStat(statType)); + } } parameters.put(StatsSetupConst.NUM_PARTITIONS, Integer.toString(tblStats.getNumPartitions())); tTable.setParameters(parameters); + // set skewed info using topk, if skewed info not specified by user + if (!tblStats.getTopKStat().isEmpty() && partitions == null) { + Map> currTopK = tblStats.getTopKStat(); + if (tTable.getSd().getSkewedInfo().getSkewedColNamesSize() == 0) { + List colNames = new ArrayList(); + List> colValues = new ArrayList>(); + for (String key : currTopK.keySet()) { + colNames.add(key); + for (String val : currTopK.get(key)) { + List listVal = new ArrayList(); + listVal.add(val); + colValues.add(listVal); + } + break; + } + tTable.getSd().getSkewedInfo().setSkewedColNames(colNames); + tTable.getSd().getSkewedInfo().setSkewedColValues(colValues); + } else if (tTable.getSd().getSkewedInfo().getSkewedColNamesSize() == 1 && + tTable.getSd().getSkewedInfo().getSkewedColValuesSize() == 0) { + String colName = tTable.getSd().getSkewedInfo().getSkewedColNames().get(0); + if (currTopK.containsKey(colName)) { + List> colValues = new ArrayList>(); + for (String val : currTopK.get(colName)) { + List listVal = new ArrayList(); + listVal.add(val); + colValues.add(listVal); + } + tTable.getSd().getSkewedInfo().setSkewedColValues(colValues); + } + } + } + String tableFullName = table.getDbName() + "." + table.getTableName(); db.alterTable(tableFullName, new Table(tTable)); @@ -471,7 +675,8 @@ || parameters.containsKey(StatsSetupConst.NUM_FILES) || parameters.containsKey(StatsSetupConst.TOTAL_SIZE) || parameters.containsKey(StatsSetupConst.RAW_DATA_SIZE) - || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS); + || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS) + || parameters.containsKey(StatsSetupConst.TOP_K_VALUES); } private void updateStats(List statsList, PartitionStatistics stats, @@ -480,22 +685,53 @@ String value; Long longValue; + List listValue; for (String statType : statsList) { - value = statsAggregator.aggregateStats(aggKey, statType); - if (value != null) { - longValue = Long.parseLong(value); - - if (work.getLoadTableDesc() != null && - !work.getLoadTableDesc().getReplace()) { - String originalValue = parameters.get(statType); - if (originalValue != null) { - longValue += Long.parseLong(originalValue); + if (statType == StatsSetupConst.TOP_K_VALUES) { + listValue = statsAggregator.aggregateStatsTopK(aggKey, statType); + if (listValue != null) { + // update topk values + try { + Map> topKFormatted = getFinalTopK(listValue); + stats.setStat(topKFormatted); + } catch (IOException e) { + throw new HiveException("StatsAggregator failed to get topk statistics. " + + StringUtils.stringifyException(e)); } + } else { + if (atomic) { + throw new HiveException("StatsAggregator failed to get statistics."); + } } - stats.setStat(statType, longValue); } else { - if (atomic) { - throw new HiveException("StatsAggregator failed to get statistics."); + value = statsAggregator.aggregateStats(aggKey, statType); + if (value != null) { + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + longValue = Long.parseLong(value); + + if (work.getLoadTableDesc() != null && + !work.getLoadTableDesc().getReplace()) { + String originalValue = parameters.get(statType); + if (originalValue != null) { + longValue += Long.parseLong(originalValue); + } + } + stats.setStat(statType, longValue); + } else if (StatsSetupConst.getType( + statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + if (work.getLoadTableDesc() != null && + !work.getLoadTableDesc().getReplace()) { + String originalValue = parameters.get(statType); + if (originalValue != null) { + value += originalValue; + } + } + stats.setStat(statType, value); + } + } else { + if (atomic) { + throw new HiveException("StatsAggregator failed to get statistics."); + } } } } @@ -568,4 +804,123 @@ statsAggregator.closeConnection(); } } + + /** + * This method parses a list topk stats string, + * which are from different reducers, + * and returns a map of column names and top k values. + */ + public Map> getFinalTopK(List allTopK) throws IOException { + // the map to be returned + Map> retMap = new LinkedHashMap>(); + // this big map maps a column name to a small map + // the small map stores the column's top values and the ItemCount of each value + Map>> bigMap + = new HashMap>>(); + // this maps a column name to a list of ItemCount + // which stores the column's top value information + Map>> bigQueue = new HashMap>>(); + + int k = 0; // k as in topk + float minPercent = 0; // minimal row percentage for a value to be in topk + long rowCount = 0; // number of rows in the newly added data, + // to help determine each value's row percentage + + // integrate topk from all reducers + for (String topk : allTopK) { + if (topk.isEmpty()) { + continue; + } + TopKStorage topKSt = new TopKStorage(); + try { + topKSt = mapper.readValue(topk, TopKStorage.class); + } catch (IOException e) { + throw new IOException("Failed to retrieve topk stats from JSON object. " + + StringUtils.stringifyException(e)); + } + Map>> itemsMap = topKSt.getAllItems(); + k = topKSt.getK(); + minPercent = topKSt.getMinPercent(); + rowCount += topKSt.getRowCount(); + for (String colName : itemsMap.keySet()) { + // combine all reducer's topk for a column + Map> items = bigMap.get(colName); + if (items == null) { + items = new HashMap>(); + } + List> colVals = itemsMap.get(colName); + for (ItemCount valCount : colVals) { + String value = valCount.getItem(); + int count = valCount.getCount(); + int possibleError = valCount.getPossibleError(); + if (items.containsKey(value)) { + // update this value's count and possible error + int oldCount = items.get(value).getCount(); + items.get(value).setCount(oldCount + count); + int oldPossibleError = items.get(value).getPossibleError(); + items.get(value).setPossibleError(oldPossibleError + possibleError); + } else { + items.put(value, valCount); + } + } + bigMap.put(colName, items); + } + } + if (bigMap.isEmpty()) { + return retMap; // no topk is collected + } + + // sort the top values for each column + for (String colName : bigMap.keySet()) { + Map> items = bigMap.get(colName); + List> countList = new ArrayList>(); + countList.addAll(items.values()); + Collections.sort(countList, ItemCount.DESCENDING); + bigQueue.put(colName, countList); + } + + // get final topk, + // and put the most skewed column at the beginning of retMap, + // so it can be put into table/partition skewed info. + // if skewed info supports multiple independent columns in the future, + // then all topk can be get utilized. + float totalPercent = 0; + String mostSkewedColName = new String(); + List mostSkewedColValues = new ArrayList(); + Map> tmpMap = new HashMap>(); + for (String colName : bigQueue.keySet()) { + List finalList = new ArrayList(); + int i = 0; + float currentTotalPercent = 0; + for (ItemCount itemCount : bigQueue.get(colName)) { + // use the lower bound of the count estimate + // to compute the percentage of rows that a value holds + float percent = 100 * ( + (float) itemCount.getCount() - (float) itemCount.getPossibleError()) / (float) rowCount; + // a value can be in topk if it ranks smaller than k, + // and its row percentage is higher than minPercent. + if (i >= k || percent < minPercent) { + break; + } + finalList.add(itemCount.getItem()); + i ++; + currentTotalPercent += percent; + } + // update most skewed column + if (currentTotalPercent > totalPercent) { + mostSkewedColName = colName; + totalPercent = currentTotalPercent; + mostSkewedColValues.clear(); + mostSkewedColValues.addAll(finalList); + } + // only non-empty topk gets returned + if (!finalList.isEmpty()) { + tmpMap.put(colName, finalList); + } + } + retMap.put(mostSkewedColName, mostSkewedColValues); + tmpMap.remove(mostSkewedColName); + retMap.putAll(tmpMap); + return retMap; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Stat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Stat.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Stat.java (working copy) @@ -18,48 +18,146 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.stats.StatsSetupConst; +import org.apache.hadoop.hive.ql.stats.topk.ItemCount; +import org.apache.hadoop.hive.ql.stats.topk.TopKAggregator; +import org.apache.hadoop.hive.ql.stats.topk.TopKStorage; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.util.StringUtils; +import org.codehaus.jackson.map.ObjectMapper; public class Stat { + public static final ObjectMapper mapper = new ObjectMapper(); + // stored stats - private final Map statsMap; + // numeric and string stats are stored separately + private final Map statsMapLong; + private final Map statsMapString; + // topk aggregation storage + private final Map> topKAggr; + // topk paratemers set by FileSinkOperator or TableScanOperator + private short topKNum; + private short topKPoolSize; + private float topKMinPercent; // additional bookkeeping info for the stored stats private final Map bookkeepingInfo; public Stat() { - statsMap = new HashMap(); + statsMapLong = new HashMap(); + statsMapString = new HashMap(); bookkeepingInfo = new HashMap(); + topKAggr = new HashMap>(); + topKNum = 0; + topKPoolSize = 0; + topKMinPercent = 0; } public void addToStat(String statType, long amount) { - LongWritable currentValue = statsMap.get(statType); + LongWritable currentValue = statsMapLong.get(statType); if (currentValue == null) { - statsMap.put(statType, new LongWritable(amount)); + statsMapLong.put(statType, new LongWritable(amount)); } else { currentValue.set(currentValue.get() + amount); } } - public long getStat(String statType) { - LongWritable currValue = statsMap.get(statType); + // add topk into the stat + public void addToStat(String statType, Map colVals) { + if (topKNum == 0) { + return; + } + for (String colName : colVals.keySet()) { + if (!topKAggr.containsKey(colName)) { + TopKAggregator colTopKAggr; + colTopKAggr = (new TopKAggregator.Factory(topKNum, topKPoolSize)).create(); + colTopKAggr.add(colVals.get(colName), 1); + topKAggr.put(colName, colTopKAggr); + } + else { + topKAggr.get(colName).add(colVals.get(colName), 1); + } + } + } + + public long getLongStat(String statType) { + LongWritable currValue = statsMapLong.get(statType); if (currValue == null) { return 0; } return currValue.get(); } + // get topk from the stat + public String getStringStat(String statType) throws IOException { + if (statType == StatsSetupConst.TOP_K_VALUES) { + if (topKAggr.isEmpty()) { + return ""; + } + // get topk for each column + Map>> topKAll = new HashMap>>(); + for (String colName : topKAggr.keySet()) { + List> colValRecord = topKAggr.get(colName).get(); + topKAll.put(colName, colValRecord); + } + String retval = new String(); + // wrap the topk stats into a JSON string + // so that StatsPublisher can publish it to the temporary table + TopKStorage topKSt = new TopKStorage(topKAll, topKNum, topKMinPercent, + statsMapLong.get(StatsSetupConst.ROW_COUNT).get()); + try { + StringWriter sw = new StringWriter(); + mapper.writeValue(sw, topKSt); + retval = sw.toString(); + } catch (IOException e) { + throw new IOException("Failed to convert topK stats into JSON object. " + + StringUtils.stringifyException(e)); + } + return retval; + } else { + String currValue = statsMapString.get(statType); + if (currValue == null) { + return ""; + } + return currValue; + } + } + + public void setTopKNum(short k) { + this.topKNum = k; + } + + public void setTopKPoolSize(short l) { + this.topKPoolSize = l; + } + + public void setTopKMinPercent(float p) { + this.topKMinPercent = p; + } + public Collection getStoredStats() { - return statsMap.keySet(); + Collection storedStats = new ArrayList(); + storedStats.addAll(statsMapLong.keySet()); + storedStats.addAll(statsMapString.keySet()); + if (!topKAggr.isEmpty()) { + storedStats.add(StatsSetupConst.TOP_K_VALUES); + } + return storedStats; } public void clear() { - statsMap.clear(); + statsMapLong.clear(); + statsMapString.clear(); + topKAggr.clear(); } // additional information about stats (e.g., virtual column number Index: ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (working copy) @@ -44,6 +44,12 @@ private List partColumns; /** + * A list of the column names of the table. + * Set by the semantic analyzer only in case of the analyze command. + */ + private List colNames; + + /** * A boolean variable set to true by the semantic analyzer only in case of the analyze command. * */ @@ -89,6 +95,14 @@ this.alias = alias; } + public void setColNames (List colNames) { + this.colNames = colNames; + } + + public List getColNames () { + return colNames; + } + public void setPartColumns (List partColumns) { this.partColumns = partColumns; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -6777,6 +6777,13 @@ tsDesc.setGatherStats(true); tsDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE)); + // get column names to put in tableScanDesc + List colNames = new ArrayList(); + for (FieldSchema col : tab.getCols()) { + colNames.add(col.getName()); + } + tsDesc.setColNames(colNames); + // append additional virtual columns for storing statistics Iterator vcs = VirtualColumn.getStatsRegistry(conf).iterator(); List vcList = new ArrayList(); Index: ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java (working copy) @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.stats; +import java.util.List; + import org.apache.hadoop.conf.Configuration; /** @@ -59,6 +61,12 @@ public String aggregateStats(String keyPrefix, String statType); /** + * This method aggregates topk statistics from all tasks. + * @return a list of strings for the topk from all tasks, or null if there is an error/exception. + */ + public List aggregateStatsTopK(String keyPrefix, String statType); + + /** * This method closes the connection to the temporary storage. * * @return true if close connection is successful, false otherwise. Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsSetupConstants.java (working copy) @@ -26,7 +26,7 @@ // NOTE: // For all table names past and future, Hive will not drop old versions of this table, it is up // to the administrator - public static final String PART_STAT_TABLE_NAME = "PARTITION_STATS_V2"; + public static final String PART_STAT_TABLE_NAME = "PARTITION_STATS_V3"; // supported statistics - column names @@ -34,4 +34,6 @@ public static final String PART_STAT_RAW_DATA_SIZE_COLUMN_NAME = "RAW_DATA_SIZE"; + public static final String PART_STAT_TOP_K_VALUES_COLUMN_NAME = "TOP_K_VALUES"; + } Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (working copy) @@ -155,7 +155,6 @@ Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries); return true; } catch (SQLIntegrityConstraintViolationException e) { - // We assume that the table used for partial statistics has a primary key declared on the // "fileID". The exception will be thrown if two tasks report results for the same fileID. // In such case, we either update the row, or abandon changes depending on which statistic @@ -272,7 +271,8 @@ ResultSet rs = dbm.getTables(null, null, JDBCStatsUtils.getStatTableName(), null); boolean tblExists = rs.next(); if (!tblExists) { // Table does not exist, create it - String createTable = JDBCStatsUtils.getCreate(""); + String createTable = JDBCStatsUtils.getCreate("", + HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_STATS_TOPK_COLUMN_TYPE)); stmt.executeUpdate(createTable); stmt.close(); } Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java (working copy) @@ -33,6 +33,7 @@ // supported statistics supportedStats.add(StatsSetupConst.ROW_COUNT); supportedStats.add(StatsSetupConst.RAW_DATA_SIZE); + supportedStats.add(StatsSetupConst.TOP_K_VALUES); // row count statistics columnNameMapping.put(StatsSetupConst.ROW_COUNT, @@ -41,6 +42,10 @@ // raw data size columnNameMapping.put(StatsSetupConst.RAW_DATA_SIZE, JDBCStatsSetupConstants.PART_STAT_RAW_DATA_SIZE_COLUMN_NAME); + + // top k values + columnNameMapping.put(StatsSetupConst.TOP_K_VALUES, + JDBCStatsSetupConstants.PART_STAT_TOP_K_VALUES_COLUMN_NAME); } /** @@ -123,11 +128,15 @@ /** * Prepares CREATE TABLE query */ - public static String getCreate(String comment) { + public static String getCreate(String comment, String topKColType) { String create = "CREATE TABLE /* " + comment + " */ " + JDBCStatsUtils.getStatTableName() + " (" + getTimestampColumnName() + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + JDBCStatsUtils.getIdColumnName() + " VARCHAR(255) PRIMARY KEY "; for (int i = 0; i < supportedStats.size(); i++) { + if (supportedStats.get(i) == StatsSetupConst.TOP_K_VALUES) { + create += ", " + getStatColumnName(supportedStats.get(i)) + " " + topKColType + " "; + continue; + } create += ", " + getStatColumnName(supportedStats.get(i)) + " BIGINT "; } create += ")"; @@ -175,9 +184,16 @@ * @return aggregated value for the given statistic */ public static String getSelectAggr(String statType, String comment) { - String select = "SELECT /* " + comment + " */ " + "SUM( " - + getStatColumnName(statType) + " ) " + " FROM " - + getStatTableName() + " WHERE " + JDBCStatsUtils.getIdColumnName() + " LIKE ? ESCAPE ?"; + String select = new String(); + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + select = "SELECT /* " + comment + " */ " + "SUM( " + + getStatColumnName(statType) + " ) " + " FROM " + + getStatTableName() + " WHERE " + JDBCStatsUtils.getIdColumnName() + " LIKE ? ESCAPE ?"; + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + select = "SELECT /* " + comment + " */ " + + getStatColumnName(statType) + " FROM " + + getStatTableName() + " WHERE " + JDBCStatsUtils.getIdColumnName() + " LIKE ? ESCAPE ?"; + } return select; } Index: ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (working copy) @@ -24,7 +24,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLRecoverableException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; @@ -34,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsSetupConst; public class JDBCStatsAggregator implements StatsAggregator { @@ -114,6 +117,10 @@ @Override public String aggregateStats(String fileID, String statType) { + if (statType == StatsSetupConst.TOP_K_VALUES) { + LOG.warn("Invalid statistic: " + statType + "is supported by aggregateStatsTopK."); + return null; + } if (!JDBCStatsUtils.isValidStatistic(statType)) { LOG.warn("Invalid statistic: " + statType + ", supported stats: " + JDBCStatsUtils.getSupportedStatistics()); @@ -130,20 +137,32 @@ String keyPrefix = Utilities.escapeSqlLike(fileID) + "%"; for (int failures = 0;; failures++) { try { - long retval = 0; + long retvallong = 0; + // return value for String type stats + String retvalstring = new String(); + PreparedStatement selStmt = columnMapping.get(statType); selStmt.setString(1, keyPrefix); selStmt.setString(2, Character.toString(Utilities.sqlEscapeChar)); ResultSet result = Utilities.executeWithRetry(execQuery, selStmt, waitWindow, maxRetries); if (result.next()) { - retval = result.getLong(1); + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + retvallong = result.getLong(1); + } else if (StatsSetupConst.getType( + statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + retvalstring = result.getString(1); + } } else { LOG.warn("Warning. Nothing published. Nothing aggregated."); return null; } - return Long.toString(retval); + if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.LONG_TYPE) { + return Long.toString(retvallong); + } else if (StatsSetupConst.getType(statType) == StatsSetupConst.StatDataType.STRING_TYPE) { + return retvalstring; + } } catch (SQLRecoverableException e) { // need to start from scratch (connection) if (failures >= maxRetries) { @@ -172,6 +191,70 @@ } @Override + public List aggregateStatsTopK(String fileID, String statType) { + + if (statType != StatsSetupConst.TOP_K_VALUES) { + LOG.warn("Invalid statistic: " + statType + ", supported stat: " + + StatsSetupConst.TOP_K_VALUES); + return null; + } + + Utilities.SQLCommand execQuery = new Utilities.SQLCommand() { + @Override + public ResultSet run(PreparedStatement stmt) throws SQLException { + return stmt.executeQuery(); + } + }; + + String keyPrefix = Utilities.escapeSqlLike(fileID) + "%"; + for (int failures = 0;; failures++) { + try { + // return value list for topk stats + List retlist = new ArrayList(); + + PreparedStatement selStmt = columnMapping.get(statType); + selStmt.setString(1, keyPrefix); + selStmt.setString(2, Character.toString(Utilities.sqlEscapeChar)); + + ResultSet result = Utilities.executeWithRetry(execQuery, selStmt, waitWindow, maxRetries); + if (result.next()) { + retlist.add(result.getString(1)); + while (result.next()) { + retlist.add(result.getString(1)); + } + } else { + LOG.warn("Warning. Nothing published. Nothing aggregated."); + return null; + } + return retlist; + } catch (SQLRecoverableException e) { + // need to start from scratch (connection) + if (failures >= maxRetries) { + return null; + } + // close the current connection + closeConnection(); + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r); + try { + Thread.sleep(waitTime); + } catch (InterruptedException iex) { + } + // getting a new connection + if (!connect(hiveconf)) { + // if cannot reconnect, just fail because connect() already handles retries. + LOG.error("Error during publishing aggregation. " + e); + return null; + } + } catch (SQLException e) { + // for SQLTransientException (already handled by Utilities.*WithRetries() functions + // and SQLNonTransientException, just declare failure. + LOG.error("Error during publishing aggregation. " + e); + return null; + } + } + } + + @Override public boolean closeConnection() { if (conn == null) { Index: ql/src/java/org/apache/hadoop/hive/ql/stats/StatsSetupConst.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsSetupConst.java (revision 1377805) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsSetupConst.java (working copy) @@ -61,4 +61,39 @@ */ public static final String RAW_DATA_SIZE = "rawDataSize"; + /** + * The top k values of each column, to be published or gathered. + */ + public static final String TOP_K_VALUES = "topKValues"; + + /** + * The enumeration consists of data types for the stats. + */ + public static enum StatDataType { + LONG_TYPE, + STRING_TYPE, + TYPE_NOT_EXIST + }; + + /** + * This method returns the data type of a given statType. + * @param statType + * : a string whose data type is to be returned. + * @return the data type as a constant string in StatsSetupConst. + */ + public static StatDataType getType(String statType) { + StatDataType retval = StatDataType.TYPE_NOT_EXIST; + if (statType == ROW_COUNT || + statType == RAW_DATA_SIZE || + statType == NUM_FILES || + statType == NUM_PARTITIONS|| + statType == TOTAL_SIZE) { + retval = StatDataType.LONG_TYPE; + } + if (statType == TOP_K_VALUES) { + retval = StatDataType.STRING_TYPE; + } + return retval; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/stats/topk/TopKAggregator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/topk/TopKAggregator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/topk/TopKAggregator.java (working copy) @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats.topk; + +import java.util.List; + +/** + * Top-K count aggregation. Since the algorithm doesn't keep all the data, there's no guarantee the + * results are correct. See documention of TopK.java for details. In cases where only an estimate + * of the actual value is known, StatsAggregation always uses the lower-bound of that estimate. + * + * @see TopK + */ +public class TopKAggregator> { + + private final TopK topK; + + public TopKAggregator(TopK topK) { + this.topK = topK; + } + + /** + * Adds the value to the aggregation. + * + * @param input a list + * @return true if the aggregation was modified. + */ + public synchronized boolean add(T item, int count) { + boolean changed = false; + + if (count > 0) { + topK.add(item, count); + changed = true; + } + return changed; + } + + public List> get() { + return topK.getTopCounts(); + } + + public static class Factory> { + private final short k; + private final short max; + + public Factory( + short k, + short max + ) { + this.k = k; + this.max = max; + } + + public TopKAggregator create() { + return new TopKAggregator(new TopK(k, max)); + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/stats/topk/TopKStorage.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/topk/TopKStorage.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/topk/TopKStorage.java (working copy) @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats.topk; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * + * TopKStorage. + * a class that wraps the String type topk stats into a JavaBean. + * + */ +public class TopKStorage { + private short k; + private float minPercent; + private long rowCount; + private Map>> allItems; + + public TopKStorage() { + this.allItems = new HashMap>>(); + this.k = 0; + this.minPercent = 0; + this.rowCount = 0; + } + + public TopKStorage(Map>> allItems, + short k, + float minPercent, + long rowCount) { + this.allItems = allItems; + this.k = k; + this.minPercent = minPercent; + this.rowCount = rowCount; + } + + public Map>> getAllItems() { + return this.allItems; + } + + public void setAllItems(Map>> allItems) { + this.allItems = allItems; + } + + public short getK() { + return this.k; + } + + public void setK(short k) { + this.k = k; + } + + public float getMinPercent() { + return this.minPercent; + } + + public void setMinPercent(float minPercent) { + this.minPercent = minPercent; + } + + + public long getRowCount() { + return this.rowCount; + } + + public void setRowCount(long rowCount) { + this.rowCount = rowCount; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/stats/topk/ItemCount.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/topk/ItemCount.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/topk/ItemCount.java (working copy) @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats.topk; + +import java.util.Collections; +import java.util.Comparator; + +import org.apache.commons.lang.builder.CompareToBuilder; + +import com.google.common.base.Preconditions; + +/** + * Keeps track of the estimated count of the number of times an item has been seen and the possible + * overestimation of the count. + * + * @param the type of item being counted + */ +public class ItemCount> { + private T item; + + private int count; + private int possibleError; + + public ItemCount() { + this.item = null; + this.count = 0; + this.possibleError = 0; + } + + public ItemCount(T item, int count, int possibleError) { + this.item = item; + this.count = count; + this.possibleError = possibleError; + } + + /** + * Adds a count value for this item + * + * @param count the count value + */ + public void add(int count) { + this.count = overflowSafeAdd(this.count, count); + } + + public boolean add(ItemCount itemCount) { + Preconditions.checkArgument( + itemCount.getItem().equals(getItem()), + "Can't merge " + this + " with " + itemCount + ); + + int count = itemCount.getCount(); + int possibleError = itemCount.getPossibleError(); + + this.count = overflowSafeAdd(this.count, count); + this.possibleError = overflowSafeAdd(this.possibleError, possibleError); + + return count != 0 || possibleError != 0; + } + + /** + * Fetches the current count value. + */ + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + /** + * Fetches the possible overestimation of count. + */ + public int getPossibleError() { + return possibleError; + } + + public void setPossibleError(int possibleError) { + this.possibleError = possibleError; + } + + /** + * Returns the item contained. + */ + public T getItem() { + return item; + } + + public void setItem(T item) { + this.item = item; + } + + /** + * Returns a clone of this instance. + */ + public ItemCount copy() { + return new ItemCount(item, count, possibleError); + } + + @Override + public String toString() { + return item + ":" + count + "(" + possibleError + ")"; + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + + if (object == null || getClass() != object.getClass()) { + return false; + } + + ItemCount that = (ItemCount) object; + + return count == that.count && + item.equals(that.item) && + possibleError == that.possibleError; + } + + @Override + public int hashCode() { + int result = item == null ? 0 : item.hashCode(); + + result = 31 * result + count; + result = 31 * result + possibleError; + + return result; + } + + private int overflowSafeAdd(int value1, int value2) { + long result = (long) value1 + (long) value2; + + if (result > Integer.MAX_VALUE) { + result = Integer.MAX_VALUE; + } + return (int) result; + } + + public static final Comparator> ASCENDING = new Comparator>() { + @Override + public int compare(ItemCount lhs, ItemCount rhs) { + return new CompareToBuilder() + .append(lhs.getCount(), rhs.getCount()) + // we want the ones with the smallest counts first, but if there's a tie, then we want + // the ones with the biggest errors first (since it's more likely the true values is + // smaller), hence the reversal of lhs and rhs here + .append(rhs.getPossibleError(), lhs.getPossibleError()) + .append(lhs.getItem(), rhs.getItem()) + .toComparison(); + } + }; + public static final Comparator> DESCENDING = Collections.reverseOrder(ASCENDING); + +} Index: ql/src/java/org/apache/hadoop/hive/ql/stats/topk/TopK.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/stats/topk/TopK.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/stats/topk/TopK.java (working copy) @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.stats.topk; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +/** + * A class that keeps track of top K items. + *

+ * Note that this implementation is loosely based on + * this paper. + * Sections 3 and 4.2 are useful reads. + *

+ * Since the algorithm doesn't keep all the data it can't always guarantee that the results will + * be correct. It's possible to tell when the results are guaranteed top-K and are in the right + * order. However, since we do best effort computation here, we simply report all the values along + * with a possible error amount. (If there's a need for guranteed results, refer to the section 4.2 + * of the paper.) + *

+ * That is, the counts presented for each items are approximate when the + * {@link org.apache.hadoop.hive.ql.stats.topk.ItemCount#getPossibleError()} value is + * non-zero. + * + * @param the type of the item value + */ +public class TopK> { + private final Map> items = new HashMap>(); + private final PriorityQueue> countQueue = new PriorityQueue>( + // TODO that if space/perf becomes an issue, use a different initial size than the default + 11, // 11 is just the default value PriorityQueue uses internally + ItemCount.ASCENDING + ); + private final short k; + private final short maxItems; + + /** + * Creates an instance. + * + * @param k the number of top items to report + * @param maxItems the maximum number of items to track + */ + public TopK(short k, short maxItems) { + this(k, maxItems, ImmutableList.>of()); + } + + /** + * Creates an instance. + * + * @param k the number of top items to report + * @param maxItems the maximum number of items to track + * @param counts an initial set of item counts to add immediately + */ + public TopK( + short k, + short maxItems, + List> counts + ) { + Preconditions.checkArgument( + maxItems > 0, + "Max must be greater than zero, but got: " + maxItems + ); + Preconditions.checkArgument( + k <= maxItems, + "K can't be greater than max, but got: " + k + " > " + maxItems + ); + this.k = k; + this.maxItems = maxItems; + this.countQueue.addAll(counts); + + for (ItemCount itemCount : counts) { + items.put(itemCount.getItem(), itemCount); + } + } + + /** + * Adds or updates the count of the given item in the list. + * + * @param item the item + * @param count the count + */ + public synchronized void add(T item, int count) { + Preconditions.checkArgument(count >= 0, "Count must be non-negative, but got: " + count); + + ItemCount itemCount = items.get(item); + + if (itemCount == null) { + if (items.size() >= maxItems) { + // swap the item with the item having lowest count + ItemCount minItem = countQueue.remove(); + // at most, the new item could have had the same count as the one we're removing, i.e., it + // could have been tied for last but got bumped out of the queue + int possibleError = minItem.getCount(); + + items.remove(minItem.getItem()); + itemCount = new ItemCount(item, count + possibleError, possibleError); + countQueue.add(itemCount); + items.put(item, itemCount); + } else { + // add the new item + itemCount = new ItemCount(item, count, 0); + items.put(item, itemCount); + countQueue.add(itemCount); + } + } else { + // update the count (remove and re-add to ensure correct ordering is maintained) + countQueue.remove(itemCount); + itemCount.add(count); + countQueue.add(itemCount); + } + } + + /** + * Returns the top counts. The counts are in descending order of the count values. + * The maximum size of is determined by the k parameter to the constructor. + * + * @return + */ + public List> getTopCounts() { + List> values = getAllCounts(); + + Collections.sort(values, ItemCount.DESCENDING); + + if (values.size() > k) { + // prune extra elements + values.subList(k, values.size()).clear(); + } + + return values; + } + + /** + * Returns a copy of all the counts. The counts are in no particular order + * + * @return + */ + synchronized List> getAllCounts() { + List> values = new ArrayList>(countQueue.size()); + + for (ItemCount itemCount : countQueue) { + values.add(itemCount.copy()); + } + + return values; + } + + /** + * Merges the contents of the supplied topK instance to this one. + * The supplied topK is unmodified. + * + * @param topK + */ + public synchronized boolean addAll(List> counts) { + boolean updated = false; + + for (ItemCount count : counts) { + T item = count.getItem(); + ItemCount existingCount = items.get(item); + + if (existingCount == null) { + items.put(item, count); + countQueue.add(count); + updated = true; + } else if (existingCount.add(count)) { + // remove and re-add to ensure correct ordering is maintained + countQueue.remove(existingCount); + countQueue.add(existingCount); + updated = true; + } + } + + while (countQueue.size() > maxItems) { + ItemCount itemCount = countQueue.remove(); + + items.remove(itemCount.getItem()); + } + + return updated; + } + + /** + * Returns a deep copy of this instance + */ + public TopK copy() { + return new TopK(k, maxItems, getAllCounts()); + } +}