Uploaded image for project: 'Kudu'
  1. Kudu
  2. KUDU-2483

Scan tablets with bloom filter

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.12.0
    • client

    Description

      Join is really common/popular in Spark SQL, in this JIRA I take broadcast join as an example and describe how Kudu's bloom filter can help accelerate distributed computing.

      Spark runs broadcast join with below steps:
      1. When do broadcast join, we have a small table and a big table; Spark will read all data from small table to one worker and build a hash table;
      2. The generated hash table from step 1 is broadcasted to all the workers, which will read the splits from big table;
      3. Workers start fetching and iterating all the splits of big table and see if the joining keys exists in the hash table; Only matched joining keys is retained.

      From above, step 3 is the heaviest, especially when the worker and split storage is not on the same host and bandwith is limited. Actually the cost brought by step 3 is not always necessary. Think about below scenario:

      Small table A
      id      name
      1      Jin
      6      Xing
      
      Big table B
      id     age
      1      10
      2      21
      3      33
      4      65
      5      32
      6      23
      7      18
      8      20
      9      22
      

      Run query with SQL: select * from A inner join B on A.id=B.id

      It's pretty straight that we don't need to fetch all the data from Table B, because the number of matched keys is really small;

      I propose to use small table to build a bloom filter(BF) and use the generated BF as a predicate/filter to fetch data from big table, thus:
      1. Much traffic/bandwith is saved.
      2. Less data to processe by worker

      Broadcast join is just an example, other types of join will also benefit if we scan with a BF

      In a nutshell, I think Kudu can provide an iterface, by which user can scan data with bloom filters

       

      Here I want add some statistics for Spark-Kudu integration with/without BloomBloomFilter.

      In our product environment the bandwidth of each executor is 50M bps.

      We do inner join with two tables – – one is large and another one is comparatively small.

      In Spark, inner join can be implemented as SortMergeJoin or BroadcastHashJoin, we implemented the corresponding operators with BloomFilter as SortMergeBloomFilterJoin and BroadcastBloomFilterJoin.

      The hash table of BloomFilter is configured as 32M. 

      I show statistics as below:

      Records of Table A Records of Table B Join Operator Executor Time
      400 thousand 14 billion SortMergeJoin 760 seconds
      400 thousand 14 billion BroadcastHashJoin 376s
      400 thousand 14 billion BroadcastBloomFilterJoin 21s
      2 million 14 billion SortMergeJoin 707s
      2 million 14 billion BroadcastHashJoin 329s
      2 million 14 billion SortMergeBloomFilterJoin 75s
      2 million 14 billion BroadcastBloomFilterJoin 35s

      As we can see, it benefit a lot from BloomFilter-PushDown. 

      I want to take this jira  as a umbrella and my workmates will submit following sub-task/pr.

      It will be great if some can take more look at this and share some comments. 

       

      Attachments

        1. KUDU-2483
          21 kB
          Jin Xing
        2. image-2018-07-01-23-29-05-517.png
          121 kB
          Jin Xing
        3. BloomFilter+Design+Doc.pdf
          76 kB
          Jin Xing

        Issue Links

          There are no Sub-Tasks for this issue.

          Activity

            People

              bankim Bankim Bhavsar
              jinxing6042@126.com Jin Xing
              Votes:
              8 Vote for this issue
              Watchers:
              20 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: