Hadoop Common
  1. Hadoop Common
  2. HADOOP-331

map outputs should be written to a single output file with an index

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.3.2
    • Fix Version/s: 0.10.0
    • Component/s: None
    • Labels:
      None

      Description

      The current strategy of writing a file per target map is consuming a lot of unused buffer space (causing out of memory crashes) and puts a lot of burden on the FS (many opens, inodes used, etc).

      I propose that we write a single file containing all output and also write an index file IDing which byte range in the file goes to each reduce. This will remove the issue of buffer waste, address scaling issues with number of open files and generally set us up better for scaling. It will also have advantages with very small inputs, since the buffer cache will reduce the number of seeks needed and the data serving node can open a single file and just keep it open rather than needing to do directory and open ops on every request.

      The only issue I see is that in cases where the task output is substantiallyu larger than its input, we may need to spill multiple times. In this case, we can do a merge after all spills are complete (or during the final spill).

      1. 331.txt
        4 kB
        Devaraj Das
      2. 331-design.txt
        4 kB
        Devaraj Das
      3. 331-initial3.patch
        67 kB
        Devaraj Das
      4. 331.patch
        60 kB
        Devaraj Das

        Issue Links

          Activity

          Hide
          p sutter added a comment -

          Sounds like a great direction! Could you clarify a little bit?

          Would you have one map output file per mapper? Or one map output file per mapping-node (wherein the many mappers that run on an individual node will coalesce their outputs into a single file)?

          How many ranges will each reducer have to read back from the file? one? one per 100 megabytes of map output? one per combiner flush?

          What's a spill?

          How does this fit with the compression proposal? Will there be a separate compression context per reducer?

          Show
          p sutter added a comment - Sounds like a great direction! Could you clarify a little bit? Would you have one map output file per mapper? Or one map output file per mapping-node (wherein the many mappers that run on an individual node will coalesce their outputs into a single file)? How many ranges will each reducer have to read back from the file? one? one per 100 megabytes of map output? one per combiner flush? What's a spill? How does this fit with the compression proposal? Will there be a separate compression context per reducer?
          Hide
          p sutter added a comment -

          .. and further, when considering the plumbing for transferring this data to the reducer, it would be great if the mechanism allows the reducer to receive this data as it is generated by the mapper. in this way, the sorter could begin sorting immediately, and the map output data would only come back off disk in the case of a second execution of a failed reducer (since the first transfer would happen "for free" out of the file cache)

          This wouldnt be necessary in the first version, but it would be good if this were considered in the design.

          Show
          p sutter added a comment - .. and further, when considering the plumbing for transferring this data to the reducer, it would be great if the mechanism allows the reducer to receive this data as it is generated by the mapper. in this way, the sorter could begin sorting immediately, and the map output data would only come back off disk in the case of a second execution of a failed reducer (since the first transfer would happen "for free" out of the file cache) This wouldnt be necessary in the first version, but it would be good if this were considered in the design.
          Hide
          eric baldeschwieler added a comment -

          I think we should do this in an incremental fashion, so...

          Each map task would continue to output its own state only. Although centralizing this per node is an interesting idea. I can see how this might be a real benefit in some cases. Perhaps you should file this enhancement idea.

          Ditto with the transmition to other nodes. Interesting, but complicated idea. Maybe you should file it. Think that can be taken up at a later date. Although feedback on how you would enhance this change to support such later work welcome.

          A spill is dumping the buffered output to disk when we accumulate enough info. Yes, something like a 100mB buffer seems right. (configurable possibly)

          I think the goal should be that each reduce only reads a single range. That will keep the client code simple an will keep us from thrashing as we scale. This may require some thought, since if you have a small number of reduce tasks, reading from multipule ranges may prove more seek efficient than doing the merge.

          If we do block compression for intermediates, you would need to align those to reduce targets, but I don't think we should try to do that in the first version of this. Especially given that this data will not be sorted the return to block compression may not be that great. (yes, I can construct counter examples, but let's deal with this as a seperate project). Checksums also need to be target aligned.

          Show
          eric baldeschwieler added a comment - I think we should do this in an incremental fashion, so... Each map task would continue to output its own state only. Although centralizing this per node is an interesting idea. I can see how this might be a real benefit in some cases. Perhaps you should file this enhancement idea. Ditto with the transmition to other nodes. Interesting, but complicated idea. Maybe you should file it. Think that can be taken up at a later date. Although feedback on how you would enhance this change to support such later work welcome. A spill is dumping the buffered output to disk when we accumulate enough info. Yes, something like a 100mB buffer seems right. (configurable possibly) I think the goal should be that each reduce only reads a single range. That will keep the client code simple an will keep us from thrashing as we scale. This may require some thought, since if you have a small number of reduce tasks, reading from multipule ranges may prove more seek efficient than doing the merge. If we do block compression for intermediates, you would need to align those to reduce targets, but I don't think we should try to do that in the first version of this. Especially given that this data will not be sorted the return to block compression may not be that great. (yes, I can construct counter examples, but let's deal with this as a seperate project). Checksums also need to be target aligned.
          Hide
          p sutter added a comment -

          sounds great. the simpler the better.

          +1

          Show
          p sutter added a comment - sounds great. the simpler the better. +1
          Hide
          p sutter added a comment -

          one more suggestion (plea): could the map-output-spill-extraction take place on the mapper side (perhaps in the http server), such that the reducer be able to retreive fully-formed map outputs as currently?

          incidently, we have individual map inputs that are 5-10GB in size, which are compressed so they cant be split, which would yield potentially 50-100 spills per mapper. it would be nice if the reducer (or whatever process is fetching the files) didnt have to do that reassembly, but rather that it happened on the mapper side.

          Show
          p sutter added a comment - one more suggestion (plea): could the map-output-spill-extraction take place on the mapper side (perhaps in the http server), such that the reducer be able to retreive fully-formed map outputs as currently? incidently, we have individual map inputs that are 5-10GB in size, which are compressed so they cant be split, which would yield potentially 50-100 spills per mapper. it would be nice if the reducer (or whatever process is fetching the files) didnt have to do that reassembly, but rather that it happened on the mapper side.
          Hide
          Kimoon Kim added a comment -

          Instead of one output, what about n outputs, where 1 <= n < # reducers? n can be configurable and is set to a number that can avoid FS thrashing. Each output file contains a disjoint set of partitions so that reducer in need of a partition will later access only one file. Increased locality.

          Given we are thinking of sorting map outputs, this also allows us to avoid global sorting. i.e. sorting happens on one output file at a time that has only small set of data.

          Show
          Kimoon Kim added a comment - Instead of one output, what about n outputs, where 1 <= n < # reducers? n can be configurable and is set to a number that can avoid FS thrashing. Each output file contains a disjoint set of partitions so that reducer in need of a partition will later access only one file. Increased locality. Given we are thinking of sorting map outputs, this also allows us to avoid global sorting. i.e. sorting happens on one output file at a time that has only small set of data.
          Hide
          eric baldeschwieler added a comment -

          Don't know that this would have a positive impact, but if this eased implementation in some way I don't yet understand, it would meet my requirements.

          Show
          eric baldeschwieler added a comment - Don't know that this would have a positive impact, but if this eased implementation in some way I don't yet understand, it would meet my requirements.
          Hide
          Devaraj Das added a comment -

          Looking at generating a single map output file per map for now.
          The plan is to do the following:
          Define a class called PartKey that will contain two fields - partition number (int) and the actual key (WritableComparable). As we are mapping, the PartKeys and the associated values are written to a buffer. The buffer has a fixed size (configurable via map.output.buffer.size) of 128M and this buffer, when full, is sorted and spilled to disk. We may end up having a couple of these spilled buffers. We do a merge at the end. The sorting takes into account the partition number. Also, the merge emits information about offsets where a particular partition resides in the merged file. The copying phase of reduce strips the partion information contained in the PartKey and feeds the actual map-generated key to the reducer.
          Makes sense?

          Show
          Devaraj Das added a comment - Looking at generating a single map output file per map for now. The plan is to do the following: Define a class called PartKey that will contain two fields - partition number (int) and the actual key (WritableComparable). As we are mapping, the PartKeys and the associated values are written to a buffer. The buffer has a fixed size (configurable via map.output.buffer.size) of 128M and this buffer, when full, is sorted and spilled to disk. We may end up having a couple of these spilled buffers. We do a merge at the end. The sorting takes into account the partition number. Also, the merge emits information about offsets where a particular partition resides in the merged file. The copying phase of reduce strips the partion information contained in the PartKey and feeds the actual map-generated key to the reducer. Makes sense?
          Hide
          Bryan Pendleton added a comment -

          Just have to weigh in, as I always do, with the full-disk and multi-disk viewpoint. It would be nice if, either there's awareness of the disk filling up (set spill size down dynamically if disk free space is less than the default spill size), or if, instead of a single output, there was one output written per output directory. Of course, separate spills should be distributed across different output dirs in some way as they are now.

          For huge jobs, every bit of space counts, and avoiding failure through simple means can have a big impact on throughput.

          Show
          Bryan Pendleton added a comment - Just have to weigh in, as I always do, with the full-disk and multi-disk viewpoint. It would be nice if, either there's awareness of the disk filling up (set spill size down dynamically if disk free space is less than the default spill size), or if, instead of a single output, there was one output written per output directory. Of course, separate spills should be distributed across different output dirs in some way as they are now. For huge jobs, every bit of space counts, and avoiding failure through simple means can have a big impact on throughput.
          Hide
          eric baldeschwieler added a comment -

          re: devaraj

          I like the approach. One refinement suggested below:

          I don't think you want to store the partkeys inline. That requires more code change and an on disk format changes and wasted bytes to disk and over the wire. I think you spill serialized key/values with a side file that maps each partition to a start offset.

          In RAM you spill serialized key/value pairs to your buffer and also keep an array/vector (apply appropriate java class here) of (partition,offset to key). You can then quicksort the array and spill. You want to be sure to be able to apply a block compressor to each partition as spilled. This will be very efficient and simple. So record the compressed lengths (kimoon suggested this on another thread).

          Merging would go as you outline. You could read one line of each sidefile and then merge the next partition from each, so the merge would only consider the keys. Since it would be per partition.

          You need the sidefile to support efficient access for the reduce readers anyway.


          re: brian's comments

          I think we should keep maps simple and focus this effort on reduces, which deal with much larger size.

          That said, a corner case with HUGE maps should have a reasonable outcome. I think we need a stripped file abstraction to deal with these cases, where outputs are placed in medium HDFS sized blocks on whichever disk makes the most sense. This same approach would probably be more used on the reduce side.

          But I think this should come as a second project, rather than burdening this work with it.
          Anyone want to file a bug on it?

          Show
          eric baldeschwieler added a comment - re: devaraj I like the approach. One refinement suggested below: I don't think you want to store the partkeys inline. That requires more code change and an on disk format changes and wasted bytes to disk and over the wire. I think you spill serialized key/values with a side file that maps each partition to a start offset. In RAM you spill serialized key/value pairs to your buffer and also keep an array/vector (apply appropriate java class here) of (partition,offset to key). You can then quicksort the array and spill. You want to be sure to be able to apply a block compressor to each partition as spilled. This will be very efficient and simple. So record the compressed lengths (kimoon suggested this on another thread). Merging would go as you outline. You could read one line of each sidefile and then merge the next partition from each, so the merge would only consider the keys. Since it would be per partition. You need the sidefile to support efficient access for the reduce readers anyway. — re: brian's comments I think we should keep maps simple and focus this effort on reduces, which deal with much larger size. That said, a corner case with HUGE maps should have a reasonable outcome. I think we need a stripped file abstraction to deal with these cases, where outputs are placed in medium HDFS sized blocks on whichever disk makes the most sense. This same approach would probably be more used on the reduce side. But I think this should come as a second project, rather than burdening this work with it. Anyone want to file a bug on it?
          Hide
          Jim Kellerman added a comment -

          See also HADOOP-603

          Show
          Jim Kellerman added a comment - See also HADOOP-603
          Hide
          Sameer Paranjpye added a comment -

          It might be useful to have the 'buffer' of key/value pairs parititioned into R buckets, one for each reduce. Then when spilling, one could sort each bucket and reduce the number of comparisons and swaps done considerably. Alternatively, when spilling you could do a couple of linear passes over the buffer and bucket keys by reduce and then sort each bucket.

          Show
          Sameer Paranjpye added a comment - It might be useful to have the 'buffer' of key/value pairs parititioned into R buckets, one for each reduce. Then when spilling, one could sort each bucket and reduce the number of comparisons and swaps done considerably. Alternatively, when spilling you could do a couple of linear passes over the buffer and bucket keys by reduce and then sort each bucket.
          Hide
          eric baldeschwieler added a comment -

          Yeah. Just keeping an array of offset vectors per partition would be simpler.

          Jim: Don't see a problem with your suggestion (HADOOP-603), but I don't think the two projects relate. These files will be broken into partitions. Fitting that into a MapFile might be a bit of a stretch.

          Show
          eric baldeschwieler added a comment - Yeah. Just keeping an array of offset vectors per partition would be simpler. Jim: Don't see a problem with your suggestion ( HADOOP-603 ), but I don't think the two projects relate. These files will be broken into partitions. Fitting that into a MapFile might be a bit of a stretch.
          Hide
          Doug Cutting added a comment -

          > sort each bucket and reduce the number of comparisons and swaps done considerably

          In memory, we only need to sort pointers to buffered items, so there only needs to be a single buffer with the serialized items. Still, I agree that we can minimize comparisons & swaps by bucketing these pointers, then sorting each buffer prior to dumping the data it points to. Is that what you meant, Sameer?

          Each spill can result in output of the same format as the final output: a <key,value>* file sorted by <part,key> and a <part,offset>* file that indicates the start of each part. So if there's only one spill then no merging is required. Unfortunately, since 'part' is implicit, we cannot use the existing SequenceFile merge code.

          Show
          Doug Cutting added a comment - > sort each bucket and reduce the number of comparisons and swaps done considerably In memory, we only need to sort pointers to buffered items, so there only needs to be a single buffer with the serialized items. Still, I agree that we can minimize comparisons & swaps by bucketing these pointers, then sorting each buffer prior to dumping the data it points to. Is that what you meant, Sameer? Each spill can result in output of the same format as the final output: a <key,value>* file sorted by <part,key> and a <part,offset>* file that indicates the start of each part. So if there's only one spill then no merging is required. Unfortunately, since 'part' is implicit, we cannot use the existing SequenceFile merge code.
          Hide
          Devaraj Das added a comment -

          I would ideally like to use as much of the existing code/tools as possible. So, here is another proposal:
          1) Generate a master index as the buffers are filled with key/value pairs. The master index is an array indexed by partition number and each entry in the array contain values of the form <filename, keyoffset, valoffset>*. The filename refers to the filename where the containing buffer was spilled. Think of this as each element in the array is an array by itself.
          2) Filled buffers are spilled and NO sorting is done when a spill happens.
          3) At the end, we have a master index and we go through that index by index ( partition by partition). Since we know the filename and the key/value offsets, we can access the key/value data with ease. We create a temporary fixed-sized in-memory buffer for each array element and if we can accomodate all the key/value pairs for the particular array element in that buffer, well and good; we sort that in-memory buffer and append the sorted data to a final output file. If we cannot fit in everything into the buffer, we spill and sort the spills after we spill all the key/value pairs.
          4) While we do (3), we can trivially generate the final index file.
          5) At the end of the step (3), we have a big file with sorted data (by partition) and a single index file.
          The problems of implicit partition numbers are not there with this approach. Also, we can do parallel sorts (like two at a time or something) of the array elements and generate multiple files and just concatenate the sorted data at the end of the sorts to that single output file. The negative with this approach is that we need another temp buffer (but it may be required in the other approaches also). Think the master index file won't be very big (it will be roughly of the order of number of records processed by a single map).
          Thoughts ?

          Show
          Devaraj Das added a comment - I would ideally like to use as much of the existing code/tools as possible. So, here is another proposal: 1) Generate a master index as the buffers are filled with key/value pairs. The master index is an array indexed by partition number and each entry in the array contain values of the form <filename, keyoffset, valoffset>*. The filename refers to the filename where the containing buffer was spilled. Think of this as each element in the array is an array by itself. 2) Filled buffers are spilled and NO sorting is done when a spill happens. 3) At the end, we have a master index and we go through that index by index ( partition by partition). Since we know the filename and the key/value offsets, we can access the key/value data with ease. We create a temporary fixed-sized in-memory buffer for each array element and if we can accomodate all the key/value pairs for the particular array element in that buffer, well and good; we sort that in-memory buffer and append the sorted data to a final output file. If we cannot fit in everything into the buffer, we spill and sort the spills after we spill all the key/value pairs. 4) While we do (3), we can trivially generate the final index file. 5) At the end of the step (3), we have a big file with sorted data (by partition) and a single index file. The problems of implicit partition numbers are not there with this approach. Also, we can do parallel sorts (like two at a time or something) of the array elements and generate multiple files and just concatenate the sorted data at the end of the sorts to that single output file. The negative with this approach is that we need another temp buffer (but it may be required in the other approaches also). Think the master index file won't be very big (it will be roughly of the order of number of records processed by a single map). Thoughts ?
          Hide
          Devaraj Das added a comment -

          > Think the master index file won't be very big (it will be roughly of the order of
          > number of records processed by a single map).
          Actually meant to say "master index array/buffer".

          Show
          Devaraj Das added a comment - > Think the master index file won't be very big (it will be roughly of the order of > number of records processed by a single map). Actually meant to say "master index array/buffer".
          Hide
          Doug Cutting added a comment -

          Unless I misunderstand, your step (3) involves a lot of random disk accesses, something we must avoid. Seek time dominates for accesses of less than 100k or so. Optimal performance will only be achieved if all disk operations are reads and writes of larger chunks.

          Sorting buffers before they are written is also important, since it eliminates a sort pass. Most map outputs can be sorted in just a few passes, often only one, so each pass is significant.

          Show
          Doug Cutting added a comment - Unless I misunderstand, your step (3) involves a lot of random disk accesses, something we must avoid. Seek time dominates for accesses of less than 100k or so. Optimal performance will only be achieved if all disk operations are reads and writes of larger chunks. Sorting buffers before they are written is also important, since it eliminates a sort pass. Most map outputs can be sorted in just a few passes, often only one, so each pass is significant.
          Hide
          Owen O'Malley added a comment -

          I think that the buffer of the map output should be stored as:

          class KeyByteOffset

          { WritableComparable key; int offset; // offset of value in a buffer of bytes int length; // length of value in buffer of bytes }

          List<KeyByteOffset>[NumReduces]

          So you have a List of records for each reduce.

          When you fill up on #records or #value bytes, sort and spill. (Combine first, if there is a combiner.)

          The spilled keys should be PartKey's from above so that the standard merge tools on sequence files work. (If the PartKey uses the vint representation, each key will only expand by 1-2 bytes on disk.)

          I think we should have a new variant of the sequence file merge that returns an iterator over key/value pairs, which will be useful in reduce also.

          With that, we can easily merge and produce the final partition index & sequence file to local disk.

          Show
          Owen O'Malley added a comment - I think that the buffer of the map output should be stored as: class KeyByteOffset { WritableComparable key; int offset; // offset of value in a buffer of bytes int length; // length of value in buffer of bytes } List<KeyByteOffset> [NumReduces] So you have a List of records for each reduce. When you fill up on #records or #value bytes, sort and spill. (Combine first, if there is a combiner.) The spilled keys should be PartKey's from above so that the standard merge tools on sequence files work. (If the PartKey uses the vint representation, each key will only expand by 1-2 bytes on disk.) I think we should have a new variant of the sequence file merge that returns an iterator over key/value pairs, which will be useful in reduce also. With that, we can easily merge and produce the final partition index & sequence file to local disk.
          Hide
          Devaraj Das added a comment -

          Yes, disk accesses bothers me too. So do you think it is better to sort before spill (step (2)) & then updating the master index array to contain the <filename, start-key-val-offset, end-key-val-offset> makes sense? If we do this, then we will have bigger chunks that sort will operate on at the end (basically we will merge the chunks).

          Show
          Devaraj Das added a comment - Yes, disk accesses bothers me too. So do you think it is better to sort before spill (step (2)) & then updating the master index array to contain the <filename, start-key-val-offset, end-key-val-offset> makes sense? If we do this, then we will have bigger chunks that sort will operate on at the end (basically we will merge the chunks).
          Hide
          Owen O'Malley added a comment -

          and clearly the servlet opens the file, transmits the sequence file header, and the range of bytes from the index.

          Show
          Owen O'Malley added a comment - and clearly the servlet opens the file, transmits the sequence file header, and the range of bytes from the index.
          Hide
          Doug Cutting added a comment -

          > we should have a new variant of the sequence file merge that returns an iterator over key/value pairs

          +1

          Owen, shouldn't KeyByteOffset also include the partition #?

          Under this scheme, each spill will result in a separate <PartKey,value> sequence file, right? And probably each such sequence file should be accompanied by a <part,position> index file. That way, if there's only one spill, it can be used directly. Note that block compression complicates random access. Perhaps we should add a SequenceFile#Writer.flush() method that, when block-compression is used, resets the compressor and emits a sync, creating a seekable offset.

          Show
          Doug Cutting added a comment - > we should have a new variant of the sequence file merge that returns an iterator over key/value pairs +1 Owen, shouldn't KeyByteOffset also include the partition #? Under this scheme, each spill will result in a separate <PartKey,value> sequence file, right? And probably each such sequence file should be accompanied by a <part,position> index file. That way, if there's only one spill, it can be used directly. Note that block compression complicates random access. Perhaps we should add a SequenceFile#Writer.flush() method that, when block-compression is used, resets the compressor and emits a sync, creating a seekable offset.
          Hide
          Owen O'Malley added a comment -

          Before the spill, the partition is implicit in which list the KeyByteOffset is on, so I don't think you need to include it explictly.

          We may want to handle the (standard) 1 spill case separately, so that we can make the final output a sequence file over Key,Value instead of PartKey,Value. Either way will work...

          Show
          Owen O'Malley added a comment - Before the spill, the partition is implicit in which list the KeyByteOffset is on, so I don't think you need to include it explictly. We may want to handle the (standard) 1 spill case separately, so that we can make the final output a sequence file over Key,Value instead of PartKey,Value. Either way will work...
          Hide
          Sameer Paranjpye added a comment -

          Doug:

          > Still, I agree that we can minimize comparisons & swaps by bucketing these pointers, then sorting each
          > buffer prior to dumping the data it points to.

          Yes, this is what I meant.
          ---------------------

          Don't think KeyByteOffset needs a partition# since partition# is the same as the index of the list in List<KeyByteOffset>[NumReduces] in which the key is stored.

          > Under this scheme, each spill will result in a separate <PartKey,value> sequence file, right? And probably
          > each such sequence file should be accompanied by a <part,position> index file. That way, if there's only
          > one spill, it can be used directly. Note that block compression complicates random access. Perhaps we
          > should add a SequenceFile#Writer.flush() method that, when block-compression is used, resets the
          > compressor and emits a sync, creating a seekable offset.

          +1

          Another minor optimization could be to only include the partition# in the first key in each partition, populating all succeeding keys with 0.

          Show
          Sameer Paranjpye added a comment - Doug: > Still, I agree that we can minimize comparisons & swaps by bucketing these pointers, then sorting each > buffer prior to dumping the data it points to. Yes, this is what I meant. --------------------- Don't think KeyByteOffset needs a partition# since partition# is the same as the index of the list in List<KeyByteOffset> [NumReduces] in which the key is stored. > Under this scheme, each spill will result in a separate <PartKey,value> sequence file, right? And probably > each such sequence file should be accompanied by a <part,position> index file. That way, if there's only > one spill, it can be used directly. Note that block compression complicates random access. Perhaps we > should add a SequenceFile#Writer.flush() method that, when block-compression is used, resets the > compressor and emits a sync, creating a seekable offset. +1 Another minor optimization could be to only include the partition# in the first key in each partition, populating all succeeding keys with 0.
          Hide
          Doug Cutting added a comment -

          Owen:
          > the partition is implicit in which list the KeyByteOffset is on

          Ah, I somehow missed the lists. Sorry! You're right, no part is needed in KeyByteOffset.

          Sameer:
          > only include the partition# in the first key in each partition

          But then we cannot use SequenceFile's merger, since keys wouldn't be independently comparable, right?

          Show
          Doug Cutting added a comment - Owen: > the partition is implicit in which list the KeyByteOffset is on Ah, I somehow missed the lists. Sorry! You're right, no part is needed in KeyByteOffset. Sameer: > only include the partition# in the first key in each partition But then we cannot use SequenceFile's merger, since keys wouldn't be independently comparable, right?
          Hide
          Devaraj Das added a comment -

          A modified proposal (after Doug's comments)

          • Keep an array (length = # of partitions) whose elements will be lists containing elements of the form <spill#, start-key-val-offset, end-key-val-offset>*. This is obtained after the sort and before the spill of the buffer. The spill# helps in locating the section of the sorted partition data within a spill.
          • Merge goes over this array element by element and for each element, it looks at the contained list elements and creates a file appending the sections one by one. Then does mergesort with just one file as input with the output set to the final output file (opened in append mode). An index file is updated to contain <part#, start-offset, compressedlength>
          • At the end of the above, we have the final output file and the index file.
            The negative with this approach is that it requires trips to disk for reading/writing the sections before merge. So if we had 10 sections for a particular partition, we would have to read 10 chunks from the sorted spills and write them to disk as a single file. The plus is that the key format doesn't need to change at all (so minimum code change) and the number of comparisons during merge are reduced because we merge chunks of a single partition at a time.

          We can have the interator over keys/values as suggested by Owen and the block compression optimization suggested by Doug. Although the iterator won't be required for the map outputs if we adopt the approach outlined above, but will be required for the reduces, right Owen?

          Show
          Devaraj Das added a comment - A modified proposal (after Doug's comments) Keep an array (length = # of partitions) whose elements will be lists containing elements of the form <spill#, start-key-val-offset, end-key-val-offset>*. This is obtained after the sort and before the spill of the buffer. The spill# helps in locating the section of the sorted partition data within a spill. Merge goes over this array element by element and for each element, it looks at the contained list elements and creates a file appending the sections one by one. Then does mergesort with just one file as input with the output set to the final output file (opened in append mode). An index file is updated to contain <part#, start-offset, compressedlength> At the end of the above, we have the final output file and the index file. The negative with this approach is that it requires trips to disk for reading/writing the sections before merge. So if we had 10 sections for a particular partition, we would have to read 10 chunks from the sorted spills and write them to disk as a single file. The plus is that the key format doesn't need to change at all (so minimum code change) and the number of comparisons during merge are reduced because we merge chunks of a single partition at a time. We can have the interator over keys/values as suggested by Owen and the block compression optimization suggested by Doug. Although the iterator won't be required for the map outputs if we adopt the approach outlined above, but will be required for the reduces, right Owen?
          Hide
          Sameer Paranjpye added a comment -

          Doug:
          > But then we cannot use SequenceFile's merger, since keys wouldn't be independently comparable, right?

          Yes, you're right, they wouldn't. This optimization is probably not worth the trouble.


          To summarize, this is what we appear to have ended up with.

          • A couple of key variants
            class KeyByteOffset { WritableComparable key; int offset; int length; }

          class PartKey

          { int partition; WritableComparable key; }
          • In RAM data structures
            List<KeyByteOffset>[NumReduces] keyLists; // one list of keys per reduce
            byte[] valueBuffer; // serialized values are appended to this buffer
          • For each <key, value> pair.
            Determine partition for the key. Append the key to keyLists[partition], append the value to valueBuffer.
            If #records is greater than #maxRecords or #valueBytes is greater than #maxValueBytes
            OR
            We have no more records to process.
            SPILL - The spill is a block compressed sequence file of <PartKey, value> pairs, with a <part, offset> index.
            Compressed blocks in this file must not span partitions.
          • At the end, if #numSpills > 1, merge spills
          Show
          Sameer Paranjpye added a comment - Doug: > But then we cannot use SequenceFile's merger, since keys wouldn't be independently comparable, right? Yes, you're right, they wouldn't. This optimization is probably not worth the trouble. To summarize, this is what we appear to have ended up with. A couple of key variants class KeyByteOffset { WritableComparable key; int offset; int length; } class PartKey { int partition; WritableComparable key; } In RAM data structures List<KeyByteOffset> [NumReduces] keyLists; // one list of keys per reduce byte[] valueBuffer; // serialized values are appended to this buffer For each <key, value> pair. Determine partition for the key. Append the key to keyLists [partition] , append the value to valueBuffer. If #records is greater than #maxRecords or #valueBytes is greater than #maxValueBytes OR We have no more records to process. SPILL - The spill is a block compressed sequence file of <PartKey, value> pairs, with a <part, offset> index. Compressed blocks in this file must not span partitions. At the end, if #numSpills > 1, merge spills
          Hide
          Doug Cutting added a comment -

          +1 to Sameer's summary.

          Also note that two changes to SequenceFile are required:
          1. Expose an iterator-like API to merge, where subsequent next key/value pairs can be extracted from the merge. This will permit writing an index when merging, and will help optimize reduce.
          2. Add a Writer.sync() method that ends a compression block and creates a sync point.

          Show
          Doug Cutting added a comment - +1 to Sameer's summary. Also note that two changes to SequenceFile are required: 1. Expose an iterator-like API to merge, where subsequent next key/value pairs can be extracted from the merge. This will permit writing an index when merging, and will help optimize reduce. 2. Add a Writer.sync() method that ends a compression block and creates a sync point.
          Hide
          Sameer Paranjpye added a comment -

          A couple of other points/questions. In the summary above,

          SPILL means:

          • Sort
          • Combine if a Combiner is specified
          • Write to disk

          What sorting algorithm should we use? Is Collections.sort good enough? It claims to be comparable to a highly optimized quicksort. Or should we invest in a quicksort implementation.

          Is the generic SequenceFile heap merge going to do the right thing when merging spills? Even in the final merge we don't want compressed blocks to span partition boundaries. Or are we going to have to special case it for PartKeys?

          Show
          Sameer Paranjpye added a comment - A couple of other points/questions. In the summary above, SPILL means: Sort Combine if a Combiner is specified Write to disk What sorting algorithm should we use? Is Collections.sort good enough? It claims to be comparable to a highly optimized quicksort. Or should we invest in a quicksort implementation. Is the generic SequenceFile heap merge going to do the right thing when merging spills? Even in the final merge we don't want compressed blocks to span partition boundaries. Or are we going to have to special case it for PartKeys?
          Hide
          eric baldeschwieler added a comment -

          Sounds good. I think we are converging on something.

          A couple of points:

          1) When we spill, we don't need to keep any record of the spilled data. Devaraj maintained operating on these arrays after spills. We should not do that. Everything should be cleared out. (Except possibly the index of partitions to file offsets).

          2) I like the idea of extending the block compressed sequence file to directly support flushes at partition boundaries and merges of ranges. This will be reused in reduce as doug observed.

          3) We should not spill based on a number of records. Don't see any value in that. We should just spill based on RAM used.

          4) Related, we need to track total RAM used, not just for values, but also for keys and arrays. We don't want the system to blow up in the degenerate cases of huge keys or null values and many, many keys.

          Show
          eric baldeschwieler added a comment - Sounds good. I think we are converging on something. A couple of points: 1) When we spill, we don't need to keep any record of the spilled data. Devaraj maintained operating on these arrays after spills. We should not do that. Everything should be cleared out. (Except possibly the index of partitions to file offsets). 2) I like the idea of extending the block compressed sequence file to directly support flushes at partition boundaries and merges of ranges. This will be reused in reduce as doug observed. 3) We should not spill based on a number of records. Don't see any value in that. We should just spill based on RAM used. 4) Related, we need to track total RAM used, not just for values, but also for keys and arrays. We don't want the system to blow up in the degenerate cases of huge keys or null values and many, many keys.
          Hide
          eric baldeschwieler added a comment -

          A related feature. Compressing values individually as we serialize (before sort/spill) should be an option. Even if we are using block compression on output. Sounds inefficient, but if it reduces the number of spills 3x, this saves total disk passes and compression/decompression passes. Also clearly good if we choose to just compress by values, not block in the sequence files, since we could then just pass the compressed values about.

          Show
          eric baldeschwieler added a comment - A related feature. Compressing values individually as we serialize (before sort/spill) should be an option. Even if we are using block compression on output. Sounds inefficient, but if it reduces the number of spills 3x, this saves total disk passes and compression/decompression passes. Also clearly good if we choose to just compress by values, not block in the sequence files, since we could then just pass the compressed values about.
          Hide
          Doug Cutting added a comment -

          > we need to track total RAM used, not just for values, but also for keys and arrays

          Then we should change KeyByteOffset to something like:

          class BufferedEntry

          { int start; int keyLength; int valueLength; }

          Then the sort code can use WritableComparator.compare(byte[] k1, int s1, int l1, byte[] k2, int s2, int l2). Most WritableComparables have an optimized implementation of this method.

          To account for arrays we could just add 20 bytes per entry (BufferedEntry+pointer to it), which is probably close enough.

          Show
          Doug Cutting added a comment - > we need to track total RAM used, not just for values, but also for keys and arrays Then we should change KeyByteOffset to something like: class BufferedEntry { int start; int keyLength; int valueLength; } Then the sort code can use WritableComparator.compare(byte[] k1, int s1, int l1, byte[] k2, int s2, int l2). Most WritableComparables have an optimized implementation of this method. To account for arrays we could just add 20 bytes per entry (BufferedEntry+pointer to it), which is probably close enough.
          Hide
          Doug Cutting added a comment -

          > Compressing values individually as we serialize (before sort/spill) should be an option.

          Sounds like an enhancement to be added later. We should first use whatever compression is specified for map output.

          Show
          Doug Cutting added a comment - > Compressing values individually as we serialize (before sort/spill) should be an option. Sounds like an enhancement to be added later. We should first use whatever compression is specified for map output.
          Hide
          Owen O'Malley added a comment -

          Number of records does impact memory usage precisely because of the KeyByteOffset. I've seen cases where there are very small records (int,int) that use the majority of memory in the objects and their overhead. Since it can become a dominant factor, it needs to be monitored.

          I want to keep the Key objects so that performance doesn't suck if the user doesn't define raw comparators. Key objects are typically small and doing the base sort on them will be much faster if we keep them as objects.

          Show
          Owen O'Malley added a comment - Number of records does impact memory usage precisely because of the KeyByteOffset. I've seen cases where there are very small records (int,int) that use the majority of memory in the objects and their overhead. Since it can become a dominant factor, it needs to be monitored. I want to keep the Key objects so that performance doesn't suck if the user doesn't define raw comparators. Key objects are typically small and doing the base sort on them will be much faster if we keep them as objects.
          Hide
          Doug Cutting added a comment -

          > keep the Key objects so that performance doesn't suck if the user doesn't define raw comparators

          Note that merge performance will also suffer when the user doesn't define raw comparators, but not as badly as sort performance.

          One other advantage to using the raw, binary comparator is that we could then share the sort logic with SequenceFile. It currently has a sort algorithm that, without too much work, could be exposed as something like:

          public static sort(int[] pointers, int[] starts, int[] lengths, byte[] data, WritableComparator order);

          The pointers array indicates which values in starts and lengths should be used. It is permuted, so that, after sorting, the key at data[starts[pointers[i]]] is always less than data[starts[pointers[i+1]]]. To use this we'd dispense with the KeyByteOffset class altogether and simply keep something like:

          byte[] data; // all buffered keys and values
          int[] starts; // the start of each pair
          int[] keyLengths; // the length of each key
          // values run from starts[i]+keyLenghts[i] through starts[i+1].
          List<int[]> partPointers;

          Then we could sort each part pointer array and then write it. Each buffered entry would have a fixed 12-byte overhead, so memory accounting could be exact.

          Show
          Doug Cutting added a comment - > keep the Key objects so that performance doesn't suck if the user doesn't define raw comparators Note that merge performance will also suffer when the user doesn't define raw comparators, but not as badly as sort performance. One other advantage to using the raw, binary comparator is that we could then share the sort logic with SequenceFile. It currently has a sort algorithm that, without too much work, could be exposed as something like: public static sort(int[] pointers, int[] starts, int[] lengths, byte[] data, WritableComparator order); The pointers array indicates which values in starts and lengths should be used. It is permuted, so that, after sorting, the key at data[starts[pointers [i] ]] is always less than data[starts[pointers [i+1] ]]. To use this we'd dispense with the KeyByteOffset class altogether and simply keep something like: byte[] data; // all buffered keys and values int[] starts; // the start of each pair int[] keyLengths; // the length of each key // values run from starts [i] +keyLenghts [i] through starts [i+1] . List<int[]> partPointers; Then we could sort each part pointer array and then write it. Each buffered entry would have a fixed 12-byte overhead, so memory accounting could be exact.
          Hide
          Owen O'Malley added a comment -

          Ok, I've been convinced that we should start with BufferedEntry as a baseline. I think the overhead/record should be 40 rather than 20 for the size metric, but of course we should use a constant for it anyways:

          final int BUFFERED_KEY_VALUE_OVERHEAD = 40;

          And I think BufferedKeyValue is a little more informative than BufferedEntry.

          // condition for a spill would be
          buffer.size() + BUFFERED_KEY_VALUE_OVERHEAD * numKeyValues > conf.getMapOutputBufferSize()

          The records would be stored in an array of ArrayLists:

          List<BufferedEntry>[numReduces]

          Spills would be written as SequenceFile<PartKey<Key>, Value>

          The spills would be merged (using the iterator output form of merge) to write:

          SequenceFile<Key,Value> and partition index

          If there have been no spills, you just write the SequenceFile<Key,Value> and partition index from memory.

          It will give a fixed usage of memory, a single dump to disk in the common case, and a reasonable behavior for large cases.

          Show
          Owen O'Malley added a comment - Ok, I've been convinced that we should start with BufferedEntry as a baseline. I think the overhead/record should be 40 rather than 20 for the size metric, but of course we should use a constant for it anyways: final int BUFFERED_KEY_VALUE_OVERHEAD = 40; And I think BufferedKeyValue is a little more informative than BufferedEntry. // condition for a spill would be buffer.size() + BUFFERED_KEY_VALUE_OVERHEAD * numKeyValues > conf.getMapOutputBufferSize() The records would be stored in an array of ArrayLists: List<BufferedEntry> [numReduces] Spills would be written as SequenceFile<PartKey<Key>, Value> The spills would be merged (using the iterator output form of merge) to write: SequenceFile<Key,Value> and partition index If there have been no spills, you just write the SequenceFile<Key,Value> and partition index from memory. It will give a fixed usage of memory, a single dump to disk in the common case, and a reasonable behavior for large cases.
          Hide
          Devaraj Das added a comment -

          >public static sort(int[] pointers, int[] starts, int[] lengths, byte[] data,
          >WritableComparator order);

          +1

          >The pointers array indicates which values in starts and lengths should be used. It
          >is permuted, so that, after sorting, the key at data[starts[pointers[i]]] is always less
          >than data[starts[pointers[i+1]]]. To use this we'd dispense with the KeyByteOffset
          >class altogether and simply keep something like:

          +1

          Show
          Devaraj Das added a comment - >public static sort(int[] pointers, int[] starts, int[] lengths, byte[] data, >WritableComparator order); +1 >The pointers array indicates which values in starts and lengths should be used. It >is permuted, so that, after sorting, the key at data[starts[pointers [i] ]] is always less >than data[starts[pointers [i+1] ]]. To use this we'd dispense with the KeyByteOffset >class altogether and simply keep something like: +1
          Hide
          Devaraj Das added a comment -

          Attached is a txt doc that summarizes the discussions so far as a design doc.

          Show
          Devaraj Das added a comment - Attached is a txt doc that summarizes the discussions so far as a design doc.
          Hide
          Devaraj Das added a comment -

          Attaching a revised design spec.

          Show
          Devaraj Das added a comment - Attaching a revised design spec.
          Hide
          Devaraj Das added a comment -

          This is the first version of the patch for review. While testing is in progress, I thought the review can go on in parallel. The main changes are:
          1) The reduces don't do sort anymore. One change in ReduceTask.java is to do with that - "sort" string is replaced by "merge" in a couple of places.
          2) The other change in ReduceTask.java is that the class ValuesIterator class has been upgraded to package private static class. This is because, I am using the functionality of this class in Combiner also. Thus, I have two new classes extending from ValuesIterator: ReduceValuesIterator and CombineValuesIterator defined in ReduceTask.java and MapTask.java respectively.
          3) In MapTask.java, I have a couple of other new classes:
          3.1 MapOutputBuffer: this class does all the work to do with maintaining the buffer, sort, combine and spill. It implements the OutputCollector interface and a new interface called OutputWriter. The intent of having the second interface is for other classes to invoke the methods for writing the partition boundaries (syncs) and doing the final merge of the spills.
          4) The sort has been refactored as an interface SorterBase.java (that extends from RawKeyValueIterator interface). It defines a couple of methods that the MapOutputBuffer invokes during the map phase. The reason for extending from RawKeyValueIterator is to allow easy iteration over the sorted data (during combining/spilling). Also, during iteration the OutputWriter in MapTask.java is notified of partition changes so that it can do things like ending a block boundary by writing a sync (for block compression), etc.
          5) Implemented the SorterBase interface as a class BasicTypeSorterBase that implements the methods and the relevant datastructures as arrays of primitive int (as opposed to object arrays which could be implemented by another class implementing the SorterBase interface). The intent of BasicTypeSorterBase class is to serve all implementations of sort algos that rely on primitive arrays (of offsets/lengths/etc. to a read-only buffer). The sort() method itself is empty here.
          6) MergeSorter.java extends from BasicTypeSorterBase.java and implements the sort method. Basically, it accesses the data structures that BasicTypeSorterBase created and sets up the input arguments for hadoop.util.MergeSort (that actually implements the core of the MergeSort algo). One could have a QuickSorter.java and a corresponding hadoop.util.QuickSort.java and so on. The bridge between the framework and the sort algo is the Comparator that is passed by the framework to the hadoop.util.MergeSort's constructor.
          7) What sort algo to use in the framework can be set by map.sort.class which defaults to MergeSorter.class (for now).
          8) TaskTracker.java has the necessary jsp changes to serve the map output data to the reduces.
          9) SequenceFile.java has two new public APIs: createWriter and sync (this createWriter has slightly different args). Also refactored sort to use the new mergesort from hadoop.util.
          10) A new class hadoop.util.ListOfArrays has been introduced to maintain lists of primitive arrays and handle things like growing of arrays internally.

          Show
          Devaraj Das added a comment - This is the first version of the patch for review. While testing is in progress, I thought the review can go on in parallel. The main changes are: 1) The reduces don't do sort anymore. One change in ReduceTask.java is to do with that - "sort" string is replaced by "merge" in a couple of places. 2) The other change in ReduceTask.java is that the class ValuesIterator class has been upgraded to package private static class. This is because, I am using the functionality of this class in Combiner also. Thus, I have two new classes extending from ValuesIterator: ReduceValuesIterator and CombineValuesIterator defined in ReduceTask.java and MapTask.java respectively. 3) In MapTask.java, I have a couple of other new classes: 3.1 MapOutputBuffer: this class does all the work to do with maintaining the buffer, sort, combine and spill. It implements the OutputCollector interface and a new interface called OutputWriter. The intent of having the second interface is for other classes to invoke the methods for writing the partition boundaries (syncs) and doing the final merge of the spills. 4) The sort has been refactored as an interface SorterBase.java (that extends from RawKeyValueIterator interface). It defines a couple of methods that the MapOutputBuffer invokes during the map phase. The reason for extending from RawKeyValueIterator is to allow easy iteration over the sorted data (during combining/spilling). Also, during iteration the OutputWriter in MapTask.java is notified of partition changes so that it can do things like ending a block boundary by writing a sync (for block compression), etc. 5) Implemented the SorterBase interface as a class BasicTypeSorterBase that implements the methods and the relevant datastructures as arrays of primitive int (as opposed to object arrays which could be implemented by another class implementing the SorterBase interface). The intent of BasicTypeSorterBase class is to serve all implementations of sort algos that rely on primitive arrays (of offsets/lengths/etc. to a read-only buffer). The sort() method itself is empty here. 6) MergeSorter.java extends from BasicTypeSorterBase.java and implements the sort method. Basically, it accesses the data structures that BasicTypeSorterBase created and sets up the input arguments for hadoop.util.MergeSort (that actually implements the core of the MergeSort algo). One could have a QuickSorter.java and a corresponding hadoop.util.QuickSort.java and so on. The bridge between the framework and the sort algo is the Comparator that is passed by the framework to the hadoop.util.MergeSort's constructor. 7) What sort algo to use in the framework can be set by map.sort.class which defaults to MergeSorter.class (for now). 8) TaskTracker.java has the necessary jsp changes to serve the map output data to the reduces. 9) SequenceFile.java has two new public APIs: createWriter and sync (this createWriter has slightly different args). Also refactored sort to use the new mergesort from hadoop.util. 10) A new class hadoop.util.ListOfArrays has been introduced to maintain lists of primitive arrays and handle things like growing of arrays internally.
          Hide
          Owen O'Malley added a comment -

          Change the name of the class SorterBase to BufferSorter

          Instead of "initialize(JobConf)" in the file BasicTypeSorterBase.java, it should implement JobConfigurable.

          The BasicTypeSorterBase should be an abstract class since it itself
          doesn't implement the sort algorithm

          The sorter should not be concerned about partitions (makes the code
          hard to read). So have one sorter per partition instead of having one sorter
          for all partitions. In the case, where there is one sorter per partition, we
          don't require the callback mechanism (the OutputWriter interface in
          MapTask.java). Also, the ListOfArrays class is not required if this is done.

          sort method in the SorterBase interface should return a RawKeyValueIterator
          object rather than deriving from it import DataInputBuffer is not used in
          BasicTypeSorterBase.java

          SequenceFile.java has some whitespace-only changes

          Use readLong/writeLong instead of LongWritables for reading/writing longs

          Remove the empty finally block from MapTask.run

          Synchronize the collect methods in MapTask.java (move the 'synchronized'
          from the method declaration to inside the method as 'synchronized(this)' )

          Close the combiner after iterating a sorted partition while spilling to disk.
          This is to ensure that in case of streaming, where the combiner is a separate
          process, it processes the input key/vals and writes the output key/vals before
          the spill file is updated with the partition information.

          The occurrences of "sort" strings in ReduceTask.java may remain; they need not
          be replaced with "merge" strings (although merge is the more accurate word, but
          effectively we are sorting. Also, there may be code somewhere that relies on
          status strings being "sort").

          In TestMapRed.java, the check whether map output a compressed file or not is
          commented out. We need to do the compression check.

          Show
          Owen O'Malley added a comment - Change the name of the class SorterBase to BufferSorter Instead of "initialize(JobConf)" in the file BasicTypeSorterBase.java, it should implement JobConfigurable. The BasicTypeSorterBase should be an abstract class since it itself doesn't implement the sort algorithm The sorter should not be concerned about partitions (makes the code hard to read). So have one sorter per partition instead of having one sorter for all partitions. In the case, where there is one sorter per partition, we don't require the callback mechanism (the OutputWriter interface in MapTask.java). Also, the ListOfArrays class is not required if this is done. sort method in the SorterBase interface should return a RawKeyValueIterator object rather than deriving from it import DataInputBuffer is not used in BasicTypeSorterBase.java SequenceFile.java has some whitespace-only changes Use readLong/writeLong instead of LongWritables for reading/writing longs Remove the empty finally block from MapTask.run Synchronize the collect methods in MapTask.java (move the 'synchronized' from the method declaration to inside the method as 'synchronized(this)' ) Close the combiner after iterating a sorted partition while spilling to disk. This is to ensure that in case of streaming, where the combiner is a separate process, it processes the input key/vals and writes the output key/vals before the spill file is updated with the partition information. The occurrences of "sort" strings in ReduceTask.java may remain; they need not be replaced with "merge" strings (although merge is the more accurate word, but effectively we are sorting. Also, there may be code somewhere that relies on status strings being "sort"). In TestMapRed.java, the check whether map output a compressed file or not is commented out. We need to do the compression check.
          Hide
          Devaraj Das added a comment -

          Thanks Owen for the detailed review. Attached is the new patch.

          Show
          Devaraj Das added a comment - Thanks Owen for the detailed review. Attached is the new patch.
          Hide
          Hadoop QA added a comment -

          +1, http://issues.apache.org/jira/secure/attachment/12346642/331.patch applied and successfully tested against trunk revision r483294

          Show
          Hadoop QA added a comment - +1, http://issues.apache.org/jira/secure/attachment/12346642/331.patch applied and successfully tested against trunk revision r483294
          Hide
          Owen O'Malley added a comment -

          +1

          Show
          Owen O'Malley added a comment - +1
          Hide
          Doug Cutting added a comment -

          I just committed this, although I had to increase the heap size used for unit tests or else TestDFSIO would get an out-of-memory error. It's now 128MB.

          Thanks, Devaraj!

          Show
          Doug Cutting added a comment - I just committed this, although I had to increase the heap size used for unit tests or else TestDFSIO would get an out-of-memory error. It's now 128MB. Thanks, Devaraj!
          Hide
          Hadoop QA added a comment -
          Show
          Hadoop QA added a comment - Integrated in Hadoop-Nightly #91 (See http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Nightly/91/ )

            People

            • Assignee:
              Devaraj Das
              Reporter:
              eric baldeschwieler
            • Votes:
              2 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development