Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Fix Version/s: 0.7 beta 1
    • Component/s: Core
    • Labels:
      None
    • Environment:

      All

      Description

      The basic idea is to allow rows to get large enough that they don't have to fit in memory entirely, but can easily fit on a disk. The compaction algorithm today de-serializes the entire row in memory before writing out the compacted SSTable (see ColumnFamilyStore.doCompaction() and associated methods).

      The requirement is to have a compaction method with a lower memory requirement so we can support rows larger than available main memory. To re-use the old FB example, if we stored a user's inbox in a row, we'd want the inbox to grow bigger than memory so long as it fit on disk.

        Issue Links

          Activity

          Hide
          Jonathan Ellis added a comment -

          High level, you want to make a CF deserializer that implements Iterable<IColumn> (with buffering of course). Then have merge operate on those iterables instead of full CFs.

          It should be fairly self-contained, really. I think you only need to worry about the code in this small part of doCompaction:

          	                                if(columnFamilies.size() > 1)
          	                                {
          	    		                        merge(columnFamilies);
          	                                }
          			                        // deserialize into column families                                    
          			                     columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
          

          and then the sub-methods of merge of course.

          Show
          Jonathan Ellis added a comment - High level, you want to make a CF deserializer that implements Iterable<IColumn> (with buffering of course). Then have merge operate on those iterables instead of full CFs. It should be fairly self-contained, really. I think you only need to worry about the code in this small part of doCompaction: if (columnFamilies.size() > 1) { merge(columnFamilies); } // deserialize into column families columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn())); and then the sub-methods of merge of course.
          Hide
          Jun Rao added a comment -

          If a CF is indexed by name, an IColumn iterator is not that hard to be added on an sstable. It becomes harder if a CF is not indexed by name.

          Show
          Jun Rao added a comment - If a CF is indexed by name, an IColumn iterator is not that hard to be added on an sstable. It becomes harder if a CF is not indexed by name.
          Hide
          Jonathan Ellis added a comment -

          I think I remember seeing something about by-time sorting that made it more complicated but I don't remember what. Can you refresh my memory?

          Show
          Jonathan Ellis added a comment - I think I remember seeing something about by-time sorting that made it more complicated but I don't remember what. Can you refresh my memory?
          Hide
          Jun Rao added a comment -

          A CF can be defined to be indexed either by name or by timestamp. When storing columns in sstables, the columns are sorted according to the index attribute, i.e., either name or timestamp.

          Show
          Jun Rao added a comment - A CF can be defined to be indexed either by name or by timestamp. When storing columns in sstables, the columns are sorted according to the index attribute, i.e., either name or timestamp.
          Hide
          Jonathan Ellis added a comment -

          Oh, right, when they are stored by timestamp then you're not guaranteed that the columns you need to merge will come out in anything near the right order.

          Probably need to make two passes, or fall back to old in-memory for time-based. (Does anyone use those?)

          Show
          Jonathan Ellis added a comment - Oh, right, when they are stored by timestamp then you're not guaranteed that the columns you need to merge will come out in anything near the right order. Probably need to make two passes, or fall back to old in-memory for time-based. (Does anyone use those?)
          Hide
          Jonathan Ellis added a comment -

          Sandeep, are you or Jun going to be able to work on this?

          Show
          Jonathan Ellis added a comment - Sandeep, are you or Jun going to be able to work on this?
          Hide
          Jun Rao added a comment -

          Prashant, Avinash,

          Are you guys working on this issue already? Thanks,

          Show
          Jun Rao added a comment - Prashant, Avinash, Are you guys working on this issue already? Thanks,
          Hide
          Sandeep Tata added a comment -

          Prashant, Avinash – are you guys working on this already?

          Show
          Sandeep Tata added a comment - Prashant, Avinash – are you guys working on this already?
          Hide
          Jonathan Ellis added a comment -

          see CASSANDRA-226 for an idea on how to address time-sorted CFs here

          Show
          Jonathan Ellis added a comment - see CASSANDRA-226 for an idea on how to address time-sorted CFs here
          Hide
          Jonathan Ellis added a comment -

          Indexes are the real problem we're going to have to deal with here.

          We can't write the indexes first, if we can't merge the columns we're indexing in memory. (Not without making two passes: one to scan all the column names while writing the indexes, and another to do the full merge. Two passes is too high a cost to pay.)

          But we can't merge the columns in a streaming fashion while keeping the index data in memory to spit out at the end, either. We just fixed a bug from taking exactly this approach in CASSANDRA-208: this would limit the number of columns we support to a relatively small number; probably low millions, depending on your column name size and how much memory you can throw at the jvm.

          I think a hybrid approach is called for. If there are less than some threshold of columns (1000? 100000?) we merge in memory and put the index first, as we do now. Otherwise, we do a streaming merge and write the index to a separate file, similar to how we write the key index now. (In fact we could probably encapsulate this code as SSTableIndexWriter and use it in both places.)

          We don't want to always index in separate file because (a) filesystems have limits too – we don't want one index file per row per columnfamily – and because we want to do streaming writes wherever possible, which means staying in the same file.

          This approach will result in a litlte more seeking (between column index and sstable) than the two-pass inline approach, but merging in a single pass is worth the trade. (Remember that for large rows, reading the input multiple sstables will not be seek-free either once buffers max out. So we want to keep to a single pass for performance as well as simplicity.)

          Show
          Jonathan Ellis added a comment - Indexes are the real problem we're going to have to deal with here. We can't write the indexes first, if we can't merge the columns we're indexing in memory. (Not without making two passes: one to scan all the column names while writing the indexes, and another to do the full merge. Two passes is too high a cost to pay.) But we can't merge the columns in a streaming fashion while keeping the index data in memory to spit out at the end, either. We just fixed a bug from taking exactly this approach in CASSANDRA-208 : this would limit the number of columns we support to a relatively small number; probably low millions, depending on your column name size and how much memory you can throw at the jvm. I think a hybrid approach is called for. If there are less than some threshold of columns (1000? 100000?) we merge in memory and put the index first, as we do now. Otherwise, we do a streaming merge and write the index to a separate file, similar to how we write the key index now. (In fact we could probably encapsulate this code as SSTableIndexWriter and use it in both places.) We don't want to always index in separate file because (a) filesystems have limits too – we don't want one index file per row per columnfamily – and because we want to do streaming writes wherever possible, which means staying in the same file. This approach will result in a litlte more seeking (between column index and sstable) than the two-pass inline approach, but merging in a single pass is worth the trade. (Remember that for large rows, reading the input multiple sstables will not be seek-free either once buffers max out. So we want to keep to a single pass for performance as well as simplicity.)
          Hide
          Jun Rao added a comment -

          A couple of comments.

          1. While the row index has one index entry per row, the column index has one index entry per group of columns. So, the chance of column index not fitting in memory is low. Plus, one can always increase the column group size to reduce the index footprint.

          2. As a general solution, maybe we can put the column index after the column data in the same file. During compaction, we try to keep the column index in memory. If not possible, we append the column index to a temp file first. After we have written all columns, we copy the column index from the temp file to the end of the data file. So, in the worse case, we make two passes of the column index, but not the column data.

          Show
          Jun Rao added a comment - A couple of comments. 1. While the row index has one index entry per row, the column index has one index entry per group of columns. So, the chance of column index not fitting in memory is low. Plus, one can always increase the column group size to reduce the index footprint. 2. As a general solution, maybe we can put the column index after the column data in the same file. During compaction, we try to keep the column index in memory. If not possible, we append the column index to a temp file first. After we have written all columns, we copy the column index from the temp file to the end of the data file. So, in the worse case, we make two passes of the column index, but not the column data.
          Hide
          Jonathan Ellis added a comment -

          You're right, since not each column name is indexed I think we can get by with column index in memory. this will allow 100s of millions of columns, maybe only 10s to make sure you can hold multiple large indexes in memory at once, but that is still adequate for any use case I can think of. So I don't think we need to worry about writing indexes to a separate file for that reason.

          There are two other downsides though to endex-at-the-end; one is having to do an extra seek (we seek first to the end of the row to read the index size, then have to seek back from there to read the actual index), and the other is that index-at-the-end code will is inherently more complex than index-in-separate-file.

          But index-in-separate-file has its own problems; an extra fopen on the performance side, and since we'd want to keep small indexes inline, the complexity of handling both inline indexes and separate-file ones.

          On balance I think I lean towards index-at-the-end and hope we have enough ram that the OS cache can make the extra seek go away.

          Show
          Jonathan Ellis added a comment - You're right, since not each column name is indexed I think we can get by with column index in memory. this will allow 100s of millions of columns, maybe only 10s to make sure you can hold multiple large indexes in memory at once, but that is still adequate for any use case I can think of. So I don't think we need to worry about writing indexes to a separate file for that reason. There are two other downsides though to endex-at-the-end; one is having to do an extra seek (we seek first to the end of the row to read the index size, then have to seek back from there to read the actual index), and the other is that index-at-the-end code will is inherently more complex than index-in-separate-file. But index-in-separate-file has its own problems; an extra fopen on the performance side, and since we'd want to keep small indexes inline, the complexity of handling both inline indexes and separate-file ones. On balance I think I lean towards index-at-the-end and hope we have enough ram that the OS cache can make the extra seek go away.
          Hide
          Jonathan Ellis added a comment -

          We have a bigger problem.

          We rely on knowing the total size of the serialized columns to be able to seek around the sstable. But we can't write that data at the start without making two passes (the first to compute the size). Obviously writing it at the end is a nonstarter since we'd have no way to know where the end is, absent the size information.

          Bigtable doesn't seem to have found a way out of this either, limiting the data associated with a key to 64KB (see section 4).

          I'd rather limit the size (2GB is the current limit, which is more reasonable than 64KB I think) than make two passes in compaction. Huge rows seems almost like a misfeature given the key-oriented partitioner design.

          Show
          Jonathan Ellis added a comment - We have a bigger problem. We rely on knowing the total size of the serialized columns to be able to seek around the sstable. But we can't write that data at the start without making two passes (the first to compute the size). Obviously writing it at the end is a nonstarter since we'd have no way to know where the end is, absent the size information. Bigtable doesn't seem to have found a way out of this either, limiting the data associated with a key to 64KB (see section 4). I'd rather limit the size (2GB is the current limit, which is more reasonable than 64KB I think) than make two passes in compaction. Huge rows seems almost like a misfeature given the key-oriented partitioner design.
          Hide
          Jonathan Ellis added a comment -

          I suppose we could get the size information from the index, though. But that introduces a fair amount of complexity to what used to be simple operations.

          Show
          Jonathan Ellis added a comment - I suppose we could get the size information from the index, though. But that introduces a fair amount of complexity to what used to be simple operations.
          Hide
          Jun Rao added a comment -

          Can we leave a place-holder for the total size at the start and go back and fill the hole at the end of compaction?

          Show
          Jun Rao added a comment - Can we leave a place-holder for the total size at the start and go back and fill the hole at the end of compaction?
          Hide
          Jonathan Ellis added a comment -

          sure, if you want to add two seeks per row (first back to the hole, second to reposition for the next row).

          I'd rather maintain our No Seeking For Writes design than have huge rows.

          Show
          Jonathan Ellis added a comment - sure, if you want to add two seeks per row (first back to the hole, second to reposition for the next row). I'd rather maintain our No Seeking For Writes design than have huge rows.
          Hide
          Jonathan Ellis added a comment -

          Or, how about this compromise:

          We know each row size at the start. If the sum of these (which will always be equal or greater than the actual merged size) is greater than some user-defined number of MB, we do a two-pass merge; first to compute bloom filter, column index, and total row size, and second to actually write out the merged columns.

          Otherwise we do an in-memory merge the way we do now so that narrow rows are not penalized.

          This has the added benefit of not requiring a disk format change.

          Show
          Jonathan Ellis added a comment - Or, how about this compromise: We know each row size at the start. If the sum of these (which will always be equal or greater than the actual merged size) is greater than some user-defined number of MB, we do a two-pass merge; first to compute bloom filter, column index, and total row size, and second to actually write out the merged columns. Otherwise we do an in-memory merge the way we do now so that narrow rows are not penalized. This has the added benefit of not requiring a disk format change.
          Hide
          Jonathan Ellis added a comment -

          (This would be appropriate for workloads where you have a few outlier rows that incur the two-pass penalty, but most of the time you do not so it is less painful to do a few slower merges than redo the datamodel to something that maps less well to the domain.)

          Show
          Jonathan Ellis added a comment - (This would be appropriate for workloads where you have a few outlier rows that incur the two-pass penalty, but most of the time you do not so it is less painful to do a few slower merges than redo the datamodel to something that maps less well to the domain.)
          Hide
          Jonathan Ellis added a comment -

          Re-scheduling for 0.9. Maybe this time for sure.

          Show
          Jonathan Ellis added a comment - Re-scheduling for 0.9. Maybe this time for sure.
          Hide
          Jeremy added a comment -

          When this is finally resolved do you see this as being a fix that mostly protects from OOM cases, but really, a user shouldn't intentionally design a scenario that might create very large (ever growing) CF because of performance penalties... OR Is this is a fix that (as long as you have sufficient disk and modest amount of memory) handles this case just fine and a user shouldn't worry about it anymore...

          Show
          Jeremy added a comment - When this is finally resolved do you see this as being a fix that mostly protects from OOM cases, but really, a user shouldn't intentionally design a scenario that might create very large (ever growing) CF because of performance penalties... OR Is this is a fix that (as long as you have sufficient disk and modest amount of memory) handles this case just fine and a user shouldn't worry about it anymore...
          Hide
          Jonathan Ellis added a comment - - edited

          06
          make row size at which to drop to incremental compaction configurable

          05
          make row size 64 bits

          04
          add LazilyCompactedRow

          03
          r/m 'object count' abomination. fix BF serialization to only use top-level columns (since those are the only ones it's ever checked for)

          02
          make single pass over columns for indexing

          01
          introduce AbstractCompactedRow, PrecompactedRow

          Show
          Jonathan Ellis added a comment - - edited 06 make row size at which to drop to incremental compaction configurable 05 make row size 64 bits 04 add LazilyCompactedRow 03 r/m 'object count' abomination. fix BF serialization to only use top-level columns (since those are the only ones it's ever checked for) 02 make single pass over columns for indexing 01 introduce AbstractCompactedRow, PrecompactedRow
          Hide
          Jonathan Ellis added a comment -

          patchset applies on top of the one for CASSANDRA-1127

          Show
          Jonathan Ellis added a comment - patchset applies on top of the one for CASSANDRA-1127
          Hide
          Jonathan Ellis added a comment -

          rebased.

          note that besides LazilyCompactedRowTest (which checks that in-memory and on-disk compactions produce the same result), you can change

          if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit())

          to

          if (true)

          to force the rest of the test suite to use LCR (on-disk compaction).

          Show
          Jonathan Ellis added a comment - rebased. note that besides LazilyCompactedRowTest (which checks that in-memory and on-disk compactions produce the same result), you can change if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit()) to if (true) to force the rest of the test suite to use LCR (on-disk compaction).
          Hide
          Stu Hood added a comment -

          Is there a reason to keep non-lazily compacted rows around? Are they faster for rows smaller than 256 MB?

          Show
          Stu Hood added a comment - Is there a reason to keep non-lazily compacted rows around? Are they faster for rows smaller than 256 MB?
          Hide
          Jonathan Ellis added a comment -

          Yes; lazy makes 2 identical column merging passes, so it's going to be roughly half as fast.

          Show
          Jonathan Ellis added a comment - Yes; lazy makes 2 identical column merging passes, so it's going to be roughly half as fast.
          Hide
          Jonathan Ellis added a comment -

          rebased

          Show
          Jonathan Ellis added a comment - rebased
          Hide
          Stu Hood added a comment - - edited
          • Some comments in ColumnIndexer.serializeInternal are outdated
          • Methods in SSTableIdentityIterator are really tightly coupled
          • SSTableIdentityIterator.rowSize reads an integer for the row size
          • Could you document LazilyCompactedRow a little bit to explain the algorithm? It's unclear that the first pass happens within the constructor
          Show
          Stu Hood added a comment - - edited Some comments in ColumnIndexer.serializeInternal are outdated Methods in SSTableIdentityIterator are really tightly coupled SSTableIdentityIterator.rowSize reads an integer for the row size Could you document LazilyCompactedRow a little bit to explain the algorithm? It's unclear that the first pass happens within the constructor
          Hide
          Jonathan Ellis added a comment -

          1. updated comments in new 03
          2. added patch 07 to address the low hanging fruit here (see below)
          3. fixed by removing rowSize in favor of dataSize in new 04
          4. added class javadoc to ACR, PCR, and LCR in new 01 and 04

          I experimented with making more invasive changes to SSTII (making SSTS implement iterable instead of iterator, and making it iterate over SSTableRow objects that could build a SSTII or just read an entire CF, so we could move getColumnFamilyWithColumns out of SSTII) but this made things worse overall; the requirements for row scans vs compaction are just different enough to make things ugly.

          Show
          Jonathan Ellis added a comment - 1. updated comments in new 03 2. added patch 07 to address the low hanging fruit here (see below) 3. fixed by removing rowSize in favor of dataSize in new 04 4. added class javadoc to ACR, PCR, and LCR in new 01 and 04 I experimented with making more invasive changes to SSTII (making SSTS implement iterable instead of iterator, and making it iterate over SSTableRow objects that could build a SSTII or just read an entire CF, so we could move getColumnFamilyWithColumns out of SSTII) but this made things worse overall; the requirements for row scans vs compaction are just different enough to make things ugly.
          Hide
          Stu Hood added a comment -

          +1

          Show
          Stu Hood added a comment - +1
          Hide
          Jonathan Ellis added a comment -

          committed

          Show
          Jonathan Ellis added a comment - committed
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Cassandra #469 (See http://hudson.zones.apache.org/hudson/job/Cassandra/469/ )

            People

            • Assignee:
              Jonathan Ellis
              Reporter:
              Sandeep Tata
            • Votes:
              4 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development