Hive
  1. Hive
  2. HIVE-1721

use bloom filters to improve the performance of joins

    Details

      Description

      In case of map-joins, it is likely that the big table will not find many matching rows from the small table.
      Currently, we perform a hash-map lookup for every row in the big table, which can be pretty expensive.

      It might be useful to try out a bloom-filter containing all the elements in the small table.
      Each element from the big table is first searched in the bloom filter, and only in case of a positive match,
      the small table hash table is explored.

      1. hive-1721.patch.txt
        13 kB
        Siddhartha Gunda

        Activity

        Hide
        Siddhartha Gunda added a comment -

        I created some UDF and UDAF functions using which we can create bloom filters and also use it.

        Sample Ways to use them-
        STEP 1 : CREATE TEMPORARY FUNCTION bloom AS 'org.apache.hadoop.hive.contrib.genericudaf.GenericUDAFBuildBloom';
        STEP 2 : CREATE TEMPORARY FUNCTION bloom_filter AS 'org.apache.hadoop.hive.contrib.genericudf.GenericUDFBloomFilter';
        STEP 3 : CREATE TABLE 'NameOfBloomFilterTable' as SELECT bloom('HashType', 'NumElements', 'ProbabilityOfFalsePositives',column1,column2,……) FROM 'TableName';
        'NameOfBloomFilterTable' - Give a name to the table in which bloom filter is stored.
        'HashType' - Type of hash functions used to build the bloom filter. Its accepts two inputs, 'jenkins', 'murmur'
        'NumElements' - Number of elements in the table on which the bloom filter is being built
        'ProbabilityOfFalsePositives' - acceptable probability of false positives.

        Example : CREATE TABLE tblBloom as SELECT bloom('jenkins', '20', '0.1',id,str) FROM tblOne;

        STEP 4 : ADD FILE 'PathOfBloomFilterTable';
        Example : ADD FILE /user/hive/warehouse/tblbloom40/000000_0;

        STEP 5 : Sample Use cases
        SELECT *,bloom_filter('jenkins', '20', '0.1', '000000_0', id, str) FROM Table1;

        SELECT *
        FROM Table1
        INNER JOIN Table2
        ON Table1.id = Table2.id
        WHERE bloom_filter('jenkins', '20', '0.1', '000000_0', Table1.id, Table1.str)

        Show
        Siddhartha Gunda added a comment - I created some UDF and UDAF functions using which we can create bloom filters and also use it. Sample Ways to use them- STEP 1 : CREATE TEMPORARY FUNCTION bloom AS 'org.apache.hadoop.hive.contrib.genericudaf.GenericUDAFBuildBloom'; STEP 2 : CREATE TEMPORARY FUNCTION bloom_filter AS 'org.apache.hadoop.hive.contrib.genericudf.GenericUDFBloomFilter'; STEP 3 : CREATE TABLE 'NameOfBloomFilterTable' as SELECT bloom('HashType', 'NumElements', 'ProbabilityOfFalsePositives',column1,column2,……) FROM 'TableName'; 'NameOfBloomFilterTable' - Give a name to the table in which bloom filter is stored. 'HashType' - Type of hash functions used to build the bloom filter. Its accepts two inputs, 'jenkins', 'murmur' 'NumElements' - Number of elements in the table on which the bloom filter is being built 'ProbabilityOfFalsePositives' - acceptable probability of false positives. Example : CREATE TABLE tblBloom as SELECT bloom('jenkins', '20', '0.1',id,str) FROM tblOne; STEP 4 : ADD FILE 'PathOfBloomFilterTable'; Example : ADD FILE /user/hive/warehouse/tblbloom40/000000_0; STEP 5 : Sample Use cases SELECT *,bloom_filter('jenkins', '20', '0.1', '000000_0', id, str) FROM Table1; SELECT * FROM Table1 INNER JOIN Table2 ON Table1.id = Table2.id WHERE bloom_filter('jenkins', '20', '0.1', '000000_0', Table1.id, Table1.str)
        Hide
        Aleksandra Wozniak added a comment -

        What is the status of this task? Is anyone actively working on it?

        Show
        Aleksandra Wozniak added a comment - What is the status of this task? Is anyone actively working on it?
        Hide
        Ashutosh Chauhan added a comment -

        @Alex,
        No jira was created after map-join. I think goal of this jira is to improve performance of joins in case one of the table is big enough that it cant be read in each mapper's memory, but may be amenable to other performance tricks.

        Show
        Ashutosh Chauhan added a comment - @Alex, No jira was created after map-join. I think goal of this jira is to improve performance of joins in case one of the table is big enough that it cant be read in each mapper's memory, but may be amenable to other performance tricks.
        Hide
        Alan Gates added a comment -

        It is possible to do bloom filter generation in parallel, see Pig's BuildBloom UDF for an example. It does require a serial reduce phase, but it is quite small since it involves ORing the bitmaps from all of the Bloom filters built in each map phase.

        Show
        Alan Gates added a comment - It is possible to do bloom filter generation in parallel, see Pig's BuildBloom UDF for an example. It does require a serial reduce phase, but it is quite small since it involves ORing the bitmaps from all of the Bloom filters built in each map phase.
        Hide
        alex gemini added a comment -

        the description said "in case of map-join" , so this jira create before "map side join" implementation?

        Show
        alex gemini added a comment - the description said "in case of map-join" , so this jira create before "map side join" implementation?
        Hide
        Ashutosh Chauhan added a comment -

        Last line should read "then launch second MR job to do step 3 & 4"

        Show
        Ashutosh Chauhan added a comment - Last line should read "then launch second MR job to do step 3 & 4"
        Hide
        Ashutosh Chauhan added a comment -

        @Alex,
        Reading the previous comments on jira, this is proposed to work as follows:

        • Create a local task and launch it on client machine, building a bloom filter on medium-sized table. (~200MB)
        • Create a Common Join MR job and launch it on cluster. Also, ship the bloom filter built in previous step to all the mapper nodes (via Distributed Cache).
        • In Mapper, look-up key of every row of large table in bloom filter. If it exists, then send that row to reducer, else filter it out.
        • In reducer, do the cross-product of rows of different table for a given key to get your joined output.

        As outlined above, it will be a win since you will be shuffling much less data from mappers to reducers. Though assumptions are cost of building bloom filter on client machine is small, there is huge difference in sizes of two tables and the join key is highly selective. One or more of these assumptions may be wrong in which case there might be a performance loss. So, there is a trade-off when to use this.

        I don't know if there exists a way to compute bloom filter in distributed fashion. If there is such a way, then you can do the step 1 through a MR job (instead of locally) and on a much larger table and then launch second MR job to do step 2 & 3. Again, there will be trade-offs here.

        Show
        Ashutosh Chauhan added a comment - @Alex, Reading the previous comments on jira, this is proposed to work as follows: Create a local task and launch it on client machine, building a bloom filter on medium-sized table. (~200MB) Create a Common Join MR job and launch it on cluster. Also, ship the bloom filter built in previous step to all the mapper nodes (via Distributed Cache). In Mapper, look-up key of every row of large table in bloom filter. If it exists, then send that row to reducer, else filter it out. In reducer, do the cross-product of rows of different table for a given key to get your joined output. As outlined above, it will be a win since you will be shuffling much less data from mappers to reducers. Though assumptions are cost of building bloom filter on client machine is small, there is huge difference in sizes of two tables and the join key is highly selective. One or more of these assumptions may be wrong in which case there might be a performance loss. So, there is a trade-off when to use this. I don't know if there exists a way to compute bloom filter in distributed fashion. If there is such a way, then you can do the step 1 through a MR job (instead of locally) and on a much larger table and then launch second MR job to do step 2 & 3. Again, there will be trade-offs here.
        Hide
        alex gemini added a comment -

        The original thought is increase the map side small table size,this is very depend on how we chunk big table.if big table is chunk into buckets for 16 buckets,the small table must be automatic change to 16 buckets too(same logic bucket map join).if big table is partition by (region string),the small table also need to partition by (region) first and also make sure the smallest chunk size is not bigger than current small table size define by hive.The partition case is more often,we can avoid common join by always let small table chunk like big table's format.

        Show
        alex gemini added a comment - The original thought is increase the map side small table size,this is very depend on how we chunk big table.if big table is chunk into buckets for 16 buckets,the small table must be automatic change to 16 buckets too(same logic bucket map join).if big table is partition by (region string),the small table also need to partition by (region) first and also make sure the smallest chunk size is not bigger than current small table size define by hive.The partition case is more often,we can avoid common join by always let small table chunk like big table's format.
        Hide
        alex gemini added a comment -

        I'm wondering how we apply bloom filter to big table.we use map side join for small table < 25M, if we use bloom filter build small table,we maybe can increase small table size to 200M, but in big table map stage,we need to read bloom filter and writer intermediate result back to disk and then reading this intermediate result to check the real small table,we still can't hold the actual real small table into memory(correct the logic if I'm wrong),we pay the cost of writer a intermediate result which is very close to final result.In this case we can't increase the map number because it will double the penalty of io.I guess it will only get benefit in three table join on same join key,one small with 2 big.In my opinion the other db system can get benefit of bloom filter is because they can hold the intermediate result in memory for further processing (like oracle) or print it immediate (like hbase).

        Show
        alex gemini added a comment - I'm wondering how we apply bloom filter to big table.we use map side join for small table < 25M, if we use bloom filter build small table,we maybe can increase small table size to 200M, but in big table map stage,we need to read bloom filter and writer intermediate result back to disk and then reading this intermediate result to check the real small table,we still can't hold the actual real small table into memory(correct the logic if I'm wrong),we pay the cost of writer a intermediate result which is very close to final result.In this case we can't increase the map number because it will double the penalty of io.I guess it will only get benefit in three table join on same join key,one small with 2 big.In my opinion the other db system can get benefit of bloom filter is because they can hold the intermediate result in memory for further processing (like oracle) or print it immediate (like hbase).
        Hide
        Mikalai Parafeniuk added a comment -

        Hello.
        I am Mikalaj Parafeniuk from Belarussian State University. I'm a third year student and I'm looking for a project to contribute in terms of GSoC 2012.
        I have already learned a basics about bloom filters from the internet and usefull links from this thread. But i need some further reading. Could you propose some? Also i need to learn Hive codebase. I want to achieve that by fixing small bugs. Could you give me the idea where can i start?

        Show
        Mikalai Parafeniuk added a comment - Hello. I am Mikalaj Parafeniuk from Belarussian State University. I'm a third year student and I'm looking for a project to contribute in terms of GSoC 2012. I have already learned a basics about bloom filters from the internet and usefull links from this thread. But i need some further reading. Could you propose some? Also i need to learn Hive codebase. I want to achieve that by fixing small bugs. Could you give me the idea where can i start?
        Hide
        Srinivasan Sembakkam Rajivelu added a comment -

        @Namit,
        Could you please assign this ticket for me. Can I take it for GSOC 2012.

        Show
        Srinivasan Sembakkam Rajivelu added a comment - @Namit, Could you please assign this ticket for me. Can I take it for GSOC 2012.
        Hide
        Srinivasan Sembakkam Rajivelu added a comment -

        I am planning to take this ticket as my proposal for GSOC 2012. I am very much interested in this Bloom Filter Implementation. Is it possible to assign this ticket to me.

        Show
        Srinivasan Sembakkam Rajivelu added a comment - I am planning to take this ticket as my proposal for GSOC 2012. I am very much interested in this Bloom Filter Implementation. Is it possible to assign this ticket to me.
        Hide
        Carl Steinbach added a comment -

        @Srinivasan: Doesn't look like it. Feel free to have a go at it.

        Show
        Carl Steinbach added a comment - @Srinivasan: Doesn't look like it. Feel free to have a go at it.
        Hide
        Srinivasan Sembakkam Rajivelu added a comment -

        Is any one working with this ticket. Can I take it and work on this!!

        Show
        Srinivasan Sembakkam Rajivelu added a comment - Is any one working with this ticket. Can I take it and work on this!!
        Hide
        Siying Dong added a comment -

        Andrew, what do you mean by "the filter could be built in parallel with an MR job"? Our initial plan was to only build filter based on smaller tables and apply the filter against the big table to reduce data to be shuffled.

        For the syntax, the plan is to use syntax like MAPJOIN. We can do something like SELECT /+ BLOOMFILTER(t1) +/ ... FROM t1 JOIN t2 ...

        Show
        Siying Dong added a comment - Andrew, what do you mean by "the filter could be built in parallel with an MR job"? Our initial plan was to only build filter based on smaller tables and apply the filter against the big table to reduce data to be shuffled. For the syntax, the plan is to use syntax like MAPJOIN. We can do something like SELECT / + BLOOMFILTER(t1) + / ... FROM t1 JOIN t2 ...
        Hide
        John Sichi added a comment -

        Standard practice is to add new conf parameters, with sensible defaults, but including a way to disable the feature completely (and make that the default until we have sufficient confidence in it).

        Show
        John Sichi added a comment - Standard practice is to add new conf parameters, with sensible defaults, but including a way to disable the feature completely (and make that the default until we have sufficient confidence in it).
        Hide
        J. Andrew Key added a comment -

        I may use org.apache.hadoop.util.bloom.BloomFilter http://svn.apache.org/repos/asf/hadoop/common/trunk/common/src/java/org/apache/hadoop/util/bloom/BloomFilter.java.
        Should there be a query syntax for controlling the creation of the bloom filter with options such as:
        1. vector size, hash function type and number?
        2. expected table size and acceptable level of false positives?

        If anyone has any preference or ideas, please share them.

        Show
        J. Andrew Key added a comment - I may use org.apache.hadoop.util.bloom.BloomFilter http://svn.apache.org/repos/asf/hadoop/common/trunk/common/src/java/org/apache/hadoop/util/bloom/BloomFilter.java . Should there be a query syntax for controlling the creation of the bloom filter with options such as: 1. vector size, hash function type and number? 2. expected table size and acceptable level of false positives? If anyone has any preference or ideas, please share them.
        Hide
        J. Andrew Key added a comment -

        Thanks. For large-ish tables the maybe the filter could be built in parallel with an MR job. I will take a look.

        Show
        J. Andrew Key added a comment - Thanks. For large-ish tables the maybe the filter could be built in parallel with an MR job. I will take a look.
        Hide
        Namit Jain added a comment -

        No-one is working on this.
        Feel free to take it over

        Show
        Namit Jain added a comment - No-one is working on this. Feel free to take it over
        Hide
        J. Andrew Key added a comment -

        Is anyone actively working on this? I've worked with Bloom filters before and was wondering if this issue was perhaps abandoned. If anyone has any notes or code for me to review, I would love to take a crack at this one.

        Show
        J. Andrew Key added a comment - Is anyone actively working on this? I've worked with Bloom filters before and was wondering if this issue was perhaps abandoned. If anyone has any notes or code for me to review, I would love to take a crack at this one.
        Hide
        Namit Jain added a comment -

        That depends on the size of the filtered big table:

        To start with, we can do a join of the small table with the filtered big table using the current infrastructure.
        We may need some special tricks for outer joins, but it should be possible

        Show
        Namit Jain added a comment - That depends on the size of the filtered big table: To start with, we can do a join of the small table with the filtered big table using the current infrastructure. We may need some special tricks for outer joins, but it should be possible
        Hide
        Siying Dong added a comment -

        So the idea is, the filtered rows in the big table fit in memory so that we can sort them and pay sequential I/O to read the small table back? Or we do external sort for the filtered rows from the big table?

        Show
        Siying Dong added a comment - So the idea is, the filtered rows in the big table fit in memory so that we can sort them and pay sequential I/O to read the small table back? Or we do external sort for the filtered rows from the big table?
        Hide
        Joydeep Sen Sarma added a comment -

        @Siyin - that's a good question. I don't know statistically how common it is - but we have heard requests along these lines. for example one use case is that one project wants to get some data for a reasonably large subset of the users. one use case we have seen was where 0.2% of users were interesting - but even 0.2% is very large for us. people also use semi-joins and that pretty much says that people want to filter rows out.

        Show
        Joydeep Sen Sarma added a comment - @Siyin - that's a good question. I don't know statistically how common it is - but we have heard requests along these lines. for example one use case is that one project wants to get some data for a reasonably large subset of the users. one use case we have seen was where 0.2% of users were interesting - but even 0.2% is very large for us. people also use semi-joins and that pretty much says that people want to filter rows out.
        Hide
        Namit Jain added a comment -

        Yes, even after all the optimizations, map-join is restricted to tables < ~25M.

        There are lots of scenarios when the small table is ~100M

        Show
        Namit Jain added a comment - Yes, even after all the optimizations, map-join is restricted to tables < ~25M. There are lots of scenarios when the small table is ~100M
        Hide
        Siying Dong added a comment -

        It is a common use case? Small table is so big that it doesn't even fit in memory, but most rows in big table don't match any of those keys.

        Show
        Siying Dong added a comment - It is a common use case? Small table is so big that it doesn't even fit in memory, but most rows in big table don't match any of those keys.
        Hide
        Namit Jain added a comment -

        T2 does not fit in memory completely.
        We create a bloom filter for T2, which fits in memory - the assumption here is that by filtering out a lot of rows from T1, we
        are reducing the number of rows that go to the reducer substantially, which helps the join performance

        Show
        Namit Jain added a comment - T2 does not fit in memory completely. We create a bloom filter for T2, which fits in memory - the assumption here is that by filtering out a lot of rows from T1, we are reducing the number of rows that go to the reducer substantially, which helps the join performance
        Hide
        Joydeep Sen Sarma added a comment -

        a bloom filter takes 10 bits per entry (with reasonable probability. i remember reading this value from wikipedia).

        Our java hash tables take 2000 bytes per key-value pair (based on tests done by Liyin for reasonable sized keys/values).

        So the idea is that if the small table is too big to be loaded into memory - but small enough that it's bloom filter can be stored in memory - then we can first do a filter of the large table and then do the sort.

        Show
        Joydeep Sen Sarma added a comment - a bloom filter takes 10 bits per entry (with reasonable probability. i remember reading this value from wikipedia). Our java hash tables take 2000 bytes per key-value pair (based on tests done by Liyin for reasonable sized keys/values). So the idea is that if the small table is too big to be loaded into memory - but small enough that it's bloom filter can be stored in memory - then we can first do a filter of the large table and then do the sort.
        Hide
        Siying Dong added a comment -

        If T2 is always in a hash table in memory, then all what we have do to check the hash table is to calculate a hash value and search for the table. It is less expensive than calculating k hash values. Only if checking the hash table is much expensive than calculating k hash values (for example, it is on disk), we see enhancement. Am I wrong?

        Show
        Siying Dong added a comment - If T2 is always in a hash table in memory, then all what we have do to check the hash table is to calculate a hash value and search for the table. It is less expensive than calculating k hash values. Only if checking the hash table is much expensive than calculating k hash values (for example, it is on disk), we see enhancement. Am I wrong?
        Hide
        Namit Jain added a comment -

        It works very well if one of the tables (T1) is much larger than the other table (T2).
        T2 is not small enough to be a candidate for map-join.

        Then, via the bloom filter, we can filter out most of the rows of T1

        Show
        Namit Jain added a comment - It works very well if one of the tables (T1) is much larger than the other table (T2). T2 is not small enough to be a candidate for map-join. Then, via the bloom filter, we can filter out most of the rows of T1
        Hide
        Siying Dong added a comment -

        Shouldn't bloom filter even more expensive than normal hash table?
        Instead of calculating one hash, we'll have to calculate k of them. And calculating the hash, is the most expensive part for us.
        It probably works if most of the key is not in small table and keys and values of small tables are saved in disks. I'm not sure it is our use scenario.

        Show
        Siying Dong added a comment - Shouldn't bloom filter even more expensive than normal hash table? Instead of calculating one hash, we'll have to calculate k of them. And calculating the hash, is the most expensive part for us. It probably works if most of the key is not in small table and keys and values of small tables are saved in disks. I'm not sure it is our use scenario.
        Hide
        Namit Jain added a comment -
        Show
        Namit Jain added a comment - Got it, a good article at http://antognini.ch/papers/BloomFilters20080620.pdf
        Hide
        Joydeep Sen Sarma added a comment -

        i am not so sure about this.

        consider a hash table which has a very large number of buckets (relative to the size of the elements in the hashtable). a lookup inside the hashtable stops as soon as we hit an empty bucket. this requires us to only compute the hashcode(). if #buckets >> #elements - then for a miss - the likely average cost of the miss should only be the cost of the hashcode routine.

        now consider a bloom filter. here we have to compute multiple hash codes (or at least one). on top of that - with the added bloom filters - there's an added cost for each positive (many hashcode computations).

        It's very clear from this reasoning that Bloom filters would be more expensive, not less, for small table joins. note that the hashtables in java do allow specification of number of buckets - so the strategy outlined here (of deliberately constructing a sparse hash table) is a feasible one.

        Stepping back - this makes sense - because Bloom filters are designed for large data sets (or at least data sets that don't easily fit in memory) - not small ones (that fit easily in memory).

        It would be more interesting to consider Bloom filters to cover join scenarios that cannot be performed with map join. for example - if the small table had 1M keys and map-join is not able to handle that large a hash table - then one can use bloom filters:

        • filter (probabilistically) large table against medium sized table by looking up against bloom filter of medium-sized table (map-side bloom filter). (Note - this is not a join - just a filter)
        • take filtered output and do sort-merge join against medium sized table (by now the data size should be greatly reduced and the cost of sorting would go down tremendously).

        there's lots of literature around this - it's a pretty well known technique. it's quite different from what's proposed in this jira.

        Show
        Joydeep Sen Sarma added a comment - i am not so sure about this. consider a hash table which has a very large number of buckets (relative to the size of the elements in the hashtable). a lookup inside the hashtable stops as soon as we hit an empty bucket. this requires us to only compute the hashcode(). if #buckets >> #elements - then for a miss - the likely average cost of the miss should only be the cost of the hashcode routine. now consider a bloom filter. here we have to compute multiple hash codes (or at least one). on top of that - with the added bloom filters - there's an added cost for each positive (many hashcode computations). It's very clear from this reasoning that Bloom filters would be more expensive, not less, for small table joins. note that the hashtables in java do allow specification of number of buckets - so the strategy outlined here (of deliberately constructing a sparse hash table) is a feasible one. Stepping back - this makes sense - because Bloom filters are designed for large data sets (or at least data sets that don't easily fit in memory) - not small ones (that fit easily in memory). — It would be more interesting to consider Bloom filters to cover join scenarios that cannot be performed with map join. for example - if the small table had 1M keys and map-join is not able to handle that large a hash table - then one can use bloom filters: filter (probabilistically) large table against medium sized table by looking up against bloom filter of medium-sized table (map-side bloom filter). (Note - this is not a join - just a filter) take filtered output and do sort-merge join against medium sized table (by now the data size should be greatly reduced and the cost of sorting would go down tremendously). there's lots of literature around this - it's a pretty well known technique. it's quite different from what's proposed in this jira.

          People

          • Assignee:
            Unassigned
            Reporter:
            Namit Jain
          • Votes:
            5 Vote for this issue
            Watchers:
            19 Start watching this issue

            Dates

            • Created:
              Updated:

              Development