Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.7.2
    • Component/s: java, spec
    • Labels:
      None

      Description

      Define a column-major file format. This would permit better compression and also permit efficient skipping of fields that are not of interest.

      1. avro-file-columnar.pdf
        61 kB
        Thiruvalluvan M. G.
      2. AVRO-806-v2.patch
        75 kB
        Thiruvalluvan M. G.
      3. AVRO-806.patch
        31 kB
        Doug Cutting
      4. AVRO-806.patch
        239 kB
        Doug Cutting

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          The writer would keep a buffer per field. As records are added, each field would be encoded to its respective buffer. When the total amount of buffered data reaches the desired block size, all column buffers would be flushed, preceded by an index listing the (compressed) sizes of each of column buffers.

          Each buffer would be compressed prior to writing, probably with Snappy.

          The reader would have a decoder for each field. To skip fields not of interest, the application would specify a subset of the schema written. The reader would only decompress and process fields present in the schema provided by the application.

          Show
          Doug Cutting added a comment - The writer would keep a buffer per field. As records are added, each field would be encoded to its respective buffer. When the total amount of buffered data reaches the desired block size, all column buffers would be flushed, preceded by an index listing the (compressed) sizes of each of column buffers. Each buffer would be compressed prior to writing, probably with Snappy. The reader would have a decoder for each field. To skip fields not of interest, the application would specify a subset of the schema written. The reader would only decompress and process fields present in the schema provided by the application.
          Hide
          Scott Carey added a comment -

          A useful reference is the Data model and Nested columar storage section in Google's Dremel paper: http://www.google.com/research/pubs/pub36632.html

          With Avro, one challenge of columnar storage is going to be Unions.

          If a record is

          { int, [float, string], long }

          How do you deal with the unions? Do you put the whole union in the same columnar chunk? Do you split the union into chunks for each branch, and store the union index in its own chunk?

          How this plays with records nested within unions or unions inside of arrays can be complicated too.

          Show
          Scott Carey added a comment - A useful reference is the Data model and Nested columar storage section in Google's Dremel paper: http://www.google.com/research/pubs/pub36632.html With Avro, one challenge of columnar storage is going to be Unions. If a record is { int, [float, string], long } How do you deal with the unions? Do you put the whole union in the same columnar chunk? Do you split the union into chunks for each branch, and store the union index in its own chunk? How this plays with records nested within unions or unions inside of arrays can be complicated too.
          Hide
          Douglas Creager added a comment -

          Avro's formalism is a bit different than protobuf's, so we wouldn't be able to use Dremel's data model as-is. But it's definitely a good start.

          My hunch is that we'd want each columnar chunk to only store primitive values, so we'd have to break down a complex type into each primitive pieces. For most schemas, there's a unique name that identifies each primitive piece. So if you have

          record Test {
            int  a;
            union {float, string}  b;
            long  c;
            array<double>  d;
            map<string>  e;
            record Subtest {
              boolean f;
              bytes g;
            } sub1;
            union {null, Subtest}  sub2;
          }
          

          then your primitive names would be

          a
          b.float
          b.string
          c
          d.[]
          e.{}
          sub1.f
          sub1.g
          sub2.null
          sub2.Subtest.f
          sub2.Subtest.g
          

          There are only four compound types in Avro: For records, there's a child for each field. For arrays, there's a single child, which I called []. For maps, it's {}. For unions, there's a child for each branch, with the same name as the branch's schema.

          The only issue is with recursive types, since for those, the nesting can be arbitrarily deep at runtime. We can probably add a “nesting level” to handle these, but we'll have to hammer out the details.

          Show
          Douglas Creager added a comment - Avro's formalism is a bit different than protobuf's, so we wouldn't be able to use Dremel's data model as-is. But it's definitely a good start. My hunch is that we'd want each columnar chunk to only store primitive values, so we'd have to break down a complex type into each primitive pieces. For most schemas, there's a unique name that identifies each primitive piece. So if you have record Test { int a; union { float , string} b; long c; array< double > d; map<string> e; record Subtest { boolean f; bytes g; } sub1; union { null , Subtest} sub2; } then your primitive names would be a b. float b.string c d.[] e.{} sub1.f sub1.g sub2. null sub2.Subtest.f sub2.Subtest.g There are only four compound types in Avro: For records, there's a child for each field. For arrays, there's a single child, which I called []. For maps, it's {}. For unions, there's a child for each branch, with the same name as the branch's schema. The only issue is with recursive types, since for those, the nesting can be arbitrarily deep at runtime. We can probably add a “nesting level” to handle these, but we'll have to hammer out the details.
          Hide
          Douglas Creager added a comment -

          And we could handle the union index in the same way as we'd handle array and map counts. Each branch of the union can be seen as a container that can only have 0 or 1 elements. Since we'll already need some kind of repetition count to keep track of how many elements are in an array type, we can use the same mechanism to keep track of how many elements are in each union branch.

          Show
          Douglas Creager added a comment - And we could handle the union index in the same way as we'd handle array and map counts. Each branch of the union can be seen as a container that can only have 0 or 1 elements. Since we'll already need some kind of repetition count to keep track of how many elements are in an array type, we can use the same mechanism to keep track of how many elements are in each union branch.
          Hide
          Doug Cutting added a comment -

          I was thinking of just creating columns for the fields of the fields of the top-level record. In this approach, a union would be written as a union, prefixed with a varint indicating the branch taken.

          If we stored union branches separately then we'd also need a column that has the varint. Iterators would then use this to decide when a column has a value. For nested unions I think the iterators would need to have a list of pointers to varints.

          The use case is to accelerate scans of a subset of fields. Further acceleration is possible if things are columnized more deeply, but we probably want to stop at some fixed depth in each block regardless. So I'm effectively proposing a depth of 1. Increasing the depth increases the number of buffer pointers and the complexity of row iteration. I don't have a clear sense of when that becomes significant. One way to limit the depth would be to specify a maximum number of columns, and use a breadth-first walk of the schema until that number of columns are encountered. However I wonder whether we're over-engineering this.

          Show
          Doug Cutting added a comment - I was thinking of just creating columns for the fields of the fields of the top-level record. In this approach, a union would be written as a union, prefixed with a varint indicating the branch taken. If we stored union branches separately then we'd also need a column that has the varint. Iterators would then use this to decide when a column has a value. For nested unions I think the iterators would need to have a list of pointers to varints. The use case is to accelerate scans of a subset of fields. Further acceleration is possible if things are columnized more deeply, but we probably want to stop at some fixed depth in each block regardless. So I'm effectively proposing a depth of 1. Increasing the depth increases the number of buffer pointers and the complexity of row iteration. I don't have a clear sense of when that becomes significant. One way to limit the depth would be to specify a maximum number of columns, and use a breadth-first walk of the schema until that number of columns are encountered. However I wonder whether we're over-engineering this.
          Hide
          Alejandro Abdelnur added a comment -

          Could we god simple at first? I.e., single column for unions and columns only for the elements of depth 1 as a start.

          We could add a 'column encoding' version somewhere. This would allow as to change things and still being able to provide backwards compatibility.

          Show
          Alejandro Abdelnur added a comment - Could we god simple at first? I.e., single column for unions and columns only for the elements of depth 1 as a start. We could add a 'column encoding' version somewhere. This would allow as to change things and still being able to provide backwards compatibility.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Not having elements of depth > 1 would basically eliminate all of our schemas from using this.

          Are there enough people out there who would be able to weather this constraint for the simpler version to be effectively tested before complications are added in?

          Seems like since in this case we know what the expected complications will be, it's better to design with them in mind from the start and avoid surprises later.

          Show
          Dmitriy V. Ryaboy added a comment - Not having elements of depth > 1 would basically eliminate all of our schemas from using this. Are there enough people out there who would be able to weather this constraint for the simpler version to be effectively tested before complications are added in? Seems like since in this case we know what the expected complications will be, it's better to design with them in mind from the start and avoid surprises later.
          Hide
          Doug Cutting added a comment -

          The question is not whether the elements of depth > 1 are included, but whether they're each stored in a distinct column. Regardless, one will read the data file in the same way, using a schema with a subset of the fields, even if you're not using the column-major codec at all. So if you have a query that scans only field x.y.z, then storing values for x.y in a column will still make things faster than a row-order, but perhaps not as fast as if x.y.z values were stored in their own column, especially if y has a lot of other fields. Note that Avro's already fast at skipping string and binary values that are not desired: it reads the length and increments the buffer pointer. So column-major will provide the biggest speedup for structures that have a lot of numeric fields that are often ignored queries.

          Show
          Doug Cutting added a comment - The question is not whether the elements of depth > 1 are included, but whether they're each stored in a distinct column. Regardless, one will read the data file in the same way, using a schema with a subset of the fields, even if you're not using the column-major codec at all. So if you have a query that scans only field x.y.z, then storing values for x.y in a column will still make things faster than a row-order, but perhaps not as fast as if x.y.z values were stored in their own column, especially if y has a lot of other fields. Note that Avro's already fast at skipping string and binary values that are not desired: it reads the length and increments the buffer pointer. So column-major will provide the biggest speedup for structures that have a lot of numeric fields that are often ignored queries.
          Hide
          Thiruvalluvan M. G. added a comment -

          It will be nice if we can make the column datafile compatible with the current datafile standard (with the column's schema, of course). There will be a "meta" datafile as a catalog of its constituent column data files. A single datafile is, by definition, its own catalog. This approach, which I suppose is not incompatible with the original proposal, has some advantages:

          • We can reuse almost all the current code.
          • Whether to split columns into further columns is completely left the writer as both are valid.

          However, this approach will make splitting unions harder. We have to cater to "holes" in the branch columns. With a little bit of space overhead, we can address the problem: [A, B, C] will be split into [null, A], [null, B] and [null, C]. (A, B, or C is null, there won't be a corresponding column datafile). The "holes" in the column will then be encoded as null branch. We do not need a separate branch-index column.

          An array of record

          { A, B, C }

          can be decomposed as three columns: array of A, array of B and and array C. The idea can be extended to maps.

          The good thing about this is that this defines a recursive column splitting algorithm. And it is flexible. For example, if for a record

          {A, B, C}

          B and C always come together, we can split it as A and

          {B, C}

          .

          On another point, while it may be a good strategy for the writer to write column blocks of data in lock-step fashion, the reader should not rely on it. This allows different columns to be written at different points in time. This will perform better because different columns have different sizes and compress differently. But with this, skipping to next sync on one column could lead to skipping to the middle of another column. That could be challenging.

          Show
          Thiruvalluvan M. G. added a comment - It will be nice if we can make the column datafile compatible with the current datafile standard (with the column's schema, of course). There will be a "meta" datafile as a catalog of its constituent column data files. A single datafile is, by definition, its own catalog. This approach, which I suppose is not incompatible with the original proposal, has some advantages: We can reuse almost all the current code. Whether to split columns into further columns is completely left the writer as both are valid. However, this approach will make splitting unions harder. We have to cater to "holes" in the branch columns. With a little bit of space overhead, we can address the problem: [A, B, C] will be split into [null, A] , [null, B] and [null, C] . (A, B, or C is null, there won't be a corresponding column datafile). The "holes" in the column will then be encoded as null branch. We do not need a separate branch-index column. An array of record { A, B, C } can be decomposed as three columns: array of A, array of B and and array C. The idea can be extended to maps. The good thing about this is that this defines a recursive column splitting algorithm. And it is flexible. For example, if for a record {A, B, C} B and C always come together, we can split it as A and {B, C} . On another point, while it may be a good strategy for the writer to write column blocks of data in lock-step fashion, the reader should not rely on it. This allows different columns to be written at different points in time. This will perform better because different columns have different sizes and compress differently. But with this, skipping to next sync on one column could lead to skipping to the middle of another column. That could be challenging.
          Hide
          Doug Cutting added a comment -

          Thiru, so you're proposing multiple, parallel files for columns? I'm proposing a single file whose format is as it is today except for the encoding of blocks of records, which would use a new codec: in addition to the current "null", "gzip" and "snappy" we'd add a "column" codec. As you note, existing implementations would not be able to read this until they've implemented this codec, while, with multiple files, they would. However I'm not sure that folks would appreciate the increase in the number of files that parallel files would create.

          Note that this codec could be implemented using the existing compression codec API: it could accept a buffer of serialized records, then parse the records using the file's schema, splitting their fields into separate buffers, and finally appending all of these buffers with an index at the front. This can be optimized to avoid the extra copy of data.

          Show
          Doug Cutting added a comment - Thiru, so you're proposing multiple, parallel files for columns? I'm proposing a single file whose format is as it is today except for the encoding of blocks of records, which would use a new codec: in addition to the current "null", "gzip" and "snappy" we'd add a "column" codec. As you note, existing implementations would not be able to read this until they've implemented this codec, while, with multiple files, they would. However I'm not sure that folks would appreciate the increase in the number of files that parallel files would create. Note that this codec could be implemented using the existing compression codec API: it could accept a buffer of serialized records, then parse the records using the file's schema, splitting their fields into separate buffers, and finally appending all of these buffers with an index at the front. This can be optimized to avoid the extra copy of data.
          Hide
          Doug Cutting added a comment -

          This is a work in progress.

          I believe the output is correct and complete, but the input side is not right yet so I can't test it. To read the column format I need ResolvingDecoder to call three new Decoder methods on the nested Decoder:

          • startRecord() at the beginning of each record
          • startField() at the beginning of each field
          • endRecord() at the end of each field.

          I've made some changes to ResolvingDecoder attempting to do this, but they don't work and I don't understand it well enough to make this work. Thiru, can you please help me here?

          That would get the input side to work, but it wouldn't yet be much faster when columns are elided from the schema. To make it fast we also need to change ResolvingDecoder to take advantage of the new Decoder#skipField() method.

          Show
          Doug Cutting added a comment - This is a work in progress. I believe the output is correct and complete, but the input side is not right yet so I can't test it. To read the column format I need ResolvingDecoder to call three new Decoder methods on the nested Decoder: startRecord() at the beginning of each record startField() at the beginning of each field endRecord() at the end of each field. I've made some changes to ResolvingDecoder attempting to do this, but they don't work and I don't understand it well enough to make this work. Thiru, can you please help me here? That would get the input side to work, but it wouldn't yet be much faster when columns are elided from the schema. To make it fast we also need to change ResolvingDecoder to take advantage of the new Decoder#skipField() method.
          Hide
          Doug Cutting added a comment -

          A breif description of my patch:

          I've added methods to Codec to create the Encoder and Decoder used for the file. Then I've added methods to Encoder and Decoder called as we start to read each record, as we start to read each field, and as we finish reading each field. These are used by the ColumnEncoder and ColumnDecoder implementations to track the depth and column and then switch input or output to the correct buffer as objects are written and read.

          I hope this makes sense!

          Show
          Doug Cutting added a comment - A breif description of my patch: I've added methods to Codec to create the Encoder and Decoder used for the file. Then I've added methods to Encoder and Decoder called as we start to read each record, as we start to read each field, and as we finish reading each field. These are used by the ColumnEncoder and ColumnDecoder implementations to track the depth and column and then switch input or output to the correct buffer as objects are written and read. I hope this makes sense!
          Hide
          Thiruvalluvan M. G. added a comment -

          Here is a patch which implements the same idea, more formally. Please see the attached file that Raymie Stata wrote about the approach we are proposing. Some comments on the patch:

          • The implementation basically works. Further optimizations are possible. One pending work is the optimization to avoid looking into columns that are needed. At present the traditional schema resolution works, but if the reader's and writer's schemas are such that certain column can be completely skipped, they are not skipped.
          • Data file writer uses a trick that in case of exceptions, it flushes things up to the previous record. That trick won't work with columnar storage because it can be flushed just once per block. (I've commented out the test case for that.)
          • Raymie and I have designed the solution in layers - one layer that is capable of writing and reading columnar store. The column to data item assignment are a separate layer. We have a default that assigns each item to a separate column up to a depth. But the users can supply their own custom column assignments. Yet another layer is the ability to put the columns in a block. In case someone wants to use file-based columnar storage, they can do so easily on top the first two layers.
          • For now, unions have a single column, irrespective of the number and type of branches.
          • I don't think extending the codec mechanism for supporting columnar store will work. Columnar store is orthogonal to codecs. Codecs are about storing blocks with compression. Decoders decide how the contents should be interpreted. I think, the way to support columnar storage is to replace binary encoder/decoder with columnar encoder/decoders. I've demonstrated in the Data file reader and writer. In order to support both binary encoder/decoder and columnar encoder decoder, I pushes bytesBuffered() to Encoder from BinaryEncoder and isEnd() to Decoder from BinaryDecoder. I don't think these changes will break anything.
          • The implementation is not complete, We have to let the user of data file writer choose columnar storage instead of binarystorage. I've not implemeted it yet.
          • One test in FileSpanStorage is failing possibly because, I think, it assumes something about the way data file stores using binary encoder. I'm not sure.
          • If we make Data file writer/reader handle different encoders/decoders, then changes to columnar storage is reasonably well-isolated.
          • There are a couple of unrelated changes (in tools) that were required to make the tests pass on my machine. Please ignore them.
          Show
          Thiruvalluvan M. G. added a comment - Here is a patch which implements the same idea, more formally. Please see the attached file that Raymie Stata wrote about the approach we are proposing. Some comments on the patch: The implementation basically works. Further optimizations are possible. One pending work is the optimization to avoid looking into columns that are needed. At present the traditional schema resolution works, but if the reader's and writer's schemas are such that certain column can be completely skipped, they are not skipped. Data file writer uses a trick that in case of exceptions, it flushes things up to the previous record. That trick won't work with columnar storage because it can be flushed just once per block. (I've commented out the test case for that.) Raymie and I have designed the solution in layers - one layer that is capable of writing and reading columnar store. The column to data item assignment are a separate layer. We have a default that assigns each item to a separate column up to a depth. But the users can supply their own custom column assignments. Yet another layer is the ability to put the columns in a block. In case someone wants to use file-based columnar storage, they can do so easily on top the first two layers. For now, unions have a single column, irrespective of the number and type of branches. I don't think extending the codec mechanism for supporting columnar store will work. Columnar store is orthogonal to codecs. Codecs are about storing blocks with compression. Decoders decide how the contents should be interpreted. I think, the way to support columnar storage is to replace binary encoder/decoder with columnar encoder/decoders. I've demonstrated in the Data file reader and writer. In order to support both binary encoder/decoder and columnar encoder decoder, I pushes bytesBuffered() to Encoder from BinaryEncoder and isEnd() to Decoder from BinaryDecoder. I don't think these changes will break anything. The implementation is not complete, We have to let the user of data file writer choose columnar storage instead of binarystorage. I've not implemeted it yet. One test in FileSpanStorage is failing possibly because, I think, it assumes something about the way data file stores using binary encoder. I'm not sure. If we make Data file writer/reader handle different encoders/decoders, then changes to columnar storage is reasonably well-isolated. There are a couple of unrelated changes (in tools) that were required to make the tests pass on my machine. Please ignore them.
          Hide
          Doug Cutting added a comment -

          Thiru, this looks great. So some issues we need to resolve are:

          • How does one specify this format to DataFileWriter? Perhaps rather than extending the codec API we might add EncoderFactory.getEncoderNamed(String) and DecoderFactory.getDecoderNamed(String)? Then we can add a setEncoding(String) method to DataFileWriter?
          • How does this integrate with compression? I suspect we should compress each column separately, so the compression codec needs to be invoked on each buffer before it's written. This means that the Encoder must know about the compression codec.
          • How is the format indicated in the file itself? While it may not have made sense to implement this as a codec, it might make sense to use the "avro.codec" metadata field, as readers should already check this. We might use, e.g., values like "column+snappy".

          I think it would be perfectly acceptable if the initial version only supported a particular compression codec, Snappy, and that compression codec was always turned on. A big advantage of the column representation should be improved compression, and Snappy's fast enough that using it all of the time doesn't cost much.

          Show
          Doug Cutting added a comment - Thiru, this looks great. So some issues we need to resolve are: How does one specify this format to DataFileWriter? Perhaps rather than extending the codec API we might add EncoderFactory.getEncoderNamed(String) and DecoderFactory.getDecoderNamed(String)? Then we can add a setEncoding(String) method to DataFileWriter? How does this integrate with compression? I suspect we should compress each column separately, so the compression codec needs to be invoked on each buffer before it's written. This means that the Encoder must know about the compression codec. How is the format indicated in the file itself? While it may not have made sense to implement this as a codec, it might make sense to use the "avro.codec" metadata field, as readers should already check this. We might use, e.g., values like "column+snappy". I think it would be perfectly acceptable if the initial version only supported a particular compression codec, Snappy, and that compression codec was always turned on. A big advantage of the column representation should be improved compression, and Snappy's fast enough that using it all of the time doesn't cost much.
          Hide
          Scott Carey added a comment -

          For now, unions have a single column, irrespective of the number and type of branches.

          I think "For now" = Forever? This is a binary format, I think we should make unions columnar as well. Backwards compatibility will be hard and high maintenance.

          How does this integrate with compression? I suspect we should compress each column separately, so the compression codec needs to be invoked on each buffer before it's written. This means that the Encoder must know about the compression codec.

          This depends on what the goal is. If we want to avoid decompressing columns that are not accessed, we will need to do that. Otherwise it is not necessary and compression ratios will be best if large blocks are compressed as a unit with all columns.

          and Snappy's fast enough that using it all of the time doesn't cost much.

          Yes, the CRC currently used costs more than Snappy.

          Show
          Scott Carey added a comment - For now, unions have a single column, irrespective of the number and type of branches. I think "For now" = Forever? This is a binary format, I think we should make unions columnar as well. Backwards compatibility will be hard and high maintenance. How does this integrate with compression? I suspect we should compress each column separately, so the compression codec needs to be invoked on each buffer before it's written. This means that the Encoder must know about the compression codec. This depends on what the goal is. If we want to avoid decompressing columns that are not accessed, we will need to do that. Otherwise it is not necessary and compression ratios will be best if large blocks are compressed as a unit with all columns. and Snappy's fast enough that using it all of the time doesn't cost much. Yes, the CRC currently used costs more than Snappy.
          Hide
          Doug Cutting added a comment -

          > I think we should make unions columnar as well.

          That would be nice, but I'd rather we have something useful sooner than something perfect later. We can extend it later in a backward-compatible manner. It would not be forward compatible, but that might be acceptable as long as there's only a single implementation (Java).

          > If we want to avoid decompressing columns that are not accessed [ ... ]

          I think the advantage of a columnar format is to avoid touching data that's not needed, and avoiding decompression is consistent with that.

          Show
          Doug Cutting added a comment - > I think we should make unions columnar as well. That would be nice, but I'd rather we have something useful sooner than something perfect later. We can extend it later in a backward-compatible manner. It would not be forward compatible, but that might be acceptable as long as there's only a single implementation (Java). > If we want to avoid decompressing columns that are not accessed [ ... ] I think the advantage of a columnar format is to avoid touching data that's not needed, and avoiding decompression is consistent with that.
          Hide
          Jeff Hammerbacher added a comment -

          > I think the advantage of a columnar format is to avoid touching data that's not needed, and avoiding decompression is consistent with that.

          During the design of RCFile, the folks from Facebook found that packing a few columns together into a file made for better performance than putting a single column into the file. There's a trade-off between CPU consumed in deserialization and IO consumed in pulling the data off of disk. Avoiding decompressing columns that are not accessed seemed to be important for Hive performance.

          Show
          Jeff Hammerbacher added a comment - > I think the advantage of a columnar format is to avoid touching data that's not needed, and avoiding decompression is consistent with that. During the design of RCFile, the folks from Facebook found that packing a few columns together into a file made for better performance than putting a single column into the file. There's a trade-off between CPU consumed in deserialization and IO consumed in pulling the data off of disk. Avoiding decompressing columns that are not accessed seemed to be important for Hive performance.
          Hide
          Jeff Hammerbacher added a comment -

          In addition to RCFile, it's also worth comparing this format to the CIF format proposed by IBM Research: http://pages.cs.wisc.edu/~jignesh/publ/colMR.pdf

          Show
          Jeff Hammerbacher added a comment - In addition to RCFile, it's also worth comparing this format to the CIF format proposed by IBM Research: http://pages.cs.wisc.edu/~jignesh/publ/colMR.pdf
          Hide
          Doug Cutting added a comment -

          Yes, CIF file looks promising. It's great to see all the benchmarks!

          I wonder if the advantages of CIF could be had without a custom HDFS block placement strategy? For example, one might pack the files of a split directory into a single file whose block size was set to the size of the file, forcing it into a single block. This would guarantee locality for the columns of a split.

          In other words, instead of groups of column-major records within a file ("block columnar" in Raymie's document) on one hand or a file-per-column on the other ("file columnar"), we have a single group per file. Since splits might often be bigger than RAM, creating these would probably require two steps: writing a set of temporary local files, one per column, then appending these into the final output. The file would have an index indicating where each column lies, and each column within the file would permit efficient skipping, in the style of CIF.

          Show
          Doug Cutting added a comment - Yes, CIF file looks promising. It's great to see all the benchmarks! I wonder if the advantages of CIF could be had without a custom HDFS block placement strategy? For example, one might pack the files of a split directory into a single file whose block size was set to the size of the file, forcing it into a single block. This would guarantee locality for the columns of a split. In other words, instead of groups of column-major records within a file ("block columnar" in Raymie's document) on one hand or a file-per-column on the other ("file columnar"), we have a single group per file. Since splits might often be bigger than RAM, creating these would probably require two steps: writing a set of temporary local files, one per column, then appending these into the final output. The file would have an index indicating where each column lies, and each column within the file would permit efficient skipping, in the style of CIF.
          Hide
          Doug Cutting added a comment -

          I've implemented a new column-file format at:

          https://github.com/cutting/trevni

          This supports writing Avro data.

          If folks find this useful then I intend to contribute it to Apache.

          Show
          Doug Cutting added a comment - I've implemented a new column-file format at: https://github.com/cutting/trevni This supports writing Avro data. If folks find this useful then I intend to contribute it to Apache.
          Hide
          Raymie Stata added a comment -

          This is the second attempt at a column-major codec. The whole goal of col-major formats is to optimize performance. Thus, to drive this exercise forward it seems necessary to have some kind of benchmark to do some testing. (I don't think a micro-benchmark is sufficient – rather the right benchmark is with a query planner (Hive?) that can take advantage of these formats.) With such a benchmark in place, we'd compare the performance of the existing row-major (as a baseline) Avro formats with the various, proposed col-major formats to make sure that we're getting the kind of performance improvements (2x, 4x or more) to justify the complexity of a col-major format.

          Some comments more specific to this proposal: First, I'd like to see the Type Mapping section for Avro filled in; this would give us a much better idea of what you're trying. Second, at first glance, it seems like your design replicates some of the features of RCFiles that the CIF paper claims cause performance problems (but, again, maybe this issue is better addressed via some benchmarking).

          Regarding your implementation of this proposal, it re-implements all the lower-levels of Avro. It seems like this double-implementation will be a maintenance problem.

          Show
          Raymie Stata added a comment - This is the second attempt at a column-major codec. The whole goal of col-major formats is to optimize performance. Thus, to drive this exercise forward it seems necessary to have some kind of benchmark to do some testing. (I don't think a micro-benchmark is sufficient – rather the right benchmark is with a query planner (Hive?) that can take advantage of these formats.) With such a benchmark in place, we'd compare the performance of the existing row-major (as a baseline) Avro formats with the various, proposed col-major formats to make sure that we're getting the kind of performance improvements (2x, 4x or more) to justify the complexity of a col-major format. Some comments more specific to this proposal: First, I'd like to see the Type Mapping section for Avro filled in; this would give us a much better idea of what you're trying. Second, at first glance, it seems like your design replicates some of the features of RCFiles that the CIF paper claims cause performance problems (but, again, maybe this issue is better addressed via some benchmarking). Regarding your implementation of this proposal, it re-implements all the lower-levels of Avro. It seems like this double-implementation will be a maintenance problem.
          Hide
          Doug Cutting added a comment -

          Raymie, thanks for your thoughts.

          I agree that benchmarks are needed. The best benchmarks are real applications. I provided code that folks can try now in their MapReduce applications. I have not yet had a chance to integrate this with Hive by writing a SerDe, but that is an obvious next step. (I've never written a SerDe. If someone else has perhaps they can help.)

          Do you have any datasets or queries that you'd like to propose as benchmarks?

          I'll work on better documenting the type mapping for Avro, since that's been implemented.

          I'll be on offline next week and won't be able to work more on this (or respond) until the week after.

          Show
          Doug Cutting added a comment - Raymie, thanks for your thoughts. I agree that benchmarks are needed. The best benchmarks are real applications. I provided code that folks can try now in their MapReduce applications. I have not yet had a chance to integrate this with Hive by writing a SerDe, but that is an obvious next step. (I've never written a SerDe. If someone else has perhaps they can help.) Do you have any datasets or queries that you'd like to propose as benchmarks? I'll work on better documenting the type mapping for Avro, since that's been implemented. I'll be on offline next week and won't be able to work more on this (or respond) until the week after.
          Hide
          Raymie Stata added a comment -

          In about a month we will have some Hive benchmarks, but the data won't be very wide, so they won't be good for testing column-major formats. However, maybe we should walk before we run: If someone puts Avro SerDe's in place against the regular Avro format, we could benchmark and maybe even help tune that configuration, which would provide a baseline for testing a column-major configuration. (Unfortunately, we can't do the SerDe work itself.)

          Show
          Raymie Stata added a comment - In about a month we will have some Hive benchmarks, but the data won't be very wide, so they won't be good for testing column-major formats. However, maybe we should walk before we run: If someone puts Avro SerDe's in place against the regular Avro format, we could benchmark and maybe even help tune that configuration, which would provide a baseline for testing a column-major configuration. (Unfortunately, we can't do the SerDe work itself.)
          Hide
          Jakob Homan added a comment -

          The Avro Serde already exists: https://github.com/jghoman/haivvreo and is being ported to Hive directly. I'd love to test this format with it, but I in all likelihood won't have time in the next month. If someone wants to do that, it'd be great. Shouldn't be too hard at all.

          Show
          Jakob Homan added a comment - The Avro Serde already exists: https://github.com/jghoman/haivvreo and is being ported to Hive directly. I'd love to test this format with it, but I in all likelihood won't have time in the next month. If someone wants to do that, it'd be great. Shouldn't be too hard at all.
          Hide
          alex gemini added a comment -

          1.the file header should have some extra space in case we add some column or append some value to block end.
          2.trevni set different codec for each column,I think there is different between compression,decompression and encoding,decoding:for example,run length encoding will have high compression ratio by sort them first.we change the sequence of data within a minor block(say 64k) but still guarantee the whole block(say hdfs block 128M) will looks same as before(except that data sequence change).so,use different Codec for each column maybe not get better performance or compression ratio than a single codec for all column.I think different column need different encodec (for example run length encodec which we didn't implement yet).

          Show
          alex gemini added a comment - 1.the file header should have some extra space in case we add some column or append some value to block end. 2.trevni set different codec for each column,I think there is different between compression,decompression and encoding,decoding:for example,run length encoding will have high compression ratio by sort them first.we change the sequence of data within a minor block(say 64k) but still guarantee the whole block(say hdfs block 128M) will looks same as before(except that data sequence change).so,use different Codec for each column maybe not get better performance or compression ratio than a single codec for all column.I think different column need different encodec (for example run length encodec which we didn't implement yet).
          Hide
          alex gemini added a comment -

          compression always treat everything as raw bytes,encoding and decoding only apply to a certain pattern.The encodec and decodec should be separated,for example for dictionary encoding(like email column or address column),we will decoding it only when we need exact value,if we just need to count the total number of row,the application program will tell how fileformat treat that.For simplicity,we always decoding it first.

          Show
          alex gemini added a comment - compression always treat everything as raw bytes,encoding and decoding only apply to a certain pattern.The encodec and decodec should be separated,for example for dictionary encoding(like email column or address column),we will decoding it only when we need exact value,if we just need to count the total number of row,the application program will tell how fileformat treat that.For simplicity,we always decoding it first.
          Hide
          Doug Cutting added a comment -

          I'd like to bring the Trevni (https://github.com/cutting/trevni) code to Apache. How do folks think we ought to do this?

          Trevni's serialization, columns of scalar values, is different from Avro. It doesn't require a schema, but implements a mapping between Avro schemas and columns. I see three options:

          1. incorporate it into Avro's lang/* tree. Currently Trevni has only a Java implementation, so its code could be merged into lang/java as new modules. The trevni-core module is independent of Avro, but the trevni-avro module depends on other Avro modules.
          2. have an independent code tree for Trevni but still managed by the Avro PMC. If we expect that folks from Avro will also be the folks who work on Trevni then having the Avro project produce Trevni as a separately released product might be reasonable.
          3. submit an incubator proposal for Trevni, aiming for an independent TLP. I worry that Trevni's too small to sustain an independent community and that bundling it with Avro might be best.

          Which do folks think is best? I'm leaning towards (1). Any objections to that? If not, I'll prepare a patch.

          Show
          Doug Cutting added a comment - I'd like to bring the Trevni ( https://github.com/cutting/trevni ) code to Apache. How do folks think we ought to do this? Trevni's serialization, columns of scalar values, is different from Avro. It doesn't require a schema, but implements a mapping between Avro schemas and columns. I see three options: incorporate it into Avro's lang/* tree. Currently Trevni has only a Java implementation, so its code could be merged into lang/java as new modules. The trevni-core module is independent of Avro, but the trevni-avro module depends on other Avro modules. have an independent code tree for Trevni but still managed by the Avro PMC. If we expect that folks from Avro will also be the folks who work on Trevni then having the Avro project produce Trevni as a separately released product might be reasonable. submit an incubator proposal for Trevni, aiming for an independent TLP. I worry that Trevni's too small to sustain an independent community and that bundling it with Avro might be best. Which do folks think is best? I'm leaning towards (1). Any objections to that? If not, I'll prepare a patch.
          Hide
          Scott Carey added a comment -

          I think (1) is the best way to start. We could easily transition to (2) if that made sense due to other language implementations, and (3) if it grows big enough.

          We may want to identify it separately as 'evolving' or similar so that API changes in the next couple releases if needed can be managed more flexibly.

          Show
          Scott Carey added a comment - I think (1) is the best way to start. We could easily transition to (2) if that made sense due to other language implementations, and (3) if it grows big enough. We may want to identify it separately as 'evolving' or similar so that API changes in the next couple releases if needed can be managed more flexibly.
          Hide
          Jakob Homan added a comment -

          (1) sounds good.

          I'm curious about the decision to go with 1 row group/block and 1 block/file. The recommendation is to increase the block size, up to a 1 gb or more. Any background on this design decision? Any reason not to go with multiple, smaller row groups per file ala RCFile?

          Show
          Jakob Homan added a comment - (1) sounds good. I'm curious about the decision to go with 1 row group/block and 1 block/file. The recommendation is to increase the block size, up to a 1 gb or more. Any background on this design decision? Any reason not to go with multiple, smaller row groups per file ala RCFile?
          Hide
          Doug Cutting added a comment -

          Jakob, this is discussed in the spec and also in the CIF paper cited there.

          http://cutting.github.com/trevni/spec.html

          RCFile, with relatively small row groups, can save CPU time when skipping un-processed columns, but isn't able to save much i/o time. To save i/o time you need adjacent sequences of values that are larger than 10MB or so (so transfer time >> seek time). When values in some columns are considerably smaller than others (e.g., an int or long versus a blob) then it can take a lot of records to get 10MB or more of the smaller values. Put another way, i/o performance is optimal when there's one row group per unit of parallelism so there's just one seek per column. So a job with 100 map tasks should ideally have 100 row groups. And we want to localize these map tasks, so each group should ideally reside in a single HDFS datanode. The CIF folks achieve this by having a single-block file per column in the row group, then using a custom block placement strategy to place these on the same datanodes. Trevni instead works with stock HDFS by placing all the columns of each row group in a single file that fits in a single block. Does that make sense?

          Show
          Doug Cutting added a comment - Jakob, this is discussed in the spec and also in the CIF paper cited there. http://cutting.github.com/trevni/spec.html RCFile, with relatively small row groups, can save CPU time when skipping un-processed columns, but isn't able to save much i/o time. To save i/o time you need adjacent sequences of values that are larger than 10MB or so (so transfer time >> seek time). When values in some columns are considerably smaller than others (e.g., an int or long versus a blob) then it can take a lot of records to get 10MB or more of the smaller values. Put another way, i/o performance is optimal when there's one row group per unit of parallelism so there's just one seek per column. So a job with 100 map tasks should ideally have 100 row groups. And we want to localize these map tasks, so each group should ideally reside in a single HDFS datanode. The CIF folks achieve this by having a single-block file per column in the row group, then using a custom block placement strategy to place these on the same datanodes. Trevni instead works with stock HDFS by placing all the columns of each row group in a single file that fits in a single block. Does that make sense?
          Hide
          Jakob Homan added a comment -

          Yes, that's all reasonable. My concern is just enforcing a 1:1:1 relationship between row groups, blocks and files. RCFile's very tiny recommended row group size (4mb, I believe), certainly don't make sense from an IO perspective. But if our only ability to increase parallelism on trevni files is to decrease the size of row groups (and correspondingly increase the number of files), this may be a problem. It's not required to enforce a 1:1:1 relationship in the file; one could still have row groups large enough to make it worth the IO (and still split on block boundaries), but have multiple of them within a single trevni file. This could certainly be supported as an option.

          Either way, this is looking good.

          Show
          Jakob Homan added a comment - Yes, that's all reasonable. My concern is just enforcing a 1:1:1 relationship between row groups, blocks and files. RCFile's very tiny recommended row group size (4mb, I believe), certainly don't make sense from an IO perspective. But if our only ability to increase parallelism on trevni files is to decrease the size of row groups (and correspondingly increase the number of files), this may be a problem. It's not required to enforce a 1:1:1 relationship in the file; one could still have row groups large enough to make it worth the IO (and still split on block boundaries), but have multiple of them within a single trevni file. This could certainly be supported as an option. Either way, this is looking good.
          Hide
          Tom White added a comment -

          +1 for option (1)

          Show
          Tom White added a comment - +1 for option (1)
          Hide
          Raymie Stata added a comment -

          I'm strongly against option (1). Trevni is clearly designed to use Avro (and Thrift and Protocol Buffers). Indeed, it's code is in org.apache.trevni, not org.apache.avro.trevni (which it should be – I'm not suggesting otherwise). Further, I can imagine large number of Avro usages – perhaps the majority – not wanting Trevni. So it seems like option (1) creates unnecessary coupling – and just sets us up for refactoring the code base in the future.

          Regarding option (2), I think that's bad for Trevni. If you want to recruit members of the Thrift and Protocol Buffers communities to the Trevni project, it seems like a mistake to put Trevni into the Avro project. You write "I worry that Trevni's too small to sustain an independent community," but this has the danger of becoming a self-fulfilling prophesy. For example, as far as I can tell, the Thrift and Protocol Buffer communities are larger than Avro's, so from a recruiting perspective, aren't you working to fulfill your "small community" prophesy by dropping Trevni into the smallest community of the three?

          However, if you believe that the Avro project is currently the best home for Trevni, then I believe a top-level trevni/ directory in the Avro project is the right place for it.

          Show
          Raymie Stata added a comment - I'm strongly against option (1). Trevni is clearly designed to use Avro (and Thrift and Protocol Buffers). Indeed, it's code is in org.apache.trevni, not org.apache.avro.trevni (which it should be – I'm not suggesting otherwise). Further, I can imagine large number of Avro usages – perhaps the majority – not wanting Trevni. So it seems like option (1) creates unnecessary coupling – and just sets us up for refactoring the code base in the future. Regarding option (2), I think that's bad for Trevni. If you want to recruit members of the Thrift and Protocol Buffers communities to the Trevni project, it seems like a mistake to put Trevni into the Avro project. You write "I worry that Trevni's too small to sustain an independent community," but this has the danger of becoming a self-fulfilling prophesy. For example, as far as I can tell, the Thrift and Protocol Buffer communities are larger than Avro's, so from a recruiting perspective, aren't you working to fulfill your "small community" prophesy by dropping Trevni into the smallest community of the three? However, if you believe that the Avro project is currently the best home for Trevni, then I believe a top-level trevni/ directory in the Avro project is the right place for it.
          Hide
          Doug Cutting added a comment -

          Here's a patch that implements (1) above.

          Show
          Doug Cutting added a comment - Here's a patch that implements (1) above.
          Hide
          Doug Cutting added a comment -

          Raymie, putting the code in Avro's lang/java directory does not in fact create any tight coupling. Trevni's core module still does not depend on anything in Avro. Avro releases will contain a trevni-core.jar file that depends on nothing else in Avro.

          That said, you earlier complained (above) that Trevni's core implementation doesn't share enough with Avro. Placing these in a common project would permit such sharing if it was desired. For example, as you indicated, they might share an optimized library for serialization and deserialization of scalar values.

          The patch I've attached makes no changes to Trevni code, only to build files so that Trevni's source code can be placed in Avro's lang/java tree. Committing this patch says just two things:

          1. The Avro PMC will maintain the Trevni code (for now); and
          2. Avro and Trevni will be released together (for now);

          It does not say anything about other couplings or dependencies between the Avro and Trevni implementations or specifications. That's all for future discussion.

          Show
          Doug Cutting added a comment - Raymie, putting the code in Avro's lang/java directory does not in fact create any tight coupling. Trevni's core module still does not depend on anything in Avro. Avro releases will contain a trevni-core.jar file that depends on nothing else in Avro. That said, you earlier complained (above) that Trevni's core implementation doesn't share enough with Avro. Placing these in a common project would permit such sharing if it was desired. For example, as you indicated, they might share an optimized library for serialization and deserialization of scalar values. The patch I've attached makes no changes to Trevni code, only to build files so that Trevni's source code can be placed in Avro's lang/java tree. Committing this patch says just two things: The Avro PMC will maintain the Trevni code (for now); and Avro and Trevni will be released together (for now); It does not say anything about other couplings or dependencies between the Avro and Trevni implementations or specifications. That's all for future discussion.
          Hide
          Doug Cutting added a comment -

          Raymie said: "... the Thrift and Protocol Buffer communities are larger than Avro's... ".

          That's certainly true for RPC, but I doubt it's true for persistent data storage. Neither Protocol Buffers nor Thrift even offer a standard file format. In that realm I think Avro leads both. Trevni is a file format. We can shred Protocol Buffer and Thrift data structures to it regardless of where it lives in the code tree.

          Show
          Doug Cutting added a comment - Raymie said: "... the Thrift and Protocol Buffer communities are larger than Avro's... ". That's certainly true for RPC, but I doubt it's true for persistent data storage. Neither Protocol Buffers nor Thrift even offer a standard file format. In that realm I think Avro leads both. Trevni is a file format. We can shred Protocol Buffer and Thrift data structures to it regardless of where it lives in the code tree.
          Hide
          Doug Cutting added a comment -

          Jakob, I think the more common case will be that fields whose values are small will produce small columns where seek time becomes significant. When seek time is significant the returns of greater parallelism are diminished unless replication is also increased, which is unlikely.

          With multiple row groups per file you have to choose a size for the row groups. Would you ever choose a size smaller than 64MB, the typical HDFS block size? Column files are only an advantage when there are multiple columns, so the amount read will typically be a fraction of the row group size.

          What cases do you imagine where having a row group size less than a file is useful?

          Show
          Doug Cutting added a comment - Jakob, I think the more common case will be that fields whose values are small will produce small columns where seek time becomes significant. When seek time is significant the returns of greater parallelism are diminished unless replication is also increased, which is unlikely. With multiple row groups per file you have to choose a size for the row groups. Would you ever choose a size smaller than 64MB, the typical HDFS block size? Column files are only an advantage when there are multiple columns, so the amount read will typically be a fraction of the row group size. What cases do you imagine where having a row group size less than a file is useful?
          Hide
          Doug Cutting added a comment -

          Also, Jakob, I may have overstated things a bit above. With a Trevni file one can have a higher degree of parallelism than one processor per file. A Trevni file can be efficiently split into row ranges. So a file with 1M rows could be processed as 10 tasks, each processing 100k rows. Values are chunked into ~64k compressed blocks and only those overlapping with the specified range would need to be decompressed and processed by a task.

          Show
          Doug Cutting added a comment - Also, Jakob, I may have overstated things a bit above. With a Trevni file one can have a higher degree of parallelism than one processor per file. A Trevni file can be efficiently split into row ranges. So a file with 1M rows could be processed as 10 tasks, each processing 100k rows. Values are chunked into ~64k compressed blocks and only those overlapping with the specified range would need to be decompressed and processed by a task.
          Hide
          Doug Cutting added a comment -

          Anyone object to me committing the attached patch, which keeps Trevni in its own packages, jar files and maven modules, but places its code in the lang/java tree so it's released along with Avro? If not, I'll commit it soon.

          Show
          Doug Cutting added a comment - Anyone object to me committing the attached patch, which keeps Trevni in its own packages, jar files and maven modules, but places its code in the lang/java tree so it's released along with Avro? If not, I'll commit it soon.
          Hide
          Doug Cutting added a comment -

          I committed this.

          Show
          Doug Cutting added a comment - I committed this.

            People

            • Assignee:
              Doug Cutting
              Reporter:
              Doug Cutting
            • Votes:
              6 Vote for this issue
              Watchers:
              33 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development