diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java index deb0f76..b32e04a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java @@ -23,6 +23,9 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ColStatistics; +import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.hive.ql.plan.Statistics.State; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -41,6 +44,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.sql.Timestamp; +import java.util.List; /** * Generic UDF to generate Bloom Filter @@ -245,10 +249,30 @@ public Object terminatePartial(AggregationBuffer agg) throws HiveException { } public long getExpectedEntries() { + long expectedEntries = -1; if (sourceOperator != null && sourceOperator.getStatistics() != null) { - return sourceOperator.getStatistics().getNumRows(); + Statistics stats = sourceOperator.getStatistics(); + expectedEntries = stats.getNumRows(); + + // Use NumDistinctValues if possible + switch (stats.getColumnStatsState()) { + case COMPLETE: + case PARTIAL: + // There should only be column stats for one column, use if that is the case. + List colStats = stats.getColumnStats(); + if (colStats.size() == 1) { + long ndv = colStats.get(0).getCountDistint(); + if (ndv > 0) { + expectedEntries = ndv; + } + } + break; + default: + break; + } } - return -1; + + return expectedEntries; } public Operator getSourceOperator() { diff --git a/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q index e1eefff..0f66f6b 100644 --- a/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q +++ b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q @@ -39,5 +39,11 @@ select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2'); select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2'); +-- bloomfilter expectedEntries should use ndv if available. Compare to first query +analyze table dsrv_small compute statistics; +analyze table dsrv_small compute statistics for columns; +set hive.stats.fetch.column.stats=true; +EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int); + drop table dsrv_big; drop table dsrv_small; diff --git a/ql/src/test/results/clientpositive/llap/mergejoin.q.out b/ql/src/test/results/clientpositive/llap/mergejoin.q.out index 6114548..71614ff 100644 --- a/ql/src/test/results/clientpositive/llap/mergejoin.q.out +++ b/ql/src/test/results/clientpositive/llap/mergejoin.q.out @@ -1876,7 +1876,7 @@ STAGE PLANS: outputColumnNames: _col0 Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator - aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=25) + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=14) mode: hash outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE @@ -1952,7 +1952,7 @@ STAGE PLANS: Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator - aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25) + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=14) mode: final outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE @@ -2569,7 +2569,7 @@ STAGE PLANS: outputColumnNames: _col0 Statistics: Num rows: 25 Data size: 2225 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator - aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=25) + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=14) mode: hash outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE @@ -2645,7 +2645,7 @@ STAGE PLANS: Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator - aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=25) + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=14) mode: final outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE diff --git a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out index 29f2391..e3c416b 100644 --- a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out +++ b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out @@ -914,6 +914,147 @@ POSTHOOK: Input: default@dsrv_big POSTHOOK: Input: default@dsrv_small #### A masked pattern was here #### 0 +PREHOOK: query: analyze table dsrv_small compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_small +PREHOOK: Output: default@dsrv_small +POSTHOOK: query: analyze table dsrv_small compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_small +POSTHOOK: Output: default@dsrv_small +PREHOOK: query: analyze table dsrv_small compute statistics for columns +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +POSTHOOK: query: analyze table dsrv_small compute statistics for columns +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + filterExpr: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=47) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=47) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: drop table dsrv_big PREHOOK: type: DROPTABLE PREHOOK: Input: default@dsrv_big