diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 53b9b0c..0676229 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2872,7 +2872,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal TEZ_DYNAMIC_SEMIJOIN_REDUCTION("hive.tez.dynamic.semijoin.reduction", true, "When dynamic semijoin is enabled, shuffle joins will perform a leaky semijoin before shuffle. This " + "requires hive.tez.dynamic.partition.pruning to be enabled."), - TEZ_MAX_BLOOM_FILTER_ENTRIES("hive.tez.max.bloom.filter.entries", 100000000L, + TEZ_MAX_BLOOM_FILTER_ENTRIES("hive.tez.max.bloom.filter.entries", 10000000L, "Bloom filter should be of at max certain size to be effective"), TEZ_SMB_NUMBER_WAVES( "hive.tez.smb.number.waves", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java index deb0f76..2eab2d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java @@ -23,6 +23,9 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ColStatistics; +import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.hive.ql.plan.Statistics.State; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -41,6 +44,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.sql.Timestamp; +import java.util.List; /** * Generic UDF to generate Bloom Filter @@ -98,12 +102,8 @@ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveExc static class BloomFilterBuf extends AbstractAggregationBuffer { BloomFilter bloomFilter; - public BloomFilterBuf(long expectedEntries, long maxEntries) { - if (expectedEntries > maxEntries) { - bloomFilter = new BloomFilter(1); - } else { - bloomFilter = new BloomFilter(expectedEntries); - } + public BloomFilterBuf(long expectedEntries) { + bloomFilter = new BloomFilter(expectedEntries); } @Override @@ -124,7 +124,7 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException { throw new IllegalStateException("BloomFilter expectedEntries not initialized"); } - BloomFilterBuf buf = new BloomFilterBuf(expectedEntries, maxEntries); + BloomFilterBuf buf = new BloomFilterBuf(expectedEntries); reset(buf); return buf; } @@ -245,10 +245,31 @@ public Object terminatePartial(AggregationBuffer agg) throws HiveException { } public long getExpectedEntries() { + long expectedEntries = -1; if (sourceOperator != null && sourceOperator.getStatistics() != null) { - return sourceOperator.getStatistics().getNumRows(); + Statistics stats = sourceOperator.getStatistics(); + expectedEntries = stats.getNumRows(); + + // Use NumDistinctValues if possible + switch (stats.getColumnStatsState()) { + case COMPLETE: + case PARTIAL: + // There should only be column stats for one column, use if that is the case. + List colStats = stats.getColumnStats(); + if (colStats.size() == 1) { + long ndv = colStats.get(0).getCountDistint(); + if (ndv > 0) { + expectedEntries = ndv; + } + } + break; + default: + break; + } } - return -1; + + // Do not exceed maxEntries + return expectedEntries > maxEntries ? maxEntries : expectedEntries; } public Operator getSourceOperator() { diff --git a/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q index e1eefff..5eec4e4 100644 --- a/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q +++ b/ql/src/test/queries/clientpositive/vectorized_dynamic_semijoin_reduction.q @@ -39,5 +39,14 @@ select count(*) from dsrv_big a join dsrv_small b on (a.key_str = b.key_str and EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2'); select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) where b.value in ('nonexistent1', 'nonexistent2'); +-- bloomfilter expectedEntries should use ndv if available. Compare to first query +analyze table dsrv_small compute statistics; +analyze table dsrv_small compute statistics for columns; +set hive.stats.fetch.column.stats=true; +EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int); +set hive.tez.max.bloom.filter.entries=10; +-- bloomfilter expectedEntries should be capped at hive.tez.max.bloom.filter.entries +EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int); + drop table dsrv_big; drop table dsrv_small; diff --git a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out index 29f2391..829e7e3 100644 --- a/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out +++ b/ql/src/test/results/clientpositive/llap/vectorized_dynamic_semijoin_reduction.q.out @@ -914,6 +914,272 @@ POSTHOOK: Input: default@dsrv_big POSTHOOK: Input: default@dsrv_small #### A masked pattern was here #### 0 +PREHOOK: query: analyze table dsrv_small compute statistics +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_small +PREHOOK: Output: default@dsrv_small +POSTHOOK: query: analyze table dsrv_small compute statistics +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_small +POSTHOOK: Output: default@dsrv_small +PREHOOK: query: analyze table dsrv_small compute statistics for columns +PREHOOK: type: QUERY +PREHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +POSTHOOK: query: analyze table dsrv_small compute statistics for columns +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dsrv_small +#### A masked pattern was here #### +PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + filterExpr: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=47) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=47) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from dsrv_big a join dsrv_small b on (a.key_int = b.key_int) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key_int is not null and key_int BETWEEN DynamicValue(RS_7_b_key_int_min) AND DynamicValue(RS_7_b_key_int_max) and in_bloom_filter(key_int, DynamicValue(RS_7_b_key_int_bloom_filter))) (type: boolean) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 500 Data size: 90000 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + filterExpr: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key_int is not null (type: boolean) + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key_int (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 57 Data size: 228 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=10) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Statistics: Num rows: 550 Data size: 99000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=10) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: drop table dsrv_big PREHOOK: type: DROPTABLE PREHOOK: Input: default@dsrv_big