diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6e16200..9302617 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1731,6 +1731,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "uses column statistics to estimate the number of rows flowing out of it and hence the data size.\n" + "In the absence of column statistics, this factor determines the amount of rows that flows out\n" + "of JOIN operator."), + // When estimating output rows for a join involving multiple columns, the default behavior assumes + // the columns are independent. Setting this flag to true will cause the estimator to assume + // the columns are correlated. + HIVE_STATS_CORRELATED_MULTI_KEY_JOINS("hive.stats.correlated.multi.key.joins", false, + "When estimating output rows for a join involving multiple columns, the default behavior assumes" + + "the columns are independent. Setting this flag to true will cause the estimator to assume" + + "the columns are correlated."), // in the absence of uncompressed/raw data size, total file size will be used for statistics // annotation. But the file may be compressed, encoded and serialized which may be lesser in size // than the actual uncompressed/raw data size. This factor will be multiplied to file size to estimate diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index d9f70a7..3996b2e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -1506,7 +1506,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, perAttrDVs.clear(); } - if (numAttr > numParent) { + if (numAttr > 1 && conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_CORRELATED_MULTI_KEY_JOINS)) { + denom = Collections.max(distinctVals); + } else if (numAttr > numParent) { // To avoid denominator getting larger and aggressively reducing // number of rows, we will ease out denominator. denom = StatsUtils.addWithExpDecay(distinctVals); diff --git a/ql/src/test/queries/clientpositive/correlated_join_keys.q b/ql/src/test/queries/clientpositive/correlated_join_keys.q new file mode 100644 index 0000000..4c801de --- /dev/null +++ b/ql/src/test/queries/clientpositive/correlated_join_keys.q @@ -0,0 +1,34 @@ + +drop table customer_address; + +create table customer_address +( + ca_address_sk int, + ca_address_id string, + ca_street_number string, + ca_street_name string, + ca_street_type string, + ca_suite_number string, + ca_city string, + ca_county string, + ca_state string, + ca_zip string, + ca_country string, + ca_gmt_offset float, + ca_location_type string +) +row format delimited fields terminated by '|'; + +load data local inpath '../../data/files/customer_address.txt' overwrite into table customer_address; +analyze table customer_address compute statistics; +analyze table customer_address compute statistics for columns ca_state, ca_zip; + +set hive.stats.fetch.column.stats=true; + +set hive.stats.correlated.multi.key.joins=false; +explain select count(*) from customer_address a join customer_address b on (a.ca_zip = b.ca_zip and a.ca_state = b.ca_state); + +set hive.stats.correlated.multi.key.joins=true; +explain select count(*) from customer_address a join customer_address b on (a.ca_zip = b.ca_zip and a.ca_state = b.ca_state); + +drop table customer_address; diff --git a/ql/src/test/results/clientpositive/correlated_join_keys.q.out b/ql/src/test/results/clientpositive/correlated_join_keys.q.out new file mode 100644 index 0000000..ec5d008 --- /dev/null +++ b/ql/src/test/results/clientpositive/correlated_join_keys.q.out @@ -0,0 +1,258 @@ +PREHOOK: query: drop table customer_address +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table customer_address +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table customer_address +( + ca_address_sk int, + ca_address_id string, + ca_street_number string, + ca_street_name string, + ca_street_type string, + ca_suite_number string, + ca_city string, + ca_county string, + ca_state string, + ca_zip string, + ca_country string, + ca_gmt_offset float, + ca_location_type string +) +row format delimited fields terminated by '|' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@customer_address +POSTHOOK: query: create table customer_address +( + ca_address_sk int, + ca_address_id string, + ca_street_number string, + ca_street_name string, + ca_street_type string, + ca_suite_number string, + ca_city string, + ca_county string, + ca_state string, + ca_zip string, + ca_country string, + ca_gmt_offset float, + ca_location_type string +) +row format delimited fields terminated by '|' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@customer_address +PREHOOK: query: load data local inpath '../../data/files/customer_address.txt' overwrite into table customer_address +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@customer_address +POSTHOOK: query: load data local inpath '../../data/files/customer_address.txt' overwrite into table customer_address +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@customer_address +PREHOOK: query: analyze table customer_address compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@customer_address +PREHOOK: Output: default@customer_address +POSTHOOK: query: analyze table customer_address compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@customer_address +POSTHOOK: Output: default@customer_address +PREHOOK: query: analyze table customer_address compute statistics for columns ca_state, ca_zip +PREHOOK: type: QUERY +PREHOOK: Input: default@customer_address +#### A masked pattern was here #### +POSTHOOK: query: analyze table customer_address compute statistics for columns ca_state, ca_zip +POSTHOOK: type: QUERY +POSTHOOK: Input: default@customer_address +#### A masked pattern was here #### +PREHOOK: query: explain select count(*) from customer_address a join customer_address b on (a.ca_zip = b.ca_zip and a.ca_state = b.ca_state) +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) from customer_address a join customer_address b on (a.ca_zip = b.ca_zip and a.ca_state = b.ca_state) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (ca_state is not null and ca_zip is not null) (type: boolean) + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: ca_state (type: string), ca_zip (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + TableScan + alias: b + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (ca_state is not null and ca_zip is not null) (type: boolean) + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: ca_state (type: string), ca_zip (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string), _col1 (type: string) + 1 _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: explain select count(*) from customer_address a join customer_address b on (a.ca_zip = b.ca_zip and a.ca_state = b.ca_state) +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) from customer_address a join customer_address b on (a.ca_zip = b.ca_zip and a.ca_state = b.ca_state) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (ca_state is not null and ca_zip is not null) (type: boolean) + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: ca_state (type: string), ca_zip (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + TableScan + alias: b + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (ca_state is not null and ca_zip is not null) (type: boolean) + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: ca_state (type: string), ca_zip (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 20 Data size: 3500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string), _col1 (type: string) + 1 _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 16 Data size: 128 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: drop table customer_address +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@customer_address +PREHOOK: Output: default@customer_address +POSTHOOK: query: drop table customer_address +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@customer_address +POSTHOOK: Output: default@customer_address