Cassandra
  1. Cassandra
  2. CASSANDRA-4297

Use java NIO as much as possible when streaming compressed SSTables

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Fix Version/s: 1.2.0 beta 1
    • Component/s: None
    • Labels:

      Description

      Back in 0.8, streaming uses java NIO (FileChannel#transferTo/transferFrom) to perform zero copy file transfer between nodes. Since 1.0, in order to add new features like sstable compression and internode encryption we had to switch to java IO Input/OutputStreams. What we currently do to transfer compressed SSTable is, in source node, 1) decompress chunk in SSTable, 2) compress using LZF for network, and in destination node, 3) decompress using LZF as reading from socket, 4) compress for SSTable on disk.

      Now, 1.1 comes out with SSTable compression turned on by default. It is reasonable to transfer compressed file as is using NIO instead of decompress/compress in source node.

      1. 0001-Use-SortedSet-instead-of-Set-and-Arrays.sort.txt
        2 kB
        Yuki Morishita
      2. 4297-v2.txt
        44 kB
        Yuki Morishita
      3. 4297.txt
        38 kB
        Yuki Morishita

        Activity

        Hide
        Yuki Morishita added a comment -

        Committed. Thanks!

        Show
        Yuki Morishita added a comment - Committed. Thanks!
        Hide
        Jonathan Ellis added a comment -

        +1

        Show
        Jonathan Ellis added a comment - +1
        Hide
        Yuki Morishita added a comment -

        I attached the part I modified since v2 patch. Basically just switched to use TreeSet instead of Set and Arrays.sort.

        Show
        Yuki Morishita added a comment - I attached the part I modified since v2 patch. Basically just switched to use TreeSet instead of Set and Arrays.sort.
        Hide
        Yuki Morishita added a comment -

        Why didn't I use SortedSet/TreeSet to eliminate dups and sort?
        I will update patch with more detailed comment.

        Show
        Yuki Morishita added a comment - Why didn't I use SortedSet/TreeSet to eliminate dups and sort? I will update patch with more detailed comment.
        Hide
        Jonathan Ellis added a comment -

        LGTM, +1.

        The reason why I use Set here is to eliminate duplicate chunks. Given two different file section can be mapped to just one chunk

        Can you expand the "since sections are not guaranteed to be sorted" comment to elaborate on that? (Still might be a bit cleaner to just new ArrayList(set) instead of manually copying to array; performance difference would be negligible.)

        Show
        Jonathan Ellis added a comment - LGTM, +1. The reason why I use Set here is to eliminate duplicate chunks. Given two different file section can be mapped to just one chunk Can you expand the "since sections are not guaranteed to be sorted" comment to elaborate on that? (Still might be a bit cleaner to just new ArrayList(set) instead of manually copying to array; performance difference would be negligible.)
        Hide
        Yuki Morishita added a comment -

        V2 attached based on the review + some test related change.

        Would prefer to have CDIS just implement IS, and let callers wrap in DIS when desired, similar to how we use SnappyInputStream in IncomingTcpConnection, or LZFInputStream in ISR

        CDIS now implements InputStream only and renamed to CompressedInputStream.

        Why the changes to OutboundTcpConnection?

        The changes are made in order to obtain nio.SocketChannel, socket has to be created using SocketChannel.open.

        Re MS changes: when would header.file be null?

        When a node requests range but target node doesn't have corresponding data. I reverted the change in MS to send at least send streaming header when header.file is null. It seems redundant but for now, it's necessary to terminate stream session of requesting node.

        Chunk[] sort can use Guava Longs.compare

        done.

        I suggest adding a comment to explain why sort is necessary (b/c ranges are from replication strategy, so may not be sorted?) Instead of using Set + copy into array, why not use an ArrayList + trimToSize()

        The reason why I use Set here is to eliminate duplicate chunks. Given two different file section can be mapped to just one chunk.

        is the FST comment // TODO just use a raw RandomAccessFile since we're managing our own buffer here obsolete? is the CompressedRandomAccessReader path used at all in FST anymore?

        I removed CRAR from FST in v2. Even if nio is not available (in case of inter-node SSL), streaming uses CompressedFileStreamTask with socket's InputStream to transfer file directly.

        Nit: avoid double negation in if statements with else clauses
        Nit: suggest moving serialization code for Chunk and CompressionParameters into ChunkSerializer and ChunkParametersSerializer classes, respectively, just to make the code discoverable for re-use later

        done.

        Should we make nio transfer the default for uncompressed sstables as well, and add an option to enable compression? Alternatively, now that compression is the default for new sstables, I'd be okay with removing LZF stream compression entirely

        I don't do any benchmark, but I think always using LZF compression is fine when transferring uncompressed data.

        Does this over-transfer data on chunk boundaries? Put another way, do we stream data that doesn't actually belong on the target node? (I'm okay with this, just want to be clear about what's happening.)

        Source node can send unrelated range of data inside chunk, but receiving node ignores (or skips) that part when reading from socket, so, the answer is no.

        Show
        Yuki Morishita added a comment - V2 attached based on the review + some test related change. Would prefer to have CDIS just implement IS, and let callers wrap in DIS when desired, similar to how we use SnappyInputStream in IncomingTcpConnection, or LZFInputStream in ISR CDIS now implements InputStream only and renamed to CompressedInputStream. Why the changes to OutboundTcpConnection? The changes are made in order to obtain nio.SocketChannel, socket has to be created using SocketChannel.open. Re MS changes: when would header.file be null? When a node requests range but target node doesn't have corresponding data. I reverted the change in MS to send at least send streaming header when header.file is null. It seems redundant but for now, it's necessary to terminate stream session of requesting node. Chunk[] sort can use Guava Longs.compare done. I suggest adding a comment to explain why sort is necessary (b/c ranges are from replication strategy, so may not be sorted?) Instead of using Set + copy into array, why not use an ArrayList + trimToSize() The reason why I use Set here is to eliminate duplicate chunks. Given two different file section can be mapped to just one chunk. is the FST comment // TODO just use a raw RandomAccessFile since we're managing our own buffer here obsolete? is the CompressedRandomAccessReader path used at all in FST anymore? I removed CRAR from FST in v2. Even if nio is not available (in case of inter-node SSL), streaming uses CompressedFileStreamTask with socket's InputStream to transfer file directly. Nit: avoid double negation in if statements with else clauses Nit: suggest moving serialization code for Chunk and CompressionParameters into ChunkSerializer and ChunkParametersSerializer classes, respectively, just to make the code discoverable for re-use later done. Should we make nio transfer the default for uncompressed sstables as well, and add an option to enable compression? Alternatively, now that compression is the default for new sstables, I'd be okay with removing LZF stream compression entirely I don't do any benchmark, but I think always using LZF compression is fine when transferring uncompressed data. Does this over-transfer data on chunk boundaries? Put another way, do we stream data that doesn't actually belong on the target node? (I'm okay with this, just want to be clear about what's happening.) Source node can send unrelated range of data inside chunk, but receiving node ignores (or skips) that part when reading from socket, so, the answer is no.
        Hide
        Jonathan Ellis added a comment -

        Comments:

        • Would prefer to have CDIS just implement IS, and let callers wrap in DIS when desired, similar to how we use SnappyInputStream in IncomingTcpConnection, or LZFInputStream in ISR
        • Why the changes to OutboundTcpConnection?
        • Re MS changes: when would header.file be null?
        • Chunk[] sort can use Guava Longs.compare
        • I suggest adding a comment to explain why sort is necessary (b/c ranges are from replication strategy, so may not be sorted?)
        • Instead of using Set + copy into array, why not use an ArrayList + trimToSize()
        • is the FST comment // TODO just use a raw RandomAccessFile since we're managing our own buffer here obsolete?
        • is the CompressedRandomAccessReader path used at all in FST anymore?
        • Nit: avoid double negation in if statements with else clauses, e.g. instead of
          .           if (remoteFile.compressionInfo != null)
                          dis = new CompressedDataInputStream(socket.getInputStream(), remoteFile.compressionInfo);
                      else
                          dis = new DataInputStream(new LZFInputStream(socket.getInputStream()));
          

          prefer

          [           if (remoteFile.compressionInfo == null)
                          dis = new DataInputStream(new LZFInputStream(socket.getInputStream()));
                      else
                          dis = new CompressedDataInputStream(socket.getInputStream(), remoteFile.compressionInfo);
          
        • Nit: suggest moving serialization code for Chunk and CompressionParameters into ChunkSerializer and ChunkParametersSerializer classes, respectively, just to make the code discoverable for re-use later

        At a higher level,

        • Should we make nio transfer the default for uncompressed sstables as well, and add an option to enable compression? Alternatively, now that compression is the default for new sstables, I'd be okay with removing LZF stream compression entirely
        • Does this over-transfer data on chunk boundaries? Put another way, do we stream data that doesn't actually belong on the target node? (I'm okay with this, just want to be clear about what's happening.)
        Show
        Jonathan Ellis added a comment - Comments: Would prefer to have CDIS just implement IS, and let callers wrap in DIS when desired, similar to how we use SnappyInputStream in IncomingTcpConnection, or LZFInputStream in ISR Why the changes to OutboundTcpConnection? Re MS changes: when would header.file be null? Chunk[] sort can use Guava Longs.compare I suggest adding a comment to explain why sort is necessary (b/c ranges are from replication strategy, so may not be sorted?) Instead of using Set + copy into array, why not use an ArrayList + trimToSize() is the FST comment // TODO just use a raw RandomAccessFile since we're managing our own buffer here obsolete? is the CompressedRandomAccessReader path used at all in FST anymore? Nit: avoid double negation in if statements with else clauses, e.g. instead of . if (remoteFile.compressionInfo != null ) dis = new CompressedDataInputStream(socket.getInputStream(), remoteFile.compressionInfo); else dis = new DataInputStream( new LZFInputStream(socket.getInputStream())); prefer [ if (remoteFile.compressionInfo == null ) dis = new DataInputStream( new LZFInputStream(socket.getInputStream())); else dis = new CompressedDataInputStream(socket.getInputStream(), remoteFile.compressionInfo); Nit: suggest moving serialization code for Chunk and CompressionParameters into ChunkSerializer and ChunkParametersSerializer classes, respectively, just to make the code discoverable for re-use later At a higher level, Should we make nio transfer the default for uncompressed sstables as well, and add an option to enable compression? Alternatively, now that compression is the default for new sstables, I'd be okay with removing LZF stream compression entirely Does this over-transfer data on chunk boundaries? Put another way, do we stream data that doesn't actually belong on the target node? (I'm okay with this, just want to be clear about what's happening.)
        Hide
        Yuki Morishita added a comment -

        Attaching patch for review(also updated https://github.com/yukim/cassandra/tree/4297).

        • removed unnecessary changes from trunk
        • corrected progress reporting on dest node
        • added some comments
        Show
        Yuki Morishita added a comment - Attaching patch for review(also updated https://github.com/yukim/cassandra/tree/4297 ). removed unnecessary changes from trunk corrected progress reporting on dest node added some comments
        Hide
        Yuki Morishita added a comment -

        I have to brush up my patch around progress to update correctly. Will post updated version soon.

        Show
        Yuki Morishita added a comment - I have to brush up my patch around progress to update correctly. Will post updated version soon.
        Hide
        Yuki Morishita added a comment -

        CompressedDIS is DataInputStream version of CompressedRandomAccessReader. It reads compressed chunks directly from stream and provides decompressed data while reading from stream. CRC check is also performed after decompressing chunk based on crc_chance setting in compression option, which is default to 1.0 or 100%, as done in CRAR.

        Show
        Yuki Morishita added a comment - CompressedDIS is DataInputStream version of CompressedRandomAccessReader. It reads compressed chunks directly from stream and provides decompressed data while reading from stream. CRC check is also performed after decompressing chunk based on crc_chance setting in compression option, which is default to 1.0 or 100%, as done in CRAR.
        Hide
        Jonathan Ellis added a comment -

        Can you break this down a bit for me? CompressedFST looks straightforward, but what is CompressedDIS doing?

        Show
        Jonathan Ellis added a comment - Can you break this down a bit for me? CompressedFST looks straightforward, but what is CompressedDIS doing?
        Hide
        Sylvain Lebresne added a comment -

        It looks like the only reason to decompress is to compare crc32... is that right?

        No. We decompress because we need to build secondary indexes, compute the sstable stats, clean counters delta, etc...

        Show
        Sylvain Lebresne added a comment - It looks like the only reason to decompress is to compare crc32... is that right? No. We decompress because we need to build secondary indexes, compute the sstable stats, clean counters delta, etc...
        Hide
        Jonathan Ellis added a comment -

        Most of the time was spent on source node waiting for dest node to decompress and write to disk

        It looks like the only reason to decompress is to compare crc32... is that right? Why did we crc uncompressed data instead of compressed? Should we introduce a new version of snappy compression that CRCs the compressed data instead?

        Show
        Jonathan Ellis added a comment - Most of the time was spent on source node waiting for dest node to decompress and write to disk It looks like the only reason to decompress is to compare crc32... is that right? Why did we crc uncompressed data instead of compressed? Should we introduce a new version of snappy compression that CRCs the compressed data instead?
        Hide
        Yuki Morishita added a comment -

        I've pushed working commit to https://github.com/yukim/cassandra/tree/4297.

        When streaming compressed files, source node appends compression info to stream header, and dest node uses that info to decompress data from stream.
        If inter-node encryption is turned on, then zero copy transfer cannot be performed, so in that case we fall back to current way of streaming.

        I ran simple bulk loading test which transfers several compressed SSTables between nodes. Although overall throughput and time took to complete streaming is about the same, patched version reduced CPU usage (20% -> 2%) on source node. Most of the time was spent on source node waiting for dest node to decompress and write to disk.

        I still don't know if this is useful in production, so if someone can perform more realistic tests, I'm greatly appreciated.

        Show
        Yuki Morishita added a comment - I've pushed working commit to https://github.com/yukim/cassandra/tree/4297 . When streaming compressed files, source node appends compression info to stream header, and dest node uses that info to decompress data from stream. If inter-node encryption is turned on, then zero copy transfer cannot be performed, so in that case we fall back to current way of streaming. I ran simple bulk loading test which transfers several compressed SSTables between nodes. Although overall throughput and time took to complete streaming is about the same, patched version reduced CPU usage (20% -> 2%) on source node. Most of the time was spent on source node waiting for dest node to decompress and write to disk. I still don't know if this is useful in production, so if someone can perform more realistic tests, I'm greatly appreciated.

          People

          • Assignee:
            Yuki Morishita
            Reporter:
            Yuki Morishita
            Reviewer:
            Jonathan Ellis
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development