Hive
  1. Hive
  2. HIVE-819

Add lazy decompress ability to RCFile

    Details

    • Hadoop Flags:
      Reviewed

      Description

      This is especially useful for a filter scanning.
      For example, for query 'select a, b, c from table_rc_lazydecompress where a>1;' we only need to decompress the block data of b,c columns when one row's column 'a' in that block satisfies the filter condition.

      1. hive-819-2009-9-21.patch
        17 kB
        He Yongqiang
      2. hive-819-2009-9-12.patch
        14 kB
        He Yongqiang
      3. hive-819-2009-11-1.patch
        20 kB
        He Yongqiang

        Activity

        He Yongqiang created issue -
        Hide
        He Yongqiang added a comment -

        A draft version adding a call back to do lazy decompression. Need to do more profile.
        One experiment on a 115M compressed input file uservisits,
        "SELECT sourceip, desturl, visitdate, useragent, countrycode, duration FROM uservisits_rc where duration >9;" was reduced from 20+s to 14seconds.
        However, after changing filter condition from 9 to 8, the execution time is increased by 4s. That's too bad, and need to do more profile to find out.

        Show
        He Yongqiang added a comment - A draft version adding a call back to do lazy decompression. Need to do more profile. One experiment on a 115M compressed input file uservisits, "SELECT sourceip, desturl, visitdate, useragent, countrycode, duration FROM uservisits_rc where duration >9;" was reduced from 20+s to 14seconds. However, after changing filter condition from 9 to 8, the execution time is increased by 4s. That's too bad, and need to do more profile to find out.
        He Yongqiang made changes -
        Field Original Value New Value
        Attachment hive-819-2009-9-12.patch [ 12419398 ]
        Hide
        He Yongqiang added a comment -

        It turns out that changing the filter condition from 9 to 8, the number of decompressions is not reduced at all compared to eager decompression. That means it needs to decompress all bytes just not applying lazy decompression. It needs to decompress all block data of other needed columns because there is always one row with duration >9 in every block...

        Show
        He Yongqiang added a comment - It turns out that changing the filter condition from 9 to 8, the number of decompressions is not reduced at all compared to eager decompression. That means it needs to decompress all bytes just not applying lazy decompression. It needs to decompress all block data of other needed columns because there is always one row with duration >9 in every block...
        Zheng Shao made changes -
        Assignee He Yongqiang [ he yongqiang ]
        Hide
        Ning Zhang added a comment -

        A few general comments:

        1) Can you briefly summarize the current approach of how decompression is done and the your proposal to the lazy decompression? Also more comments in the code would be much helpful.

        2) Does the performance regression by 4 secs with the query predicate duration > 8 consistent or intermittent? If it is the former is there any additional changes that causes this regression (I thought the worst case would be decompress all columns, as you mentioned, which is equivalent to the previous behavior?). If the latter, what method of timing are you using? If you have YourKit can your also do CPU profiling?

        Show
        Ning Zhang added a comment - A few general comments: 1) Can you briefly summarize the current approach of how decompression is done and the your proposal to the lazy decompression? Also more comments in the code would be much helpful. 2) Does the performance regression by 4 secs with the query predicate duration > 8 consistent or intermittent? If it is the former is there any additional changes that causes this regression (I thought the worst case would be decompress all columns, as you mentioned, which is equivalent to the previous behavior?). If the latter, what method of timing are you using? If you have YourKit can your also do CPU profiling?
        Hide
        He Yongqiang added a comment -

        >>Can you briefly summarize the current approach of how decompression is done and the your proposal to the lazy decompression? Also more comments in the code would be much helpful.
        np. Currently compression is eager. The needed columns info is passed into reader, and the reader will skip unneeded columns and only read needed columns into memory and decompress them immediately when they are read.
        Lazy decompression is done by not decompress needed columns at the first place, just hold the uncompressed bytes in memory, and pass a call back object to BytesRefWritable. The patch added an interface LazyDecompressionCallback, and RCFile's reader implemented it as a LazyDecompressionCallbackImpl. LazyDecompressionCallback is used to constuct BytesRefWritable, and when BytesRefWritable.getData() etc is called(that's the entry between ColumnSerde,ColumnStruct and BytesRefWritable) when need to convert underlying bytes to objects, the call back method is invoked and decompression happens.

        >>Does the performance regression by 4 secs with the query predicate duration > 8 consistent or intermittent?
        intermittent. i tested it more times after did the comments.
        >>If the latter, what method of timing are you using?
        i just submit a simple hive select query in local mode and use the query finish time.

        Show
        He Yongqiang added a comment - >>Can you briefly summarize the current approach of how decompression is done and the your proposal to the lazy decompression? Also more comments in the code would be much helpful. np. Currently compression is eager. The needed columns info is passed into reader, and the reader will skip unneeded columns and only read needed columns into memory and decompress them immediately when they are read. Lazy decompression is done by not decompress needed columns at the first place, just hold the uncompressed bytes in memory, and pass a call back object to BytesRefWritable. The patch added an interface LazyDecompressionCallback, and RCFile's reader implemented it as a LazyDecompressionCallbackImpl. LazyDecompressionCallback is used to constuct BytesRefWritable, and when BytesRefWritable.getData() etc is called(that's the entry between ColumnSerde,ColumnStruct and BytesRefWritable) when need to convert underlying bytes to objects, the call back method is invoked and decompression happens. >>Does the performance regression by 4 secs with the query predicate duration > 8 consistent or intermittent? intermittent. i tested it more times after did the comments. >>If the latter, what method of timing are you using? i just submit a simple hive select query in local mode and use the query finish time.
        Hide
        Ning Zhang added a comment -

        Yongqiang, thanks for the explanation! Below are some more detailed comments:

        1) in RCFile.c:307 it seems decompress() can be called multiple times and the function doesn't check if the data is already decompressed, and if so return. This may not cause problem in this diff since the callers will check if the data is decompressed or not before calling decompress(), but since it is a public function and it doesn't prevent future callers call this function twice. So it may be better to implement this check inside the decompress() function.

        2) Also the same decompress() function, it seems it doesn't work correctly when the column is not compressed. Can you double check it?

        3) Add unit tests or qfiles for the following cases:

        • storage dimension:
          (1) fields are compressed
          (2) fields are uncompressed
        • queries dimension:
          (a) 1 column in the where-clause
          (b) 2 references to the same column in the where-clause (e.g., a> 2 and a < 5)
          (c) 2 references to the same column in the where-clause and groupby-clause respectively (e.g., where a > 2 group by a).

        So there will be 6 test cases w/ the permutation of the 2 dimensions. For (b) and (c) please check the actual column decompression is only done once.

        Show
        Ning Zhang added a comment - Yongqiang, thanks for the explanation! Below are some more detailed comments: 1) in RCFile.c:307 it seems decompress() can be called multiple times and the function doesn't check if the data is already decompressed, and if so return. This may not cause problem in this diff since the callers will check if the data is decompressed or not before calling decompress(), but since it is a public function and it doesn't prevent future callers call this function twice. So it may be better to implement this check inside the decompress() function. 2) Also the same decompress() function, it seems it doesn't work correctly when the column is not compressed. Can you double check it? 3) Add unit tests or qfiles for the following cases: storage dimension: (1) fields are compressed (2) fields are uncompressed queries dimension: (a) 1 column in the where-clause (b) 2 references to the same column in the where-clause (e.g., a> 2 and a < 5) (c) 2 references to the same column in the where-clause and groupby-clause respectively (e.g., where a > 2 group by a). So there will be 6 test cases w/ the permutation of the 2 dimensions. For (b) and (c) please check the actual column decompression is only done once.
        Hide
        He Yongqiang added a comment -

        >>1) in RCFile.c:307 it seems decompress() can be called multiple times and the function doesn't check if the data is already decompressed, and if so return. This may not cause problem in this diff since the callers will check if the data is decompressed or not before calling decompress(), but since it is a public function and it doesn't prevent future callers call this function twice. So it may be better to implement this check inside the decompress() function.

        The only entrance of RCFile -> LazyDecompressionCallbackImpl's decompress() is from BytesRefWritable. If we checked if it is already decompressed inside BytesRefWritable, do we need to add that check also in LazyDecompressionCallbackImpl?

        >>2) Also the same decompress() function, it seems it doesn't work correctly when the column is not compressed. Can you double check it?
        From my tests, it works correctly for not compressed data.

        >>3)
        added tests:

        DROP TABLE rcfileTableLazyDecompress;
        CREATE table rcfileTableLazyDecompress (key STRING, value STRING) STORED AS RCFile;
        
        FROM src
        INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10;
        
        SELECT key, value FROM rcfileTableLazyDecompress where key > 238;
        
        SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400;
        
        SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key;
        
        set mapred.output.compress=true;
        set hive.exec.compress.output=true;
        
        FROM src
        INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10;
        
        SELECT key, value FROM rcfileTableLazyDecompress where key > 238;
        
        SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400;
        
        SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key;
        
        set mapred.output.compress=false;
        set hive.exec.compress.output=false;
        
        DROP TABLE rcfileTableLazyDecompress;
        

        Ning, thanks for your suggestions! Do i miss tests for your comments?
        For the check to avoid call decompress multiple times, what do you think if we move the check from BytesRefWritable to LazyDecompressionCallbackImpl? There still will be
        some minor check duplication.

        Show
        He Yongqiang added a comment - >>1) in RCFile.c:307 it seems decompress() can be called multiple times and the function doesn't check if the data is already decompressed, and if so return. This may not cause problem in this diff since the callers will check if the data is decompressed or not before calling decompress(), but since it is a public function and it doesn't prevent future callers call this function twice. So it may be better to implement this check inside the decompress() function. The only entrance of RCFile -> LazyDecompressionCallbackImpl's decompress() is from BytesRefWritable. If we checked if it is already decompressed inside BytesRefWritable, do we need to add that check also in LazyDecompressionCallbackImpl? >>2) Also the same decompress() function, it seems it doesn't work correctly when the column is not compressed. Can you double check it? From my tests, it works correctly for not compressed data. >>3) added tests: DROP TABLE rcfileTableLazyDecompress; CREATE table rcfileTableLazyDecompress (key STRING, value STRING) STORED AS RCFile; FROM src INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10; SELECT key, value FROM rcfileTableLazyDecompress where key > 238; SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400; SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key; set mapred.output.compress=true; set hive.exec.compress.output=true; FROM src INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10; SELECT key, value FROM rcfileTableLazyDecompress where key > 238; SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400; SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key; set mapred.output.compress=false; set hive.exec.compress.output=false; DROP TABLE rcfileTableLazyDecompress; Ning, thanks for your suggestions! Do i miss tests for your comments? For the check to avoid call decompress multiple times, what do you think if we move the check from BytesRefWritable to LazyDecompressionCallbackImpl? There still will be some minor check duplication.
        Hide
        Ning Zhang added a comment -

        Hi Yongqiang, the tests look good. It should cover most cases. Other queries such as map-reduce joins, map-side joins, UDF, UDAF, etc may fall into the same code path. Namit and Zheng may correct me if I'm wrong.

        As for where the check of double-decompression should be put, I prefer putting it in LazyDecompressionCallbackImpl since this integrity check is introduced by the lazy decompression, thus is part of its responsibility. And there may be more callers besides BytesRefWritable to LazyDecompressionCallbackImpl. If putting it in LazyDecompressionCallbackImpl, we dont' need to implement the checking in each of its callers.

        Show
        Ning Zhang added a comment - Hi Yongqiang, the tests look good. It should cover most cases. Other queries such as map-reduce joins, map-side joins, UDF, UDAF, etc may fall into the same code path. Namit and Zheng may correct me if I'm wrong. As for where the check of double-decompression should be put, I prefer putting it in LazyDecompressionCallbackImpl since this integrity check is introduced by the lazy decompression, thus is part of its responsibility. And there may be more callers besides BytesRefWritable to LazyDecompressionCallbackImpl. If putting it in LazyDecompressionCallbackImpl, we dont' need to implement the checking in each of its callers.
        Hide
        He Yongqiang added a comment -

        hive-819-2009-9-21.patch integrates Ning's suggestions. Thanks Ning!

        Show
        He Yongqiang added a comment - hive-819-2009-9-21.patch integrates Ning's suggestions. Thanks Ning!
        He Yongqiang made changes -
        Attachment hive-819-2009-9-21.patch [ 12420197 ]
        Hide
        Ning Zhang added a comment -

        Looks good.

        +1

        Show
        Ning Zhang added a comment - Looks good. +1
        Hide
        Namit Jain added a comment -

        will merge is the tests pass

        Show
        Namit Jain added a comment - will merge is the tests pass
        Hide
        Namit Jain added a comment -

        ql/src/test/queries/clientpositive/rcfile_lazydecompress.q
        failed

        Show
        Namit Jain added a comment - ql/src/test/queries/clientpositive/rcfile_lazydecompress.q failed
        Hide
        He Yongqiang added a comment -

        update the patch to the trunk code (overwrite the .q.out file).

        Show
        He Yongqiang added a comment - update the patch to the trunk code (overwrite the .q.out file).
        He Yongqiang made changes -
        Attachment hive-819-2009-11-1.patch [ 12423787 ]
        Hide
        Namit Jain added a comment -

        Merged in trunk - Thanks Yongqiang

        Show
        Namit Jain added a comment - Merged in trunk - Thanks Yongqiang
        Namit Jain made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Hadoop Flags [Reviewed]
        Resolution Fixed [ 1 ]
        Carl Steinbach made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open Resolved Resolved
        54d 15h 27m 1 Namit Jain 02/Nov/09 19:09
        Resolved Resolved Closed Closed
        774d 4h 56m 1 Carl Steinbach 17/Dec/11 00:06

          People

          • Assignee:
            He Yongqiang
            Reporter:
            He Yongqiang
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development