Cassandra
  1. Cassandra
  2. CASSANDRA-3003

Trunk single-pass streaming doesn't handle large row correctly

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Fix Version/s: 1.0.0
    • Component/s: Core
    • Labels:

      Description

      For normal column family, trunk streaming always buffer the whole row into memory. In uses

        ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
      

      on the input bytes.
      We must avoid this for rows that don't fit in the inMemoryLimit.

      Note that for regular column families, for a given row, there is actually no need to even recreate the bloom filter of column index, nor to deserialize the columns. It is enough to filter the key and row size to feed the index writer, but then simply dump the rest on disk directly. This would make streaming more efficient, avoid a lot of object creation and avoid the pitfall of big rows.

      Counters column family are unfortunately trickier, because each column needs to be deserialized (to mark them as 'fromRemote'). However, we don't need to do the double pass of LazilyCompactedRow for that. We can simply use a SSTableIdentityIterator and deserialize/reserialize input as it comes.

      1. 3003-v3.txt
        11 kB
        Yuki Morishita
      2. 3003-v5.txt
        11 kB
        Yuki Morishita
      3. ASF.LICENSE.NOT.GRANTED--3003-v1.txt
        9 kB
        Yuki Morishita
      4. ASF.LICENSE.NOT.GRANTED--3003-v2.txt
        10 kB
        Yuki Morishita
      5. v3003-v4.txt
        11 kB
        Yuki Morishita

        Activity

        Gavin made changes -
        Workflow patch-available, re-open possible [ 12752939 ] reopen-resolved, no closed status, patch-avail, testing [ 12758544 ]
        Gavin made changes -
        Workflow no-reopen-closed, patch-avail [ 12626538 ] patch-available, re-open possible [ 12752939 ]
        Hide
        Hudson added a comment -

        Integrated in Cassandra #1074 (See https://builds.apache.org/job/Cassandra/1074/)
        Handle large rows with single-pass streaming
        patch by yukim; reviewed by slebresne for CASSANDRA-3003

        slebresne : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1165306
        Files :

        • /cassandra/trunk/CHANGES.txt
        • /cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
        • /cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
        • /cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
        • /cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
        Show
        Hudson added a comment - Integrated in Cassandra #1074 (See https://builds.apache.org/job/Cassandra/1074/ ) Handle large rows with single-pass streaming patch by yukim; reviewed by slebresne for CASSANDRA-3003 slebresne : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1165306 Files : /cassandra/trunk/CHANGES.txt /cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java /cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java /cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java /cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
        Sylvain Lebresne made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Sylvain Lebresne added a comment -

        lgtm, +1.

        Committed with a tiny change to use a cheaper array backed column family in appendToStream, since we deserialize in order (and in a single thread).

        Show
        Sylvain Lebresne added a comment - lgtm, +1. Committed with a tiny change to use a cheaper array backed column family in appendToStream, since we deserialize in order (and in a single thread).
        Hide
        Yuki Morishita added a comment -

        Sylvain,

        Thank you for the review.
        For now, I leave the max timestamp calculation part as it is done during streaming.

        we need to use Integer.MIN_VALUE as the value for expireBefore when deserializing the columns, otherwise the expired columns will be converted to DeletedColumns, which will change there serialized size (and thus screw up the data size and column index)

        Fixed.

        for markDeltaAsDeleted, we must check if the length is already negative and leave it so if it is, otherwise if a streamed sstable get re-streamed to another node before it was compacted, we could end up not cleaning the delta correctly.

        it would be nice in SSTW.appendFromStream() to assert the sanity of our little deserialize-reserialize dance and assert what we did write the number of bytes that we wrote in the header.

        Nice point. I added the same assertion as other append() does.

        the patch change a clearAllDelta to a markDeltaAsDeleted in CounterColumnTest which is bogus (and the test does fail with that change).

        I forgot to revert this one. I should have run test before submitting...

        I would markDeltaAsDeleted to markForClearingDelta as this describe what the function does better

        Fixed.

        nitpick: there is a few space at end of lines in some comments (I know I know, I'm picky).

        Fixed this one too, I guess.

        Show
        Yuki Morishita added a comment - Sylvain, Thank you for the review. For now, I leave the max timestamp calculation part as it is done during streaming. we need to use Integer.MIN_VALUE as the value for expireBefore when deserializing the columns, otherwise the expired columns will be converted to DeletedColumns, which will change there serialized size (and thus screw up the data size and column index) Fixed. for markDeltaAsDeleted, we must check if the length is already negative and leave it so if it is, otherwise if a streamed sstable get re-streamed to another node before it was compacted, we could end up not cleaning the delta correctly. it would be nice in SSTW.appendFromStream() to assert the sanity of our little deserialize-reserialize dance and assert what we did write the number of bytes that we wrote in the header. Nice point. I added the same assertion as other append() does. the patch change a clearAllDelta to a markDeltaAsDeleted in CounterColumnTest which is bogus (and the test does fail with that change). I forgot to revert this one. I should have run test before submitting... I would markDeltaAsDeleted to markForClearingDelta as this describe what the function does better Fixed. nitpick: there is a few space at end of lines in some comments (I know I know, I'm picky). Fixed this one too, I guess.
        Yuki Morishita made changes -
        Attachment 3003-v5.txt [ 12492974 ]
        Hide
        Sylvain Lebresne added a comment -

        I think this is a little bit sad to deserialize all the columns in the non-counter case. We do need to do it right now because of the computation of the max timestamp, but maybe we could have the other side send use the max timestamp as part of the stream header (but I agree, it's a bit more complicated).

        For the record, the handling of counter columns amounts to the initial proposition of Stu of moving the cleanup to the reads (though the solution is slightly different). So the "we'll cleanup on each read before the sstable is compacted" remark does hold here, but I don't see a better solution right now and the "those sstables will likely be compacted quickly" argument probably make this ok anyway.

        Other comments:

        • we need to use Integer.MIN_VALUE as the value for expireBefore when deserializing the columns, otherwise the expired columns will be converted to DeletedColumns, which will change there serialized size (and thus screw up the data size and column index)
        • for markDeltaAsDeleted, we must check if the length is already negative and leave it so if it is, otherwise if a streamed sstable get re-streamed to another node before it was compacted, we could end up not cleaning the delta correctly.
        • it would be nice in SSTW.appendFromStream() to assert the sanity of our little deserialize-reserialize dance and assert what we did write the number of bytes that we wrote in the header.
        • the patch change a clearAllDelta to a markDeltaAsDeleted in CounterColumnTest which is bogus (and the test does fail with that change).
        • I would markDeltaAsDeleted to markForClearingDelta as this describe what the function does better
        • nitpick: there is a few space at end of lines in some comments (I know I know, I'm picky).
        Show
        Sylvain Lebresne added a comment - I think this is a little bit sad to deserialize all the columns in the non-counter case. We do need to do it right now because of the computation of the max timestamp, but maybe we could have the other side send use the max timestamp as part of the stream header (but I agree, it's a bit more complicated). For the record, the handling of counter columns amounts to the initial proposition of Stu of moving the cleanup to the reads (though the solution is slightly different). So the "we'll cleanup on each read before the sstable is compacted" remark does hold here, but I don't see a better solution right now and the "those sstables will likely be compacted quickly" argument probably make this ok anyway. Other comments: we need to use Integer.MIN_VALUE as the value for expireBefore when deserializing the columns, otherwise the expired columns will be converted to DeletedColumns, which will change there serialized size (and thus screw up the data size and column index) for markDeltaAsDeleted, we must check if the length is already negative and leave it so if it is, otherwise if a streamed sstable get re-streamed to another node before it was compacted, we could end up not cleaning the delta correctly. it would be nice in SSTW.appendFromStream() to assert the sanity of our little deserialize-reserialize dance and assert what we did write the number of bytes that we wrote in the header. the patch change a clearAllDelta to a markDeltaAsDeleted in CounterColumnTest which is bogus (and the test does fail with that change). I would markDeltaAsDeleted to markForClearingDelta as this describe what the function does better nitpick: there is a few space at end of lines in some comments (I know I know, I'm picky).
        Yuki Morishita made changes -
        Attachment v3003-v4.txt [ 12492476 ]
        Hide
        Yuki Morishita added a comment -

        Added handling of counters inside SuperColumn.

        Show
        Yuki Morishita added a comment - Added handling of counters inside SuperColumn.
        Hide
        Yuki Morishita added a comment -

        In v3, I forgot to handle the case where counter columns inside super column. I'll update soon.

        Show
        Yuki Morishita added a comment - In v3, I forgot to handle the case where counter columns inside super column. I'll update soon.
        Jonathan Ellis made changes -
        Reviewer slebresne
        Yuki Morishita made changes -
        Attachment 3003-v3.txt [ 12492255 ]
        Hide
        Yuki Morishita added a comment -

        v3 attached. It marks counter column to delete delta after deserializing it from stream without clearing all delta. In this way, marking does not affect regular counter update.

        Show
        Yuki Morishita added a comment - v3 attached. It marks counter column to delete delta after deserializing it from stream without clearing all delta. In this way, marking does not affect regular counter update.
        Hide
        Yuki Morishita added a comment -

        Looks like we need distinct paths. Counter reads from remote in the read path also get marked (have negative #elt) and may cause problem. I'll take a look.

        Show
        Yuki Morishita added a comment - Looks like we need distinct paths. Counter reads from remote in the read path also get marked (have negative #elt) and may cause problem. I'll take a look.
        Hide
        Jonathan Ellis added a comment -

        Does CounterColumn.create work for both "normal," non-streamed counter updates, as well as streaming? Or do we need two distinct paths there?

        Show
        Jonathan Ellis added a comment - Does CounterColumn.create work for both "normal," non-streamed counter updates, as well as streaming? Or do we need two distinct paths there?
        Yuki Morishita made changes -
        Attachment mylyn-context.zip [ 12491799 ]
        Yuki Morishita made changes -
        Attachment mylyn-context.zip [ 12491799 ]
        Hide
        Yuki Morishita added a comment -

        V2 attached and ready for the review.
        For Counter columns, instead of padding in place of removed delta, v2 just "mark" the counter column to clear delta later, by multiplying #elt by -1 in order to keep the header size for later removal. Marking only occur when deserialize "fromRemote", and actual removal of delta is done when reading again from disk after the streaming.

        Show
        Yuki Morishita added a comment - V2 attached and ready for the review. For Counter columns, instead of padding in place of removed delta, v2 just "mark" the counter column to clear delta later, by multiplying #elt by -1 in order to keep the header size for later removal. Marking only occur when deserialize "fromRemote", and actual removal of delta is done when reading again from disk after the streaming.
        Yuki Morishita made changes -
        Attachment 3003-v2.txt [ 12491798 ]
        Yuki Morishita made changes -
        Status In Progress [ 3 ] Open [ 1 ]
        Yuki Morishita made changes -
        Attachment mylyn-context.zip [ 12491092 ]
        Yuki Morishita made changes -
        Attachment mylyn-context.zip [ 12491092 ]
        Hide
        Yuki Morishita added a comment -

        Instead of creating CounterCleanedRow, I added appendFromStream method to SSTW, which handles both normal and counter column.

        I still need to work on SSTII because attached patch causes problem when iterating over cleaned up CounterColumns with 0-padding added during streaming.
        That also causes StreamingTransferTest fail.

        Will post update version soon.

        Show
        Yuki Morishita added a comment - Instead of creating CounterCleanedRow, I added appendFromStream method to SSTW, which handles both normal and counter column. I still need to work on SSTII because attached patch causes problem when iterating over cleaned up CounterColumns with 0-padding added during streaming. That also causes StreamingTransferTest fail. Will post update version soon.
        Yuki Morishita made changes -
        Attachment 3003-v1.txt [ 12491091 ]
        Jonathan Ellis made changes -
        Fix Version/s 1.0 [ 12316349 ]
        Affects Version/s 1.0 [ 12316349 ]
        Hide
        Jonathan Ellis added a comment -

        How is this looking, Yuki?

        Show
        Jonathan Ellis added a comment - How is this looking, Yuki?
        Hide
        Sylvain Lebresne added a comment -

        Can we pad it somehow?

        It's doable. Basically a context is an array of shards, with a header that is a (variable) list of which of those shards are a delta. When we cleanup the delta we remove the header basically. We could have a specific cleanup for streaming that just set all the header to -1. But we probably want to do that only for the cleanup during streaming, and have compaction clean those afterwards, otherwise it is ugly. I don't know how much easier it is than cleaning during reads, though it avoids having to add a new info for sstable metadata.

        Show
        Sylvain Lebresne added a comment - Can we pad it somehow? It's doable. Basically a context is an array of shards, with a header that is a (variable) list of which of those shards are a delta. When we cleanup the delta we remove the header basically. We could have a specific cleanup for streaming that just set all the header to -1. But we probably want to do that only for the cleanup during streaming, and have compaction clean those afterwards, otherwise it is ugly. I don't know how much easier it is than cleaning during reads, though it avoids having to add a new info for sstable metadata.
        Hide
        Jonathan Ellis added a comment -

        it would have been nicer to have the cleaning of a counter context not change its size

        Can we pad it somehow?

        Show
        Jonathan Ellis added a comment - it would have been nicer to have the cleaning of a counter context not change its size Can we pad it somehow?
        Hide
        Sylvain Lebresne added a comment -

        I'm probably missing something, but isn't the problem that this can't be done without two passes for rows that are too large to fit in memory?

        Hum true. What we need to do is deserialize each row with the 'fromRemote' flag on so that the delta are cleaned up, and them reserialize the result. But that will potentially reduce the column serialized size (and thus modify the row total size and the column index). Now we could imagine to remember the offset of the beginning of the row, to load the column index in memory and update it during the first pass (it would likely be ok to simply update the index offsets without changing the index structure itself), and to seek back at the end to write the updated data size and column index. However, this unfortunately won't be doable with the current SequentialWriter (and CompressedSequentialWriter) since we cannot seek back (without truncating). Retrospectively, it would have been nicer to have the cleaning of a counter context not change its size

        So yeah, it sucks. I'm still mildly fan of moving the cleanup because it "feels wrong" somehow. It feels it would be better to have that delta cleaning done sooner than latter. But this may end up being the simplest/more efficient solution.

        Show
        Sylvain Lebresne added a comment - I'm probably missing something, but isn't the problem that this can't be done without two passes for rows that are too large to fit in memory? Hum true. What we need to do is deserialize each row with the 'fromRemote' flag on so that the delta are cleaned up, and them reserialize the result. But that will potentially reduce the column serialized size (and thus modify the row total size and the column index). Now we could imagine to remember the offset of the beginning of the row, to load the column index in memory and update it during the first pass (it would likely be ok to simply update the index offsets without changing the index structure itself), and to seek back at the end to write the updated data size and column index. However, this unfortunately won't be doable with the current SequentialWriter (and CompressedSequentialWriter) since we cannot seek back (without truncating). Retrospectively, it would have been nicer to have the cleaning of a counter context not change its size So yeah, it sucks. I'm still mildly fan of moving the cleanup because it "feels wrong" somehow. It feels it would be better to have that delta cleaning done sooner than latter. But this may end up being the simplest/more efficient solution.
        Hide
        Yuki Morishita added a comment -

        Stu, Sylvan,

        Let me try to fix this by using EchoedRow to serialize directly to disk, and creating new "CounterCleanedRow" suggested by Sylvain above.

        Show
        Yuki Morishita added a comment - Stu, Sylvan, Let me try to fix this by using EchoedRow to serialize directly to disk, and creating new "CounterCleanedRow" suggested by Sylvain above.
        Hide
        Stu Hood added a comment -

        I really think it is not very hard to do 'inline'. We really just want to deserialize, cleanup, reserialize. It should be super easy to add some "CounterCleanedRow" that does that.

        I'm probably missing something, but isn't the problem that this can't be done without two passes for rows that are too large to fit in memory? And you can't perform two passes without buffering data somewhere? I suggested removing the cleanup step out of streaming because then the row could be echoed to disk without modification.

        It would also be less efficient, because until we have compacted the streamed sstable, each read will have to call the cleanup over and over

        This is true, but compaction is fairly likely to trigger soon after a big batch of streamed files arrives, since they will trigger compaction thresholds.

        Show
        Stu Hood added a comment - I really think it is not very hard to do 'inline'. We really just want to deserialize, cleanup, reserialize. It should be super easy to add some "CounterCleanedRow" that does that. I'm probably missing something, but isn't the problem that this can't be done without two passes for rows that are too large to fit in memory? And you can't perform two passes without buffering data somewhere? I suggested removing the cleanup step out of streaming because then the row could be echoed to disk without modification. It would also be less efficient, because until we have compacted the streamed sstable, each read will have to call the cleanup over and over This is true, but compaction is fairly likely to trigger soon after a big batch of streamed files arrives, since they will trigger compaction thresholds.
        Yuki Morishita made changes -
        Status Open [ 1 ] In Progress [ 3 ]
        Hide
        Sylvain Lebresne added a comment -

        In order to use EchoedRow, we'd have to move where we do CounterContext cleanup

        I really think it is not very hard to do 'inline'. We really just want to deserialize, cleanup, reserialize. It should be super easy to add some "CounterCleanedRow" that does that.

        at it could be done at read time if we added "fromRemote" as a field in the metadata of an SSTable

        Yes, but it does sound a bit complicated to me compared to doing the cleanup right away during streaming. It would also be less efficient, because until we have compacted the streamed sstable, each read will have to call the cleanup over and over, while we really only care to have it done twice (unless we completely change where we do cleanup).

        Perhaps we can use this as an opportunity to switch to using only PrecompactedRow (for narrow rows which might go to cache) or EchoedRow (for wide rows, which go directly to disk)?

        I agree in that there is no point in doing manual deserialization there. About the PrecompactedRow for narrow rows which might go to cache, I'll just precise that it is worth using PrecompactedRow only if 1) we are doing AES streaming and 2) the row is in cache in the first place (which we can know since we always at least deserialize the row key).

        Show
        Sylvain Lebresne added a comment - In order to use EchoedRow, we'd have to move where we do CounterContext cleanup I really think it is not very hard to do 'inline'. We really just want to deserialize, cleanup, reserialize. It should be super easy to add some "CounterCleanedRow" that does that. at it could be done at read time if we added "fromRemote" as a field in the metadata of an SSTable Yes, but it does sound a bit complicated to me compared to doing the cleanup right away during streaming. It would also be less efficient, because until we have compacted the streamed sstable, each read will have to call the cleanup over and over, while we really only care to have it done twice (unless we completely change where we do cleanup). Perhaps we can use this as an opportunity to switch to using only PrecompactedRow (for narrow rows which might go to cache) or EchoedRow (for wide rows, which go directly to disk)? I agree in that there is no point in doing manual deserialization there. About the PrecompactedRow for narrow rows which might go to cache, I'll just precise that it is worth using PrecompactedRow only if 1) we are doing AES streaming and 2) the row is in cache in the first place (which we can know since we always at least deserialize the row key).
        Hide
        Stu Hood added a comment - - edited

        Oof... I don't know how I missed this one in review: very, very sorry Yuki/Sylvain.

        Perhaps we can use this as an opportunity to switch to using only PrecompactedRow (for narrow rows which might go to cache) or EchoedRow (for wide rows, which go directly to disk)?

        In order to use EchoedRow, we'd have to move where we do CounterContext cleanup: I've suggested in the past that it could be done at read time if we added "fromRemote" as a field in the metadata of an SSTable. Every SSTable*Iterator would be affected, because they'd need to respect the fromRemote field.

        Alternatively, we could revert 2920 and 2677 (which I would hate: this has been a huge cleanup).

        EDIT: Oops, apparently I didn't review this one. Anyway!

        Show
        Stu Hood added a comment - - edited Oof... I don't know how I missed this one in review: very, very sorry Yuki/Sylvain. Perhaps we can use this as an opportunity to switch to using only PrecompactedRow (for narrow rows which might go to cache) or EchoedRow (for wide rows, which go directly to disk)? In order to use EchoedRow, we'd have to move where we do CounterContext cleanup: I've suggested in the past that it could be done at read time if we added "fromRemote" as a field in the metadata of an SSTable. Every SSTable*Iterator would be affected, because they'd need to respect the fromRemote field. Alternatively, we could revert 2920 and 2677 (which I would hate: this has been a huge cleanup). EDIT: Oops, apparently I didn't review this one. Anyway!
        Sylvain Lebresne made changes -
        Field Original Value New Value
        Priority Major [ 3 ] Critical [ 2 ]
        Hide
        Sylvain Lebresne added a comment -

        Marking critical, because at least for counter column family, when the row is larger than the inMemoryLimit, the code will actually crash because it will use lazilyCompactedRow which will try to do it's 2 passes.

        Show
        Sylvain Lebresne added a comment - Marking critical, because at least for counter column family, when the row is larger than the inMemoryLimit, the code will actually crash because it will use lazilyCompactedRow which will try to do it's 2 passes.
        Sylvain Lebresne created issue -

          People

          • Assignee:
            Yuki Morishita
            Reporter:
            Sylvain Lebresne
            Reviewer:
            Sylvain Lebresne
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development