Cassandra
  1. Cassandra
  2. CASSANDRA-579

Stream SSTables without Anti-compaction

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Fix Version/s: 0.7 beta 1
    • Component/s: None
    • Labels:
      None

      Description

      The io.Streaming API currently requires a file on disk to stream, which means that bootstrap and repairs need to perform an anti-compaction that writes a bunch of data to disk, only to have it be deleted after the streaming has finished.

      EDIT: Deleted reference to using streaming as a client API: it wouldn't provide enough benefit over using the BMT interface, due to fragility.

        Issue Links

          Activity

          Hide
          Hudson added a comment -

          Integrated in Cassandra #469 (See http://hudson.zones.apache.org/hudson/job/Cassandra/469/)
          Stream sstables without anticompaction
          patch by Stu Hood; reviewed by jbellis for CASSANDRA-579
          Extract index/filter writing into IndexWriter; add recovery for non-essential sstable components; only send the datafile when streaming
          patch by Stu Hood; reviewed by jbellis for CASSANDRA-579

          Show
          Hudson added a comment - Integrated in Cassandra #469 (See http://hudson.zones.apache.org/hudson/job/Cassandra/469/ ) Stream sstables without anticompaction patch by Stu Hood; reviewed by jbellis for CASSANDRA-579 Extract index/filter writing into IndexWriter; add recovery for non-essential sstable components; only send the datafile when streaming patch by Stu Hood; reviewed by jbellis for CASSANDRA-579
          Hide
          Jonathan Ellis added a comment -

          done. (squashed 01-03 on purpose; 04 and 05 apparently got squashed somehow by git-svn getting confused during repeated borked dcommits.)

          Show
          Jonathan Ellis added a comment - done. (squashed 01-03 on purpose; 04 and 05 apparently got squashed somehow by git-svn getting confused during repeated borked dcommits.)
          Hide
          Jonathan Ellis added a comment -

          patches 01-03 applied, but I keep getting timeouts trying to commit the rest. (probably the super flaky wireless here.) will try again in the morning.

          Show
          Jonathan Ellis added a comment - patches 01-03 applied, but I keep getting timeouts trying to commit the rest. (probably the super flaky wireless here.) will try again in the morning.
          Hide
          Stu Hood added a comment -
          1. Squashed your patch into 0002
          2. Estimated keys in 0002 based on the first 100 keys or 100 megabytes of data, whichever comes first
          3. Fixed the test in 0005 to catch the long/int discrepancy: StreamingTest only tested recovering a single row
          4. Removed getNearestPosition in 0005
          Show
          Stu Hood added a comment - Squashed your patch into 0002 Estimated keys in 0002 based on the first 100 keys or 100 megabytes of data, whichever comes first Fixed the test in 0005 to catch the long/int discrepancy: StreamingTest only tested recovering a single row Removed getNearestPosition in 0005
          Hide
          Jonathan Ellis added a comment -

          patch to fix row size post- CASSANDRA-16.

          probably worth exploring why StreamingTest doesn't catch this.

          Show
          Jonathan Ellis added a comment - patch to fix row size post- CASSANDRA-16 . probably worth exploring why StreamingTest doesn't catch this.
          Hide
          Jonathan Ellis added a comment -

          // FIXME: extremely rough estimate of keys in the sstable

          I think we do need a better key count estimate. What about reading the first 1% (by size) of the rows, and estimating from that?

          Alternatively streamin could actually count the rows as they arrive.

          Show
          Jonathan Ellis added a comment - // FIXME: extremely rough estimate of keys in the sstable I think we do need a better key count estimate. What about reading the first 1% (by size) of the rows, and estimating from that? Alternatively streamin could actually count the rows as they arrive.
          Hide
          Stu Hood added a comment -

          Rebased for trunk.

          Show
          Stu Hood added a comment - Rebased for trunk.
          Hide
          Stu Hood added a comment -

          Rebased for trunk: should be ready for review now.

          Show
          Stu Hood added a comment - Rebased for trunk: should be ready for review now.
          Hide
          Stu Hood added a comment -

          0005 Needs more work.

          Show
          Stu Hood added a comment - 0005 Needs more work.
          Hide
          Stu Hood added a comment -

          Here is a patchset for trunk implementing the "stream matching ranges from existing sstables, and then rebuild indexes" strategy. It applies on top of 1117 (for no good reason, in retrospect). It passes existing tests, but I haven't tried it out on a cluster: I'll give that a shot later today.

          Show
          Stu Hood added a comment - Here is a patchset for trunk implementing the "stream matching ranges from existing sstables, and then rebuild indexes" strategy. It applies on top of 1117 (for no good reason, in retrospect). It passes existing tests, but I haven't tried it out on a cluster: I'll give that a shot later today.
          Hide
          Jonathan Ellis added a comment -

          Given how much cpu the compact part of anticompact chew up, Stu is right that we shouldn't do that on the source node. But we shouldn't do it on the target node, either, or more precisely, we shouldn't do it as part of the stream operation and let the operator do it manually if desired.

          We should use the row index to figure out what parts of the source data files to stream over, and send those chunks w/o deserializing anything. On the target side, we should read the keys but not deserialize anything else, and build the index + bloom filter from that.

          Show
          Jonathan Ellis added a comment - Given how much cpu the compact part of anticompact chew up, Stu is right that we shouldn't do that on the source node. But we shouldn't do it on the target node, either, or more precisely, we shouldn't do it as part of the stream operation and let the operator do it manually if desired. We should use the row index to figure out what parts of the source data files to stream over, and send those chunks w/o deserializing anything. On the target side, we should read the keys but not deserialize anything else, and build the index + bloom filter from that.
          Hide
          Stu Hood added a comment -

          Tons of people get bitten by this, and everyone else is lagged out by slow transfers.

          Show
          Stu Hood added a comment - Tons of people get bitten by this, and everyone else is lagged out by slow transfers.
          Hide
          Stu Hood added a comment -

          We can probably resume progress here now that 389 is in, although I hope to get 777 in first.

          Show
          Stu Hood added a comment - We can probably resume progress here now that 389 is in, although I hope to get 777 in first.
          Hide
          Stu Hood added a comment -

          I think we should delay making too many changes to streaming until we've finalized the SSTable versioning/interface changes proposed on #674. All of the possible approaches to optimizing this depend on the file format. For instance, sending only portions of the file depends on a splittable format, and performing the compaction on the sending side and then writing a new SSTable on the receiving side depends on the format of the serialized CompactedRows that would be sent across.

          Show
          Stu Hood added a comment - I think we should delay making too many changes to streaming until we've finalized the SSTable versioning/interface changes proposed on #674. All of the possible approaches to optimizing this depend on the file format. For instance, sending only portions of the file depends on a splittable format, and performing the compaction on the sending side and then writing a new SSTable on the receiving side depends on the format of the serialized CompactedRows that would be sent across.
          Hide
          Jonathan Ellis added a comment -

          Note that if we're streaming w/o first writing temporary file locally there is less reason to keep all outbound streaming single-threaded, but Streaming contexts currently assume there can only be one outstanding stream per source node. So keep that in mind if we are tempted to multithread streaming.

          Show
          Jonathan Ellis added a comment - Note that if we're streaming w/o first writing temporary file locally there is less reason to keep all outbound streaming single-threaded, but Streaming contexts currently assume there can only be one outstanding stream per source node. So keep that in mind if we are tempted to multithread streaming.
          Hide
          Jonathan Ellis added a comment -

          one side has to compact eventually, so it should be the sending side that compacts, since that gets you reduced bandwidth; there's no benefit by having receiver compact. it would be quite easy to compact to a socket stream instead of a disk one. (you lose transferTo, but receiver can still do transferFrom. and we could special case only-a-single-source-sstable with transferTo if that is worth it...)

          Show
          Jonathan Ellis added a comment - one side has to compact eventually, so it should be the sending side that compacts, since that gets you reduced bandwidth; there's no benefit by having receiver compact. it would be quite easy to compact to a socket stream instead of a disk one. (you lose transferTo, but receiver can still do transferFrom. and we could special case only-a-single-source-sstable with transferTo if that is worth it...)
          Hide
          Stu Hood added a comment -

          > What if we just stream from individual SSTables directly?
          This idea is growing on me... it causes minimal load on the machine that you are trying to move data off of, in terms of CPU and disk (compaction is done on the destination side). Network bandwidth should be relatively plentiful.

          Show
          Stu Hood added a comment - > What if we just stream from individual SSTables directly? This idea is growing on me... it causes minimal load on the machine that you are trying to move data off of, in terms of CPU and disk (compaction is done on the destination side). Network bandwidth should be relatively plentiful.
          Hide
          Jaakko Laine added a comment -

          What if we just stream from individual SSTables directly? That would of course send some stuff twice (or more) if done without compaction, but that seems like small overhead. If we streamed directly from SSTables, that would allow us to bootstrap from a node that is short of disk space.

          Show
          Jaakko Laine added a comment - What if we just stream from individual SSTables directly? That would of course send some stuff twice (or more) if done without compaction, but that seems like small overhead. If we streamed directly from SSTables, that would allow us to bootstrap from a node that is short of disk space.
          Hide
          Stu Hood added a comment -

          > ...since you just did a major compact...
          You didn't necessarily just do a major compaction: currently, when someone triggers a repair manually, it performs a readonly major compaction, which doesn't write to disk.

          Show
          Stu Hood added a comment - > ...since you just did a major compact... You didn't necessarily just do a major compaction: currently, when someone triggers a repair manually, it performs a readonly major compaction, which doesn't write to disk.
          Hide
          Jonathan Ellis added a comment -

          > Unless you are suggesting streaming each SSTable individually?

          streaming each range individually (so, one source sstable since you just did a major compact, and each range becomes an sstable on the target)

          Show
          Jonathan Ellis added a comment - > Unless you are suggesting streaming each SSTable individually? streaming each range individually (so, one source sstable since you just did a major compact, and each range becomes an sstable on the target)
          Hide
          Stu Hood added a comment - - edited

          > 1. use SSTR.getPosition to find start and end ranges to transfer...
          I don't think this works: you still need to perform the compaction first, which means you still need to dump to disk.
          EDIT: Unless you are suggesting streaming each SSTable individually?

          > 2. from the data file, compute index + BF files on the destination node
          This seems like the best approach.

          Show
          Stu Hood added a comment - - edited > 1. use SSTR.getPosition to find start and end ranges to transfer... I don't think this works: you still need to perform the compaction first, which means you still need to dump to disk. EDIT: Unless you are suggesting streaming each SSTable individually? > 2. from the data file, compute index + BF files on the destination node This seems like the best approach.
          Hide
          Jonathan Ellis added a comment -

          What you want to be careful about here is not screwing up the FileChannel.transferTo optimization which is very valuable.

          So IMO what you want to do is:

          1. use SSTR.getPosition to find start and end ranges to transfer, then use the existing streaming API – which already support streaming only parts of files via transferTo – to send that over as the data file in question.

          2. from the data file, compute index + BF files on the destination node, instead of wasting IO streaming those from the source.

          • streaming the index from source is possible, but since you need to scan the data file anyway to build BF (since there is no way to extract a subset of a BF) I think it's going to be simpler to just rebuild both. And anyway goffinet has wanted a "rebuild index from data file" for a while now
          Show
          Jonathan Ellis added a comment - What you want to be careful about here is not screwing up the FileChannel.transferTo optimization which is very valuable. So IMO what you want to do is: 1. use SSTR.getPosition to find start and end ranges to transfer, then use the existing streaming API – which already support streaming only parts of files via transferTo – to send that over as the data file in question. 2. from the data file, compute index + BF files on the destination node, instead of wasting IO streaming those from the source. streaming the index from source is possible, but since you need to scan the data file anyway to build BF (since there is no way to extract a subset of a BF) I think it's going to be simpler to just rebuild both. And anyway goffinet has wanted a "rebuild index from data file" for a while now
          Hide
          Chris Goffinet added a comment -

          We see about 16MB/s for compaction on a 4 disk RAID0 configuration. Moving this directly to network, so we can be network bound would be much better. But the thing is, compaction is running slower than the max throughput on network, which is roughly 80MB/s on our systems.

          Show
          Chris Goffinet added a comment - We see about 16MB/s for compaction on a 4 disk RAID0 configuration. Moving this directly to network, so we can be network bound would be much better. But the thing is, compaction is running slower than the max throughput on network, which is roughly 80MB/s on our systems.
          Hide
          Chris Goffinet added a comment -

          Also note, this was on 20 CFs

          Show
          Chris Goffinet added a comment - Also note, this was on 20 CFs
          Hide
          Stu Hood added a comment -

          Bumping priority on this one, because goffinet experienced a repair taking > 24 hours for ~100GB of data.

          Show
          Stu Hood added a comment - Bumping priority on this one, because goffinet experienced a repair taking > 24 hours for ~100GB of data.
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Cassandra #278 (See http://hudson.zones.apache.org/hudson/job/Cassandra/278/ )
          Hide
          Jonathan Ellis added a comment -

          expanding Streaming is fine, but it's not as straightforward as you might think because it is completely based around FileChannel.transferTo, which on Linux at least is a thin wrapper around the sendfile system call

          Show
          Jonathan Ellis added a comment - expanding Streaming is fine, but it's not as straightforward as you might think because it is completely based around FileChannel.transferTo, which on Linux at least is a thin wrapper around the sendfile system call

            People

            • Assignee:
              Stu Hood
              Reporter:
              Stu Hood
            • Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development