Avro
  1. Avro
  2. AVRO-160

file format should be friendly to streaming

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: spec
    • Labels:
      None
    • Hadoop Flags:
      Incompatible change

      Description

      It should be possible to stream through an Avro data file without seeking to the end.

      Currently the interpretation is that schemas written to the file apply to all entries before them. If this were changed so that they instead apply to all entries that follow, and the initial schema is written at the start of the file, then streaming could be supported.

      Note that the only change permitted to a schema as a file is written is to, if it is a union, to add new branches at the end of that union. If it is not a union, no changes may be made. So it is still the case that the final schema in a file can read every entry in the file and thus may be used to randomly access the file.

      1. AVRO-160-python.patch
        15 kB
        Jeff Hammerbacher
      2. AVRO-160.patch
        33 kB
        Doug Cutting
      3. AVRO-160.patch
        39 kB
        Doug Cutting
      4. AVRO-160.patch
        41 kB
        Doug Cutting
      5. AVRO-160.patch
        41 kB
        Doug Cutting

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          This was suggested by Thiru.

          Show
          Doug Cutting added a comment - This was suggested by Thiru.
          Hide
          Matt Massie added a comment -

          +1

          Show
          Matt Massie added a comment - +1
          Hide
          Philip Zeyliger added a comment -

          I think you'll also have to get rid of getCount(), no? I won't miss it.

          Show
          Philip Zeyliger added a comment - I think you'll also have to get rid of getCount(), no? I won't miss it.
          Hide
          Doug Cutting added a comment -

          > I think you'll also have to get rid of getCount(), no? I won't miss it.

          It would certainly be unavailable to a streaming reader. But the existing reader is not streaming, but random access, in order to support splitting and seeking.

          However this raises the question of whether we should continue to also flush the metadata on close. If so, then we could still keep a count of entries. To support random access and schema changes, we need to be able to read the final schema in the file. To make this efficient, we must flush the metadata at close, otherwise we'll have to scan for the final metadata. So, entries that appear after the last metadata would not be included in the count, but such entries should only occur if a program crashes, and might reasonably be considered to be after the last "commit point" and hence ignored.

          So giving up on counts is also giving up on the ability to change the schema. The motivation for putting metadata at the end rather than in a header was precisely to permit appends that alter metadata.

          So my inclination is to continue to ignore entries after the final metadata, that it always re-iterates the penultimate metadata. Each time the schema changes, the metadata must be re-written, to support schema changes for streaming readers. And when the file is closed, metadata is also dumped.

          To be consistent, we might prohibit appends that do not first truncate to the end of the last metadata. A streaming reader might still read these, however. Sigh. This is getting messy!

          Show
          Doug Cutting added a comment - > I think you'll also have to get rid of getCount(), no? I won't miss it. It would certainly be unavailable to a streaming reader. But the existing reader is not streaming, but random access, in order to support splitting and seeking. However this raises the question of whether we should continue to also flush the metadata on close. If so, then we could still keep a count of entries. To support random access and schema changes, we need to be able to read the final schema in the file. To make this efficient, we must flush the metadata at close, otherwise we'll have to scan for the final metadata. So, entries that appear after the last metadata would not be included in the count, but such entries should only occur if a program crashes, and might reasonably be considered to be after the last "commit point" and hence ignored. So giving up on counts is also giving up on the ability to change the schema. The motivation for putting metadata at the end rather than in a header was precisely to permit appends that alter metadata. So my inclination is to continue to ignore entries after the final metadata, that it always re-iterates the penultimate metadata. Each time the schema changes, the metadata must be re-written, to support schema changes for streaming readers. And when the file is closed, metadata is also dumped. To be consistent, we might prohibit appends that do not first truncate to the end of the last metadata. A streaming reader might still read these, however. Sigh. This is getting messy!
          Hide
          Philip Zeyliger added a comment -

          Note that the only change permitted to a schema as a file is written is to, if it is a union, to add new branches at the end of that union. If it is not a union, no changes may be made. So it is still the case that the final schema in a file can read every entry in the file and thus may be used to randomly access the file.

          For some reason, I thought you could go from

          { "type": "record", "name": "foo", "namespace": "foo", "fields": [

          { "name": "x", "type": "string" }

          ] }

          to

          { "type": "record", "name": "foo", "namespace": "foo", "fields": [

          { "name": "x", "type": "string" }

          ,

          {"name": "y", "type": ["null", "string"] ] }

          (That is, adding an optional extra field within some record, not just modifying the top-level.)

          Is that not the case?

          Show
          Philip Zeyliger added a comment - Note that the only change permitted to a schema as a file is written is to, if it is a union, to add new branches at the end of that union. If it is not a union, no changes may be made. So it is still the case that the final schema in a file can read every entry in the file and thus may be used to randomly access the file. For some reason, I thought you could go from { "type": "record", "name": "foo", "namespace": "foo", "fields": [ { "name": "x", "type": "string" } ] } to { "type": "record", "name": "foo", "namespace": "foo", "fields": [ { "name": "x", "type": "string" } , {"name": "y", "type": ["null", "string"] ] } (That is, adding an optional extra field within some record, not just modifying the top-level.) Is that not the case?
          Hide
          Doug Cutting added a comment -

          > Is that not the case?

          No, that would not work. You can only add clauses to unions. So you can in theory go from:

          { "type": "record", "name": "Foo", "fields": [

          { "name": "x", "type": ["int", "string"]}

          ]}

          to

          { "type": "record", "name": "Foo", "fields": [

          { "name": "x", "type": ["int", "string", "float"]}

          ]}

          However the current Java API doesn't permit this: it only permits adding clauses to a top-level union. The Java implementation could be improved to do a smarter compatibility check when you attempt to augment a file's schema.

          This restriction is created by the binary format: a record is simply serialized as its fields, with no added per-field tags or other per-record data.

          In the strict streaming case you could reset the schema entirely each time metadata is dumped. However that would prohibit random access operations.

          Perhaps the schema should instead be dumped once per block? Random access already requires that you find a block start. Changing the schema would then force a block flush. If we go this way we might also switch to using a binary format for the schema, and/or increasing the block size. Note that the DatumReader has a setSchema() method, so each time one would seek to a new block, the container could inform the DatumReader of the new schema, so that it could appropriately handle, e.g., new or missing fields.

          Show
          Doug Cutting added a comment - > Is that not the case? No, that would not work. You can only add clauses to unions. So you can in theory go from: { "type": "record", "name": "Foo", "fields": [ { "name": "x", "type": ["int", "string"]} ]} to { "type": "record", "name": "Foo", "fields": [ { "name": "x", "type": ["int", "string", "float"]} ]} However the current Java API doesn't permit this: it only permits adding clauses to a top-level union. The Java implementation could be improved to do a smarter compatibility check when you attempt to augment a file's schema. This restriction is created by the binary format: a record is simply serialized as its fields, with no added per-field tags or other per-record data. In the strict streaming case you could reset the schema entirely each time metadata is dumped. However that would prohibit random access operations. Perhaps the schema should instead be dumped once per block? Random access already requires that you find a block start. Changing the schema would then force a block flush. If we go this way we might also switch to using a binary format for the schema, and/or increasing the block size. Note that the DatumReader has a setSchema() method, so each time one would seek to a new block, the container could inform the DatumReader of the new schema, so that it could appropriately handle, e.g., new or missing fields.
          Hide
          Philip Zeyliger added a comment -

          Ok, that makes sense.

          For some reason, I thought you could write AAAAAXBBBBBBY where records A are written with schema X, and then records B are written with schema Y, where X and Y are resolvable using schema resolution. But that doesn't work because though X and Y may be resolvable, they may not have the same serialization.

          So, it turns out there are two types of schema compatibility: writer-reader compatibility, which means that we can read when we have both schemas available, and writer-writer compatibility, which concerns whether we can read (or write) data with only one of the two schemas. I don't like those names, though.

          There's something appealing about writing the schema frequently. You could also store an offset pointer to the schema in every block header, instead of the entire thing.

          What use cases are you thinking about?

          • Map/reduce outputs tend to be uniform, since it's unlikely that a M/R program changes its output in media res.
          • Map/reduce inputs might be heterogeneous because you're combining logs from last year with logs from this year, though it's likely that individual files are homogeneous. (And if you bother to combine files, you may as well do the schema resolution as part of the concatentation, and keep the new file homogeneous.)
          • HBase cells are not likely to use this format, but rather keep the schema per column.
          • An individual program's log files are likely to be homogeneous. There's no harm in starting a new log file when you upgrade, rather than appending to the old one.
          Show
          Philip Zeyliger added a comment - Ok, that makes sense. For some reason, I thought you could write AAAAAXBBBBBBY where records A are written with schema X, and then records B are written with schema Y, where X and Y are resolvable using schema resolution. But that doesn't work because though X and Y may be resolvable, they may not have the same serialization. So, it turns out there are two types of schema compatibility: writer-reader compatibility, which means that we can read when we have both schemas available, and writer-writer compatibility, which concerns whether we can read (or write) data with only one of the two schemas. I don't like those names, though. There's something appealing about writing the schema frequently. You could also store an offset pointer to the schema in every block header, instead of the entire thing. What use cases are you thinking about? Map/reduce outputs tend to be uniform, since it's unlikely that a M/R program changes its output in media res. Map/reduce inputs might be heterogeneous because you're combining logs from last year with logs from this year, though it's likely that individual files are homogeneous. (And if you bother to combine files, you may as well do the schema resolution as part of the concatentation, and keep the new file homogeneous.) HBase cells are not likely to use this format, but rather keep the schema per column. An individual program's log files are likely to be homogeneous. There's no harm in starting a new log file when you upgrade, rather than appending to the old one.
          Hide
          Scott Carey added a comment -

          Sounds to me like this file type is trying to be too many things.

          Perhaps this type should not be optimized for random access. Perhaps it is possible but slow, seeking forward or back for a metadata block to find the schema for that block?

          A file type optimized for random access would either need embedded indexes or external indexes anyway – at minimum indexes to the start of each block. And it has very different "schema visibility and compatibility" requirements.

          I believe that if this main file type is optimized for streaming writes and reads, and possible appending writes and "seek and stream" reads, many challenges are simplified.
          It will be simpler, easier to test and implement, and meet the majority of use cases.

          A format designed with random access in mind can come later. I suspect that due to the "schema visibility" requirements of random access there will be significant differences not just in implementation, but API.
          Additionally, it may be possible for an index file to encapsulate all the random access concerns and use the above format for its data storage. For example, the index over the raw file can be built by one streaming read, and modified with each appends write.

          Show
          Scott Carey added a comment - Sounds to me like this file type is trying to be too many things. Perhaps this type should not be optimized for random access. Perhaps it is possible but slow, seeking forward or back for a metadata block to find the schema for that block? A file type optimized for random access would either need embedded indexes or external indexes anyway – at minimum indexes to the start of each block. And it has very different "schema visibility and compatibility" requirements. I believe that if this main file type is optimized for streaming writes and reads, and possible appending writes and "seek and stream" reads, many challenges are simplified. It will be simpler, easier to test and implement, and meet the majority of use cases. A format designed with random access in mind can come later. I suspect that due to the "schema visibility" requirements of random access there will be significant differences not just in implementation, but API. Additionally, it may be possible for an index file to encapsulate all the random access concerns and use the above format for its data storage. For example, the index over the raw file can be built by one streaming read, and modified with each appends write.
          Hide
          Doug Cutting added a comment -

          > Perhaps this type should not be optimized for random access.

          For mapreduce, we need to be able to seek to an arbitrary point in the file, then scan to the next sync point and start reading the file. That's mostly what I mean by random access.

          It should also be possible to layer indexes on top of this, to support random access by key. Indexes might be stored as side files, or perhaps in the file's metadata. To support these, it should be possible to ask, while writing, the position of the current block start, so that one may store that in an index and subsequently seek to it, then scan the block for the desired entry.

          Let me elaborate on my last proposal. We put a schema at the start of every block. Every entry in a block must use the same schema. If you change the schema while writing, then you must start writing a new block. In effect, the schema is a compression dictionary for the block. (Blocks are also the unit of compression.)

          Benefits:

          • supports streaming
          • supports random access
          • permits arbitrary schema changes

          Costs:

          • increases the file size, but this can be ameliorated by:
            • writing the schema in binary (using a schema for schemas) and/or
            • writing larger blocks

          I think it still may make sense to flush metadata at the end of the file. It may no longer contain the schema, but it can contain things like counts and indexes. Streaming applications would not be able to use this, but other applications might find it very useful. Side files in HDFS are expensive.

          Show
          Doug Cutting added a comment - > Perhaps this type should not be optimized for random access. For mapreduce, we need to be able to seek to an arbitrary point in the file, then scan to the next sync point and start reading the file. That's mostly what I mean by random access. It should also be possible to layer indexes on top of this, to support random access by key. Indexes might be stored as side files, or perhaps in the file's metadata. To support these, it should be possible to ask, while writing, the position of the current block start, so that one may store that in an index and subsequently seek to it, then scan the block for the desired entry. Let me elaborate on my last proposal. We put a schema at the start of every block. Every entry in a block must use the same schema. If you change the schema while writing, then you must start writing a new block. In effect, the schema is a compression dictionary for the block. (Blocks are also the unit of compression.) Benefits: supports streaming supports random access permits arbitrary schema changes Costs: increases the file size, but this can be ameliorated by: writing the schema in binary (using a schema for schemas) and/or writing larger blocks I think it still may make sense to flush metadata at the end of the file. It may no longer contain the schema, but it can contain things like counts and indexes. Streaming applications would not be able to use this, but other applications might find it very useful. Side files in HDFS are expensive.
          Hide
          Doug Cutting added a comment -

          > You could also store an offset pointer to the schema in every block header, instead of the entire thing.

          Hmm. Seeks are expensive. If the schema never changed, and folks have to read the file header anyway, then I guess a pointer back to the first schema shouldn't create another seek, if we implement it well. But once you do change the schema, then many seeks in the file would have to do an extra seek back to read the schema. Caching would help, so maybe it's not a big problem, but, if schemas are small and fast to read, then it shouldn't be bad to put one at the start of each block. So, maybe this works...

          > What use cases are you thinking about?

          Append is one case where schemas might change: the appending program might differ from that that which originally created the file. Other cases are where folks want to lazily add schemas to a file as they write things. So, someone had an event logging system, where different daemons might log different events. You could use a schema that includes all possible events, if you knew them, or you could, on the fly, add new events to the top-level union the first time they're written.

          Show
          Doug Cutting added a comment - > You could also store an offset pointer to the schema in every block header, instead of the entire thing. Hmm. Seeks are expensive. If the schema never changed, and folks have to read the file header anyway, then I guess a pointer back to the first schema shouldn't create another seek, if we implement it well. But once you do change the schema, then many seeks in the file would have to do an extra seek back to read the schema. Caching would help, so maybe it's not a big problem, but, if schemas are small and fast to read, then it shouldn't be bad to put one at the start of each block. So, maybe this works... > What use cases are you thinking about? Append is one case where schemas might change: the appending program might differ from that that which originally created the file. Other cases are where folks want to lazily add schemas to a file as they write things. So, someone had an event logging system, where different daemons might log different events. You could use a schema that includes all possible events, if you knew them, or you could, on the fly, add new events to the top-level union the first time they're written.
          Hide
          Scott Carey added a comment -

          For mapreduce, we need to be able to seek to an arbitrary point in the file, then scan to the next sync point and start reading the file. That's mostly what I mean by random access.

          Ok, I misinterpreted. I'll call that "seek and scan" for the rest of this comment, as opposed to random access which I interpret as "go to tuple # 655321" or "read the first tuple following location X". It also is related to the limitation that all schemas in the file must be representable in one big union schema. If the requirement to read a tuple is only that the reader knows the schema in the prior metadata block, then what can be stored in one file is less restrictive.

          It should also be possible to layer indexes on top of this, to support random access by key. Indexes might be stored as side files, or perhaps in the file's metadata. To support these, it should be possible to ask, while writing, the position of the current block start, so that one may store that in an index and subsequently seek to it, then scan the block for the desired entry.

          I agree. It is useful to leave open the option for index type metadata in the metadata block. I'll add that the metadata block might also contain an index into that block to avoid scanning it (for large blocks). Unfortunately, to do this with streaming writes, the metadata block with the index must be after the block. So, perhaps the metadata block needs two types of metadata, that which describes a previous block(s) and that which describes the next one?

          This is where I start to wonder if serving too many needs in one file type is the right choice.

          Let me elaborate on my last proposal.

          I like it, but if we ever want true optimized random access (perhaps not) it would have to change or we would need side files.


          I think it still may make sense to flush metadata at the end of the file. It may no longer contain the schema, but it can contain things like counts and indexes. Streaming applications would not be able to use this, but other applications might find it very useful. Side files in HDFS are expensive.

          It definitely makes sense to flush some metadata at the end, but much of that might be optional.

          One useful thing would be the following.
          This allows MapReduce to not have to "seek and scan" but instead find the start of the metadata block nearest the HDFS block boundary. If counts are stored, it also allows basic random access by tuple number.

          When a file is closed, the last metadata block can contain the offset of each known metadata block. Perhaps this is optional, but if it exists then the input splitter can split on those boundaries and avoid seeking. When the file is appended, it can either copy-forward this crude index or keep a reference to the prior "finish" metadata block.

          Maybe, a straightforward thing to do is consider that each block in this file has a header, a data block, and a footer. The header has the schema of the tuples in the block and any other information required to read the block, like the compression codec, etc. The footer contains the tuple count and other optional info (like an index) and the length of the block. The sync marker is in every footer, and in the first block's header.

          Ok, I think I'm done with my speculation for now

          Show
          Scott Carey added a comment - For mapreduce, we need to be able to seek to an arbitrary point in the file, then scan to the next sync point and start reading the file. That's mostly what I mean by random access. Ok, I misinterpreted. I'll call that "seek and scan" for the rest of this comment, as opposed to random access which I interpret as "go to tuple # 655321" or "read the first tuple following location X". It also is related to the limitation that all schemas in the file must be representable in one big union schema. If the requirement to read a tuple is only that the reader knows the schema in the prior metadata block, then what can be stored in one file is less restrictive. It should also be possible to layer indexes on top of this, to support random access by key. Indexes might be stored as side files, or perhaps in the file's metadata. To support these, it should be possible to ask, while writing, the position of the current block start, so that one may store that in an index and subsequently seek to it, then scan the block for the desired entry. I agree. It is useful to leave open the option for index type metadata in the metadata block. I'll add that the metadata block might also contain an index into that block to avoid scanning it (for large blocks). Unfortunately, to do this with streaming writes, the metadata block with the index must be after the block. So, perhaps the metadata block needs two types of metadata, that which describes a previous block(s) and that which describes the next one? This is where I start to wonder if serving too many needs in one file type is the right choice. Let me elaborate on my last proposal. I like it, but if we ever want true optimized random access (perhaps not) it would have to change or we would need side files. I think it still may make sense to flush metadata at the end of the file. It may no longer contain the schema, but it can contain things like counts and indexes. Streaming applications would not be able to use this, but other applications might find it very useful. Side files in HDFS are expensive. It definitely makes sense to flush some metadata at the end, but much of that might be optional. One useful thing would be the following. This allows MapReduce to not have to "seek and scan" but instead find the start of the metadata block nearest the HDFS block boundary. If counts are stored, it also allows basic random access by tuple number. When a file is closed, the last metadata block can contain the offset of each known metadata block. Perhaps this is optional, but if it exists then the input splitter can split on those boundaries and avoid seeking. When the file is appended, it can either copy-forward this crude index or keep a reference to the prior "finish" metadata block. Maybe, a straightforward thing to do is consider that each block in this file has a header, a data block, and a footer. The header has the schema of the tuples in the block and any other information required to read the block, like the compression codec, etc. The footer contains the tuple count and other optional info (like an index) and the length of the block. The sync marker is in every footer, and in the first block's header. Ok, I think I'm done with my speculation for now
          Hide
          Doug Cutting added a comment -

          > It is useful to leave open the option for index type metadata in the metadata block.

          I don't see the use case. Blocks should be small enough that seek time dominates, so scanning should not be a dominant cost. Note that scanning is required anyway when blocks are compressed. Avro scanning should be at least as fast as decompression.

          > if we ever want true optimized random access [ ... ]

          So long as we're supporting compression, we'll never support seeks directly to individual entries. But blocks are a relatively constant size, so we can support constant-time access to individual entries.

          > This allows MapReduce to not have to "seek and scan" but instead find the start of the metadata block nearest the HDFS block boundary.

          Yes, if we kept a global block index, we could avoid this scan. However since HDFS blocks are ~64MB, and Avro file blocks are ~64k, the scan is less than a tenth of a percent of the overall map cost, so this is perhaps not a worthwhile optimization.

          > Maybe, a straightforward thing to do is consider that each block in this file has a header, a data block, and a footer.

          That could work. We'd also need to terminate blocks with the length of their footer metadata, so that a reader can efficiently find the last footer on open, where, by convention, global data is written.

          Show
          Doug Cutting added a comment - > It is useful to leave open the option for index type metadata in the metadata block. I don't see the use case. Blocks should be small enough that seek time dominates, so scanning should not be a dominant cost. Note that scanning is required anyway when blocks are compressed. Avro scanning should be at least as fast as decompression. > if we ever want true optimized random access [ ... ] So long as we're supporting compression, we'll never support seeks directly to individual entries. But blocks are a relatively constant size, so we can support constant-time access to individual entries. > This allows MapReduce to not have to "seek and scan" but instead find the start of the metadata block nearest the HDFS block boundary. Yes, if we kept a global block index, we could avoid this scan. However since HDFS blocks are ~64MB, and Avro file blocks are ~64k, the scan is less than a tenth of a percent of the overall map cost, so this is perhaps not a worthwhile optimization. > Maybe, a straightforward thing to do is consider that each block in this file has a header, a data block, and a footer. That could work. We'd also need to terminate blocks with the length of their footer metadata, so that a reader can efficiently find the last footer on open, where, by convention, global data is written.
          Hide
          Scott Carey added a comment -

          Yes, if we kept a global block index, we could avoid this scan. However since HDFS blocks are ~64MB, and Avro file blocks are ~64k, the scan is less than a tenth of a percent of the overall map cost, so this is perhaps not a worthwhile optimization.

          I had imagined that blocks were closer to 1MB for whatever reason. I had actually imagined that many writers would want to minimize the metadata blocks if they were not changing their schema. I had also imagined that there might be some use case where a single tuple was MBs in size (perhaps a large array, or a byte[] representing some sort of media, or the text content of a document). At that point, should there be a metadata block per tuple?

          There are a lot of trade-offs with respect to block sizes.

          At what size does the metadata block overhead represent larger overhead than storing extra information per tuple like Thrift/Protobuf? Put another way, if blocks are too small and schemas large, Avro's size advantage could be lost.

          At what size does compression become less effective? Are larger blocks better for streaming read/write performance?

          An index of all of a 1TB file's block locations is fairly small if the blocks are 4MB, but very large if they are 64KB. So the usefulness and costs of such an index varies.

          If the expectation is smaller blocks (around 64K) then seeking to the next will be cheap and my main concern is minimizing the metadata block size.

          If there is an expectation that large block sizes will be used, i'm concerned with the time it takes to find the start of a block and the efficiency of file splitting.

          Show
          Scott Carey added a comment - Yes, if we kept a global block index, we could avoid this scan. However since HDFS blocks are ~64MB, and Avro file blocks are ~64k, the scan is less than a tenth of a percent of the overall map cost, so this is perhaps not a worthwhile optimization. I had imagined that blocks were closer to 1MB for whatever reason. I had actually imagined that many writers would want to minimize the metadata blocks if they were not changing their schema. I had also imagined that there might be some use case where a single tuple was MBs in size (perhaps a large array, or a byte[] representing some sort of media, or the text content of a document). At that point, should there be a metadata block per tuple? There are a lot of trade-offs with respect to block sizes. At what size does the metadata block overhead represent larger overhead than storing extra information per tuple like Thrift/Protobuf? Put another way, if blocks are too small and schemas large, Avro's size advantage could be lost. At what size does compression become less effective? Are larger blocks better for streaming read/write performance? An index of all of a 1TB file's block locations is fairly small if the blocks are 4MB, but very large if they are 64KB. So the usefulness and costs of such an index varies. If the expectation is smaller blocks (around 64K) then seeking to the next will be cheap and my main concern is minimizing the metadata block size. If there is an expectation that large block sizes will be used, i'm concerned with the time it takes to find the start of a block and the efficiency of file splitting.
          Hide
          Doug Cutting added a comment -

          > At what size does the metadata block overhead represent larger overhead than storing extra information per tuple like Thrift/Protobuf?

          That's a hard to calculate, but I think we're a ways from that, especially if we write the schema in binary.

          > At what size does compression become less effective?

          I think we've found that, after ~64k, the compression ratio does not typically significantly improve.

          > Are larger blocks better for streaming read/write performance?

          In, e.g., mapreduce, we stream through a series of blocks, so we're still sequentially accessing ~64MB chunks, regardless of the compression-block size.

          Show
          Doug Cutting added a comment - > At what size does the metadata block overhead represent larger overhead than storing extra information per tuple like Thrift/Protobuf? That's a hard to calculate, but I think we're a ways from that, especially if we write the schema in binary. > At what size does compression become less effective? I think we've found that, after ~64k, the compression ratio does not typically significantly improve. > Are larger blocks better for streaming read/write performance? In, e.g., mapreduce, we stream through a series of blocks, so we're still sequentially accessing ~64MB chunks, regardless of the compression-block size.
          Hide
          Doug Cutting added a comment -

          I'm now having second thoughts about the current proposal to include the schema with each block. We're going through a lot of work in order to support changing the schema within a file, yet I don't actually believe that to be a common usage. I wonder if instead we should simply make the schema a part of the file header and not permit it to be modified while writing. This would support mapreduce well. If someone wishes to modify or intermix schemas, then they have to copy their data to a new file, using a new schema.

          So, my new, reductionist approach is that a data file has just:

          • a header with
            • a magic number identifying this file format (incremented from current data file)
            • a sync marker
            • a json-format schema
            • a compression codec name (default is null)
            • an avro encoding name (text/binary, default is binary)
            • optionally other, user-provided metadata
          • followed by a sequence of blocks, each with:
            • the sync marker from the header
            • the count of instances in this block
            • the length in bytes of this compressed block
            • the compressed block data
              • a sequence of 'count' entries corresponding to the header's schema

          That's it. Thoughts?

          Show
          Doug Cutting added a comment - I'm now having second thoughts about the current proposal to include the schema with each block. We're going through a lot of work in order to support changing the schema within a file, yet I don't actually believe that to be a common usage. I wonder if instead we should simply make the schema a part of the file header and not permit it to be modified while writing. This would support mapreduce well. If someone wishes to modify or intermix schemas, then they have to copy their data to a new file, using a new schema. So, my new, reductionist approach is that a data file has just: a header with a magic number identifying this file format (incremented from current data file) a sync marker a json-format schema a compression codec name (default is null) an avro encoding name (text/binary, default is binary) optionally other, user-provided metadata followed by a sequence of blocks, each with: the sync marker from the header the count of instances in this block the length in bytes of this compressed block the compressed block data a sequence of 'count' entries corresponding to the header's schema That's it. Thoughts?
          Hide
          Jeff Hammerbacher added a comment -

          Not crazy. Eric Anderson, who works on the DataSeries project at HP Labs, pointed out that he wish they had not encoded the schema with each block as well, as the use case was not common and the overhead was significant.

          Show
          Jeff Hammerbacher added a comment - Not crazy. Eric Anderson, who works on the DataSeries project at HP Labs, pointed out that he wish they had not encoded the schema with each block as well, as the use case was not common and the overhead was significant.
          Hide
          Philip Zeyliger added a comment -

          I like the simpler model. I'm pretty confident that other formats will evolve (Hadoop sure has a handful: SeqFile, MapFile, TFile, RCFile), and it's good to start way simple.

          To be clear, you're preserving appendability and splittabability, but ditching schema-evolution-for-appendability. That seems very sensible.

          How big are blocks (rule of thumb?)? DataSeries does parallel decompression, which is easily doable for this file format too, as long as blocks aren't too big.

          You could conceivably store compression codec per block. Probably not worth it.

          Should Avro allow a user-specified trailer block? It's simpler if it doesn't.

          – Philip

          Show
          Philip Zeyliger added a comment - I like the simpler model. I'm pretty confident that other formats will evolve (Hadoop sure has a handful: SeqFile, MapFile, TFile, RCFile), and it's good to start way simple. To be clear, you're preserving appendability and splittabability, but ditching schema-evolution-for-appendability. That seems very sensible. How big are blocks (rule of thumb?)? DataSeries does parallel decompression, which is easily doable for this file format too, as long as blocks aren't too big. You could conceivably store compression codec per block. Probably not worth it. Should Avro allow a user-specified trailer block? It's simpler if it doesn't. – Philip
          Hide
          Scott Carey added a comment -

          I agree, a simple format for 80%+ of the use cases is a good thing. It alleviates my prior feeling that this might be trying to do "too many things". In the future, another format (or slight variation on this) might support appends with schema changes or compression codec changes in a much simpler way than the earlier design in this ticket (store only the schema when it changes, with an index in a footer ?)

          The last thing I want to clarify is the sync marker behavior. Even with a 16 byte marker there needs to be well defined behavior for collisions and disambiguation. Is there documentation on this, or only in the code?
          8 bytes or less might be plenty depending on how the sync marker behavior on writes and reads is defined.

          There are many approaches to this –

          The first increases the cost and complexity of writing and but makes marker identification unambiguous.
          While writing, make sure no sequence of bytes match, and if there is a match, follow the match with a "is_literal" marker byte that cannot be found in what usually follows the sync marker (if what follows is a count of block entries, then encode -1?).
          The marker could even be 4 bytes with this approach. Detecting a collision on output and inserting literal marker byte may not be trivial however and surely will add overhead. But it will make seeking to block boundaries clear and error handling code on the reading side simple.

          Another way is to write blindly and then have a well defined behavior for detecting the various types of corruption possible when one assumes data after the marker is a valid header, and what to do when it happens. Although improbable, it is possible for random data to mimic a block header, and for errors to only be detected when attempting to deserialize entries. There are several cases to disambiguate a corrupted block from a normal block that happens to have the sync marker in its data. One doesn't want to accidentally skip a block without reporting corruption, or fail to read because of a collision.

          Additionally, if the marker (start of block) is aligned it would speed up the marker detection on both the writer and reader side and lessen the collision probability slightly (by a factor of the alignment width).

          There a lot of options for dealing with the sync marker and its collision and disambiguation behavior, I feel that whatever it is needs to be well defined in a specification.

          Show
          Scott Carey added a comment - I agree, a simple format for 80%+ of the use cases is a good thing. It alleviates my prior feeling that this might be trying to do "too many things". In the future, another format (or slight variation on this) might support appends with schema changes or compression codec changes in a much simpler way than the earlier design in this ticket (store only the schema when it changes, with an index in a footer ?) The last thing I want to clarify is the sync marker behavior. Even with a 16 byte marker there needs to be well defined behavior for collisions and disambiguation. Is there documentation on this, or only in the code? 8 bytes or less might be plenty depending on how the sync marker behavior on writes and reads is defined. There are many approaches to this – The first increases the cost and complexity of writing and but makes marker identification unambiguous. While writing, make sure no sequence of bytes match, and if there is a match, follow the match with a "is_literal" marker byte that cannot be found in what usually follows the sync marker (if what follows is a count of block entries, then encode -1?). The marker could even be 4 bytes with this approach. Detecting a collision on output and inserting literal marker byte may not be trivial however and surely will add overhead. But it will make seeking to block boundaries clear and error handling code on the reading side simple. Another way is to write blindly and then have a well defined behavior for detecting the various types of corruption possible when one assumes data after the marker is a valid header, and what to do when it happens. Although improbable, it is possible for random data to mimic a block header, and for errors to only be detected when attempting to deserialize entries. There are several cases to disambiguate a corrupted block from a normal block that happens to have the sync marker in its data. One doesn't want to accidentally skip a block without reporting corruption, or fail to read because of a collision. Additionally, if the marker (start of block) is aligned it would speed up the marker detection on both the writer and reader side and lessen the collision probability slightly (by a factor of the alignment width). There a lot of options for dealing with the sync marker and its collision and disambiguation behavior, I feel that whatever it is needs to be well defined in a specification.
          Hide
          Doug Cutting added a comment -

          Philip> To be clear, you're preserving appendability and splittabability, but ditching schema-evolution-for-appendability.

          The other thing we're ditching is computed metadata. Previously you could store the total count of items or an index of items in the metadata at end of file. This is certainly useful, but complicates things. Side files are the alternative, and the only problem with them is that adding more files is HDFS-unfriendly.

          Philip> How big are blocks (rule of thumb?)?

          64kB. Just big enough for compression to work and for per-block overheads to be insignificant.

          Scott> The last thing I want to clarify is the sync marker behavior.

          The assumption is that with a 16-byte marker there will never be a collision. We've never seen a collision yet with this approach in the lifetime of Hadoop. The odds are in our favor: it would take a file with well over an exabyte (1000 petabytes, 1M terabytes) before this is near likely. I suspect the chances are significantly greater that all three replicas of a file in HDFS will become corrupt. The significant advantage of this approach is that no escape processing is required: bytes can travel to and from the file unexamined in bulk.

          The count of items and bytes per block provide a checksum of sorts: if the next sync marker is not found where expected then the file is either corrupt or a collision has occurred. Corruption should be noticed by the filesystem. A collision does not cause data loss, since the file can still be processed from its start. So I suppose one could specify that, if the next sync marker is not found where expected then one should scan back earlier or later in the file (as appropriate) to find a different sync marker and start processing there.

          Show
          Doug Cutting added a comment - Philip> To be clear, you're preserving appendability and splittabability, but ditching schema-evolution-for-appendability. The other thing we're ditching is computed metadata. Previously you could store the total count of items or an index of items in the metadata at end of file. This is certainly useful, but complicates things. Side files are the alternative, and the only problem with them is that adding more files is HDFS-unfriendly. Philip> How big are blocks (rule of thumb?)? 64kB. Just big enough for compression to work and for per-block overheads to be insignificant. Scott> The last thing I want to clarify is the sync marker behavior. The assumption is that with a 16-byte marker there will never be a collision. We've never seen a collision yet with this approach in the lifetime of Hadoop. The odds are in our favor: it would take a file with well over an exabyte (1000 petabytes, 1M terabytes) before this is near likely. I suspect the chances are significantly greater that all three replicas of a file in HDFS will become corrupt. The significant advantage of this approach is that no escape processing is required: bytes can travel to and from the file unexamined in bulk. The count of items and bytes per block provide a checksum of sorts: if the next sync marker is not found where expected then the file is either corrupt or a collision has occurred. Corruption should be noticed by the filesystem. A collision does not cause data loss, since the file can still be processed from its start. So I suppose one could specify that, if the next sync marker is not found where expected then one should scan back earlier or later in the file (as appropriate) to find a different sync marker and start processing there.
          Hide
          Scott Carey added a comment -

          The other thing we're ditching is computed metadata. Previously you could store the total count of items or an index of items in the metadata at end of file. This is certainly useful, but complicates things. Side files are the alternative, and the only problem with them is that adding more files is HDFS-unfriendly.

          This problem is really an HDFS problem. A future feature for HDFS should probably make side files cheap rather than complicating issues for file type implementations (multiple data streams in one file, side files are just optional extra data streams – crc's, indexes, parity, etc). I should probably open a ticket for that.

          The assumption is that with a 16-byte marker there will never be a collision. We've never seen a collision yet with this approach in the lifetime of Hadoop. The odds are in our favor: it would take a file with well over an exabyte (1000 petabytes, 1M terabytes) before this is near likely. I suspect the chances are significantly greater that all three replicas of a file in HDFS will become corrupt. The significant advantage of this approach is that no escape processing is required: bytes can travel to and from the file unexamined in bulk.

          Sounds good, lets just make sure there is an expected standard thing to do when it does happen – even if it is just a very clear error message. It should be trivial to make test files that cause collisions and/or are corrupted.

          The count of items and bytes per block provide a checksum of sorts: if the next sync marker is not found where expected then the file is either corrupt or a collision has occurred. Corruption should be noticed by the filesystem.

          Background data corruption yes, but corruption caused by bugs in a writer implementation won't be.

          A collision does not cause data loss, since the file can still be processed from its start. So I suppose one could specify that, if the next sync marker is not found where expected then one should scan back earlier or later in the file (as appropriate) to find a different sync marker and start processing there.

          I'm concerned about silent data loss during processing mostly – a M/R job runs, a collision (or any form of corruption) occurs, a block gets skipped silently, and some calculation runs with missing tuples. I'd hate to hear about some LHC processing missed a new fundamental particle due to an unlucky sequence of bytes in their Petabytes of data
          So I'll keep my eye out for a set of guidelines for implementations to follow for collision/corruptioin correct/detect/log.

          Here is a quick stab at it:

          Reading sequentially: The start of the block is implicit and only fails due to corruption.
          1a Validate that the sync marker matches. If it does not log an error for a corrupt block at this location.
          1b Validate the other block header values, log an error for a corrupt header if invalid (length or count out of bounds?)
          If Either 1a or 1b fail:
          1c scan for the sync marker up to X bytes ahead, repeating 1b as needed until it does not fail, or X bytes have been read.
          2a Upon finding a valid block header, log an error containing the end of the corrupted part of the file only if there was an error in part 1 before.
          2b Read data from the block. If there is an error here, or the data or number of tuples do not match the header, log an error and scan ahead, as in 1c above.

          Reading from the middle of a file and seeking slightly differs. It essentially starts at 1c above. However if there is an invalid header it could be a collision rather than corruption. So there is not an error unless it has reached 2b. Another way to put it is that sequential reading fails if the implicit next block has any error at all. Seeking from the middle can only error after it has either detected what appears to be a valid block or reached its maximum search window.

          For Hadoop input splits there is one other critical bit – at the end of a split, it needs to validate the next block header (if it exists) and log an error if it does not appear valid. Otherwise, the following split may have skipped over some data without error while seeking to find the start of a block. Another way to look at that, is that part of validating a block is to make sure that the next block is indeed right after it or it is the end of the file (otherwise it is corrupt). In fact, conceptually it might be easier to think of the sync marker as being between blocks, not the front or back.

          I think that covers the corner cases.

          Show
          Scott Carey added a comment - The other thing we're ditching is computed metadata. Previously you could store the total count of items or an index of items in the metadata at end of file. This is certainly useful, but complicates things. Side files are the alternative, and the only problem with them is that adding more files is HDFS-unfriendly. This problem is really an HDFS problem. A future feature for HDFS should probably make side files cheap rather than complicating issues for file type implementations (multiple data streams in one file, side files are just optional extra data streams – crc's, indexes, parity, etc). I should probably open a ticket for that. The assumption is that with a 16-byte marker there will never be a collision. We've never seen a collision yet with this approach in the lifetime of Hadoop. The odds are in our favor: it would take a file with well over an exabyte (1000 petabytes, 1M terabytes) before this is near likely. I suspect the chances are significantly greater that all three replicas of a file in HDFS will become corrupt. The significant advantage of this approach is that no escape processing is required: bytes can travel to and from the file unexamined in bulk. Sounds good, lets just make sure there is an expected standard thing to do when it does happen – even if it is just a very clear error message. It should be trivial to make test files that cause collisions and/or are corrupted. The count of items and bytes per block provide a checksum of sorts: if the next sync marker is not found where expected then the file is either corrupt or a collision has occurred. Corruption should be noticed by the filesystem. Background data corruption yes, but corruption caused by bugs in a writer implementation won't be. A collision does not cause data loss, since the file can still be processed from its start. So I suppose one could specify that, if the next sync marker is not found where expected then one should scan back earlier or later in the file (as appropriate) to find a different sync marker and start processing there. I'm concerned about silent data loss during processing mostly – a M/R job runs, a collision (or any form of corruption) occurs, a block gets skipped silently, and some calculation runs with missing tuples. I'd hate to hear about some LHC processing missed a new fundamental particle due to an unlucky sequence of bytes in their Petabytes of data So I'll keep my eye out for a set of guidelines for implementations to follow for collision/corruptioin correct/detect/log. Here is a quick stab at it: Reading sequentially: The start of the block is implicit and only fails due to corruption. 1a Validate that the sync marker matches. If it does not log an error for a corrupt block at this location. 1b Validate the other block header values, log an error for a corrupt header if invalid (length or count out of bounds?) If Either 1a or 1b fail: 1c scan for the sync marker up to X bytes ahead, repeating 1b as needed until it does not fail, or X bytes have been read. 2a Upon finding a valid block header, log an error containing the end of the corrupted part of the file only if there was an error in part 1 before. 2b Read data from the block. If there is an error here, or the data or number of tuples do not match the header, log an error and scan ahead, as in 1c above. Reading from the middle of a file and seeking slightly differs. It essentially starts at 1c above. However if there is an invalid header it could be a collision rather than corruption. So there is not an error unless it has reached 2b. Another way to put it is that sequential reading fails if the implicit next block has any error at all. Seeking from the middle can only error after it has either detected what appears to be a valid block or reached its maximum search window. For Hadoop input splits there is one other critical bit – at the end of a split, it needs to validate the next block header (if it exists) and log an error if it does not appear valid. Otherwise, the following split may have skipped over some data without error while seeking to find the start of a block. Another way to look at that, is that part of validating a block is to make sure that the next block is indeed right after it or it is the end of the file (otherwise it is corrupt). In fact, conceptually it might be easier to think of the sync marker as being between blocks, not the front or back. I think that covers the corner cases.
          Hide
          Philip Zeyliger added a comment -

          This problem is really an HDFS problem. A future feature for HDFS should probably make side files cheap rather than complicating issues for file type implementations (multiple data streams in one file, side files are just optional extra data streams - crc's, indexes, parity, etc). I should probably open a ticket for that.

          Several applications can work with multiple files. If you want per-block extra stuff (indexes, bloom-filters, whatever), side files work less well because they lose being co-located with the original data. And that's fine; applications that need that will develop more complicated file formats.

          Show
          Philip Zeyliger added a comment - This problem is really an HDFS problem. A future feature for HDFS should probably make side files cheap rather than complicating issues for file type implementations (multiple data streams in one file, side files are just optional extra data streams - crc's, indexes, parity, etc). I should probably open a ticket for that. Several applications can work with multiple files. If you want per-block extra stuff (indexes, bloom-filters, whatever), side files work less well because they lose being co-located with the original data. And that's fine; applications that need that will develop more complicated file formats.
          Hide
          Philip Zeyliger added a comment -

          Doug> The count of items and bytes per block provide a checksum of sorts

          Are we cool with not having checksums in these files? Some compressors have their own, and file systems (HDFS, certainly) are likely to do something too. Should there be another layer?

          Show
          Philip Zeyliger added a comment - Doug> The count of items and bytes per block provide a checksum of sorts Are we cool with not having checksums in these files? Some compressors have their own, and file systems (HDFS, certainly) are likely to do something too. Should there be another layer?
          Hide
          Doug Cutting added a comment -

          Scott> I'm concerned about silent data loss during processing mostly - a M/R job runs, a collision (or any form of corruption) occurs, a block gets skipped silently, and some calculation runs with missing tuples.

          Who's silently skipping blocks? That sounds like the source of the problem. Normally, if corruption is detected an exception should be thrown and the task should fail. Hopefully the task will succeed on another non-corrupt replica. If you truly require that things run to completion on corrupt data, then you'll necessarily miss some tuples. It should be possible to configure things this way, but it should not be the default.

          Since I never expect to see a collision, I don't feel an urgent need to add code to recover from one. Detecting them and failing might be wise, just in case.

          Philip> Are we cool with not having checksums in these files?

          I am.

          Show
          Doug Cutting added a comment - Scott> I'm concerned about silent data loss during processing mostly - a M/R job runs, a collision (or any form of corruption) occurs, a block gets skipped silently, and some calculation runs with missing tuples. Who's silently skipping blocks? That sounds like the source of the problem. Normally, if corruption is detected an exception should be thrown and the task should fail. Hopefully the task will succeed on another non-corrupt replica. If you truly require that things run to completion on corrupt data, then you'll necessarily miss some tuples. It should be possible to configure things this way, but it should not be the default. Since I never expect to see a collision, I don't feel an urgent need to add code to recover from one. Detecting them and failing might be wise, just in case. Philip> Are we cool with not having checksums in these files? I am.
          Hide
          Scott Carey added a comment -

          Who's silently skipping blocks?

          Code that jumps to the middle of the file and finds the start of the next block by finding the sync marker and either:
          1: there is a collision and it skips to the next sync marker rather than failing.
          2: the file was written corruptly (a sync marker is missing or corrupted on write; block size/count are wrong but consistent with each other), a reader or file split that assumes correctness can skip data silently on a seek and scan.

          Detecting and failing on these conditions is important, no matter how unlikely. Making test files that contain these flaws is also easy.

          Show
          Scott Carey added a comment - Who's silently skipping blocks? Code that jumps to the middle of the file and finds the start of the next block by finding the sync marker and either: 1: there is a collision and it skips to the next sync marker rather than failing. 2: the file was written corruptly (a sync marker is missing or corrupted on write; block size/count are wrong but consistent with each other), a reader or file split that assumes correctness can skip data silently on a seek and scan. Detecting and failing on these conditions is important, no matter how unlikely. Making test files that contain these flaws is also easy.
          Hide
          Doug Cutting added a comment -

          > there is a collision
          > the file was written corruptly

          These should both cause failures. If you catch an exception and continue anyway, then you'll skip data, as expected. In neither case would we silently skip data.

          Show
          Doug Cutting added a comment - > there is a collision > the file was written corruptly These should both cause failures. If you catch an exception and continue anyway, then you'll skip data, as expected. In neither case would we silently skip data.
          Hide
          Doug Cutting added a comment -

          Here's a patch that implements this for Java and updates the spec.

          It's slightly different from what's proposed above. A file is a header followed by zero or more blocks. These each contain:

          • header
            • magic
            • metadata
            • sync
          • block
            • length
            • data
            • sync

          Thus every block is both preceded and followed by a sync marker.

          I also split the Java data file reading code into two classes: a base class that only permits sequential access, and a subclass that supports random access. This will permit us to, e.g., process standard input.

          Show
          Doug Cutting added a comment - Here's a patch that implements this for Java and updates the spec. It's slightly different from what's proposed above. A file is a header followed by zero or more blocks. These each contain: header magic metadata sync block length data sync Thus every block is both preceded and followed by a sync marker. I also split the Java data file reading code into two classes: a base class that only permits sequential access, and a subclass that supports random access. This will permit us to, e.g., process standard input.
          Hide
          Doug Cutting added a comment -

          BTW, you need to 'svn cp src/java/org/apache/avro/file/DataFileReader.java src/java/org/apache/avro/file/DataFileStream.java' before applying this patch. I'm not totally happy with these class names yet.

          Show
          Doug Cutting added a comment - BTW, you need to 'svn cp src/java/org/apache/avro/file/DataFileReader.java src/java/org/apache/avro/file/DataFileStream.java' before applying this patch. I'm not totally happy with these class names yet.
          Hide
          Philip Zeyliger added a comment -

          Took a look at the patch. I hadn't read the old code, so made some comments on stuff that's probably pre-existing...

          I love the (relative) simplicity of the new file format.

          File metadata consists of:

          Would it be fair to add to spec.xml, that the file metadata
          can be read with the schema
          { type: array, items: record

          { fields: [string, bytes] }

          }
          Ah, no, looking at DataFileStream, it's really (more fake-syntax)

          { type: map, key-type: string, value-type: bytes }

          in.read(magic);

          Should this throw an exception if in.read(magic)
          didn't return magic.length?

          this.vin = new BinaryDecoder(in);

          I think we should use the specific API here
          if we can.

          public synchronized byte[] getMeta(String key)

          Why does this needs to be synchronized? The meta
          map is created at construction, so could be marked
          final, no?

          public synchronized D next(D reuse) throws IOException {

          As you suggested in person, this API is a bit
          broken for iteration, since values may well be null.
          That's fair game for another schema, though.

          long blockCount; // # entries in block

          I was surprised that blockCount was decremented, instead of incremented,
          when I first ran into it. I don't care eight way, but perhaps
          "# entries remaining in current block" would be more clear.

          /** Move to the specified synchronization point, as returned by {@link DataFileWriter#sync()}. */

          I'm a bit lost as to what that comment means. Ah, ok. I think what might
          be more helpful is: "Move the specific synchronization point,
          as returned by

          {@link DataFileWriter#sync())}

          . If pre-established
          synchornization points are not available, use

          {@link sync(pos)}

          to move
          to the first synchronization point at or past pos." i.e.,
          specifically discuss the distinction between seek() and sync(), since
          they're similar, but quite different.

          if (j == sync.length) { /* position before sync */ sin.seek(sin.tell() - DataFileConstants.SYNC_SIZE);

          Does this work? In the old code, next() first skipped a synchronization marker,
          so this was right. But in the new code, if blockCount=0, it reads in the
          number of records. So I think this should have been changed too. Of course,
          I'm probably just missing something.

          ((SeekableBufferedInput)in).seek(position);

          These casts feel icky. You could just have an extra
          field in DataFileReader, no?

          DataFileWriter: appendTo, create

          Why are these synchronized? (There are lots of methods in this
          class that are synchronized... in the common case, there's
          only one thread writing, so seems to be easier to force
          the user to do his own.)

          Would be great to have tests for trying to setMeta() when appending,
          or after file has had records in it.

          I didn't see any tests for the random access stuff.

          Show
          Philip Zeyliger added a comment - Took a look at the patch. I hadn't read the old code, so made some comments on stuff that's probably pre-existing... I love the (relative) simplicity of the new file format. File metadata consists of: Would it be fair to add to spec.xml, that the file metadata can be read with the schema { type: array, items: record { fields: [string, bytes] } } Ah, no, looking at DataFileStream, it's really (more fake-syntax) { type: map, key-type: string, value-type: bytes } in.read(magic); Should this throw an exception if in.read(magic) didn't return magic.length? this.vin = new BinaryDecoder(in); I think we should use the specific API here if we can. public synchronized byte[] getMeta(String key) Why does this needs to be synchronized? The meta map is created at construction, so could be marked final, no? public synchronized D next(D reuse) throws IOException { As you suggested in person, this API is a bit broken for iteration, since values may well be null. That's fair game for another schema, though. long blockCount; // # entries in block I was surprised that blockCount was decremented, instead of incremented, when I first ran into it. I don't care eight way, but perhaps "# entries remaining in current block" would be more clear. /** Move to the specified synchronization point, as returned by {@link DataFileWriter#sync()}. */ I'm a bit lost as to what that comment means. Ah, ok. I think what might be more helpful is: "Move the specific synchronization point, as returned by {@link DataFileWriter#sync())} . If pre-established synchornization points are not available, use {@link sync(pos)} to move to the first synchronization point at or past pos." i.e., specifically discuss the distinction between seek() and sync(), since they're similar, but quite different. if (j == sync.length) { /* position before sync */ sin.seek(sin.tell() - DataFileConstants.SYNC_SIZE); Does this work? In the old code, next() first skipped a synchronization marker, so this was right. But in the new code, if blockCount=0, it reads in the number of records. So I think this should have been changed too. Of course, I'm probably just missing something. ((SeekableBufferedInput)in).seek(position); These casts feel icky. You could just have an extra field in DataFileReader, no? DataFileWriter: appendTo, create Why are these synchronized? (There are lots of methods in this class that are synchronized... in the common case, there's only one thread writing, so seems to be easier to force the user to do his own.) Would be great to have tests for trying to setMeta() when appending, or after file has had records in it. I didn't see any tests for the random access stuff.
          Hide
          Doug Cutting added a comment -

          ;; This buffer is for notes you don't want to save, and for Lisp evaluation.
          ;; If you want to create a file, visit that file with C-x C-f,
          ;; then enter the text in that file's own buffer.

          > Would it be fair to add to spec.xml, that the file metadata
          >

          { type: map, key-type: string, value-type: bytes }

          Yes, I've added a schema to the spec. BTW, maps don't have key-types,
          so it's just

          {type: map, values: bytes}

          .

          > in.read(magic);
          > Should this throw an exception if in.read(magic) didn't return magic.length?

          In this case it doesn't matter. Java initializes arrays with nulls,
          and the expected value has no nulls, so, if it doesn't read the entire
          thing it will always fail. But I've changed it
          tovin.readFixed(magic), since that has readFully() semantics and is
          what we use to read sync markers.

          > this.vin = new BinaryDecoder(in);
          > I think we should use the specific API here if we can.

          I'm fine with adding the header schema to the spec, but I'm not eager
          to use specific to implement the header in this patch. For one thing,
          it makes bootstrapping harder. The build already has some wacky stuff
          so that the specific compiler is compiled before the IPC code which
          depends on specific compiler output. Perhaps we should really
          re-organize the code tree into stuff that depends on specific output
          and stuff that does not, but that would separate files that are
          otherwise closely related. Or we could add a special comment to files
          required to compile the specific compiler use an ant <contains> filter
          to compile those first. In any case, can we address this separately?

          > public synchronized byte[] getMeta(String key)
          > Why does this needs to be synchronized?

          It doesn't any longer. This is a relic from when metadata was
          read/write. Good catch. Fixed.

          > public synchronized D next(D reuse) throws IOException {
          > As you suggested in person, this API is a bit broken for iteration

          I've now provided an iterator API.

          > long blockCount; // # entries in block
          > I was surprised that blockCount was decremented

          I changed the name of the variable.

          > /** Move to the specified synchronization point, as returned by

          {@link DataFileWriter#sync()}

          . */
          > I'm a bit lost as to what that comment means.

          I updated that comment.

          > if (j == sync.length) { /* position before sync */ sin.seek(sin.tell() - DataFileConstants.SYNC_SIZE);
          > Does this work?

          Probably not. I forgot to update it, and it's never had tests. I've
          updated it now and added a test.

          > ((SeekableBufferedInput)in).seek(position);
          > These casts feel icky.

          I replaced this with a field.

          > DataFileWriter: appendTo, create
          > Why are these synchronized?

          Things used in Hadoop InputFormats should be thread safe to make them
          easy to use from multi-threaded mappers. SequenceFile is thread-safe
          for this reason, and we want this to be a drop-in replacement for
          SequenceFile.

          > Would be great to have tests for trying to setMeta() when appending, or after file has had records in it.

          Yes, lots more tests would be good, including that.

          > I didn't see any tests for the random access stuff.

          I've added one now.

          Show
          Doug Cutting added a comment - ;; This buffer is for notes you don't want to save, and for Lisp evaluation. ;; If you want to create a file, visit that file with C-x C-f, ;; then enter the text in that file's own buffer. > Would it be fair to add to spec.xml, that the file metadata > { type: map, key-type: string, value-type: bytes } Yes, I've added a schema to the spec. BTW, maps don't have key-types, so it's just {type: map, values: bytes} . > in.read(magic); > Should this throw an exception if in.read(magic) didn't return magic.length? In this case it doesn't matter. Java initializes arrays with nulls, and the expected value has no nulls, so, if it doesn't read the entire thing it will always fail. But I've changed it tovin.readFixed(magic), since that has readFully() semantics and is what we use to read sync markers. > this.vin = new BinaryDecoder(in); > I think we should use the specific API here if we can. I'm fine with adding the header schema to the spec, but I'm not eager to use specific to implement the header in this patch. For one thing, it makes bootstrapping harder. The build already has some wacky stuff so that the specific compiler is compiled before the IPC code which depends on specific compiler output. Perhaps we should really re-organize the code tree into stuff that depends on specific output and stuff that does not, but that would separate files that are otherwise closely related. Or we could add a special comment to files required to compile the specific compiler use an ant <contains> filter to compile those first. In any case, can we address this separately? > public synchronized byte[] getMeta(String key) > Why does this needs to be synchronized? It doesn't any longer. This is a relic from when metadata was read/write. Good catch. Fixed. > public synchronized D next(D reuse) throws IOException { > As you suggested in person, this API is a bit broken for iteration I've now provided an iterator API. > long blockCount; // # entries in block > I was surprised that blockCount was decremented I changed the name of the variable. > /** Move to the specified synchronization point, as returned by {@link DataFileWriter#sync()} . */ > I'm a bit lost as to what that comment means. I updated that comment. > if (j == sync.length) { /* position before sync */ sin.seek(sin.tell() - DataFileConstants.SYNC_SIZE); > Does this work? Probably not. I forgot to update it, and it's never had tests. I've updated it now and added a test. > ((SeekableBufferedInput)in).seek(position); > These casts feel icky. I replaced this with a field. > DataFileWriter: appendTo, create > Why are these synchronized? Things used in Hadoop InputFormats should be thread safe to make them easy to use from multi-threaded mappers. SequenceFile is thread-safe for this reason, and we want this to be a drop-in replacement for SequenceFile. > Would be great to have tests for trying to setMeta() when appending, or after file has had records in it. Yes, lots more tests would be good, including that. > I didn't see any tests for the random access stuff. I've added one now.
          Hide
          Philip Zeyliger added a comment -

          Thanks for addressing my comments. Some minor notes
          below, but I'm comfortable with this being committed. +1.

          org.apache.avro.file.Header (from the spec)

          Cool.

          Things used in Hadoop InputFormats should be thread safe to make them easy to use from multi-threaded mappers. SequenceFile is thread-safe for this reason, and we want this to be a drop-in replacement for SequenceFile.

          It might be handy to make a note in DataFileReader's javadoc
          to mention that it is thread-safe. AVRO could later add
          a non-thread-safe version, if it's deemed faster.

          To read up to that synchoronization point, call pastSync(long)

          pastSync doesn't seem to do any reading, so this might be out of date.
          Also, synchronization is misspelled.

          DataFileStream: vin.readFixed(magic);

          Hate to waffle on you here, but this throws EOFException on
          a two-byte file, whereas "Not a data file" would be clearer.

          DataFileStream: synchronization of hasNext(), next(D), close.

          Do these need to be synchronized for Hadoop compatibility, too?
          If so, I think it's appropriate to note in the javadoc
          for DataFileStream that multiple threads can use it concurrently,
          though they are not allowed to use the underlying inputstream.

          //System.out.println("sync = "+

          //System.out.println("start = "start" end = "+end);

          You may want to delete these two before checkin.

          TestDataFile: readFile()

          I think the reuse parameter is unused here now.

          Show
          Philip Zeyliger added a comment - Thanks for addressing my comments. Some minor notes below, but I'm comfortable with this being committed. +1. org.apache.avro.file.Header (from the spec) Cool. Things used in Hadoop InputFormats should be thread safe to make them easy to use from multi-threaded mappers. SequenceFile is thread-safe for this reason, and we want this to be a drop-in replacement for SequenceFile. It might be handy to make a note in DataFileReader's javadoc to mention that it is thread-safe. AVRO could later add a non-thread-safe version, if it's deemed faster. To read up to that synchoronization point, call pastSync(long) pastSync doesn't seem to do any reading, so this might be out of date. Also, synchronization is misspelled. DataFileStream: vin.readFixed(magic); Hate to waffle on you here, but this throws EOFException on a two-byte file, whereas "Not a data file" would be clearer. DataFileStream: synchronization of hasNext(), next(D), close. Do these need to be synchronized for Hadoop compatibility, too? If so, I think it's appropriate to note in the javadoc for DataFileStream that multiple threads can use it concurrently, though they are not allowed to use the underlying inputstream. //System.out.println("sync = "+ //System.out.println("start = " start " end = "+end); You may want to delete these two before checkin. TestDataFile: readFile() I think the reuse parameter is unused here now.
          Hide
          Jeff Hammerbacher added a comment -

          Hey Doug,

          The most recent patch seems to write both the length of the block in number of entries as well as bytes. I seemed to recall you mentioning removing the former. Am I mistaken?

          Thanks,
          Jeff

          Show
          Jeff Hammerbacher added a comment - Hey Doug, The most recent patch seems to write both the length of the block in number of entries as well as bytes. I seemed to recall you mentioning removing the former. Am I mistaken? Thanks, Jeff
          Hide
          Jeff Hammerbacher added a comment -

          Here's a patch that implements the new format in Python. It's pretty minimal, and only works with my new Python implementation (see AVRO-219), but the tests pass on my machine. I don't know that I got things exactly right, so please take a look and let me know.

          Show
          Jeff Hammerbacher added a comment - Here's a patch that implements the new format in Python. It's pretty minimal, and only works with my new Python implementation (see AVRO-219 ), but the tests pass on my machine. I don't know that I got things exactly right, so please take a look and let me know.
          Hide
          Doug Cutting added a comment -

          Philip> pastSync doesn't seem to do any reading, so this might be out of date.

          Yes, that comment was confusing. I have updated it.

          Philip> this throws EOFException on a two-byte file, whereas "Not a data file" would be clearer.

          I fixed that.

          Philip> DataFileStream: synchronization of hasNext(), next(D), close.
          Philip> Do these need to be synchronized for Hadoop compatibility, too?

          It's not so much Hadoop compatibility as consistency: The API should either be thread-safe or not. If you feel that thread safety is not useful here and has a performance penalty then synchronization could be moved to the to-be-written InputFormat implementation that will use this. Would you prefer that?

          Philip> TestDataFile: readFile()
          Philip> I think the reuse parameter is unused here now.

          Removed.

          I also fixed a bug in the sync handling code.

          Show
          Doug Cutting added a comment - Philip> pastSync doesn't seem to do any reading, so this might be out of date. Yes, that comment was confusing. I have updated it. Philip> this throws EOFException on a two-byte file, whereas "Not a data file" would be clearer. I fixed that. Philip> DataFileStream: synchronization of hasNext(), next(D), close. Philip> Do these need to be synchronized for Hadoop compatibility, too? It's not so much Hadoop compatibility as consistency: The API should either be thread-safe or not. If you feel that thread safety is not useful here and has a performance penalty then synchronization could be moved to the to-be-written InputFormat implementation that will use this. Would you prefer that? Philip> TestDataFile: readFile() Philip> I think the reuse parameter is unused here now. Removed. I also fixed a bug in the sync handling code.
          Hide
          Doug Cutting added a comment -

          Jeff> The most recent patch seems to write both the length of the block in number of entries as well as bytes.

          Yes. I've vacillated on that. The existing code does not use the byte count, but I suspect when we add compression codecs the length will be useful. If we wish to support a pluggable codec API, we could either make it stream-based or buffer-based. If we have block lengths written, then the codec API can be a simple buffer-based API like 'byte[] compress(byte[]); byte[] decompress(byte[])'. But if we don't have block lengths written, then the contract for codec plugins is more complex, so I'm leaning towards them. This would make it really easy to add, e.g., a FastLZ codec (AVRO-135).

          Show
          Doug Cutting added a comment - Jeff> The most recent patch seems to write both the length of the block in number of entries as well as bytes. Yes. I've vacillated on that. The existing code does not use the byte count, but I suspect when we add compression codecs the length will be useful. If we wish to support a pluggable codec API, we could either make it stream-based or buffer-based. If we have block lengths written, then the codec API can be a simple buffer-based API like 'byte[] compress(byte[]); byte[] decompress(byte[])'. But if we don't have block lengths written, then the contract for codec plugins is more complex, so I'm leaning towards them. This would make it really easy to add, e.g., a FastLZ codec ( AVRO-135 ).
          Hide
          Philip Zeyliger added a comment -

          Looked over the patch again. Looks good. Synchronization issue is still open.

          It's not so much Hadoop compatibility as consistency: The API should either be thread-safe or not. If you feel that thread safety is not useful here and has a performance penalty then synchronization could be moved to the to-be-written InputFormat implementation that will use this. Would you prefer that?

          My preference is against thread-safety in the basic container object. (Just imagine me with big signs in a protest march... "Say No to Thread Safety"... Oy.) I don't actually have any good numbers on how much synchronized blocks cost us. Java has certainly moved towards ArrayList (and away from Vector), and I think that's not a crazy parallel. Waxing more philosophical, half the time I find thread-safe containers don't buy you much: if you're using two of them and they need to be modified atomically, you still have to do your own synchronization work.

          hasNext() and next() are methods that make very little sense, btw, synchronized. You can only call next() when hasNext() is true, but who's to say someone hasn't gone in and advanced the pointer while you weren't looking...

          – Philip

          Show
          Philip Zeyliger added a comment - Looked over the patch again. Looks good. Synchronization issue is still open. It's not so much Hadoop compatibility as consistency: The API should either be thread-safe or not. If you feel that thread safety is not useful here and has a performance penalty then synchronization could be moved to the to-be-written InputFormat implementation that will use this. Would you prefer that? My preference is against thread-safety in the basic container object. (Just imagine me with big signs in a protest march... "Say No to Thread Safety"... Oy.) I don't actually have any good numbers on how much synchronized blocks cost us. Java has certainly moved towards ArrayList (and away from Vector), and I think that's not a crazy parallel. Waxing more philosophical, half the time I find thread-safe containers don't buy you much: if you're using two of them and they need to be modified atomically, you still have to do your own synchronization work. hasNext() and next() are methods that make very little sense, btw, synchronized. You can only call next() when hasNext() is true, but who's to say someone hasn't gone in and advanced the pointer while you weren't looking... – Philip
          Hide
          Doug Cutting added a comment -

          Here's what I hope will be the final version of this. I have:

          • Removed synchronization
          • Removed the block size. Codecs may store the block size themselves if they need it.
          Show
          Doug Cutting added a comment - Here's what I hope will be the final version of this. I have: Removed synchronization Removed the block size. Codecs may store the block size themselves if they need it.
          Hide
          Philip Zeyliger added a comment -

          +1!

          Show
          Philip Zeyliger added a comment - +1!
          Hide
          Andrew Purtell added a comment -

          Some quick comments from over on HBASE-2055:

          • I see that SYNC_INTERVAL is a constant. Should be configurable? We want 64k, others might want different?
          • Looking at the most recent patch (2009-12-30 10:35 PM), DataFileWriter will hold up to SYNC_INTERVAL bytes in a buffer before writing out the block, via writeBlock(). We want to hsync after a group of related commits in our write ahead log whether SYNC_INTERVAL is reached or not, but also have the stream marked with a sync marker at each SYNC_INTERVAL. Some kind of flush method that forces writeBlock() would work.
          • What happens if the first block is not available but others are? It makes sense to me not to support changing the schema mid-file, but does it make sense to put the schema in multiple places, like super blocks in ext3?
          Show
          Andrew Purtell added a comment - Some quick comments from over on HBASE-2055 : I see that SYNC_INTERVAL is a constant. Should be configurable? We want 64k, others might want different? Looking at the most recent patch (2009-12-30 10:35 PM), DataFileWriter will hold up to SYNC_INTERVAL bytes in a buffer before writing out the block, via writeBlock(). We want to hsync after a group of related commits in our write ahead log whether SYNC_INTERVAL is reached or not, but also have the stream marked with a sync marker at each SYNC_INTERVAL. Some kind of flush method that forces writeBlock() would work. What happens if the first block is not available but others are? It makes sense to me not to support changing the schema mid-file, but does it make sense to put the schema in multiple places, like super blocks in ext3?
          Hide
          Jeff Hammerbacher added a comment -

          Some kind of flush method that forces writeBlock() would work.

          Pretty trivial to add. Check out the Python implementation, which already has this method.

          Show
          Jeff Hammerbacher added a comment - Some kind of flush method that forces writeBlock() would work. Pretty trivial to add. Check out the Python implementation, which already has this method.
          Hide
          Doug Cutting added a comment -

          I just committed this.

          Show
          Doug Cutting added a comment - I just committed this.
          Hide
          Doug Cutting added a comment -

          Andrew: sorry, I committed this before I saw your comments.

          > I see that SYNC_INTERVAL is a constant. Should be configurable?

          Yes. I've filed AVRO-274 to address this.

          > Some kind of flush method that forces writeBlock() would work.

          DataFileWriter#flush() and #sync() both force a writeBlock(). The difference is that the #sync() method does not also force a flush of the file to disk and it returns the position of the sync point, for passing to DataFileReader#seek().

          > does it make sense to put the schema in multiple places, like super blocks in ext3?

          This file format does not attempt to address data integrity issues, rather trusting that to the filesytem. To process a file whose first block is corrupted would be difficult not just because of the missing schema, but also because of the missing sync marker. The sync marker may be recoverable from EOF if the file is not truncated, but that is difficult to detect with certainty.

          Show
          Doug Cutting added a comment - Andrew: sorry, I committed this before I saw your comments. > I see that SYNC_INTERVAL is a constant. Should be configurable? Yes. I've filed AVRO-274 to address this. > Some kind of flush method that forces writeBlock() would work. DataFileWriter#flush() and #sync() both force a writeBlock(). The difference is that the #sync() method does not also force a flush of the file to disk and it returns the position of the sync point, for passing to DataFileReader#seek(). > does it make sense to put the schema in multiple places, like super blocks in ext3? This file format does not attempt to address data integrity issues, rather trusting that to the filesytem. To process a file whose first block is corrupted would be difficult not just because of the missing schema, but also because of the missing sync marker. The sync marker may be recoverable from EOF if the file is not truncated, but that is difficult to detect with certainty.
          Hide
          Jeff Hammerbacher added a comment -

          Hey Doug,

          To clarify, for the block format, you have removed the count of the number of objects, but you have kept the count of the number of bytes, correct? That's what the new spec says.

          Also, you use the term "split" in the Java code but do not use it in the spec. You have some logic to force splits. Do you mean for this to be a part of the spec, or is it just a Java-specific optimization?

          Thanks,
          Jeff

          Show
          Jeff Hammerbacher added a comment - Hey Doug, To clarify, for the block format, you have removed the count of the number of objects, but you have kept the count of the number of bytes, correct? That's what the new spec says. Also, you use the term "split" in the Java code but do not use it in the spec. You have some logic to force splits. Do you mean for this to be a part of the spec, or is it just a Java-specific optimization? Thanks, Jeff
          Hide
          Doug Cutting added a comment -

          > you have removed the count of the number of objects, but you have kept the count of the number of bytes, correct? That's what the new spec says

          No, the spec currently says each block is prefixed by "a long indicating the count of objects in this block". This is as it was before, without a byte count. Byte counts are left to codec implementations on an as-needed basis.

          > Also, you use the term "split" in the Java code but do not use it in the spec.

          I use that term in the unit test. The term is borrowed from Hadoop MapReduce, where it refers to dividing a file at arbitrary points among tasks. This is an important use case for the Java data file implementation. It requires nothing in the spec more than periodic sync markers. Probably only the Java implementation needs to implement methods like DataFileReader#sync() and DataFileReader#pastSync(), since Hadoop MapReduce is in Java.

          Show
          Doug Cutting added a comment - > you have removed the count of the number of objects, but you have kept the count of the number of bytes, correct? That's what the new spec says No, the spec currently says each block is prefixed by "a long indicating the count of objects in this block". This is as it was before, without a byte count. Byte counts are left to codec implementations on an as-needed basis. > Also, you use the term "split" in the Java code but do not use it in the spec. I use that term in the unit test. The term is borrowed from Hadoop MapReduce, where it refers to dividing a file at arbitrary points among tasks. This is an important use case for the Java data file implementation. It requires nothing in the spec more than periodic sync markers. Probably only the Java implementation needs to implement methods like DataFileReader#sync() and DataFileReader#pastSync(), since Hadoop MapReduce is in Java.
          Hide
          Scott Carey added a comment -

          No, the spec currently says each block is prefixed by "a long indicating the count of objects in this block". This is as it was before, without a byte count. Byte counts are left to codec implementations on an as-needed basis.

          While working through some use cases, I think it would make sense to have each block have both the record count, and the size in bytes (encoded) of the block.

          Use cases:

          • Concatenate two avro files with the same schema (and codec). To do this efficiently, one would want to simply copy the bytes in each block, and not decode any records at all.
          • Convert the codec in a file (read file A with codec X and output file B with codec Y – for example to compress a file) In this use case one wants access to the raw bytes in a block, but again decoding and re-encoding the records is a waste of time.

          Several other use cases can take advantage of knowing the block size and avoid decoding and encoding records.

          Without the size, one could scan for the sync marker to find the end of the block, but this is both much slower, and unsafe. A sync marker collision (as rare as that may be) can only be detected by validating the record count, which requires decoding the records. With the size of the block in the format, use cases where the raw binary block is copied around are simple and safer.

          Furthermore, having the length of the block will allow the Codec interface to perhaps just take the (byte[], offset, length) of the block rather than an Input/Output stream which would improve performance.

          A byte count of the uncompressed size should be left to the codec.

          Thoughts?

          Show
          Scott Carey added a comment - No, the spec currently says each block is prefixed by "a long indicating the count of objects in this block". This is as it was before, without a byte count. Byte counts are left to codec implementations on an as-needed basis. While working through some use cases, I think it would make sense to have each block have both the record count, and the size in bytes (encoded) of the block. Use cases: Concatenate two avro files with the same schema (and codec). To do this efficiently, one would want to simply copy the bytes in each block, and not decode any records at all. Convert the codec in a file (read file A with codec X and output file B with codec Y – for example to compress a file) In this use case one wants access to the raw bytes in a block, but again decoding and re-encoding the records is a waste of time. Several other use cases can take advantage of knowing the block size and avoid decoding and encoding records. Without the size, one could scan for the sync marker to find the end of the block, but this is both much slower, and unsafe. A sync marker collision (as rare as that may be) can only be detected by validating the record count, which requires decoding the records. With the size of the block in the format, use cases where the raw binary block is copied around are simple and safer. Furthermore, having the length of the block will allow the Codec interface to perhaps just take the (byte[], offset, length) of the block rather than an Input/Output stream which would improve performance. A byte count of the uncompressed size should be left to the codec. Thoughts?
          Hide
          Scott Carey added a comment -

          Any thoughts on a possible change to the file format to add the size of the block?

          I have a real world use case now for both concatenation and codec conversion. If the format has the size of the block in it, then implementing that in the file classes is relatively trivial. Should I open a different ticket?

          Thanks!

          -Scott

          Show
          Scott Carey added a comment - Any thoughts on a possible change to the file format to add the size of the block? I have a real world use case now for both concatenation and codec conversion. If the format has the size of the block in it, then implementing that in the file classes is relatively trivial. Should I open a different ticket? Thanks! -Scott
          Hide
          Philip Zeyliger added a comment -

          Hi Scott,

          I don't have strong opinions either way on storing another length in there. To be clear, I think you mean "size in bytes (after codec compression) of the block". "encoded" might mean Avro-encoded, which isn't what you mean, I think.

          For the re-compression use case, codecs need to know when the stream ends anyway, so I'm not sure there's a big win of having the length. Though most codecs will be (byte[], offset, length), I would like to leave the door open for codecs operating on the encoder/decoder level (instead of the byte[] level), because they might be able to do more clever things (like columnar storage).

          Another use case for having the block length is being able to do parallel de-compression at the framework, rather than codec, level. You can read several blocks into memory, and then start threads to decompress or what have you. Hard to do that if you rely on the codec to tell you where the boundaries are.

          – Philip

          Show
          Philip Zeyliger added a comment - Hi Scott, I don't have strong opinions either way on storing another length in there. To be clear, I think you mean "size in bytes (after codec compression) of the block". "encoded" might mean Avro-encoded, which isn't what you mean, I think. For the re-compression use case, codecs need to know when the stream ends anyway, so I'm not sure there's a big win of having the length. Though most codecs will be (byte[], offset, length), I would like to leave the door open for codecs operating on the encoder/decoder level (instead of the byte[] level), because they might be able to do more clever things (like columnar storage). Another use case for having the block length is being able to do parallel de-compression at the framework, rather than codec, level. You can read several blocks into memory, and then start threads to decompress or what have you. Hard to do that if you rely on the codec to tell you where the boundaries are. – Philip
          Hide
          Scott Carey added a comment -

          To be clear, I think you mean "size in bytes (after codec compression) of the block".

          Yes.

          I would like to leave the door open for codecs operating on the encoder/decoder level (instead of the byte[] level), because they might be able to do more clever things (like columnar storage).

          Isn't that more of a different encoder/decoder implementation than a codec? Where do we draw that line? It seems like a fundamentally different layer. if you wanted to do a columnar storage optimization, would you want to have:
          codec: gzip
          codec: fastlz
          codec: columnar
          codec: columnar-gzip
          codec: columnar-fastlz?

          I feel that the layer that does blind lossless compression or other work (crc's, etc) on the binary data should have one API, and anything that is some sort of schema-aware transform of the data should have another.

          All codecs aren't stream based or naturally define when their stream ends like gzip does either, if we depend on the codec defining where the boundary of the block is, we are forcing all codecs to implement that feature. The file format already defines the block boundary markers, why not also define their boundaries more explicitly? The drawback is a copule extra bytes per block (usually 2 or 3), and the requirement of knowing the size of the block before writing, which is similar to the requirement of knowing the record count before writing that already exists.

          Show
          Scott Carey added a comment - To be clear, I think you mean "size in bytes (after codec compression) of the block". Yes. I would like to leave the door open for codecs operating on the encoder/decoder level (instead of the byte[] level), because they might be able to do more clever things (like columnar storage). Isn't that more of a different encoder/decoder implementation than a codec? Where do we draw that line? It seems like a fundamentally different layer. if you wanted to do a columnar storage optimization, would you want to have: codec: gzip codec: fastlz codec: columnar codec: columnar-gzip codec: columnar-fastlz? I feel that the layer that does blind lossless compression or other work (crc's, etc) on the binary data should have one API, and anything that is some sort of schema-aware transform of the data should have another. All codecs aren't stream based or naturally define when their stream ends like gzip does either, if we depend on the codec defining where the boundary of the block is, we are forcing all codecs to implement that feature. The file format already defines the block boundary markers, why not also define their boundaries more explicitly? The drawback is a copule extra bytes per block (usually 2 or 3), and the requirement of knowing the size of the block before writing, which is similar to the requirement of knowing the record count before writing that already exists.
          Hide
          Philip Zeyliger added a comment -

          > Isn't that more of a different encoder/decoder implementation than a codec?

          Yep, you're right, it would be a different encoder implementation.

          It's true that codecs already are having to implement boundary knowledge, so we'd get more re-use if we put the length in. I find that reasonably compelling... we've certainly oscillated back and forth here quite a bit.

          – Philip

          Show
          Philip Zeyliger added a comment - > Isn't that more of a different encoder/decoder implementation than a codec? Yep, you're right, it would be a different encoder implementation. It's true that codecs already are having to implement boundary knowledge, so we'd get more re-use if we put the length in. I find that reasonably compelling... we've certainly oscillated back and forth here quite a bit. – Philip
          Hide
          Doug Cutting added a comment -

          I am okay adding length to blocks, but it should be done in a new issue.

          Show
          Doug Cutting added a comment - I am okay adding length to blocks, but it should be done in a new issue.

            People

            • Assignee:
              Doug Cutting
              Reporter:
              Doug Cutting
            • Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development