Hive
  1. Hive
  2. HIVE-352

Make Hive support column based storage

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.4.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      HIVE-352. Column-based storage format RCFile. (Yongqiang He via zshao)

      Description

      column based storage has been proven a better storage layout for OLAP.
      Hive does a great job on raw row oriented storage. In this issue, we will enhance hive to support column based storage.
      Acctually we have done some work on column based storage on top of hdfs, i think it will need some review and refactoring to port it to Hive.

      Any thoughts?

      1. Hive-352-draft-2009-03-30.patch
        60 kB
        He Yongqiang
      2. HIve-352-draft-2009-03-28.patch
        51 kB
        He Yongqiang
      3. hive-352-2009-5-1-3.patch
        158 kB
        He Yongqiang
      4. hive-352-2009-5-1.patch
        155 kB
        He Yongqiang
      5. hive-352-2009-4-30-4.patch
        156 kB
        He Yongqiang
      6. hive-352-2009-4-30-3.patch
        146 kB
        He Yongqiang
      7. hive-352-2009-4-30-2.patch
        146 kB
        He Yongqiang
      8. hive-352-2009-4-27.patch
        142 kB
        He Yongqiang
      9. hive-352-2009-4-23.patch
        139 kB
        He Yongqiang
      10. hive-352-2009-4-22-2.patch
        138 kB
        He Yongqiang
      11. hive-352-2009-4-22.patch
        138 kB
        He Yongqiang
      12. hive-352-2009-4-19.patch
        129 kB
        He Yongqiang
      13. hive-352-2009-4-17.patch
        109 kB
        He Yongqiang
      14. hive-352-2009-4-16.patch
        106 kB
        He Yongqiang
      15. hive-352-2009-4-15.patch
        126 kB
        He Yongqiang
      16. 4-22 progress.txt
        3 kB
        He Yongqiang
      17. 4-22 performance.txt
        6 kB
        He Yongqiang
      18. 4-22 performace2.txt
        8 kB
        He Yongqiang

        Issue Links

          Activity

          Hide
          Joydeep Sen Sarma added a comment -

          thanks for taking this on. this could be pretty awesome.

          traditionally the arguments for columnar storage has been limited 'scan bandwidth' and compression. In practice - we see that scan bandwidth has two components:
          1. disk/file-system bandwidth to read data
          2. compute cost to scan data

          most columnar stores optimize for both (especially because in shared disk architectures - #1 is at premium). However - our limited experience says is that in Hadoop #1 is almost infinite. #2 can still be a bottleneck though. (it is possible that this observation applies because of high hadoop/java compute overheads - regardless - this seems to be reality).

          Given this - i like the idea of a scheme where columns are stored as independent streams inside a block oriented file format (each file block contains a set of rows, however - the organization inside blocks is by column). This does not optimize for #1 - but does optimize for #2 (potentially in conjunction with Hive's interfaces to get one column at a time from IO Libraries). It also gives us nearly equivalent compression.

          (The alternative scheme of having different file(s) per column is also complicated by the fact that locality is almost impossible to ensure and there is no reasonable ways of asking hdfs to colocate different file segments in the near future).

          i would love to understand how you are planning to approach this. will we still use sequencefiles as a container - or should we ditch it? (it wasn't a great fit for hive - given that we don't use the key field - but the best thing we could find). We have seen that having a number of open codecs can hurt in memory usage - that's one open question for me - can we actually afford to open N concurrent compressed streams (assuming each column is stored compressed separately).

          It also seems that one could define a ColumnarInputFormat/OutputFormat as a generic api with different implementations and different pluggable containers underneath - and a scheme of either file per column or columnar in a block approach. in that sense we could build something more generic for hadoop (and then just make sure that hive's lazy serde uses the columnar api for data access - instead of the row based api exposed by current inputformat).

          Show
          Joydeep Sen Sarma added a comment - thanks for taking this on. this could be pretty awesome. traditionally the arguments for columnar storage has been limited 'scan bandwidth' and compression. In practice - we see that scan bandwidth has two components: 1. disk/file-system bandwidth to read data 2. compute cost to scan data most columnar stores optimize for both (especially because in shared disk architectures - #1 is at premium). However - our limited experience says is that in Hadoop #1 is almost infinite. #2 can still be a bottleneck though. (it is possible that this observation applies because of high hadoop/java compute overheads - regardless - this seems to be reality). Given this - i like the idea of a scheme where columns are stored as independent streams inside a block oriented file format (each file block contains a set of rows, however - the organization inside blocks is by column). This does not optimize for #1 - but does optimize for #2 (potentially in conjunction with Hive's interfaces to get one column at a time from IO Libraries). It also gives us nearly equivalent compression. (The alternative scheme of having different file(s) per column is also complicated by the fact that locality is almost impossible to ensure and there is no reasonable ways of asking hdfs to colocate different file segments in the near future). – i would love to understand how you are planning to approach this. will we still use sequencefiles as a container - or should we ditch it? (it wasn't a great fit for hive - given that we don't use the key field - but the best thing we could find). We have seen that having a number of open codecs can hurt in memory usage - that's one open question for me - can we actually afford to open N concurrent compressed streams (assuming each column is stored compressed separately). It also seems that one could define a ColumnarInputFormat/OutputFormat as a generic api with different implementations and different pluggable containers underneath - and a scheme of either file per column or columnar in a block approach. in that sense we could build something more generic for hadoop (and then just make sure that hive's lazy serde uses the columnar api for data access - instead of the row based api exposed by current inputformat).
          Hide
          He Yongqiang added a comment -

          Thanks, Joydeep Sen Sarma. Your feedback is really important.

          1. store schema. block-wise column store or one file per column.
          Our current implementation stores each column in one file. And the most annoying part for us, just as you said, is that currently and even in near future, hdfs does not support to colocate different file segements for columns in a same table. So some operations need to fetch data from a new file(like a mapside hash join, a join with CompositeInputFormat) or need to add new map reduce job to merge data together. Some operations are pretty good for this.
          I think block-wise column is a good point. I will try to imprement it nearly. With different columns collocated in a single block, some operations do not need a reduce part(which is really time-consuming).

          2. compression
          With different columns in different files, some light weight compressions,such as RLE, dictionay and bit vector encoding, can be used. One benefit of these light weight compression algorithms is that some operations does not need to decompression the data.
          If we implement the block-wise column storage, should we also need to specify the light weight compression algorithm for each column or we choose one( like RLE) internally if the data is of good cluster nature? Since dictionary and bit vector should also be supported, the comlumns with these compression algorithms should be also placed in the block-wise columnar file? I think placing these columns in seperate files can be handled more easily? But i do not know whether it can fit into Hive. I am new to Hive.

          having a number of open codecs can hurt in memory usage

          currently I can not think up a solution to avoid this for column per file store.

          3.file format
          yeah. i think we need to add new file formats and their corresponding InputFormats. Currently, we have implemented the VFile(Value File, we do not need to store a key part), and BitMapFile. We have not implemented a DictionayFile, instead we use a header file for VFile to store dictionary entries. The header file for VFile is not needed for some columns and sometimes it is must.
          I think the refactor of file formats should be the start for this issue.

          Thanks again.

          Show
          He Yongqiang added a comment - Thanks, Joydeep Sen Sarma. Your feedback is really important. 1. store schema. block-wise column store or one file per column. Our current implementation stores each column in one file. And the most annoying part for us, just as you said, is that currently and even in near future, hdfs does not support to colocate different file segements for columns in a same table. So some operations need to fetch data from a new file(like a mapside hash join, a join with CompositeInputFormat) or need to add new map reduce job to merge data together. Some operations are pretty good for this. I think block-wise column is a good point. I will try to imprement it nearly. With different columns collocated in a single block, some operations do not need a reduce part(which is really time-consuming). 2. compression With different columns in different files, some light weight compressions,such as RLE, dictionay and bit vector encoding, can be used. One benefit of these light weight compression algorithms is that some operations does not need to decompression the data. If we implement the block-wise column storage, should we also need to specify the light weight compression algorithm for each column or we choose one( like RLE) internally if the data is of good cluster nature? Since dictionary and bit vector should also be supported, the comlumns with these compression algorithms should be also placed in the block-wise columnar file? I think placing these columns in seperate files can be handled more easily? But i do not know whether it can fit into Hive. I am new to Hive. having a number of open codecs can hurt in memory usage currently I can not think up a solution to avoid this for column per file store. 3.file format yeah. i think we need to add new file formats and their corresponding InputFormats. Currently, we have implemented the VFile(Value File, we do not need to store a key part), and BitMapFile. We have not implemented a DictionayFile, instead we use a header file for VFile to store dictionary entries. The header file for VFile is not needed for some columns and sometimes it is must. I think the refactor of file formats should be the start for this issue. Thanks again.
          Hide
          Zheng Shao added a comment -

          Hi Yongqiang,

          Sorry for jumping on this issue late.

          Let me summaries the choices that we have to make:

          A. Put different columns in different files (we can still have column-set - a bunch of columns in the same file)

          B. Put different columns in the same file, but organize it in a block-based way. In a single block, the first column of all rows are in the front, then the second column, etc.
          B1: Write a new FileFormat
          B2: Continue to use SequenceFileFormat
          B2.1: Store a block in multiple records, one record for each column. Use the key to label the beginning of a block (or column id).
          B2.2: Store a block in a single record

          Comparing A and B: 1. B is much easier to implement than A. Hadoop jobs take files as input. If the data is stored in a single file, it's much easier to either read or write to the file. 2. B may have the advantage of locality. 3. B may require a little bit more memory buffer for writing. 4. B may not be as efficient as A in reading since all data need to be read (unless the FileFormat supports "skip" but that might create more random seeks depending on block size).

          Comparing B1 and B2: 1. B1 is much more flexible since we can do whatever we want (especially skip-reading etc); 2. B2 is much easier to do and we naturally enjoy all benefits of SequenceFile: splittable, customizable compression codec.

          Comparing B2.1 and B2.2: 1. B2.2 is easier to implement, because we don't have the problem of splitting different columns of the same block into multiple mappers. 2. B2.1 is potentially more efficient when we allow SequenceFile to skip record and ask Hive to tell us which of the columns can be skipped.

          As a result, I would suggest to try B2.2 as the first exercise, then try B2.1, then B1, then A.

          The amount of work for each level (B2.2, B2.1, B1, A) will probably differ by a factor of 3-5. So it does not hurt much by starting from B2.2, and also the first steps will be good learning steps for the next ones.

          Thoughts?

          Show
          Zheng Shao added a comment - Hi Yongqiang, Sorry for jumping on this issue late. Let me summaries the choices that we have to make: A. Put different columns in different files (we can still have column-set - a bunch of columns in the same file) B. Put different columns in the same file, but organize it in a block-based way. In a single block, the first column of all rows are in the front, then the second column, etc. B1: Write a new FileFormat B2: Continue to use SequenceFileFormat B2.1: Store a block in multiple records, one record for each column. Use the key to label the beginning of a block (or column id). B2.2: Store a block in a single record Comparing A and B: 1. B is much easier to implement than A. Hadoop jobs take files as input. If the data is stored in a single file, it's much easier to either read or write to the file. 2. B may have the advantage of locality. 3. B may require a little bit more memory buffer for writing. 4. B may not be as efficient as A in reading since all data need to be read (unless the FileFormat supports "skip" but that might create more random seeks depending on block size). Comparing B1 and B2: 1. B1 is much more flexible since we can do whatever we want (especially skip-reading etc); 2. B2 is much easier to do and we naturally enjoy all benefits of SequenceFile: splittable, customizable compression codec. Comparing B2.1 and B2.2: 1. B2.2 is easier to implement, because we don't have the problem of splitting different columns of the same block into multiple mappers. 2. B2.1 is potentially more efficient when we allow SequenceFile to skip record and ask Hive to tell us which of the columns can be skipped. As a result, I would suggest to try B2.2 as the first exercise, then try B2.1, then B1, then A. The amount of work for each level (B2.2, B2.1, B1, A) will probably differ by a factor of 3-5. So it does not hurt much by starting from B2.2, and also the first steps will be good learning steps for the next ones. Thoughts?
          Hide
          Joydeep Sen Sarma added a comment -

          >B2.2 is easier to implement, because we don't have the problem of splitting different columns of the same block into multiple mappers.

          for B2.1 - we may be able to control when sequencefile writes out sync markers (or at least we should investigate if that's easy enough to do by extending SequenceFile). the advantage of avoiding reading specific columns seems pretty significant.

          OTOH - one can also easily imagine that SequenceFile does not copy data into a BytesWritable - rather that we have a special Writable structure such that when the read on it is invoked - it just copies the reference to the underlying byte buffer. that way there are no copies of data in sequencefile reader and the application (in this case the columnar format reader) - is able to skip to the relevant sections of data without touching the irrelevant columns. if we do it this way - B2.2 has no performance downside.

          regarding the compression related questions raised by Yongqiang - it seems to me that trying out the most generic compression algorithm (gzip) is better - trying to specify or infer best compression technique per column much harder and something that can be done later. one thing we could do to mitigate the number of open codecs is to simply accumulate all the data uncompressed in a buffer per column and then do the compression in one shot at the end (once we think enough data is accumulated) using just one codec object. this obviously seems non optimal from the point of view of having to scan data multple times - OTOH - there were known issues with older versions of hadoop with lots of open codecs.

          Show
          Joydeep Sen Sarma added a comment - >B2.2 is easier to implement, because we don't have the problem of splitting different columns of the same block into multiple mappers. for B2.1 - we may be able to control when sequencefile writes out sync markers (or at least we should investigate if that's easy enough to do by extending SequenceFile). the advantage of avoiding reading specific columns seems pretty significant. OTOH - one can also easily imagine that SequenceFile does not copy data into a BytesWritable - rather that we have a special Writable structure such that when the read on it is invoked - it just copies the reference to the underlying byte buffer. that way there are no copies of data in sequencefile reader and the application (in this case the columnar format reader) - is able to skip to the relevant sections of data without touching the irrelevant columns. if we do it this way - B2.2 has no performance downside. regarding the compression related questions raised by Yongqiang - it seems to me that trying out the most generic compression algorithm (gzip) is better - trying to specify or infer best compression technique per column much harder and something that can be done later. one thing we could do to mitigate the number of open codecs is to simply accumulate all the data uncompressed in a buffer per column and then do the compression in one shot at the end (once we think enough data is accumulated) using just one codec object. this obviously seems non optimal from the point of view of having to scan data multple times - OTOH - there were known issues with older versions of hadoop with lots of open codecs.
          Hide
          Zheng Shao added a comment -

          Let's do B2.2 first. I guess there will need to be some interface change to make it possible (SerDe now only deserializes one row out of one Writable, while we are looking for multiple rows per Writable). We can use the sequencefile compression support transparently.

          Once B2.2 is done, we can move to B2.1. As Joydeep said, we may need to extend SequenceFile to make split work. At the same time we might want to use SequenceFile record-compression (instead of SequenceFile block-compression) if we can make relatively big records. That will save us the time of decompressing unnecessary columns. Or we can disable SequenceFile compression, and compress record by record by ourselves. As Joydeep said, we will have to decide whether we want to open a big number of codecs at the same time, or buffer all uncompressed data and compress one column by one column when writing out. BZip2Codec needs 100KB to 900KB per compression codec.

          Show
          Zheng Shao added a comment - Let's do B2.2 first. I guess there will need to be some interface change to make it possible (SerDe now only deserializes one row out of one Writable, while we are looking for multiple rows per Writable). We can use the sequencefile compression support transparently. Once B2.2 is done, we can move to B2.1. As Joydeep said, we may need to extend SequenceFile to make split work. At the same time we might want to use SequenceFile record-compression (instead of SequenceFile block-compression) if we can make relatively big records. That will save us the time of decompressing unnecessary columns. Or we can disable SequenceFile compression, and compress record by record by ourselves. As Joydeep said, we will have to decide whether we want to open a big number of codecs at the same time, or buffer all uncompressed data and compress one column by one column when writing out. BZip2Codec needs 100KB to 900KB per compression codec.
          Hide
          He Yongqiang added a comment -

          Thanks, Joydeep and Zheng. The advises are really helpful.
          I have written a draft document according to suggestions from Zheng and Joydeep.
          Here is the link: http://docs.google.com/Doc?id=dc9jpfdr_3ft7w3hc4

          I agree with you guys, we can start from B2, and then B1. And finally find out should we need to add the VFile in.
          BTW, yestoday i also took a look on MapFile, which i found VFile has a same with MapFile in that VFlie sometimes also need an index file. The main difference is that VFile does not need a key part and sometimes even the value's length part. Because a VFile stores one column, each column has a type, and if the data type of that column is fix lengthed, it only needs to store the raw value bytes.

          Show
          He Yongqiang added a comment - Thanks, Joydeep and Zheng. The advises are really helpful. I have written a draft document according to suggestions from Zheng and Joydeep. Here is the link: http://docs.google.com/Doc?id=dc9jpfdr_3ft7w3hc4 I agree with you guys, we can start from B2, and then B1. And finally find out should we need to add the VFile in. BTW, yestoday i also took a look on MapFile, which i found VFile has a same with MapFile in that VFlie sometimes also need an index file. The main difference is that VFile does not need a key part and sometimes even the value's length part. Because a VFile stores one column, each column has a type, and if the data type of that column is fix lengthed, it only needs to store the raw value bytes.
          Hide
          Joydeep Sen Sarma added a comment -

          if u are doing B2.2 - i think it's still pretty easy to make sure that we don't decompress all columns when we only want a few. using sequencefile record compression - that's what will happen - and i think the performance gain might be much less (the benefit would be reduced primarily to better compression of the data due to columnar format)

          In this past i have written a dummywritable class that doesn't deserialize - but just passes the inputstream passed in by hadoop to the application. (the serialization framework does this in a less hacky way - and we could do that as well). if u do it this way - hive serde can get a massive blob of binary data - and then based on header metadata - only decompress the relevant parts of it.

          ie - i don't think we ever need to do B2.1 if we do B2.2 this way.

          Show
          Joydeep Sen Sarma added a comment - if u are doing B2.2 - i think it's still pretty easy to make sure that we don't decompress all columns when we only want a few. using sequencefile record compression - that's what will happen - and i think the performance gain might be much less (the benefit would be reduced primarily to better compression of the data due to columnar format) In this past i have written a dummywritable class that doesn't deserialize - but just passes the inputstream passed in by hadoop to the application. (the serialization framework does this in a less hacky way - and we could do that as well). if u do it this way - hive serde can get a massive blob of binary data - and then based on header metadata - only decompress the relevant parts of it. ie - i don't think we ever need to do B2.1 if we do B2.2 this way.
          Hide
          Prasad Chakka added a comment -

          joydeep, do you mean impose our own record structure within a sequence file record? That is, a sequence file record value contains array of compressed column groups for N rows (N mini-records) and sequence file key contains array of length of compressed column groups(another N mini-records).

          Show
          Prasad Chakka added a comment - joydeep, do you mean impose our own record structure within a sequence file record? That is, a sequence file record value contains array of compressed column groups for N rows (N mini-records) and sequence file key contains array of length of compressed column groups(another N mini-records).
          Hide
          He Yongqiang added a comment -

          Thank you for the advices, joydeep.
          yeah,i am working on B2.2. And i am hoping i can finish a draft version in the coming few days.

          don't decompress all columns when we only want a few

          Using the SequenceFile to implement B2.2( columnar storage in one record), if we use RecordCompression, I think SequenceFile will decompress the whole value part thus all the data. It is really tough, since decompression needs to touch all the data, and if no compression at all then one advantage of columnar storage will lost. Even without compression, SequenceFile still needs to read the whole value part into memory, and disallow skipping data (Hadoop-5553).
          So i guess i may need to discard SequenceFile, which can support skipping data and compression.

          Any comments?

          Show
          He Yongqiang added a comment - Thank you for the advices, joydeep. yeah,i am working on B2.2. And i am hoping i can finish a draft version in the coming few days. don't decompress all columns when we only want a few Using the SequenceFile to implement B2.2( columnar storage in one record), if we use RecordCompression, I think SequenceFile will decompress the whole value part thus all the data. It is really tough, since decompression needs to touch all the data, and if no compression at all then one advantage of columnar storage will lost. Even without compression, SequenceFile still needs to read the whole value part into memory, and disallow skipping data (Hadoop-5553). So i guess i may need to discard SequenceFile, which can support skipping data and compression. Any comments?
          Hide
          He Yongqiang added a comment -

          By "So i guess i may need to discard SequenceFile, which can support skipping data and compression. " I mean:

          i may need to discard SequenceFile, and implement a new one, which has many similarities with SequenceFile but can support skipping data and fine grained compression (in record each column compression).

          Show
          He Yongqiang added a comment - By "So i guess i may need to discard SequenceFile, which can support skipping data and compression. " I mean: i may need to discard SequenceFile, and implement a new one, which has many similarities with SequenceFile but can support skipping data and fine grained compression (in record each column compression).
          Hide
          Joydeep Sen Sarma added a comment -

          it's not clear to me that we need to ditch Sequencefile for the short term. Like Prasad said - we can impose our own structure on the sequencefile record which can allow skipping unnecessary data.

          we cannot use record compression obviously. There are two approaches you can take:

          1. keep using a BytesWritable (or Text) for the 'value' part and impose ur own layout inside this so that the ColumnarSerDe only needs to seek to and decompress the relevant column). This does require one copy of the entire data from sequencefile 'value' to the BytesWritable
          2. use the Hadoop serializer framework (see src/core/org/apache/hadoop/io/serializer) - and get Hadoop to pass u the input stream directly (for reading the 'value' part). The custom deserializer can then be configured via Hive's plan to only copy out the bytes that are of interest to the Hive plan.

          #2 is obviously more complicated - and in practice straighline data copies of hot data is not that expensive (since Hadoop has already done a crc check on all this data and it's typically already in processor caches and fast to scan again).

          So i would try out #1 to begin with.

          Show
          Joydeep Sen Sarma added a comment - it's not clear to me that we need to ditch Sequencefile for the short term. Like Prasad said - we can impose our own structure on the sequencefile record which can allow skipping unnecessary data. we cannot use record compression obviously. There are two approaches you can take: 1. keep using a BytesWritable (or Text) for the 'value' part and impose ur own layout inside this so that the ColumnarSerDe only needs to seek to and decompress the relevant column). This does require one copy of the entire data from sequencefile 'value' to the BytesWritable 2. use the Hadoop serializer framework (see src/core/org/apache/hadoop/io/serializer) - and get Hadoop to pass u the input stream directly (for reading the 'value' part). The custom deserializer can then be configured via Hive's plan to only copy out the bytes that are of interest to the Hive plan. #2 is obviously more complicated - and in practice straighline data copies of hot data is not that expensive (since Hadoop has already done a crc check on all this data and it's typically already in processor caches and fast to scan again). So i would try out #1 to begin with.
          Hide
          He Yongqiang added a comment -

          Thanks, Joydeep and Prasad.
          First i would like to make an update to the recent work:
          I had implemented an initial RCFile which was just a wrapper of SequenceFile, and it relied on Hadoop-5553. Since it seems Hadoop-5553 will not be resolved, I have implemented another RCFile, which copies many code form SequenceFile( especially the Writer code), and provides the same on-disk data layout as SequenceFile.

          Here is a draft description of the new RCFile:
          1) Only record compression or no compression at all.

          In B2.2 we store a bunch of raw rows into one record in a columnar way. So there is no need for block compression, because block compression will decompress all the data.

          2) In-record compression.
          If the writer is created with compress flag, then the value part in one record is compressed but with a column compression style. The layout is like this:

          Record length
          Key length

          {the below is the Key part}

          number_of_rows_in_this_record(vint)
          column_1_ondisk_length(vint),column_1_row_1_value_plain_length, column_1_row_2_value_plain_length,....
          column_2_ondisk_length(vint),column_2_row_1_value_plain_length, column_2_row_2_value_plain_length,....
          ..........

          {the end of the key part} {the begin of the value part}

          Compressed data or plain data of [column_1_row_1_value, column_1_row_2_value,....]
          Compressed data or plain data of [column_2_row_1_value, column_2_row_2_value,....]

          {the end of the value part}

          The key part: KeyBuffer
          The value part : ValueBuffer

          3) the reader

          It now only provides 2 API:
          next(LongWritable rowID): returns the next rowid number. I think it should be refined, because the rowid maybe not real rowid, and it is only the already passed rows from the beginning of the reader.

          List<Bytes> getCurrentRow() will return all the columns raw bytes of one row. Because the reader can let use specify the column ids which should be skipped, so the returned List<Bytes> only contains the unskipped columns bytes. Maybe it is better to store a NullBytes in the returned list to represent a skipped column.

          Show
          He Yongqiang added a comment - Thanks, Joydeep and Prasad. First i would like to make an update to the recent work: I had implemented an initial RCFile which was just a wrapper of SequenceFile, and it relied on Hadoop-5553. Since it seems Hadoop-5553 will not be resolved, I have implemented another RCFile, which copies many code form SequenceFile( especially the Writer code), and provides the same on-disk data layout as SequenceFile. Here is a draft description of the new RCFile: 1) Only record compression or no compression at all. In B2.2 we store a bunch of raw rows into one record in a columnar way. So there is no need for block compression, because block compression will decompress all the data. 2) In-record compression. If the writer is created with compress flag, then the value part in one record is compressed but with a column compression style. The layout is like this: Record length Key length {the below is the Key part} number_of_rows_in_this_record(vint) column_1_ondisk_length(vint),column_1_row_1_value_plain_length, column_1_row_2_value_plain_length,.... column_2_ondisk_length(vint),column_2_row_1_value_plain_length, column_2_row_2_value_plain_length,.... .......... {the end of the key part} {the begin of the value part} Compressed data or plain data of [column_1_row_1_value, column_1_row_2_value,....] Compressed data or plain data of [column_2_row_1_value, column_2_row_2_value,....] {the end of the value part} The key part: KeyBuffer The value part : ValueBuffer 3) the reader It now only provides 2 API: next(LongWritable rowID): returns the next rowid number. I think it should be refined, because the rowid maybe not real rowid, and it is only the already passed rows from the beginning of the reader. List<Bytes> getCurrentRow() will return all the columns raw bytes of one row. Because the reader can let use specify the column ids which should be skipped, so the returned List<Bytes> only contains the unskipped columns bytes. Maybe it is better to store a NullBytes in the returned list to represent a skipped column.
          Hide
          He Yongqiang added a comment -

          One problem with this RCFile is that it needs to know the needed columns in advance, so it can skip and avoid decompress unneeded columns.
          I took a look at Hive's operators and SerDe, it seems that they all take a whole row object as input and do not know which column is needed before processing.
          Like with LazyStruct and StructObjectInspector, they only know which column is needed when getField/getStructFieldData is invoked by operators' evalators( like ExprNodeColumnEvaluator).

          Show
          He Yongqiang added a comment - One problem with this RCFile is that it needs to know the needed columns in advance, so it can skip and avoid decompress unneeded columns. I took a look at Hive's operators and SerDe, it seems that they all take a whole row object as input and do not know which column is needed before processing. Like with LazyStruct and StructObjectInspector, they only know which column is needed when getField/getStructFieldData is invoked by operators' evalators( like ExprNodeColumnEvaluator).
          Hide
          Zheng Shao added a comment -

          @Yongqiang: The reason that we do that "lazy" operation is that there will be "CASE", "IF" and short-circuiting boolean operations which will allow us to skip different columns for different rows.

          We need a new top-level StructObjectInspector which can deserialize the column only when getField/getStructFieldData is called on that column. For all levels below that, we can reuse the current ObjectInspector.

          Show
          Zheng Shao added a comment - @Yongqiang: The reason that we do that "lazy" operation is that there will be "CASE", "IF" and short-circuiting boolean operations which will allow us to skip different columns for different rows. We need a new top-level StructObjectInspector which can deserialize the column only when getField/getStructFieldData is called on that column. For all levels below that, we can reuse the current ObjectInspector.
          Hide
          He Yongqiang added a comment -

          impose our own structure on the sequencefile record which can allow skipping unnecessary data
          impose ur own layout inside this so that the ColumnarSerDe only needs to seek to and decompress the relevant column

          Can you give a detailed intro about the "ur own layout"? Thanks!!

          Show
          He Yongqiang added a comment - impose our own structure on the sequencefile record which can allow skipping unnecessary data impose ur own layout inside this so that the ColumnarSerDe only needs to seek to and decompress the relevant column Can you give a detailed intro about the "ur own layout"? Thanks!!
          Hide
          Raghotham Murthy added a comment -

          Here are a few insightful articles about using a row-based query processor along with a column-store.

          http://www.databasecolumn.com/2008/12/debunking-yet-another-myth-col.html

          http://www.databasecolumn.com/2008/07/debunking-another-myth-columns.html

          Before we go ahead and implement everything, it would be a good idea to do some performance analysis on a small prototype.

          Show
          Raghotham Murthy added a comment - Here are a few insightful articles about using a row-based query processor along with a column-store. http://www.databasecolumn.com/2008/12/debunking-yet-another-myth-col.html http://www.databasecolumn.com/2008/07/debunking-another-myth-columns.html Before we go ahead and implement everything, it would be a good idea to do some performance analysis on a small prototype.
          Hide
          He Yongqiang added a comment -

          Thanks, Raghotham Murthy.
          Besides these two posts, there are also several useful papers,like
          C-Store: A Column-oriented DBMS
          Column-Stores vs. Row-Stores- How Different Are They Really-sigmod08
          A Comparison of C-Store and Row-Store in a Common Framework
          Materialization Strategies in a Column-Oriented DBMS.
          Integrating compression and execution in column-oriented database systems

          In these papers, which are written mostly(all?) by people in vertica, they place most emphasis on the column-oriented execution layer together with a column storage layer. I totally agree with these opinions. And actually we observed that operators with map-reduce approach have many differences with the ones implemented in systems like CStore. And we also found that bitmap compression can extremely reduce the execution time.
          So i guess we can first try to support a column storage layer, and then we can add some column oriented operators and column-specific compression algorithms.
          I will try to provide a small prototype of the storage layer as soon as possible.

          Show
          He Yongqiang added a comment - Thanks, Raghotham Murthy. Besides these two posts, there are also several useful papers,like C-Store: A Column-oriented DBMS Column-Stores vs. Row-Stores- How Different Are They Really-sigmod08 A Comparison of C-Store and Row-Store in a Common Framework Materialization Strategies in a Column-Oriented DBMS. Integrating compression and execution in column-oriented database systems In these papers, which are written mostly(all?) by people in vertica, they place most emphasis on the column-oriented execution layer together with a column storage layer. I totally agree with these opinions. And actually we observed that operators with map-reduce approach have many differences with the ones implemented in systems like CStore. And we also found that bitmap compression can extremely reduce the execution time. So i guess we can first try to support a column storage layer, and then we can add some column oriented operators and column-specific compression algorithms. I will try to provide a small prototype of the storage layer as soon as possible.
          Hide
          He Yongqiang added a comment -

          Also the cost of tuple reconstruction accounts for a large proportion of the whole execution time. In our initial exprements, the reconstruction cost is much higher than the benefit of intergreting the column-execution and the underlying column-storage. The reconstruction is a Map-Reduce join operation. The cost can be extremely reduced in some queries when we can reduce the number of tuples needed to reconstruct. The key to this is a late materialization.
          But in the current B2.2, the localize rows in a single file and adopt a record-level columnar storage, it does not have the tuple reconstruction cost. But it needs a more specific and more flexble compression algorithms, and i strongly recommed to support bitmap file in future. As the main benefit of a columnar strategy, it needs us to add some columnar operators in the next.
          But now let us make the first step, and then add more optimizations.

          Show
          He Yongqiang added a comment - Also the cost of tuple reconstruction accounts for a large proportion of the whole execution time. In our initial exprements, the reconstruction cost is much higher than the benefit of intergreting the column-execution and the underlying column-storage. The reconstruction is a Map-Reduce join operation. The cost can be extremely reduced in some queries when we can reduce the number of tuples needed to reconstruct. The key to this is a late materialization. But in the current B2.2, the localize rows in a single file and adopt a record-level columnar storage, it does not have the tuple reconstruction cost. But it needs a more specific and more flexble compression algorithms, and i strongly recommed to support bitmap file in future. As the main benefit of a columnar strategy, it needs us to add some columnar operators in the next. But now let us make the first step, and then add more optimizations.
          Hide
          He Yongqiang added a comment -

          A draft version of B2.2.
          Thank you a lot for comments.

          Show
          He Yongqiang added a comment - A draft version of B2.2. Thank you a lot for comments.
          Hide
          Zheng Shao added a comment -

          Haven't looked it completely through yet.

          Some initial comments:

          BytesRefWritable.java: You might want to replace all BytesWritable to BytesRefWritable.
          86: I don't understand why equal() can be implemented like this.

          ColumnarSerDe.java: You might want to refactor LazySimpleSerDe to extract out the common functionality, (and reuse them in ColumnarSerDe), instead of inheriting from LazySimpleSerDe. This will give you much better control - the current implementation won't work unless you also override initialize(), serialize() - basically all methods from LazySimpleSerDe.

          If you would like to refactor LazySimpleSerDe to extract out the common functionality for ColumnarSerDe, make sure you follow HIVE-375.

          Since HIVE-375 is not committed yet, you might want to work on HIVE-360 first.

          Show
          Zheng Shao added a comment - Haven't looked it completely through yet. Some initial comments: BytesRefWritable.java: You might want to replace all BytesWritable to BytesRefWritable. 86: I don't understand why equal() can be implemented like this. ColumnarSerDe.java: You might want to refactor LazySimpleSerDe to extract out the common functionality, (and reuse them in ColumnarSerDe), instead of inheriting from LazySimpleSerDe. This will give you much better control - the current implementation won't work unless you also override initialize(), serialize() - basically all methods from LazySimpleSerDe. If you would like to refactor LazySimpleSerDe to extract out the common functionality for ColumnarSerDe, make sure you follow HIVE-375 . Since HIVE-375 is not committed yet, you might want to work on HIVE-360 first.
          Hide
          Zheng Shao added a comment -

          @Raghu: Thanks for the references. For this issue, we are working in a fast-prototyping mode. We try the simplest approach (B2.2) first, gain some experiences, and then go for more time-consuming approaches. So right now we are just building out a prototype.

          Show
          Zheng Shao added a comment - @Raghu: Thanks for the references. For this issue, we are working in a fast-prototyping mode. We try the simplest approach (B2.2) first, gain some experiences, and then go for more time-consuming approaches. So right now we are just building out a prototype.
          Hide
          He Yongqiang added a comment -

          An update version according Zheng's suggestions.
          1) remove all usage of BytesWritable, and replace them with BytesRefWritable
          2) change modifier of rowTypeInfo in LazySimpleSerDe to default
          3) change modifer of uncheckedGetField in LazyStruct from private to protected, which can allow subclasses to override its behaviour
          4) several fixes of initial RCFile
          5) a runnable TestRCFile (the TestRCFile in the previous attatched file can not even pass the compile, my mistake.)

          Thanks,Zheng.

          Show
          He Yongqiang added a comment - An update version according Zheng's suggestions. 1) remove all usage of BytesWritable, and replace them with BytesRefWritable 2) change modifier of rowTypeInfo in LazySimpleSerDe to default 3) change modifer of uncheckedGetField in LazyStruct from private to protected, which can allow subclasses to override its behaviour 4) several fixes of initial RCFile 5) a runnable TestRCFile (the TestRCFile in the previous attatched file can not even pass the compile, my mistake.) Thanks,Zheng.
          Hide
          He Yongqiang added a comment -

          Need to generalize the FileFormat Interface in Hive first. Then it can allow RCFile Format be added more easier.

          Show
          He Yongqiang added a comment - Need to generalize the FileFormat Interface in Hive first. Then it can allow RCFile Format be added more easier.
          Hide
          He Yongqiang added a comment -

          This is the latest work on this issue.
          1) added new columnar serde
          2) new columnar struct
          3) RCFile now includes a partial read test
          4) RCFileOutputFormat now implements HiveOutputForamt

          I think next step, more tests are needed, especaill on HiveOutputFormat.

          Show
          He Yongqiang added a comment - This is the latest work on this issue. 1) added new columnar serde 2) new columnar struct 3) RCFile now includes a partial read test 4) RCFileOutputFormat now implements HiveOutputForamt I think next step, more tests are needed, especaill on HiveOutputFormat.
          Hide
          He Yongqiang added a comment -

          >>more tests are needed, especaill on HiveOutputFormat.
          I mean RCFileOutputFormat.

          Show
          He Yongqiang added a comment - >>more tests are needed, especaill on HiveOutputFormat. I mean RCFileOutputFormat.
          Hide
          Zheng Shao added a comment -

          Please try a simple test for writing/reading using the new SerDe and file format:

          CREATE table columnTable (key STRING, value STRING)
          ROW FORMAT SERDE
            'org.apache.hadoop.hive.serde.columnar.ColumnarSerDe'
          STORED AS
            INPUTFORMAT 'org.apache.hadoop.hive.serde.columnar.RCFIleInputFormat'
            OUTPUTFORMAT 'org.apache.hadoop.hive.serde.columnar.RCFIleOutputFormat';
          
          INSERT OVERWRITE TABLE columnTable
          SELECT src.*
          FROM src;
          
          SELECT *
          FROM columnTable;
          
          Show
          Zheng Shao added a comment - Please try a simple test for writing/reading using the new SerDe and file format: CREATE table columnTable (key STRING, value STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde.columnar.ColumnarSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.serde.columnar.RCFIleInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.serde.columnar.RCFIleOutputFormat'; INSERT OVERWRITE TABLE columnTable SELECT src.* FROM src; SELECT * FROM columnTable;
          Hide
          He Yongqiang added a comment -

          against the latest truck.

          1) added a simple rcfile_columar.q file for test.

          DROP TABLE columnTable;
          CREATE table columnTable (key STRING, value STRING)
          ROW FORMAT SERDE
            'org.apache.hadoop.hive.serde2.lazy.ColumnarSerDe'
          STORED AS
            INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
            OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat';
          
          FROM src
          INSERT OVERWRITE TABLE columnTable SELECT src.key, src.value LIMIT 10;
          describe columnTable;
          
          SELECT columnTable.* FROM columnTable;
          

          2) let ColumnarSerDe's serialize returns BytesRefArrayWritable instead of Text

          BTW, it seems rcfile_columar.q.out does not contain results of SELECT columnTable.* FROM columnTable;
          but after the test, i saw file ql/test/data/warehouse/columntable/attempt_local_0001_r_000000_0, and it did contain the data inserted.
          Why the select got nothing?

          Show
          He Yongqiang added a comment - against the latest truck. 1) added a simple rcfile_columar.q file for test. DROP TABLE columnTable; CREATE table columnTable (key STRING, value STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.ColumnarSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'; FROM src INSERT OVERWRITE TABLE columnTable SELECT src.key, src.value LIMIT 10; describe columnTable; SELECT columnTable.* FROM columnTable; 2) let ColumnarSerDe's serialize returns BytesRefArrayWritable instead of Text BTW, it seems rcfile_columar.q.out does not contain results of SELECT columnTable.* FROM columnTable; but after the test, i saw file ql/test/data/warehouse/columntable/attempt_local_0001_r_000000_0, and it did contain the data inserted. Why the select got nothing?
          Hide
          He Yongqiang added a comment -

          Fixed the select problem.
          And refatored the TestRCFile class.

          Show
          He Yongqiang added a comment - Fixed the select problem. And refatored the TestRCFile class.
          Hide
          Zheng Shao added a comment -

          hive-352-2009-4-17.patch:

          Very nice job!

          2 more tests to add:
          1. Big data test. Take a look at ql/src/test/queries/clientpositive/groupby_bigdata.q to see how we generate big data sets.
          2. Complex column types: Take a look at ./ql/src/test/queries/clientpositive/input_lazyserde.q

          Some other improvements:
          1. ObjectInspectorFactory.getColumnarStructObjectInspector: I think you don't need byte separator and boolean lastColumnTakesRest. Just remove them.
          2. ColumnarStruct.init: Can you cache/reuse the ByteArrayRef() instead of doing ByteArrayRef br = new ByteArrayRef() every time? The assumption in Hive is that data is already owned by creator, and whoever wants to keep the data for later use needs to get a deep copy of the Object by calling ObjectInspectorUtils.copyToStandardObject.
          3. ColumnarStruct: comments should mention the difference against LazyStruct is that it reads data through init(BytesRefArrayWritable cols).
          4. Can you put all changes to serde2.lazy package into a new package called serde2.columnar?
          5. It seems there are a lot of shared code between LazySimpleSerDe and ColumnarSerDe, e.g. a lot of functionalities in init and serialize. Can you refactor LazySimpleSerde and put those common functionalities into public static methods, so that ColumnarSerDe can directly call? You might also want to put the configurations of the LazySimpleSerDe (nullString, separators, etc) into a public static Class, so that the public static methods will return it.

          Show
          Zheng Shao added a comment - hive-352-2009-4-17.patch: Very nice job! 2 more tests to add: 1. Big data test. Take a look at ql/src/test/queries/clientpositive/groupby_bigdata.q to see how we generate big data sets. 2. Complex column types: Take a look at ./ql/src/test/queries/clientpositive/input_lazyserde.q Some other improvements: 1. ObjectInspectorFactory.getColumnarStructObjectInspector: I think you don't need byte separator and boolean lastColumnTakesRest. Just remove them. 2. ColumnarStruct.init: Can you cache/reuse the ByteArrayRef() instead of doing ByteArrayRef br = new ByteArrayRef() every time? The assumption in Hive is that data is already owned by creator, and whoever wants to keep the data for later use needs to get a deep copy of the Object by calling ObjectInspectorUtils.copyToStandardObject. 3. ColumnarStruct: comments should mention the difference against LazyStruct is that it reads data through init(BytesRefArrayWritable cols). 4. Can you put all changes to serde2.lazy package into a new package called serde2.columnar? 5. It seems there are a lot of shared code between LazySimpleSerDe and ColumnarSerDe, e.g. a lot of functionalities in init and serialize. Can you refactor LazySimpleSerde and put those common functionalities into public static methods, so that ColumnarSerDe can directly call? You might also want to put the configurations of the LazySimpleSerDe (nullString, separators, etc) into a public static Class, so that the public static methods will return it.
          Hide
          Zheng Shao added a comment -

          hive-352-2009-4-17.patch:

          Talked with Yongqiang offline. Two more things:

          1. RCFile.readFields is not very efficient (see below). I think we should lazily decompress the stream instead of decompress all of it and return the decompressor. The reason is that decompressed data can be very big and easily go out-of-memory (if we consider 1:10 or more compression ratio)

                     while (deflatFilter.available() > 0)
                        valBuf.write(valueIn, 1);
          

          2. Also we need to think about how we can pass the information of which columns are needed to Hive. Yongqiang is working on designing that. If anybody have good ideas, please chime in.

          Show
          Zheng Shao added a comment - hive-352-2009-4-17.patch: Talked with Yongqiang offline. Two more things: 1. RCFile.readFields is not very efficient (see below). I think we should lazily decompress the stream instead of decompress all of it and return the decompressor. The reason is that decompressed data can be very big and easily go out-of-memory (if we consider 1:10 or more compression ratio) while (deflatFilter.available() > 0) valBuf.write(valueIn, 1); 2. Also we need to think about how we can pass the information of which columns are needed to Hive. Yongqiang is working on designing that. If anybody have good ideas, please chime in.
          Hide
          Zheng Shao added a comment -

          2 major approaches for the RCFileFormat to work are:
          1. Lazy deserialization (and decompression): The Objects passed around in the Hive Operators can be wrappers of handles to underlying decompression streams which will decompress the data on the fly.
          2. Column-hinting: Let Hive tell the FileFormat which columns are neede and which are not.

          There is a major benefit of Option 1 in a common case like this:

          SELECT key, value1, value2, value3, value4 from columnarTable where key = 'xxyyzz';
          

          if the selectivity of "key = 'xxyyzz'" is really high, we will end up decompressing very few blocks of value1 to value4.
          This is not possible with Option 2.

          Show
          Zheng Shao added a comment - 2 major approaches for the RCFileFormat to work are: 1. Lazy deserialization (and decompression): The Objects passed around in the Hive Operators can be wrappers of handles to underlying decompression streams which will decompress the data on the fly. 2. Column-hinting: Let Hive tell the FileFormat which columns are neede and which are not. There is a major benefit of Option 1 in a common case like this: SELECT key, value1, value2, value3, value4 from columnarTable where key = 'xxyyzz'; if the selectivity of "key = 'xxyyzz'" is really high, we will end up decompressing very few blocks of value1 to value4. This is not possible with Option 2.
          Hide
          He Yongqiang added a comment -

          Agreed.
          Can we have both?
          1 is absolutely better for high selectivity filter clauses. With 2, we can skip loading unnecessary (compressed) columns into memory.
          I have done a simple RCFile perform test in my local single machine. It seems RCFile perform much better in reading than block-compressed sequence file. I think the performance improvements should attribute to the skip strategy.
          The below is a coarse results of comparing RCFile with SequenceFile (in local):

          Write RCFile with 10 random string columns and 100000 rows cost 9851 milliseconds. And the file's on disk size is 50527070
          Read only one column of a RCFile with 10 random string columns and 100000 rows cost 448 milliseconds.
          Write SequenceFile with 10  random string columns and 100000 rows cost 18405 milliseconds. And the file's on disk size is 52684063
          Read SequenceFile with 10  random string columns and 100000 rows cost 9418 milliseconds.
          Write RCFile with 25 random string columns and 100000 rows cost 15112 milliseconds. And the file's on disk size is 126262141
          Read only one column of a RCFile with 25 random string columns and 100000 rows cost 467 milliseconds.
          Write SequenceFile with 25  random string columns and 100000 rows cost 45586 milliseconds. And the file's on disk size is 131355387
          Read SequenceFile with 25  random string columns and 100000 rows cost 22013 milliseconds.
          

          I will post more detailed test results together with next patch.

          Show
          He Yongqiang added a comment - Agreed. Can we have both? 1 is absolutely better for high selectivity filter clauses. With 2, we can skip loading unnecessary (compressed) columns into memory. I have done a simple RCFile perform test in my local single machine. It seems RCFile perform much better in reading than block-compressed sequence file. I think the performance improvements should attribute to the skip strategy. The below is a coarse results of comparing RCFile with SequenceFile (in local): Write RCFile with 10 random string columns and 100000 rows cost 9851 milliseconds. And the file's on disk size is 50527070 Read only one column of a RCFile with 10 random string columns and 100000 rows cost 448 milliseconds. Write SequenceFile with 10 random string columns and 100000 rows cost 18405 milliseconds. And the file's on disk size is 52684063 Read SequenceFile with 10 random string columns and 100000 rows cost 9418 milliseconds. Write RCFile with 25 random string columns and 100000 rows cost 15112 milliseconds. And the file's on disk size is 126262141 Read only one column of a RCFile with 25 random string columns and 100000 rows cost 467 milliseconds. Write SequenceFile with 25 random string columns and 100000 rows cost 45586 milliseconds. And the file's on disk size is 131355387 Read SequenceFile with 25 random string columns and 100000 rows cost 22013 milliseconds. I will post more detailed test results together with next patch.
          Hide
          Zheng Shao added a comment -

          I am surprised that RCFile is at least 2 times faster than SequenceFile in writing. What do you think is the reason?

          Show
          Zheng Shao added a comment - I am surprised that RCFile is at least 2 times faster than SequenceFile in writing. What do you think is the reason?
          Hide
          He Yongqiang added a comment -

          I am not sure. However, i observed that SequenceFile does much better in writing and in comression ratio if all rows data are the same.
          I will post a patch now, although it is not finished. I have only finished adding big data test and complex column data test. The big data test is added in 2 ways:
          1) added a rcfile_bigdata.q and
          2) add some test codes in TestRCFile. And in it there are also comparison code of SequenceFile and RCFile, RCFile does not perform better in writing and compression ratio, but much better in reading.

          The test results in previous post is generated by class PerformTestRCFileAndSeqFile.

          Show
          He Yongqiang added a comment - I am not sure. However, i observed that SequenceFile does much better in writing and in comression ratio if all rows data are the same. I will post a patch now, although it is not finished. I have only finished adding big data test and complex column data test. The big data test is added in 2 ways: 1) added a rcfile_bigdata.q and 2) add some test codes in TestRCFile. And in it there are also comparison code of SequenceFile and RCFile, RCFile does not perform better in writing and compression ratio, but much better in reading. The test results in previous post is generated by class PerformTestRCFileAndSeqFile.
          Hide
          He Yongqiang added a comment - - edited

          When I am testing RCFile's read performance, I notice severe read performance degradation when the column number get bigger.
          I tentatively doubt that is caused by the DFSClient's DFSInputStream 's skip code as shown below.

          int diff = (int)(targetPos - pos);
                  if (diff <= TCP_WINDOW_SIZE) {
                    try {
                      pos += blockReader.skip(diff);
                      if (pos == targetPos) {
                        done = true;
                      }
                    } catch (IOException e) {//make following read to retry
                      LOG.debug("Exception while seek to " + targetPos + " from "
                                + currentBlock +" of " + src + " from " + currentNode + 
                                ": " + StringUtils.stringifyException(e));
                    }
                  }
          

          It seems that if I remove this piece of code, everything still works correctly ( in my test code).
          Got an offline discussion with Zheng, and made several draft improvements:
          1. compress each column directly, it means keep one codec for each column and write data directly to the column's corresponding compression stream. Currently RCFile buffers all the data first. When buffered data is greater than a config, compress each column separately and flush them out. The direct compression strategy can increase the compression ratio.(this is not related the severe read performance degradation problem)

          To overcome the bad skip performance caused by TCP_WINDOW_SIZE(128K and not changeable at all),
          2. make continuous skips to a single skip, that way it would increase the bytes need skipped and increase the probability of not executing statements in the above if block.
          3. Enhance the RCFile writer code to be aware of TCP_WINDOW_SIZE, and let most columns be greater than TCP_WINDOW_SIZE as far as possible.
          To do this, we need add a SLOP variable to allow buffer size be greater than configured size (default 4M) and less than SIZE*SLOP. With this we can let most columns' buffer data be greater TCP_WINDOW_SIZE as possible as we can.
          This method has many limitations when columns are getting more and more (we guess >100).
          Another way to do this is to let the buffer size be TCP_WINDOW_SIZE(128K) *columnNumber.
          Anyway, all the solutions I can think up can only ease the situation.

          Any thoughts on this? Thanks!

          Show
          He Yongqiang added a comment - - edited When I am testing RCFile's read performance, I notice severe read performance degradation when the column number get bigger. I tentatively doubt that is caused by the DFSClient's DFSInputStream 's skip code as shown below. int diff = (int)(targetPos - pos); if (diff <= TCP_WINDOW_SIZE) { try { pos += blockReader.skip(diff); if (pos == targetPos) { done = true; } } catch (IOException e) {//make following read to retry LOG.debug("Exception while seek to " + targetPos + " from " + currentBlock +" of " + src + " from " + currentNode + ": " + StringUtils.stringifyException(e)); } } It seems that if I remove this piece of code, everything still works correctly ( in my test code). Got an offline discussion with Zheng, and made several draft improvements: 1. compress each column directly, it means keep one codec for each column and write data directly to the column's corresponding compression stream. Currently RCFile buffers all the data first. When buffered data is greater than a config, compress each column separately and flush them out. The direct compression strategy can increase the compression ratio.(this is not related the severe read performance degradation problem) To overcome the bad skip performance caused by TCP_WINDOW_SIZE(128K and not changeable at all), 2. make continuous skips to a single skip, that way it would increase the bytes need skipped and increase the probability of not executing statements in the above if block. 3. Enhance the RCFile writer code to be aware of TCP_WINDOW_SIZE, and let most columns be greater than TCP_WINDOW_SIZE as far as possible. To do this, we need add a SLOP variable to allow buffer size be greater than configured size (default 4M) and less than SIZE*SLOP. With this we can let most columns' buffer data be greater TCP_WINDOW_SIZE as possible as we can. This method has many limitations when columns are getting more and more (we guess >100). Another way to do this is to let the buffer size be TCP_WINDOW_SIZE(128K) *columnNumber. Anyway, all the solutions I can think up can only ease the situation. Any thoughts on this? Thanks!
          Hide
          He Yongqiang added a comment -

          More explaination to the read sharp decrease problem:
          In our test, we use string columns. the data is randomly produced.
          When column number is only a few, and the buffer size is 4M default. So every column's buffer is more than TCP_WINDOW_SIZE. So when skipping columns, the if block will not executed. But when columns are getting more, the buffer size each column can get become less. And finally, most columns' buffer is less than TCP_WINDOW_SIZE. So the sharp decrease problem appears.

          Show
          He Yongqiang added a comment - More explaination to the read sharp decrease problem: In our test, we use string columns. the data is randomly produced. When column number is only a few, and the buffer size is 4M default. So every column's buffer is more than TCP_WINDOW_SIZE. So when skipping columns, the if block will not executed. But when columns are getting more, the buffer size each column can get become less. And finally, most columns' buffer is less than TCP_WINDOW_SIZE. So the sharp decrease problem appears.
          Hide
          Zheng Shao added a comment -

          I agree we should do 1 and 2, but I don't feel 3 is worth to do.
          As you mentioned, 3 will have memory problems when the number of columns grows very big - in that case, performance degradation is better than out-of-memory problem.

          For 3 the eventual solution would be to fix DFSClient to use a smaller number than TCP_WINDOW_SIZE. But if it is not easy to change DFSClient, and we continue to see this problem, we may want to switch to other solutions (B1 etc). Anyway, if TCP_WINDOW_SIZE is 128KB, and we have 20MB buffer (compressed size), we should be able to handle 160 columns, which is usually good enough for most applications.

          Show
          Zheng Shao added a comment - I agree we should do 1 and 2, but I don't feel 3 is worth to do. As you mentioned, 3 will have memory problems when the number of columns grows very big - in that case, performance degradation is better than out-of-memory problem. For 3 the eventual solution would be to fix DFSClient to use a smaller number than TCP_WINDOW_SIZE. But if it is not easy to change DFSClient, and we continue to see this problem, we may want to switch to other solutions (B1 etc). Anyway, if TCP_WINDOW_SIZE is 128KB, and we have 20MB buffer (compressed size), we should be able to handle 160 columns, which is usually good enough for most applications.
          Hide
          Zheng Shao added a comment -

          Yongqiang talked with me offline since 1 conflicts with Joydeep's earlier comment: "We have seen that having a number of open codecs can hurt in memory usage - that's one open question for me - can we actually afford to open N concurrent compressed streams (assuming each column is stored compressed separately)."

          I think we should do an experiment to see how much memory each concurrent compressing stream takes - in reality, most tables will have less than 100 columns, so I guess if each codec takes 100K-500K memory it's affordable (total memory usage: 10MB-50MB), otherwise we need to rethink about 1. I

          Show
          Zheng Shao added a comment - Yongqiang talked with me offline since 1 conflicts with Joydeep's earlier comment: "We have seen that having a number of open codecs can hurt in memory usage - that's one open question for me - can we actually afford to open N concurrent compressed streams (assuming each column is stored compressed separately)." I think we should do an experiment to see how much memory each concurrent compressing stream takes - in reality, most tables will have less than 100 columns, so I guess if each codec takes 100K-500K memory it's affordable (total memory usage: 10MB-50MB), otherwise we need to rethink about 1. I
          Hide
          He Yongqiang added a comment -

          4-22 progress.txt describes latest modifications hive-352-2009-4-22.patch made.
          4-22 performance.txt gives a simple test results, which can be reproduced by running org.apache.hadoop.hive.ql.io.PerformTestRCFileAndSeqFile (it uses LocalFileSystem). I am not quite confident with the RCFile's read performance, the results show it does much better than SequenceFile. Are there some bugs? I tested with TestRCFile and three added .q files, they all passed in my local.

          Show
          He Yongqiang added a comment - 4-22 progress.txt describes latest modifications hive-352-2009-4-22.patch made. 4-22 performance.txt gives a simple test results, which can be reproduced by running org.apache.hadoop.hive.ql.io.PerformTestRCFileAndSeqFile (it uses LocalFileSystem). I am not quite confident with the RCFile's read performance, the results show it does much better than SequenceFile. Are there some bugs? I tested with TestRCFile and three added .q files, they all passed in my local.
          Hide
          He Yongqiang added a comment -

          According to Zheng's suggestions, hive-352-2009-4-22-2.patch made several improvements againds hive-352-2009-4-22.patch:
          1) let each row data randomly produced in the test be string bytes( the previous one produces binary bytes)
          2) add correctness parameter in performance test to allow test what we read are what we wrote ( in PerformTestRCFileAndSeqFile).

          4-22 performace2.txt added more detailed test results:
          1. local using bulk decompression,in RCFile->ValueBuffer->readFields(), like:

          bufferRef.write(valueIn, columnBlockPlainDataLength);
          

          2. locak not using bulk decompression,in RCFile->ValueBuffer->readFields(), like:

          while(deflateFilter.available()>0)
          bufferRef.write(valueIn, 1);
          

          3. using DistributedFileSystem and bulk decompression, the tests are still run on my local machine

          Here are the brief results(more detail, pls take a look attached 4-22 performace2.txt)

          1.
          (LocalFileSystem)Use Bulk decompression in RCFile->ValueBuffer->ReadFileds, and adds some noisy between two RCFile reading, and after written to avoid disk cache.

          column number| RCFile size | RCFile read 1 column | RCFile read 2 column | RCFile read all columns |Sequence file size | sequence file read all

          10| 11501112| 259| 181| 498| 13046020| 7002
          25| 28725817| 233| 269| 1082| 32246409| 16539
          40| 45940679| 261| 301| 1698| 51436799| 25415

          2.
          (LocalFileSystem)Not bulk decompression in RCFile->ValueBuffer->readFileds, and the test adds some noisy between two RCFile reading, and after written to avoid disk cache.

          column number| RCFile size | RCFile read 1 column | RCFile read 2 column | RCFile read all columns |Sequence file size | sequence file read all
          10| 11501112| 1804 | 3262 | 15956 | 13046020| 6927
          25| 28725817| 1761 | 3310 | 39492 | 32246409| 15983
          40| 45940679| 1843 | 3386 | 63759 | 51436799| 25256

          3.
          (DistributedFileSystem)Use Bulk decompression in RCFile->ValueBuffer->readFileds, and adds some noisy between two RCFile reading, and after written to avoid disk cache.

          column number| RCFile size | RCFile read 1 column | RCFile read 2 column | RCFile read all columns |Sequence file size | sequence file read all

          10| 11501112| 2381| 3516| 9898| 13046020| 18053
          25| 28725817| 3754| 5254| 22521| 32246409| 43258
          40| 45940679| 5597| 8225| 40304| 51436799| 69278

          Show
          He Yongqiang added a comment - According to Zheng's suggestions, hive-352-2009-4-22-2.patch made several improvements againds hive-352-2009-4-22.patch: 1) let each row data randomly produced in the test be string bytes( the previous one produces binary bytes) 2) add correctness parameter in performance test to allow test what we read are what we wrote ( in PerformTestRCFileAndSeqFile). 4-22 performace2.txt added more detailed test results: 1. local using bulk decompression,in RCFile->ValueBuffer->readFields(), like: bufferRef.write(valueIn, columnBlockPlainDataLength); 2. locak not using bulk decompression,in RCFile->ValueBuffer->readFields(), like: while(deflateFilter.available()>0) bufferRef.write(valueIn, 1); 3. using DistributedFileSystem and bulk decompression, the tests are still run on my local machine Here are the brief results(more detail, pls take a look attached 4-22 performace2.txt) 1. (LocalFileSystem)Use Bulk decompression in RCFile->ValueBuffer->ReadFileds, and adds some noisy between two RCFile reading, and after written to avoid disk cache. column number| RCFile size | RCFile read 1 column | RCFile read 2 column | RCFile read all columns |Sequence file size | sequence file read all 10| 11501112| 259| 181| 498| 13046020| 7002 25| 28725817| 233| 269| 1082| 32246409| 16539 40| 45940679| 261| 301| 1698| 51436799| 25415 2. (LocalFileSystem)Not bulk decompression in RCFile->ValueBuffer->readFileds, and the test adds some noisy between two RCFile reading, and after written to avoid disk cache. column number| RCFile size | RCFile read 1 column | RCFile read 2 column | RCFile read all columns |Sequence file size | sequence file read all 10| 11501112| 1804 | 3262 | 15956 | 13046020| 6927 25| 28725817| 1761 | 3310 | 39492 | 32246409| 15983 40| 45940679| 1843 | 3386 | 63759 | 51436799| 25256 3. (DistributedFileSystem)Use Bulk decompression in RCFile->ValueBuffer->readFileds, and adds some noisy between two RCFile reading, and after written to avoid disk cache. column number| RCFile size | RCFile read 1 column | RCFile read 2 column | RCFile read all columns |Sequence file size | sequence file read all 10| 11501112| 2381| 3516| 9898| 13046020| 18053 25| 28725817| 3754| 5254| 22521| 32246409| 43258 40| 45940679| 5597| 8225| 40304| 51436799| 69278
          Hide
          Zheng Shao added a comment -

          The numbers look much reasonable than before. 1.7s to read and decompress 46MB data is plausible. But the sequence file's speed - 25s to read and decompress 51MB data looks a bit too low.

          0. Did you try that with hadoop 0.17.0? "ant -Dhadoop.version=0.17.0 test" etc.

          1. Can you add your tests to ant, or post the testing scripts so that everybody can easily reproduce the test results that you have got?

          2. For DistributedFileSystem, how big is the cluster? Is the file (the file size is small so it's clearly a single block) local?

          3. It seems SequenceFile's compression is not as good as RCFile, although the data is the same and also random. What is the exact record format in sequencefile? Did you put delimitors or you put length of Strings?

          4. 40MB to 50MB is too small for testing. Let's double it to ~100MB but less than 128MB to simulate a single file system block.

          I think we should compare the following 2 approaches:
          BULK. When creating file, store uncompressed data in memory, when limit reached, compress and write out; when reading file, do bulk decompression. This won't go out of memory because decompressed size is bounded by the limit at the file creation;
          NONBULK: When creating file, store compressed data in memory, when (compressed size) limit reached, compress and write out; when reading file, do small chunk decompression to make sure we don't go out of memory.

          The approach of store compressed data at creation, and do bulk decompression at reading is not practical because it's very easy to go out of memory.

          We've done BULK, and it showed great performance (1.6s to read and decompress 40MB local file), but I suspect the compression ratio will be lower than NONBULK.
          Can you compare the compression ratio of BULK and NONBULK, given different buffer sizes and column numbers?
          Also, with NONBULK, we might be able to get bigger compressed blocks, so that for each skip we can skip more than BULK, but this is just a minor issue I think.

          If the compression ratio didn't turn out to be too different, we may just go the BULK approach.

          Show
          Zheng Shao added a comment - The numbers look much reasonable than before. 1.7s to read and decompress 46MB data is plausible. But the sequence file's speed - 25s to read and decompress 51MB data looks a bit too low. 0. Did you try that with hadoop 0.17.0? "ant -Dhadoop.version=0.17.0 test" etc. 1. Can you add your tests to ant, or post the testing scripts so that everybody can easily reproduce the test results that you have got? 2. For DistributedFileSystem, how big is the cluster? Is the file (the file size is small so it's clearly a single block) local? 3. It seems SequenceFile's compression is not as good as RCFile, although the data is the same and also random. What is the exact record format in sequencefile? Did you put delimitors or you put length of Strings? 4. 40MB to 50MB is too small for testing. Let's double it to ~100MB but less than 128MB to simulate a single file system block. I think we should compare the following 2 approaches: BULK. When creating file, store uncompressed data in memory, when limit reached, compress and write out; when reading file, do bulk decompression. This won't go out of memory because decompressed size is bounded by the limit at the file creation; NONBULK: When creating file, store compressed data in memory, when (compressed size) limit reached, compress and write out; when reading file, do small chunk decompression to make sure we don't go out of memory. The approach of store compressed data at creation, and do bulk decompression at reading is not practical because it's very easy to go out of memory. We've done BULK, and it showed great performance (1.6s to read and decompress 40MB local file), but I suspect the compression ratio will be lower than NONBULK. Can you compare the compression ratio of BULK and NONBULK, given different buffer sizes and column numbers? Also, with NONBULK, we might be able to get bigger compressed blocks, so that for each skip we can skip more than BULK, but this is just a minor issue I think. If the compression ratio didn't turn out to be too different, we may just go the BULK approach.
          Hide
          He Yongqiang added a comment -

          Thanks, Zheng.
          >>0. Did you try that with hadoop 0.17.0? "ant -Dhadoop.version=0.17.0 test" etc.
          yes.
          >>1. Can you add your tests to ant, or post the testing scripts so that everybody can easily reproduce the test results that you have got?
          I will do that with next patch
          >>2. For DistributedFileSystem, how big is the cluster? Is the file (the file size is small so it's clearly a single block) local?
          The cluster is of six nodes. The file is not local. The test was run on my local machine, and use HDFS.
          >>3. It seems SequenceFile's compression is not as good as RCFile, although the data is the same and also random. What is the exact record format in sequencefile? Did you >>put delimitors or you put length of Strings?
          yes, it has length of Strings.However, RCFile also has the length of strings
          >>The approach of store compressed data at creation, and do bulk decompression at reading is not practical because it's very easy to go out of memory.
          Yes, I encountered Out of memory error. So i added some trick in RCFile.Writer's append. Like

          if ((columnBufferSize + (this.bufferedRecords * this.columnNumber * 2) > COLUMNS_BUFFER_SIZE)
                    || (this.bufferedRecords >= this.RECORD_INTERVAL)) {
                  flushRecords();
                }
          

          >>We've done BULK, and it showed great performance (1.6s to read and decompress 40MB local file), but I suspect the compression ratio will be lower than NONBULK.
          >>Can you compare the compression ratio of BULK and NONBULK, given different buffer sizes and column numbers?
          BULK and NONBULK( they mean decompress) are only for Read, they have nothing to do with Write, so I guess it will not influence compression ratio.

          Show
          He Yongqiang added a comment - Thanks, Zheng. >>0. Did you try that with hadoop 0.17.0? "ant -Dhadoop.version=0.17.0 test" etc. yes. >>1. Can you add your tests to ant, or post the testing scripts so that everybody can easily reproduce the test results that you have got? I will do that with next patch >>2. For DistributedFileSystem, how big is the cluster? Is the file (the file size is small so it's clearly a single block) local? The cluster is of six nodes. The file is not local. The test was run on my local machine, and use HDFS. >>3. It seems SequenceFile's compression is not as good as RCFile, although the data is the same and also random. What is the exact record format in sequencefile? Did you >>put delimitors or you put length of Strings? yes, it has length of Strings.However, RCFile also has the length of strings >>The approach of store compressed data at creation, and do bulk decompression at reading is not practical because it's very easy to go out of memory. Yes, I encountered Out of memory error. So i added some trick in RCFile.Writer's append. Like if ((columnBufferSize + (this.bufferedRecords * this.columnNumber * 2) > COLUMNS_BUFFER_SIZE) || (this.bufferedRecords >= this.RECORD_INTERVAL)) { flushRecords(); } >>We've done BULK, and it showed great performance (1.6s to read and decompress 40MB local file), but I suspect the compression ratio will be lower than NONBULK. >>Can you compare the compression ratio of BULK and NONBULK, given different buffer sizes and column numbers? BULK and NONBULK( they mean decompress) are only for Read, they have nothing to do with Write, so I guess it will not influence compression ratio.
          Hide
          Zheng Shao added a comment -

          @Yongqiang: I found a place in the SequenceFile reader test that may improve the performance a lot - BytesRefWritable.readFields is creating a new array for each row!! This is bad and I would say this is not a fair comparison between RCFile and SequenceFile.

          There are 3 ways to fix BytesRefWritable:
          1. Add a boolean member "owned", set it to true every time we create an array in readFields, and don't create another array if owned is true and the current record is equal or smaller than the current owned array. Also, set it to false every time set(...) is called.
          2. Directly change the semantics of readFields - we always reuse the bytes array if length of bytes array is equal or greater to the current record, otherwise create a new one. This is OK because for people who uses set(...) they probably won't use readFields at all. Of course, we need to put a comment at readFields and set() says readFields will corrupt the array, so don't call readFields.
          3. Use a completely different class hierarchy.

          I would prefer to do 2 since it's the simplest way to go.

          I hope this will improve the sequencefile read performance a lot, and give RCFile and SeqFile a fair comparison.

          Also, you might want to modify the write code to use the same logic - reuse the bytes array if possible. Then the writes will be much faster as well.

          Show
          Zheng Shao added a comment - @Yongqiang: I found a place in the SequenceFile reader test that may improve the performance a lot - BytesRefWritable.readFields is creating a new array for each row!! This is bad and I would say this is not a fair comparison between RCFile and SequenceFile. There are 3 ways to fix BytesRefWritable: 1. Add a boolean member "owned", set it to true every time we create an array in readFields, and don't create another array if owned is true and the current record is equal or smaller than the current owned array. Also, set it to false every time set(...) is called. 2. Directly change the semantics of readFields - we always reuse the bytes array if length of bytes array is equal or greater to the current record, otherwise create a new one. This is OK because for people who uses set(...) they probably won't use readFields at all. Of course, we need to put a comment at readFields and set() says readFields will corrupt the array, so don't call readFields. 3. Use a completely different class hierarchy. I would prefer to do 2 since it's the simplest way to go. I hope this will improve the sequencefile read performance a lot, and give RCFile and SeqFile a fair comparison. Also, you might want to modify the write code to use the same logic - reuse the bytes array if possible. Then the writes will be much faster as well.
          Hide
          Zheng Shao added a comment -

          >> Yes, I encountered Out of memory error. So i added some trick in RCFile.Writer's append. Like
          Can you explain what the formula means?

          >> BULK and NONBULK( they mean decompress) are only for Read, they have nothing to do with Write, so I guess it will not influence compression ratio.
          I mean the BULK and NONBULK in my comments - they have different ways to determine when to flush the records at write as well, so the compression ratio will be different.

          Show
          Zheng Shao added a comment - >> Yes, I encountered Out of memory error. So i added some trick in RCFile.Writer's append. Like Can you explain what the formula means? >> BULK and NONBULK( they mean decompress) are only for Read, they have nothing to do with Write, so I guess it will not influence compression ratio. I mean the BULK and NONBULK in my comments - they have different ways to determine when to flush the records at write as well, so the compression ratio will be different.
          Hide
          He Yongqiang added a comment -

          More explaination for the fomular used:

          (columnBufferSize + (this.bufferedRecords * this.columnNumber * 2) > COLUMNS_BUFFER_SIZE)
          

          columnBufferSize(compressed size) refers to number of bytes all columns value data occupied.
          (this.bufferedRecords * this.columnNumber * 2) > COLUMNS_BUFFER_SIZE) is an approximate of bytes number of Key part. We record each value's length in the key part.
          So the number of records in a value buffer can not be too much large.

          Show
          He Yongqiang added a comment - More explaination for the fomular used: (columnBufferSize + (this.bufferedRecords * this.columnNumber * 2) > COLUMNS_BUFFER_SIZE) columnBufferSize(compressed size) refers to number of bytes all columns value data occupied. (this.bufferedRecords * this.columnNumber * 2) > COLUMNS_BUFFER_SIZE) is an approximate of bytes number of Key part. We record each value's length in the key part. So the number of records in a value buffer can not be too much large.
          Hide
          Zheng Shao added a comment - - edited

          Running Yongqiang's tests with hadoop native library.The file is on local file system. Each column is a random string length uniformly from 0 to 30, containing random uppercase and lowercase alphabets.

          Using DefaultCodec

          Write RCFile with 80 random string columns and 100000 rows cost 25464 milliseconds. And the file's on disk size is 91874941
          Write SequenceFile with 80 random string columns and 100000 rows cost 35711 milliseconds. And the file's on disk size is 102521005
          Read only one column of a RCFile with 80 random string columns and 100000 rows cost 594 milliseconds.
          Read only first and last columns of a RCFile with 80 random string columns and 100000 rows cost 600 milliseconds.
          Read all columns of a RCFile with 80 random string columns and 100000 rows cost 2227 milliseconds.
          Read SequenceFile with 80  random string columns and 100000 rows cost 4343 milliseconds.
          

          Using GzipCodec. Not much difference.

          Write RCFile with 80 random string columns and 100000 rows cost 26358 milliseconds. And the file's on disk size is 91931563
          Write SequenceFile with 80 random string columns and 100000 rows cost 35802 milliseconds. And the file's on disk size is 102528154
          Read only one column of a RCFile with 80 random string columns and 100000 rows cost 593 milliseconds.
          Read only first and last columns of a RCFile with 80 random string columns and 100000 rows cost 626 milliseconds.
          Read all columns of a RCFile with 80 random string columns and 100000 rows cost 2401 milliseconds.
          Read SequenceFile with 80  random string columns and 100000 rows cost 4601 milliseconds.
          

          These result look reasonable. RCFile's read performance is around 2 times of that of SequenceFiles, probably because we do bulk decompression and one less copy of data.

          Show
          Zheng Shao added a comment - - edited Running Yongqiang's tests with hadoop native library.The file is on local file system. Each column is a random string length uniformly from 0 to 30, containing random uppercase and lowercase alphabets. Using DefaultCodec Write RCFile with 80 random string columns and 100000 rows cost 25464 milliseconds. And the file's on disk size is 91874941 Write SequenceFile with 80 random string columns and 100000 rows cost 35711 milliseconds. And the file's on disk size is 102521005 Read only one column of a RCFile with 80 random string columns and 100000 rows cost 594 milliseconds. Read only first and last columns of a RCFile with 80 random string columns and 100000 rows cost 600 milliseconds. Read all columns of a RCFile with 80 random string columns and 100000 rows cost 2227 milliseconds. Read SequenceFile with 80 random string columns and 100000 rows cost 4343 milliseconds. Using GzipCodec. Not much difference. Write RCFile with 80 random string columns and 100000 rows cost 26358 milliseconds. And the file's on disk size is 91931563 Write SequenceFile with 80 random string columns and 100000 rows cost 35802 milliseconds. And the file's on disk size is 102528154 Read only one column of a RCFile with 80 random string columns and 100000 rows cost 593 milliseconds. Read only first and last columns of a RCFile with 80 random string columns and 100000 rows cost 626 milliseconds. Read all columns of a RCFile with 80 random string columns and 100000 rows cost 2401 milliseconds. Read SequenceFile with 80 random string columns and 100000 rows cost 4601 milliseconds. These result look reasonable. RCFile's read performance is around 2 times of that of SequenceFiles, probably because we do bulk decompression and one less copy of data.
          Hide
          Ashish Thusoo added a comment -

          Can we also get some numbers on the amount of memory usage?

          Also can you give more details about the experiment. Was this just a hdfs read or the measurement of a Hive query?

          Show
          Ashish Thusoo added a comment - Can we also get some numbers on the amount of memory usage? Also can you give more details about the experiment. Was this just a hdfs read or the measurement of a Hive query?
          Hide
          He Yongqiang added a comment -

          >>Can we also get some numbers on the amount of memory usage?
          I rerun the test(the same test as Zheng's,but with no native codec) in my local using local fs and DefaultCodec, and it read all columns of a rc file with 80 columns and 100000 rows(size:91849881 Bytes).
          And the maximum memory usages is shown below( i do couple of command 'ps -o vsz,rss,rsz,%mem -p 549' every minute),
          VSZ RSS RSZ %MEM
          766732 63472 63472 -3.0
          BTW, my physical memory is 3GB.

          >>Was this just a hdfs read or the measurement of a Hive query?
          The test was just a file read test.

          However, with no native codec and my results shows a much diff from Zheng's in that SequenceFile does much worse in my test.

          Write RCFile with 80 random string columns and 100000 rows cost 30643 milliseconds. And the file's on disk size is 91849881
          Write SequenceFile with 80 random string columns and 100000 rows cost 62034 milliseconds. And the file's on disk size is 102521005
          Read only one column of a RCFile with 80 random string columns and 100000 rows cost 703 milliseconds.
          Read only first and last columns of a RCFile with 80 random string columns and 100000 rows cost 526 milliseconds.
          Read all columns of a RCFile with 80 random string columns and 100000 rows cost 3131 milliseconds.
          Read SequenceFile with 80  random string columns and 100000 rows cost 47876 milliseconds.
          

          Why native codec matters so much for sequece file and not for RCFile? It should influence both RCFile and SequenceFile in the same way.

          Show
          He Yongqiang added a comment - >>Can we also get some numbers on the amount of memory usage? I rerun the test(the same test as Zheng's,but with no native codec) in my local using local fs and DefaultCodec, and it read all columns of a rc file with 80 columns and 100000 rows(size:91849881 Bytes). And the maximum memory usages is shown below( i do couple of command 'ps -o vsz,rss,rsz,%mem -p 549' every minute), VSZ RSS RSZ %MEM 766732 63472 63472 -3.0 BTW, my physical memory is 3GB. >>Was this just a hdfs read or the measurement of a Hive query? The test was just a file read test. However, with no native codec and my results shows a much diff from Zheng's in that SequenceFile does much worse in my test. Write RCFile with 80 random string columns and 100000 rows cost 30643 milliseconds. And the file's on disk size is 91849881 Write SequenceFile with 80 random string columns and 100000 rows cost 62034 milliseconds. And the file's on disk size is 102521005 Read only one column of a RCFile with 80 random string columns and 100000 rows cost 703 milliseconds. Read only first and last columns of a RCFile with 80 random string columns and 100000 rows cost 526 milliseconds. Read all columns of a RCFile with 80 random string columns and 100000 rows cost 3131 milliseconds. Read SequenceFile with 80 random string columns and 100000 rows cost 47876 milliseconds. Why native codec matters so much for sequece file and not for RCFile? It should influence both RCFile and SequenceFile in the same way.
          Hide
          Zheng Shao added a comment -

          @Yongqiang,

          The reason that native codec matters more for SequenceFile is probably because seqfile is using compression differently from rcfile, for example, incremental compression/decompression.

          I had a test on our data set which mainly contains around 40 columns of string, the length of string is usually fixed for that column, from length 1 to 10. The result is that seqfile is much smaller than rcfile - seqfile is only around 55% of the size of rcfile. However inside the rcfile I see a lot of repeated bytes - that's the length of the field for each row. Also rcfile is slower probably because it's writing out more data than seqfile.

          1. Can you also compress the field length columns? I tried to compress the rcfile again using gzip command line, and it becomes 41% of the current size - this is a lot smaller than the seqfile, which means in general, RCfile can save a lot of space because it's easier for compression algorithm to compress the length and the content of each column separately.

          2. Also, I remember you changed the the compression to be incremental, so the current solution is a mix of BULK and NONBULK as I described above. which has memory problems Since as we discussed we would like to leave the NONBULK mode for later because of the amount of additional work, can you change the code back to BULK compression? There is probably a performance loss due to incremental compression, which can be avoided by bulk compression.

          Show
          Zheng Shao added a comment - @Yongqiang, The reason that native codec matters more for SequenceFile is probably because seqfile is using compression differently from rcfile, for example, incremental compression/decompression. I had a test on our data set which mainly contains around 40 columns of string, the length of string is usually fixed for that column, from length 1 to 10. The result is that seqfile is much smaller than rcfile - seqfile is only around 55% of the size of rcfile. However inside the rcfile I see a lot of repeated bytes - that's the length of the field for each row. Also rcfile is slower probably because it's writing out more data than seqfile. 1. Can you also compress the field length columns? I tried to compress the rcfile again using gzip command line, and it becomes 41% of the current size - this is a lot smaller than the seqfile, which means in general, RCfile can save a lot of space because it's easier for compression algorithm to compress the length and the content of each column separately. 2. Also, I remember you changed the the compression to be incremental, so the current solution is a mix of BULK and NONBULK as I described above. which has memory problems Since as we discussed we would like to leave the NONBULK mode for later because of the amount of additional work, can you change the code back to BULK compression? There is probably a performance loss due to incremental compression, which can be avoided by bulk compression.
          Hide
          He Yongqiang added a comment -

          hive-352-2009-4-27.patch changed back to bulk compression and now also compress the key part.

          Here is a result on TPCH's lineitem:
          Direct(incremental) compression, and does not compress key part:
          274982705 hdfs://10.61.0.160:9000/user/hdfs/tpch1G_rc
          First Buffered then compress(Bulk Compression), and compress key part:
          188401365 hdfs://10.61.0.160:9000/user/hdfs/tpch1G_newRC

          BTW, I also tried to implement direct(incremental) compression, and tried to decompress a value buffer's columns part by part. But at the last step( when implementing ValueBuffer's readFields), i noticed that it is not very easy to implement it. Because we only hold on InputStream to the underlying file, and we need to seek back and forth to decompress part of each columns, and also we need to hold one decompress stream for each column. If we seek the inputstream, the decompress stream is corrupt.
          To avoid all these, we need to read all needed columns' compressed data into memory, and do in memory decompress. But we stil need one decompress stream for each column. I stop implementing this at the last step, if it is needed i can finish it.

          Show
          He Yongqiang added a comment - hive-352-2009-4-27.patch changed back to bulk compression and now also compress the key part. Here is a result on TPCH's lineitem: Direct(incremental) compression, and does not compress key part: 274982705 hdfs://10.61.0.160:9000/user/hdfs/tpch1G_rc First Buffered then compress(Bulk Compression), and compress key part: 188401365 hdfs://10.61.0.160:9000/user/hdfs/tpch1G_newRC BTW, I also tried to implement direct(incremental) compression, and tried to decompress a value buffer's columns part by part. But at the last step( when implementing ValueBuffer's readFields), i noticed that it is not very easy to implement it. Because we only hold on InputStream to the underlying file, and we need to seek back and forth to decompress part of each columns, and also we need to hold one decompress stream for each column. If we seek the inputstream, the decompress stream is corrupt. To avoid all these, we need to read all needed columns' compressed data into memory, and do in memory decompress. But we stil need one decompress stream for each column. I stop implementing this at the last step, if it is needed i can finish it.
          Hide
          Zheng Shao added a comment -

          Nice work Yongqiang!

          I totally agree with your analysis of the NONBULK mode - I was thinking of caching all compressed data in memory.

          The good thing about NONBULK is that we might be able to bypass the column hinting - because we might be able to omit the disk read cost since the decompression cost usually takes much more time.

          Can you add a simple option to your code so that I can easily test out whether that's true? Basically, I want to compare the difference of reading and decompressing 1 column (while skipping all other columns), and reading all columns (while decompressing only 1 column). If the difference is small, then we can implement NONBULK, and skip the column hinting implementation for now.

          What do you think?

          Show
          Zheng Shao added a comment - Nice work Yongqiang! I totally agree with your analysis of the NONBULK mode - I was thinking of caching all compressed data in memory. The good thing about NONBULK is that we might be able to bypass the column hinting - because we might be able to omit the disk read cost since the decompression cost usually takes much more time. Can you add a simple option to your code so that I can easily test out whether that's true? Basically, I want to compare the difference of reading and decompressing 1 column (while skipping all other columns), and reading all columns (while decompressing only 1 column). If the difference is small, then we can implement NONBULK, and skip the column hinting implementation for now. What do you think?
          Hide
          Zheng Shao added a comment -

          Good news: A test on some of our internal data shows that the column-based storage is saving 20%+ space for us. However it is about 1.5x slower than seqfile in writing. Not sure why yet. Will do more profiling tomorrow.

          Show
          Zheng Shao added a comment - Good news: A test on some of our internal data shows that the column-based storage is saving 20%+ space for us. However it is about 1.5x slower than seqfile in writing. Not sure why yet. Will do more profiling tomorrow.
          Hide
          Ashish Thusoo added a comment -

          That is very encouraging from the storage perspective. However 1.5x is a bit concerning. Profile output would be awesome to understand this.

          Show
          Ashish Thusoo added a comment - That is very encouraging from the storage perspective. However 1.5x is a bit concerning. Profile output would be awesome to understand this.
          Hide
          Joydeep Sen Sarma added a comment -

          how does read performance look like?

          Show
          Joydeep Sen Sarma added a comment - how does read performance look like?
          Hide
          He Yongqiang added a comment -

          Zheng, can you post your profiling results?
          I did a test with PerformTestRCFileAndSeqFile, and it seems RCFile does better in both reading and writing. Can you also does a test with PerformTestRCFileAndSeqFile?
          I am refactoring BytesRefArrayWritable today. BytesRefArrayWritable is a public class, and oncethe number of BytesRefWritable it holds is changed, currently we have to recreate a new list for that. And i am trying to avoid recreating a new list by able to narrow or enlarge the underlying list. It needs more work than I initially thought.

          Show
          He Yongqiang added a comment - Zheng, can you post your profiling results? I did a test with PerformTestRCFileAndSeqFile, and it seems RCFile does better in both reading and writing. Can you also does a test with PerformTestRCFileAndSeqFile? I am refactoring BytesRefArrayWritable today. BytesRefArrayWritable is a public class, and oncethe number of BytesRefWritable it holds is changed, currently we have to recreate a new list for that. And i am trying to avoid recreating a new list by able to narrow or enlarge the underlying list. It needs more work than I initially thought.
          Hide
          Zheng Shao added a comment -

          The following numbers are all for 128MB gzip compressed block (for seqfile, and 20% smaller for rcfile because of difference compression ratio)
          A. Read from seqfile + Write to seqfile: 2m 05s
          B. Read from seqfile + Write to rcfile: 2m 45s
          C. Read from rcfile + Write to seqfile: 2m 20s
          D. Read from rcfile + Write to rcfile: 3m 00s

          @Joydeep: The good compression ratio is mainly because we are compressing column length and column data (without delimiters) separately. In an earlier experiment I did, column-based compression only showed 7-8% improvements because I was compressing column data with delimiters.

          @Yongqiang: Did you turn on native compression when testing?

          Some performance improvement tips from the profiling:
          1. BytesRefArrayWritable to use Java Array (BytesRefWritable[]) instead of List<BytesRefWritable>
          2. RCFile$Writer.columnBuffers to use Java Array(ColumnBuffer[]) instead of List<ColumnBuffer>
          3. Add a method in BytesRefArrayWritable to return the BytesRefWritable[] so that RCFile$Writer.append can operator on it directly.
          1-3 will save us 10-15 seconds from B and D.
          4. RCFIle$Writer$ColumnBuffer.append should directly call DataOutputStream.write and WritableUtils.writeVLong
          public void append(BytesRefWritable data) throws IOException

          { data.writeDataTo(columnValBuffer); WritableUtils.writeVInt(valLenBuffer, data.getLength()); }

          4 will save 5-10 seconds from B and D.

          Following the same route, if there are any Lists that the number of elements do not usually change, we should use Java Array ([]) instead of List.

          Yongqiang, can you do step 1-4 and try to replace List with Array?

          Show
          Zheng Shao added a comment - The following numbers are all for 128MB gzip compressed block (for seqfile, and 20% smaller for rcfile because of difference compression ratio) A. Read from seqfile + Write to seqfile: 2m 05s B. Read from seqfile + Write to rcfile: 2m 45s C. Read from rcfile + Write to seqfile: 2m 20s D. Read from rcfile + Write to rcfile: 3m 00s @Joydeep: The good compression ratio is mainly because we are compressing column length and column data (without delimiters) separately. In an earlier experiment I did, column-based compression only showed 7-8% improvements because I was compressing column data with delimiters. @Yongqiang: Did you turn on native compression when testing? Some performance improvement tips from the profiling: 1. BytesRefArrayWritable to use Java Array (BytesRefWritable[]) instead of List<BytesRefWritable> 2. RCFile$Writer.columnBuffers to use Java Array(ColumnBuffer[]) instead of List<ColumnBuffer> 3. Add a method in BytesRefArrayWritable to return the BytesRefWritable[] so that RCFile$Writer.append can operator on it directly. 1-3 will save us 10-15 seconds from B and D. 4. RCFIle$Writer$ColumnBuffer.append should directly call DataOutputStream.write and WritableUtils.writeVLong public void append(BytesRefWritable data) throws IOException { data.writeDataTo(columnValBuffer); WritableUtils.writeVInt(valLenBuffer, data.getLength()); } 4 will save 5-10 seconds from B and D. Following the same route, if there are any Lists that the number of elements do not usually change, we should use Java Array ([]) instead of List. Yongqiang, can you do step 1-4 and try to replace List with Array?
          Hide
          Zheng Shao added a comment -

          BTW, for my previous comment, all times measured are for reading all columns (about 30 in total) out from the table. RCFile might have much better read performance for a single column, but we have not integrated RCFile column pruning with Hive yet.

          Show
          Zheng Shao added a comment - BTW, for my previous comment, all times measured are for reading all columns (about 30 in total) out from the table. RCFile might have much better read performance for a single column, but we have not integrated RCFile column pruning with Hive yet.
          Hide
          He Yongqiang added a comment -

          hive-352-2009-4-30-2.patch changs all lists to java arrays,
          1), 2) and 3) are done. If we change 4), it will needs extra func call.
          The attached patch also changes some fields in the key part to from vint to int (decode not-one-byte vint will needs extra work).

          Show
          He Yongqiang added a comment - hive-352-2009-4-30-2.patch changs all lists to java arrays, 1), 2) and 3) are done. If we change 4), it will needs extra func call. The attached patch also changes some fields in the key part to from vint to int (decode not-one-byte vint will needs extra work).
          Hide
          Zheng Shao added a comment -

          hive-352-2009-4-30-2.patch
          1. It seems you compiled against hadoop 0.19? Can you do that with hadoop 0.17? Otherwise it won't compile at my workspace.
          2. I think vint is actually pretty good for saving space. I would prefer to have vint for key length instead of int, but if you can do an experiment to show that vint and int has similar size after compression, then I am also OK with int.

          Show
          Zheng Shao added a comment - hive-352-2009-4-30-2.patch 1. It seems you compiled against hadoop 0.19? Can you do that with hadoop 0.17? Otherwise it won't compile at my workspace. 2. I think vint is actually pretty good for saving space. I would prefer to have vint for key length instead of int, but if you can do an experiment to show that vint and int has similar size after compression, then I am also OK with int.
          Hide
          He Yongqiang added a comment -

          compiled with hadoop-0.17

          Show
          He Yongqiang added a comment - compiled with hadoop-0.17
          Hide
          He Yongqiang added a comment -

          Sorry for the wrong file attached earlier.
          hive-352-2009-4-30-3.patch can be compiled with hadoop-0.17.0. hive-352-2009-4-30-3.patch use vint for some fields of key part. I tested with vint and int, and can not see their differences in my test.

          Show
          He Yongqiang added a comment - Sorry for the wrong file attached earlier. hive-352-2009-4-30-3.patch can be compiled with hadoop-0.17.0. hive-352-2009-4-30-3.patch use vint for some fields of key part. I tested with vint and int, and can not see their differences in my test.
          Hide
          Zheng Shao added a comment -

          hive-352-2009-4-30-3.patch
          It seems there is a bug - only 10 columns are saved. Please have a try.

          Show
          Zheng Shao added a comment - hive-352-2009-4-30-3.patch It seems there is a bug - only 10 columns are saved. Please have a try.
          Hide
          He Yongqiang added a comment -

          hive-352-2009-4-30-4.patch does some refactor work to BytesRefArrayWritable. Maybe the problem is caused by it. I did not see the problem. please make sure BytesRefArrayWritable can work well since it is a public class. Because many tests are integrated in TestRCFile and PerformTestRCFileAndSeqFile, so i does not include seperate tests for BytesRefArrayWritable, ColumnarSerDe. If they should be provided, i can add them.

          Also hive-352-2009-4-30-4.patch added a short cut for creating table with "STORED AS RCFILE" and includes a test .q file "columnarserde_create_shortcut.q".

          Show
          He Yongqiang added a comment - hive-352-2009-4-30-4.patch does some refactor work to BytesRefArrayWritable. Maybe the problem is caused by it. I did not see the problem. please make sure BytesRefArrayWritable can work well since it is a public class. Because many tests are integrated in TestRCFile and PerformTestRCFileAndSeqFile, so i does not include seperate tests for BytesRefArrayWritable, ColumnarSerDe. If they should be provided, i can add them. Also hive-352-2009-4-30-4.patch added a short cut for creating table with "STORED AS RCFILE" and includes a test .q file "columnarserde_create_shortcut.q".
          Hide
          Zheng Shao added a comment - - edited

          hive-352-2009-4-30-4.patch:

          Thanks Yongqiang. I tried it and it works now.

          • exec/Utilities.java:createRCFileWriter:
            parameter fs never used.
          • RCFile.java:
            Can you add some comments in the header to explain how you determine "row split"?
            What does this mean? <li>A sync-marker every few <code>100</code> bytes or so.</li>
            KeyBuffer.write(DataInput in, int length): length never used. Is this method ever used? If not, remove it.
            ColumnBuffer.getLength(): Let's remove the synchronized keyword for the sake of performance.
            ColumnBuffer.ColumnBuffer(Class valClass): Let's remove valClass since it's never used.
            Writer: how do you pass the column number from Hive to the configuration and then to the RCFIle.Writer?
            + this.columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0);
            Writer: Per Java convention, COLUMNS_BUFFER_SIZE should be columnsBufferSize.
            + this.COLUMNS_BUFFER_SIZE = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR,
            + 4 * 1024 * 1024);
            Reader: add javadoc for "public synchronized boolean nextColumnsBatch()"
          • RCFileRecordReader.java
            Remove "synchronized" in all methods for the sake of performance
            + protected synchronized boolean next(LongWritable key) throws IOException {
          • TestRCFile.java
            You might want to use Text.encode() instead of "String".getBytes(). At least we should use getBytes("UTF-8").
          • ql/src/test/queries/clientpositive/columnarserde_create_shortcut.q
            For all tests, please drop the created tables again at the end of the .q file. This helps to make "show tables" in other .q files return a deterministic result.
          • BytesRefArrayWritable.java
            What's the difference between get() and unCheckedGet()? The code is the same
            BytesRefArrayWritable.resetValid: Let's call it resize.
            set(..): should be valid <= index
            + if (valid < index)
            + valid = index + 1;
          • BytesRefWritable.java
            Shall we rename getBytes() to getBytesCopy()?
          • ColumnarStruct.java
            init(...): Cleaning out the object and recreate LazyObject is not efficient.
            Current:
            +      if (field.length > 0) {
            +        if (fields[fieldIndex] == null)
            +          fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
            +              .get(fieldIndex));
            +        fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], field
            +            .getStart(), field.getLength());
            +      } else if (fields[fieldIndex] != null
            +          && fields[fieldIndex].getObject() != null) {
            +        fields[fieldIndex] = null;
            +      }
            Let's change it to (Please have a test)
            +      if (fields[fieldIndex] == null)
            +        fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
            +            .get(fieldIndex));
            +      fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], field
            +          .getStart(), field.getLength());
            
          • Can you add one more test for adding/deleting columns? The value of the newly added columns should be NULL. RCFile should also be able to ignore the extra columns from the data.
            CREAT TABLE xxx  (a string, b string) STORED AS RCFILE;
            INSERT OVERWRITE TABLE xxx ...;
            ALTER TABLE xxx ADD COLUMNS (c string);
            SELECT * FROM xxx;
            ALTER TABLE xxx REPLACE COLUMNS (a string);
            SELECT * FROM xxx;
            
          • General comments:
            1 We don't need to put "this." in the code unless there is a local var with the same name.
            2 Please add more javadoc - the general rule for Hive is that every public class/method should have javadoc, except getters and setters.
            • Very important: Eclipse automatically generates a lot of empty javadocs for variables etc. Please remove them if you don't have a comment for that variable, otherwise "ant -Dhadoop.version=0.17.0 javadoc" will have warnings. Please make sure to remove all warnings.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java:95: warning - @return tag has no arguments.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java:122: warning - Tag @see: missing '#': "set(byte[] newData, int offset, int length)"
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java:105: warning - Tag @see: missing '#': "readFields(DataInput in)"
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java:38: warning - Tag @link: missing '#': "init(BytesRefArrayWritable cols)"
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java:522: warning - @param argument "keyClass" is not a parameter name.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java:522: warning - @param argument "valClass" is not a parameter name.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:149: warning - Tag @see: malformed: "CompressionCodec, SequenceFile"
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:149: warning - Tag @see: reference not found: CompressionCodec, SequenceFile
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1214: warning - @return tag has no arguments.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1214: warning - Tag @link: missing '#': "nextColumnBatch()"
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1214: warning - Tag @link: can't find nextColumnBatch() in org.apache.hadoop.hive.ql.io.RCFile.Reader
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1288: warning - @return tag has no arguments.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1288: warning - @return tag cannot be used in method with void return type.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:543: warning - @param argument "keyClass" is not a parameter name.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:543: warning - @param argument "valClass" is not a parameter name.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:564: warning - @param argument "keyClass" is not a parameter name.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:564: warning - @param argument "valClass" is not a parameter name.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:591: warning - @param argument "keyClass" is not a parameter name.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:591: warning - @param argument "valClass" is not a parameter name.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java:105: warning - @return tag has no arguments.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java:105: warning - @param argument "tableInfo" is not a parameter name.
                [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java:745: warning - @param argument "dest" is not a parameter name.
              
          Show
          Zheng Shao added a comment - - edited hive-352-2009-4-30-4.patch: Thanks Yongqiang. I tried it and it works now. exec/Utilities.java:createRCFileWriter: parameter fs never used. RCFile.java: Can you add some comments in the header to explain how you determine "row split"? What does this mean? <li>A sync-marker every few <code>100</code> bytes or so.</li> KeyBuffer.write(DataInput in, int length): length never used. Is this method ever used? If not, remove it. ColumnBuffer.getLength(): Let's remove the synchronized keyword for the sake of performance. ColumnBuffer.ColumnBuffer(Class valClass): Let's remove valClass since it's never used. Writer: how do you pass the column number from Hive to the configuration and then to the RCFIle.Writer? + this.columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0); Writer: Per Java convention, COLUMNS_BUFFER_SIZE should be columnsBufferSize. + this.COLUMNS_BUFFER_SIZE = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR, + 4 * 1024 * 1024); Reader: add javadoc for "public synchronized boolean nextColumnsBatch()" RCFileRecordReader.java Remove "synchronized" in all methods for the sake of performance + protected synchronized boolean next(LongWritable key) throws IOException { TestRCFile.java You might want to use Text.encode() instead of "String".getBytes(). At least we should use getBytes("UTF-8"). ql/src/test/queries/clientpositive/columnarserde_create_shortcut.q For all tests, please drop the created tables again at the end of the .q file. This helps to make "show tables" in other .q files return a deterministic result. BytesRefArrayWritable.java What's the difference between get() and unCheckedGet()? The code is the same BytesRefArrayWritable.resetValid: Let's call it resize. set(..): should be valid <= index + if (valid < index) + valid = index + 1; BytesRefWritable.java Shall we rename getBytes() to getBytesCopy()? ColumnarStruct.java init(...): Cleaning out the object and recreate LazyObject is not efficient. Current: + if (field.length > 0) { + if (fields[fieldIndex] == null ) + fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos + .get(fieldIndex)); + fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], field + .getStart(), field.getLength()); + } else if (fields[fieldIndex] != null + && fields[fieldIndex].getObject() != null ) { + fields[fieldIndex] = null ; + } Let's change it to (Please have a test) + if (fields[fieldIndex] == null ) + fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos + .get(fieldIndex)); + fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], field + .getStart(), field.getLength()); Can you add one more test for adding/deleting columns? The value of the newly added columns should be NULL. RCFile should also be able to ignore the extra columns from the data. CREAT TABLE xxx (a string, b string) STORED AS RCFILE; INSERT OVERWRITE TABLE xxx ...; ALTER TABLE xxx ADD COLUMNS (c string); SELECT * FROM xxx; ALTER TABLE xxx REPLACE COLUMNS (a string); SELECT * FROM xxx; General comments: 1 We don't need to put "this." in the code unless there is a local var with the same name. 2 Please add more javadoc - the general rule for Hive is that every public class/method should have javadoc, except getters and setters. Very important: Eclipse automatically generates a lot of empty javadocs for variables etc. Please remove them if you don't have a comment for that variable, otherwise "ant -Dhadoop.version=0.17.0 javadoc" will have warnings. Please make sure to remove all warnings. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java:95: warning - @ return tag has no arguments. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java:122: warning - Tag @see: missing '#': "set( byte [] newData, int offset, int length)" [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java:105: warning - Tag @see: missing '#': "readFields(DataInput in)" [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java:38: warning - Tag @link: missing '#': "init(BytesRefArrayWritable cols)" [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java:522: warning - @param argument "keyClass" is not a parameter name. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java:522: warning - @param argument "valClass" is not a parameter name. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:149: warning - Tag @see: malformed: "CompressionCodec, SequenceFile" [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:149: warning - Tag @see: reference not found: CompressionCodec, SequenceFile [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1214: warning - @ return tag has no arguments. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1214: warning - Tag @link: missing '#': "nextColumnBatch()" [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1214: warning - Tag @link: can't find nextColumnBatch() in org.apache.hadoop.hive.ql.io.RCFile.Reader [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1288: warning - @ return tag has no arguments. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:1288: warning - @ return tag cannot be used in method with void return type. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:543: warning - @param argument "keyClass" is not a parameter name. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:543: warning - @param argument "valClass" is not a parameter name. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:564: warning - @param argument "keyClass" is not a parameter name. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:564: warning - @param argument "valClass" is not a parameter name. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:591: warning - @param argument "keyClass" is not a parameter name. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java:591: warning - @param argument "valClass" is not a parameter name. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java:105: warning - @ return tag has no arguments. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java:105: warning - @param argument "tableInfo" is not a parameter name. [javadoc] /data/users/zshao/tools/352-trunk-apache-hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java:745: warning - @param argument "dest" is not a parameter name.
          Hide
          He Yongqiang added a comment -

          Thanks for the detailed review and the improvement instructions, Zheng.

          hive-352-2009-5-1.patch does these improvements according to Zheng's suggestions.

          >>Writer: how do you pass the column number from Hive to the configuration and then to the RCFIle.Writer?
          The code is in RCFileOutputFormat's getHiveRecordWriter(). It tries to parse the columns from passed in Properties.

          >>init(...): Cleaning out the object and recreate LazyObject is not efficient.
          If we change it, it will not pass the TestRCFile test. The final extra else if statements are rarely reached, and when reached, most time it only needs one instruction to determine whether fields[fieldIndex] is null.

          Other suggestions are all done. Thanks!!

          Show
          He Yongqiang added a comment - Thanks for the detailed review and the improvement instructions, Zheng. hive-352-2009-5-1.patch does these improvements according to Zheng's suggestions. >>Writer: how do you pass the column number from Hive to the configuration and then to the RCFIle.Writer? The code is in RCFileOutputFormat's getHiveRecordWriter(). It tries to parse the columns from passed in Properties. >>init(...): Cleaning out the object and recreate LazyObject is not efficient. If we change it, it will not pass the TestRCFile test. The final extra else if statements are rarely reached, and when reached, most time it only needs one instruction to determine whether fields [fieldIndex] is null. Other suggestions are all done. Thanks!!
          Hide
          Zheng Shao added a comment -

          >>>>Writer: how do you pass the column number from Hive to the configuration and then to the RCFIle.Writer?
          >>The code is in RCFileOutputFormat's getHiveRecordWriter(). It tries to parse the columns from passed in Properties.
          Thanks. I understand it now.

          >>>>init(...): Cleaning out the object and recreate LazyObject is not efficient.
          >>If we change it, it will not pass the TestRCFile test. The final extra else if statements are rarely reached, and when reached, most time it only needs one instruction to determine whether fields[fieldIndex] is null.

          Can you add a boolean[] fieldIsNull to mark whether a field is null, instead of throwing away and recreating the LazyObject?
          Then getField can check fieldIsNull to decide whether to return null or the LazyObject.

          Show
          Zheng Shao added a comment - >>>>Writer: how do you pass the column number from Hive to the configuration and then to the RCFIle.Writer? >>The code is in RCFileOutputFormat's getHiveRecordWriter(). It tries to parse the columns from passed in Properties. Thanks. I understand it now. >>>>init(...): Cleaning out the object and recreate LazyObject is not efficient. >>If we change it, it will not pass the TestRCFile test. The final extra else if statements are rarely reached, and when reached, most time it only needs one instruction to determine whether fields [fieldIndex] is null. Can you add a boolean[] fieldIsNull to mark whether a field is null, instead of throwing away and recreating the LazyObject? Then getField can check fieldIsNull to decide whether to return null or the LazyObject.
          Hide
          He Yongqiang added a comment -

          Fixes a small mistake in previous file, and adds in a boolean array fieldIsNull in ColumnarStruct.

          Show
          He Yongqiang added a comment - Fixes a small mistake in previous file, and adds in a boolean array fieldIsNull in ColumnarStruct.
          Hide
          Zheng Shao added a comment -

          hive-352-2009-5-1-3.patch

          Can you remove the extra message "FileSplit's start is 0, its length is 299"?
          Or use LOG.info/LOG.debug.

          hive> select * from zshao_rc;
          OK
          FileSplit's start is 0, its length is 299
          123     456     NULL
          Time taken: 0.09 seconds
          

          Can you find the error messsage in the code, and fix it?
          You probably just need to add your ColumnarSerDe to the internal SerDe list.

          hive> alter table zshao_rc replace columns(a int);
          Replace columns is not supported for this table. SerDe may be incompatible.
          FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
          

          Can you allow extra columns in the metadata? Just assign NULLs to the columns in the metadata but NOT in the data.

          hive> alter table zshao_rc add columns(a int);
          Column 'a' exists
          FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
          hive> alter table zshao_rc add columns(d int);
          hive> select * from zshao_rc;
          FileSplit's start is 0, its length is 299
          Failed with exception This BytesRefArrayWritable only has 3 valid values.
          
          Show
          Zheng Shao added a comment - hive-352-2009-5-1-3.patch Can you remove the extra message "FileSplit's start is 0, its length is 299"? Or use LOG.info/LOG.debug. hive> select * from zshao_rc; OK FileSplit's start is 0, its length is 299 123 456 NULL Time taken: 0.09 seconds Can you find the error messsage in the code, and fix it? You probably just need to add your ColumnarSerDe to the internal SerDe list. hive> alter table zshao_rc replace columns(a int ); Replace columns is not supported for this table. SerDe may be incompatible. FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Can you allow extra columns in the metadata? Just assign NULLs to the columns in the metadata but NOT in the data. hive> alter table zshao_rc add columns(a int ); Column 'a' exists FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask hive> alter table zshao_rc add columns(d int ); hive> select * from zshao_rc; FileSplit's start is 0, its length is 299 Failed with exception This BytesRefArrayWritable only has 3 valid values.
          Hide
          Zheng Shao added a comment -

          Corrected a few javadoc warnings and remove one debug message.
          Committed to trunk. Thanks for the hard work Yongqiang He!

          Show
          Zheng Shao added a comment - Corrected a few javadoc warnings and remove one debug message. Committed to trunk. Thanks for the hard work Yongqiang He!
          Hide
          Ashish Thusoo added a comment -

          Very cool contribution Yongqiang!!

          Show
          Ashish Thusoo added a comment - Very cool contribution Yongqiang!!
          Hide
          Zheng Shao added a comment - - edited

          Some more test results using some real log data. I also tried 2 levels of gzip by adding a new GzipCodec1 (level 1). GzipCodec is taking the default which should be level 6.
          Gzip compression levels have a big impact on both the running time and compressed size:

          The time and output_file_size shown is the average map task running time and output_file_size. I've changed mapred.min.split.size to make sure a single mapper is always processing the same data in different tests.

          InputFileFormat -> OutputFileFormat: time output_file_size
          Seqfile GZIP 6 -> Seqfile GZIP 1: 1'25'' 182MB
          Seqfile GZIP 1 -> Seqfile GZIP 6: 2'05'' 134MB
          
          set hive.io.rcfile.record.buffer.size=4194304;
          Rcfile GZIP 6 -> Rcfile GZIP 6: 2'50'' 104MB
          Rcfile GZIP 6 -> Rcfile GZIP 1: 1'55'' 130MB
          

          From this, Rcfile GZIP 1 beats Seqfile GZIP 6 in both time and space: 8% less time and 3% less space.
          I believe there are still time performance potentials in Rcfile (removing synchronized methods, etc) that we can exploit.

          These results shows that Rcfile can still beat Seqfile even if we select all fields.
          If we only select a subset of the fields, then Rcfile will definitely beat Seqfile badly after HIVE-460 and HIVE-461 are in.

          A local test with the same data (uncompressed text, about 910MB) shows GZIP 1 compression takes about half time of GZIP 6 compression .
          The compression/decompression is using the command line utility gzip and gunzip.
          Note that I've warmed up the cache, and the disk reading time is less than 1'' (by doing cat file > /dev/null)
          All times reported are wall time, but user time is within 1''-2'' difference.
          CPU is dual AMD Opteron 270 (2 x 2 core at 2GHz)

          GZIP 1 compression: 22'' decompression: 7.3's 
          GZIP 6 compression: 49'' decompression: 6.4's
          time wc uncompressed: 9.3''
          time awk 'END {print NR;}' uncompressed: 2.8''
          

          These numbers are probably the lower bounds of the running time we can ever achieve.

          Show
          Zheng Shao added a comment - - edited Some more test results using some real log data. I also tried 2 levels of gzip by adding a new GzipCodec1 (level 1). GzipCodec is taking the default which should be level 6. Gzip compression levels have a big impact on both the running time and compressed size: The time and output_file_size shown is the average map task running time and output_file_size. I've changed mapred.min.split.size to make sure a single mapper is always processing the same data in different tests. InputFileFormat -> OutputFileFormat: time output_file_size Seqfile GZIP 6 -> Seqfile GZIP 1: 1'25'' 182MB Seqfile GZIP 1 -> Seqfile GZIP 6: 2'05'' 134MB set hive.io.rcfile.record.buffer.size=4194304; Rcfile GZIP 6 -> Rcfile GZIP 6: 2'50'' 104MB Rcfile GZIP 6 -> Rcfile GZIP 1: 1'55'' 130MB From this, Rcfile GZIP 1 beats Seqfile GZIP 6 in both time and space: 8% less time and 3% less space. I believe there are still time performance potentials in Rcfile (removing synchronized methods, etc) that we can exploit. These results shows that Rcfile can still beat Seqfile even if we select all fields. If we only select a subset of the fields, then Rcfile will definitely beat Seqfile badly after HIVE-460 and HIVE-461 are in. A local test with the same data (uncompressed text, about 910MB) shows GZIP 1 compression takes about half time of GZIP 6 compression . The compression/decompression is using the command line utility gzip and gunzip. Note that I've warmed up the cache, and the disk reading time is less than 1'' (by doing cat file > /dev/null) All times reported are wall time, but user time is within 1''-2'' difference. CPU is dual AMD Opteron 270 (2 x 2 core at 2GHz) GZIP 1 compression: 22'' decompression: 7.3's GZIP 6 compression: 49'' decompression: 6.4's time wc uncompressed: 9.3'' time awk 'END {print NR;}' uncompressed: 2.8'' These numbers are probably the lower bounds of the running time we can ever achieve.
          Hide
          Uma Maheswara Rao G added a comment -

          trying to specify or infer best compression technique per column much harder and something that can be done later

          Here we mentioned that, this improvement will be implemented later.
          Could you please point me the right JIRA, where we started implementing this point or discuss?

          Show
          Uma Maheswara Rao G added a comment - trying to specify or infer best compression technique per column much harder and something that can be done later Here we mentioned that, this improvement will be implemented later. Could you please point me the right JIRA, where we started implementing this point or discuss?
          Hide
          Uma Maheswara Rao G added a comment -

          Looks like HIVE-2097 is talking about same stuff.
          Linked that JIRA here as related.

          Show
          Uma Maheswara Rao G added a comment - Looks like HIVE-2097 is talking about same stuff. Linked that JIRA here as related.

            People

            • Assignee:
              He Yongqiang
              Reporter:
              He Yongqiang
            • Votes:
              0 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development