Details
-
New Feature
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
Join is really common/popular in Spark SQL, in this JIRA I take broadcast join as an example and describe how Kudu's bloom filter can help accelerate distributed computing.
Spark runs broadcast join with below steps:
1. When do broadcast join, we have a small table and a big table; Spark will read all data from small table to one worker and build a hash table;
2. The generated hash table from step 1 is broadcasted to all the workers, which will read the splits from big table;
3. Workers start fetching and iterating all the splits of big table and see if the joining keys exists in the hash table; Only matched joining keys is retained.
From above, step 3 is the heaviest, especially when the worker and split storage is not on the same host and bandwith is limited. Actually the cost brought by step 3 is not always necessary. Think about below scenario:
Small table A id name 1 Jin 6 Xing Big table B id age 1 10 2 21 3 33 4 65 5 32 6 23 7 18 8 20 9 22
Run query with SQL: select * from A inner join B on A.id=B.id
It's pretty straight that we don't need to fetch all the data from Table B, because the number of matched keys is really small;
I propose to use small table to build a bloom filter(BF) and use the generated BF as a predicate/filter to fetch data from big table, thus:
1. Much traffic/bandwith is saved.
2. Less data to processe by worker
Broadcast join is just an example, other types of join will also benefit if we scan with a BF
In a nutshell, I think Kudu can provide an iterface, by which user can scan data with bloom filters
Here I want add some statistics for Spark-Kudu integration with/without BloomBloomFilter.
In our product environment the bandwidth of each executor is 50M bps.
We do inner join with two tables – – one is large and another one is comparatively small.
In Spark, inner join can be implemented as SortMergeJoin or BroadcastHashJoin, we implemented the corresponding operators with BloomFilter as SortMergeBloomFilterJoin and BroadcastBloomFilterJoin.
The hash table of BloomFilter is configured as 32M.
I show statistics as below:
Records of Table A | Records of Table B | Join Operator | Executor Time |
---|---|---|---|
400 thousand | 14 billion | SortMergeJoin | 760 seconds |
400 thousand | 14 billion | BroadcastHashJoin | 376s |
400 thousand | 14 billion | BroadcastBloomFilterJoin | 21s |
2 million | 14 billion | SortMergeJoin | 707s |
2 million | 14 billion | BroadcastHashJoin | 329s |
2 million | 14 billion | SortMergeBloomFilterJoin | 75s |
2 million | 14 billion | BroadcastBloomFilterJoin | 35s |
As we can see, it benefit a lot from BloomFilter-PushDown.
I want to take this jira as a umbrella and my workmates will submit following sub-task/pr.
It will be great if some can take more look at this and share some comments.
Attachments
Attachments
Issue Links
- is related to
-
IMPALA-3741 Push bloom filters to Kudu scanners
- Resolved
1.
|
Java Implementation for BloomFilter | Resolved | Jin Xing | |
2.
|
Add bloomfilter predicate in server side. | Resolved | ZhangYao |