Cassandra
  1. Cassandra
  2. CASSANDRA-3045

Update ColumnFamilyOutputFormat to use new bulkload API

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Fix Version/s: 1.1.0
    • Component/s: Hadoop
    • Labels:
      None

      Description

      The bulk loading interface added in CASSANDRA-1278 is a great fit for Hadoop jobs.

        Activity

        Hide
        Hudson added a comment -

        Integrated in Cassandra #1228 (See https://builds.apache.org/job/Cassandra/1228/)
        Bulk loader is no longer a fat client, hadoop bulk loader output format.
        Patch by brandonwilliams, reviewed by Yuki Morishita for CASSANDRA-3045

        brandonwilliams : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1208499
        Files :

        • /cassandra/trunk/CHANGES.txt
        • /cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
        • /cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
        • /cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
        • /cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
        • /cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
        • /cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
        • /cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
        • /cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
        • /cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
        • /cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
        • /cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
        • /cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
        • /cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java
        Show
        Hudson added a comment - Integrated in Cassandra #1228 (See https://builds.apache.org/job/Cassandra/1228/ ) Bulk loader is no longer a fat client, hadoop bulk loader output format. Patch by brandonwilliams, reviewed by Yuki Morishita for CASSANDRA-3045 brandonwilliams : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1208499 Files : /cassandra/trunk/CHANGES.txt /cassandra/trunk/src/java/org/apache/cassandra/config/Config.java /cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java /cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java /cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java /cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java /cassandra/trunk/src/java/org/apache/cassandra/net/Header.java /cassandra/trunk/src/java/org/apache/cassandra/net/Message.java /cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java /cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java /cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java /cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java /cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java /cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java
        Hide
        Brandon Williams added a comment -

        Good catch, Yuki. Fixed and committed.

        Show
        Brandon Williams added a comment - Good catch, Yuki. Fixed and committed.
        Hide
        Yuki Morishita added a comment -

        Brandon, patches did apply. I was on wrong branch.

        So, streaming part is good(I know sender hangs when io error occurs as Brandon mentioned above).
        For BulkRecordWriter, I don't think it work for SuperColumns.

        BulkRecordWriter#77:

        this.isSuper = Boolean.valueOf(IS_SUPERCF);
        

        should be

        this.isSuper = Boolean.valueOf(conf.get(IS_SUPERCF));
        
        Show
        Yuki Morishita added a comment - Brandon, patches did apply. I was on wrong branch. So, streaming part is good(I know sender hangs when io error occurs as Brandon mentioned above). For BulkRecordWriter, I don't think it work for SuperColumns. BulkRecordWriter#77: this .isSuper = Boolean .valueOf(IS_SUPERCF); should be this .isSuper = Boolean .valueOf(conf.get(IS_SUPERCF));
        Hide
        Brandon Williams added a comment -

        Which part doesn't apply? There is nothing new to rebase against

        Show
        Brandon Williams added a comment - Which part doesn't apply? There is nothing new to rebase against
        Hide
        Yuki Morishita added a comment -

        Brandon, could you rebase (once again) for trunk?
        It does not apply any more.

        Show
        Yuki Morishita added a comment - Brandon, could you rebase (once again) for trunk? It does not apply any more.
        Hide
        Jonathan Ellis added a comment -

        Yuki, could you do a full review now that it's ready?

        Show
        Jonathan Ellis added a comment - Yuki, could you do a full review now that it's ready?
        Hide
        Brandon Williams added a comment - - edited

        are there any benchmarks or is there anything anecdotal about performance?

        Using the simplest job possible (copying a CF, map-only) I see a 20-25% gain. I suspect this is read-limited though and if you're generating the output on a hadoop cluster and loading it into a cassandra cluster (ie, not colocated), this will be even faster, but creating such a workload is a bit too much work for me to test. If anyone has an existing case like this, I'd love for them to test and chime in.

        One other thing though, there are far less failures using BOF on a workload that generates a lot of GC (inserting a couple of hundred columns creates quite a bit and causes failures due to UE while the nodes are CMSing.) So BOF is much 'nicer' to the cluser.

        Show
        Brandon Williams added a comment - - edited are there any benchmarks or is there anything anecdotal about performance? Using the simplest job possible (copying a CF, map-only) I see a 20-25% gain. I suspect this is read-limited though and if you're generating the output on a hadoop cluster and loading it into a cassandra cluster (ie, not colocated), this will be even faster, but creating such a workload is a bit too much work for me to test. If anyone has an existing case like this, I'd love for them to test and chime in. One other thing though, there are far less failures using BOF on a workload that generates a lot of GC (inserting a couple of hundred columns creates quite a bit and causes failures due to UE while the nodes are CMSing.) So BOF is much 'nicer' to the cluser.
        Hide
        Brandon Williams added a comment -

        How do you configure BOF vs CFOF?

        By calling setOutputFormatClass on the job.

        Why do we need to keep CFOF around?

        I can think of two reasons: firstly, by removing it, we break every existing job. This is pretty easy for users to fix though, as indicated above. Secondly, someone might want access to each individual record as soon as possible, rather than waiting for the entire job to finish and stream a bunch of sstables. It's a latency vs throughput tradeoff.

        Show
        Brandon Williams added a comment - How do you configure BOF vs CFOF? By calling setOutputFormatClass on the job. Why do we need to keep CFOF around? I can think of two reasons: firstly, by removing it, we break every existing job. This is pretty easy for users to fix though, as indicated above. Secondly, someone might want access to each individual record as soon as possible, rather than waiting for the entire job to finish and stream a bunch of sstables. It's a latency vs throughput tradeoff.
        Hide
        Jeremy Hanna added a comment -

        people may have written things above CFOF so it might be nice to keep it around but deprecate it.

        are there any benchmarks or is there anything anecdotal about performance?

        Show
        Jeremy Hanna added a comment - people may have written things above CFOF so it might be nice to keep it around but deprecate it. are there any benchmarks or is there anything anecdotal about performance?
        Hide
        Jonathan Ellis added a comment -

        How do you configure BOF vs CFOF? Why do we need to keep CFOF around?

        Show
        Jonathan Ellis added a comment - How do you configure BOF vs CFOF? Why do we need to keep CFOF around?
        Hide
        Brandon Williams added a comment -

        Figured out what I was doing wrong in the rebase. Updated patches apply cleanly and pass the test that was previously failing. Also added supercolumn support, and renamed the class from 'SSTable' to 'Bulk' since this seems more appropriate (I'd hoped SSTRW could be reused if you actually wanted to write sstables out to HDFS for some reason, but it turns out that putting the streaming logic in the OutputFormat makes things hairy.)

        Show
        Brandon Williams added a comment - Figured out what I was doing wrong in the rebase. Updated patches apply cleanly and pass the test that was previously failing. Also added supercolumn support, and renamed the class from 'SSTable' to 'Bulk' since this seems more appropriate (I'd hoped SSTRW could be reused if you actually wanted to write sstables out to HDFS for some reason, but it turns out that putting the streaming logic in the OutputFormat makes things hairy.)
        Hide
        Brandon Williams added a comment -

        Odd, I rebased at r1206100, and everything auto-merged.

        Show
        Brandon Williams added a comment - Odd, I rebased at r1206100, and everything auto-merged.
        Hide
        Jonathan Ellis added a comment -

        (The actual rebase looks trivial but I wonder if that has something to do with the test passing for Yuki but not for Brandon.)

        Show
        Jonathan Ellis added a comment - (The actual rebase looks trivial but I wonder if that has something to do with the test passing for Yuki but not for Brandon.)
        Hide
        Jonathan Ellis added a comment -

        I'm at svn tip on trunk (r1206100) and still getting errors applying 0001 to ISR.

        Show
        Jonathan Ellis added a comment - I'm at svn tip on trunk (r1206100) and still getting errors applying 0001 to ISR.
        Hide
        Brandon Williams added a comment -

        Rebased patches again.

        Show
        Brandon Williams added a comment - Rebased patches again.
        Hide
        Jonathan Ellis added a comment -

        I was going to test on Windows but it looks like 01 needs rebase for IncomingStreamReader.

        Show
        Jonathan Ellis added a comment - I was going to test on Windows but it looks like 01 needs rebase for IncomingStreamReader.
        Hide
        Brandon Williams added a comment -

        Updated/rebased patches include the dangling semicolon fix.

        Show
        Brandon Williams added a comment - Updated/rebased patches include the dangling semicolon fix.
        Hide
        Brandon Williams added a comment -

        Specifically, on line 97 FileStreamTask has a dangling semicolon that causes the 'if' to always fire. However, removing that still causes a secondary index check to fail in StreamingTransferTest. Here is a debug log of a working test:

        INFO [Thread-3] 2011-11-18 17:36:04,039 SecondaryIndexManager.java (line 115) Submitting index build of 626972746864617465, for data in SSTableReader(path='build/test/cassandra/data/Keyspace1/Indexed1-hb-4-Data.db')
        DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,041 Table.java (line 515) Indexing row key1
        DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,042 CollationController.java (line 76) collectTimeOrderedData
        DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,042 KeysIndex.java (line 100) applying index row 3288498 in ColumnFamily(Indexed1.626972746864617465 [6b657931:false:0@1234,])
        DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,043 SlabAllocator.java (line 105) 1 regions now allocated in org.apache.cassandra.utils.SlabAllocator@1716fa0
        DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,043 Table.java (line 515) Indexing row key3
        DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,043 CollationController.java (line 76) collectTimeOrderedData
        DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,044 KeysIndex.java (line 100) applying index row 3288500 in ColumnFamily(Indexed1.626972746864617465 [6b657933:false:0@1234,])
        DEBUG [Thread-3] 2011-11-18 17:36:04,044 ColumnFamilyStore.java (line 671) flush position is ReplayPosition(segmentId=1321659363183, position=354)
         INFO [Thread-3] 2011-11-18 17:36:04,045 ColumnFamilyStore.java (line 685) Enqueuing flush of Memtable-Indexed1.626972746864617465@5734522(38/47 serialized/live bytes, 2 ops)
         INFO [FlushWriter:1] 2011-11-18 17:36:04,045 Memtable.java (line 239) Writing Memtable-Indexed1.626972746864617465@5734522(38/47 serialized/live bytes, 2 ops)
        DEBUG [FlushWriter:1] 2011-11-18 17:36:04,047 DatabaseDescriptor.java (line 783) expected data files size is 84; largest free partition has 19441123328 bytes free
         INFO [FlushWriter:1] 2011-11-18 17:36:04,062 Memtable.java (line 275) Completed flushing build/test/cassandra/data/Keyspace1/Indexed1.626972746864617465-hb-2-Data.db (154 bytes)
        DEBUG [FlushWriter:1] 2011-11-18 17:36:04,063 IntervalNode.java (line 45) Creating IntervalNode from [Interval(DecoratedKey(3288498, 0000000000322db2), DecoratedKey(3288500, 0000000000322db4))]
        DEBUG [FlushWriter:1] 2011-11-18 17:36:04,063 DataTracker.java (line 331) adding build/test/cassandra/data/Keyspace1/Indexed1.626972746864617465-hb-2 to list of files tracked for Keyspace1.Indexed1.626972746864617465
        DEBUG [COMMIT-LOG-WRITER] 2011-11-18 17:36:04,064 CommitLog.java (line 459) discard completed log segments for ReplayPosition(segmentId=1321659363183, position=354), column family 1047.
        DEBUG [COMMIT-LOG-WRITER] 2011-11-18 17:36:04,065 CommitLog.java (line 498) Not safe to delete commit log CommitLogSegment(/srv/encrypted/project/cassandra/build/test/cassandra/commitlog/CommitLog-1321659363183.log); dirty is ; hasNext: false
         INFO [Thread-3] 2011-11-18 17:36:04,065 SecondaryIndexManager.java (line 134) Index build of 626972746864617465, complete
         INFO [Thread-3] 2011-11-18 17:36:04,066 StreamInSession.java (line 162) Finished streaming session 778312411854932 from /127.0.0.1
        DEBUG [MiscStage:1] 2011-11-18 17:36:04,066 StreamReplyVerbHandler.java (line 47) Received StreamReply StreamReply(sessionId=778312411854932, file='', action=SESSION_FINISHED)
        

        and here is a failing one (with 3045 applied):

        INFO [Thread-3] 2011-11-18 17:20:02,669 SecondaryIndexManager.java (line 117) Submitting index build of 626972746864617465, for data in SSTableReader(path='build/test/cassandra/data/Keyspace1/Indexed1-h-4-Data.db')
        DEBUG [Streaming:1] 2011-11-18 17:20:02,669 MmappedSegmentedFile.java (line 139) All segments have been unmapped successfully
        DEBUG [NonPeriodicTasks:1] 2011-11-18 17:20:02,671 FileUtils.java (line 51) Deleting Indexed1-h-2-Statistics.db
        DEBUG [NonPeriodicTasks:1] 2011-11-18 17:20:02,671 FileUtils.java (line 51) Deleting Indexed1-h-2-Filter.db
        DEBUG [NonPeriodicTasks:1] 2011-11-18 17:20:02,671 FileUtils.java (line 51) Deleting Indexed1-h-2-Index.db
        DEBUG [NonPeriodicTasks:1] 2011-11-18 17:20:02,672 SSTable.java (line 143) Deleted build/test/cassandra/data/Keyspace1/Indexed1-h-2
        DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,674 Table.java (line 516) Indexing row key1
        DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,674 CollationController.java (line 74) collectTimeOrderedData
        DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,675 KeysIndex.java (line 100) applying index row 3288498 in ColumnFamily(Indexed1.626972746864617465 [6b657931:false:0@1234,])
        DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,676 SlabAllocator.java (line 105) 1 regions now allocated in org.apache.cassandra.utils.SlabAllocator@1148603
        DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,676 Table.java (line 516) Indexing row key3
        DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,676 CollationController.java (line 74) collectTimeOrderedData
        DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,677 KeysIndex.java (line 100) applying index row 3288500 in ColumnFamily(Indexed1.626972746864617465 [6b657933:false:0@1234,])
        DEBUG [Thread-3] 2011-11-18 17:20:02,677 ColumnFamilyStore.java (line 668) flush position is ReplayPosition(segmentId=1321658401840, position=354)
         INFO [Thread-3] 2011-11-18 17:20:02,678 ColumnFamilyStore.java (line 682) Enqueuing flush of Memtable-Indexed1.626972746864617465@6972371(38/47 serialized/live bytes, 2 ops)
         INFO [FlushWriter:1] 2011-11-18 17:20:02,679 Memtable.java (line 237) Writing Memtable-Indexed1.626972746864617465@6972371(38/47 serialized/live bytes, 2 ops)
        DEBUG [FlushWriter:1] 2011-11-18 17:20:02,679 DatabaseDescriptor.java (line 791) expected data files size is 84; largest free partition has 19418710016 bytes free
         INFO [FlushWriter:1] 2011-11-18 17:20:02,690 Memtable.java (line 273) Completed flushing build/test/cassandra/data/Keyspace1/Indexed1.626972746864617465-h-2-Data.db (154 bytes)
        DEBUG [FlushWriter:1] 2011-11-18 17:20:02,690 IntervalNode.java (line 45) Creating IntervalNode from [Interval(DecoratedKey(3288498, 0000000000322db2), DecoratedKey(3288500, 0000000000322db4))]
        DEBUG [FlushWriter:1] 2011-11-18 17:20:02,691 IntervalNode.java (line 45) Creating IntervalNode from []
        DEBUG [COMMIT-LOG-WRITER] 2011-11-18 17:20:02,691 CommitLog.java (line 458) discard completed log segments for ReplayPosition(segmentId=1321658401840, position=354), column family 1046.
        DEBUG [COMMIT-LOG-WRITER] 2011-11-18 17:20:02,691 CommitLog.java (line 497) Not safe to delete commit log CommitLogSegment(/srv/encrypted/project/cassandra/build/test/cassandra/commitlog/CommitLog-1321658401840.log); dirty is ; hasNext: false
         INFO [Thread-3] 2011-11-18 17:20:02,691 SecondaryIndexManager.java (line 136) Index build of 626972746864617465, complete
         INFO [Thread-3] 2011-11-18 17:20:02,692 StreamInSession.java (line 179) Finished streaming session 777351072779992 from /127.0.0.1
        DEBUG [Streaming:1] 2011-11-18 17:20:02,692 StreamReplyVerbHandler.java (line 47) Received StreamReply StreamReply(sessionId=777351072779992, file='', action=SESSION_FINISHED)
        

        It looks like something is up with the "Creating IntervalNode from []" line, but I don't see how streaming could break that, especially since the other debug lines indicate the correct data was sent.

        Show
        Brandon Williams added a comment - Specifically, on line 97 FileStreamTask has a dangling semicolon that causes the 'if' to always fire. However, removing that still causes a secondary index check to fail in StreamingTransferTest. Here is a debug log of a working test: INFO [Thread-3] 2011-11-18 17:36:04,039 SecondaryIndexManager.java (line 115) Submitting index build of 626972746864617465, for data in SSTableReader(path='build/test/cassandra/data/Keyspace1/Indexed1-hb-4-Data.db') DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,041 Table.java (line 515) Indexing row key1 DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,042 CollationController.java (line 76) collectTimeOrderedData DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,042 KeysIndex.java (line 100) applying index row 3288498 in ColumnFamily(Indexed1.626972746864617465 [6b657931:false:0@1234,]) DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,043 SlabAllocator.java (line 105) 1 regions now allocated in org.apache.cassandra.utils.SlabAllocator@1716fa0 DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,043 Table.java (line 515) Indexing row key3 DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,043 CollationController.java (line 76) collectTimeOrderedData DEBUG [CompactionExecutor:1] 2011-11-18 17:36:04,044 KeysIndex.java (line 100) applying index row 3288500 in ColumnFamily(Indexed1.626972746864617465 [6b657933:false:0@1234,]) DEBUG [Thread-3] 2011-11-18 17:36:04,044 ColumnFamilyStore.java (line 671) flush position is ReplayPosition(segmentId=1321659363183, position=354) INFO [Thread-3] 2011-11-18 17:36:04,045 ColumnFamilyStore.java (line 685) Enqueuing flush of Memtable-Indexed1.626972746864617465@5734522(38/47 serialized/live bytes, 2 ops) INFO [FlushWriter:1] 2011-11-18 17:36:04,045 Memtable.java (line 239) Writing Memtable-Indexed1.626972746864617465@5734522(38/47 serialized/live bytes, 2 ops) DEBUG [FlushWriter:1] 2011-11-18 17:36:04,047 DatabaseDescriptor.java (line 783) expected data files size is 84; largest free partition has 19441123328 bytes free INFO [FlushWriter:1] 2011-11-18 17:36:04,062 Memtable.java (line 275) Completed flushing build/test/cassandra/data/Keyspace1/Indexed1.626972746864617465-hb-2-Data.db (154 bytes) DEBUG [FlushWriter:1] 2011-11-18 17:36:04,063 IntervalNode.java (line 45) Creating IntervalNode from [Interval(DecoratedKey(3288498, 0000000000322db2), DecoratedKey(3288500, 0000000000322db4))] DEBUG [FlushWriter:1] 2011-11-18 17:36:04,063 DataTracker.java (line 331) adding build/test/cassandra/data/Keyspace1/Indexed1.626972746864617465-hb-2 to list of files tracked for Keyspace1.Indexed1.626972746864617465 DEBUG [COMMIT-LOG-WRITER] 2011-11-18 17:36:04,064 CommitLog.java (line 459) discard completed log segments for ReplayPosition(segmentId=1321659363183, position=354), column family 1047. DEBUG [COMMIT-LOG-WRITER] 2011-11-18 17:36:04,065 CommitLog.java (line 498) Not safe to delete commit log CommitLogSegment(/srv/encrypted/project/cassandra/build/test/cassandra/commitlog/CommitLog-1321659363183.log); dirty is ; hasNext: false INFO [Thread-3] 2011-11-18 17:36:04,065 SecondaryIndexManager.java (line 134) Index build of 626972746864617465, complete INFO [Thread-3] 2011-11-18 17:36:04,066 StreamInSession.java (line 162) Finished streaming session 778312411854932 from /127.0.0.1 DEBUG [MiscStage:1] 2011-11-18 17:36:04,066 StreamReplyVerbHandler.java (line 47) Received StreamReply StreamReply(sessionId=778312411854932, file='', action=SESSION_FINISHED) and here is a failing one (with 3045 applied): INFO [Thread-3] 2011-11-18 17:20:02,669 SecondaryIndexManager.java (line 117) Submitting index build of 626972746864617465, for data in SSTableReader(path='build/test/cassandra/data/Keyspace1/Indexed1-h-4-Data.db') DEBUG [Streaming:1] 2011-11-18 17:20:02,669 MmappedSegmentedFile.java (line 139) All segments have been unmapped successfully DEBUG [NonPeriodicTasks:1] 2011-11-18 17:20:02,671 FileUtils.java (line 51) Deleting Indexed1-h-2-Statistics.db DEBUG [NonPeriodicTasks:1] 2011-11-18 17:20:02,671 FileUtils.java (line 51) Deleting Indexed1-h-2-Filter.db DEBUG [NonPeriodicTasks:1] 2011-11-18 17:20:02,671 FileUtils.java (line 51) Deleting Indexed1-h-2-Index.db DEBUG [NonPeriodicTasks:1] 2011-11-18 17:20:02,672 SSTable.java (line 143) Deleted build/test/cassandra/data/Keyspace1/Indexed1-h-2 DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,674 Table.java (line 516) Indexing row key1 DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,674 CollationController.java (line 74) collectTimeOrderedData DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,675 KeysIndex.java (line 100) applying index row 3288498 in ColumnFamily(Indexed1.626972746864617465 [6b657931:false:0@1234,]) DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,676 SlabAllocator.java (line 105) 1 regions now allocated in org.apache.cassandra.utils.SlabAllocator@1148603 DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,676 Table.java (line 516) Indexing row key3 DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,676 CollationController.java (line 74) collectTimeOrderedData DEBUG [CompactionExecutor:1] 2011-11-18 17:20:02,677 KeysIndex.java (line 100) applying index row 3288500 in ColumnFamily(Indexed1.626972746864617465 [6b657933:false:0@1234,]) DEBUG [Thread-3] 2011-11-18 17:20:02,677 ColumnFamilyStore.java (line 668) flush position is ReplayPosition(segmentId=1321658401840, position=354) INFO [Thread-3] 2011-11-18 17:20:02,678 ColumnFamilyStore.java (line 682) Enqueuing flush of Memtable-Indexed1.626972746864617465@6972371(38/47 serialized/live bytes, 2 ops) INFO [FlushWriter:1] 2011-11-18 17:20:02,679 Memtable.java (line 237) Writing Memtable-Indexed1.626972746864617465@6972371(38/47 serialized/live bytes, 2 ops) DEBUG [FlushWriter:1] 2011-11-18 17:20:02,679 DatabaseDescriptor.java (line 791) expected data files size is 84; largest free partition has 19418710016 bytes free INFO [FlushWriter:1] 2011-11-18 17:20:02,690 Memtable.java (line 273) Completed flushing build/test/cassandra/data/Keyspace1/Indexed1.626972746864617465-h-2-Data.db (154 bytes) DEBUG [FlushWriter:1] 2011-11-18 17:20:02,690 IntervalNode.java (line 45) Creating IntervalNode from [Interval(DecoratedKey(3288498, 0000000000322db2), DecoratedKey(3288500, 0000000000322db4))] DEBUG [FlushWriter:1] 2011-11-18 17:20:02,691 IntervalNode.java (line 45) Creating IntervalNode from [] DEBUG [COMMIT-LOG-WRITER] 2011-11-18 17:20:02,691 CommitLog.java (line 458) discard completed log segments for ReplayPosition(segmentId=1321658401840, position=354), column family 1046. DEBUG [COMMIT-LOG-WRITER] 2011-11-18 17:20:02,691 CommitLog.java (line 497) Not safe to delete commit log CommitLogSegment(/srv/encrypted/project/cassandra/build/test/cassandra/commitlog/CommitLog-1321658401840.log); dirty is ; hasNext: false INFO [Thread-3] 2011-11-18 17:20:02,691 SecondaryIndexManager.java (line 136) Index build of 626972746864617465, complete INFO [Thread-3] 2011-11-18 17:20:02,692 StreamInSession.java (line 179) Finished streaming session 777351072779992 from /127.0.0.1 DEBUG [Streaming:1] 2011-11-18 17:20:02,692 StreamReplyVerbHandler.java (line 47) Received StreamReply StreamReply(sessionId=777351072779992, file='', action=SESSION_FINISHED) It looks like something is up with the "Creating IntervalNode from []" line, but I don't see how streaming could break that, especially since the other debug lines indicate the correct data was sent.
        Hide
        Brandon Williams added a comment -

        Final version is forthcoming, just struggling to pass some unit tests.

        Show
        Brandon Williams added a comment - Final version is forthcoming, just struggling to pass some unit tests.
        Hide
        Brandon Williams added a comment -

        Can we update the bulkloader itself to also not use gossip?

        Have you looked at the first patch?

        I specifically did that first because I knew streaming was the hard part, and changing the loader would be way easier than dealing with hadoop and troubleshooting streaming at the same time.

        Show
        Brandon Williams added a comment - Can we update the bulkloader itself to also not use gossip? Have you looked at the first patch? I specifically did that first because I knew streaming was the hard part, and changing the loader would be way easier than dealing with hadoop and troubleshooting streaming at the same time.
        Hide
        Jonathan Ellis added a comment -

        Can we update the bulkloader itself to also not use gossip?

        Show
        Jonathan Ellis added a comment - Can we update the bulkloader itself to also not use gossip?
        Hide
        Brandon Williams added a comment -

        Update patches are fairly complete, with the catch that streaming never retries and in the case of failure, the task will hang forever. We can fix this with CASSANDRA-3112. This should be enough for people to test/benchmark with.

        Show
        Brandon Williams added a comment - Update patches are fairly complete, with the catch that streaming never retries and in the case of failure, the task will hang forever. We can fix this with CASSANDRA-3112 . This should be enough for people to test/benchmark with.
        Hide
        Brandon Williams added a comment -

        We could use the failure callback to know the session completed.

        Honestly that's a bit of a hack to get the CountDownLatch completed, but anything else will require much more invasive changes to streaming itself.

        Show
        Brandon Williams added a comment - We could use the failure callback to know the session completed. Honestly that's a bit of a hack to get the CountDownLatch completed, but anything else will require much more invasive changes to streaming itself.
        Hide
        Brandon Williams added a comment -

        From StreamOutSession.close:

               // Instead of just not calling the callback on failure, we could have
                // allow to register a specific callback for failures, but we leave
                // that to a future ticket (likely CASSANDRA-3112)
                if (callback != null && success)
                    callback.run();
        

        We could use the failure callback to know the session completed.

        Show
        Brandon Williams added a comment - From StreamOutSession.close: // Instead of just not calling the callback on failure, we could have // allow to register a specific callback for failures, but we leave // that to a future ticket (likely CASSANDRA-3112) if (callback != null && success) callback.run(); We could use the failure callback to know the session completed.
        Hide
        Jonathan Ellis added a comment -

        How would CASSANDRA-3112 help?

        Show
        Jonathan Ellis added a comment - How would CASSANDRA-3112 help?
        Hide
        Brandon Williams added a comment -

        In the latter approach of using CASSANDRA-3112, the bulk loader would just lose the ability to know when indexes are done building, which doesn't seem too bad for the 30s you gain skipping RING_DELAY.

        Show
        Brandon Williams added a comment - In the latter approach of using CASSANDRA-3112 , the bulk loader would just lose the ability to know when indexes are done building, which doesn't seem too bad for the 30s you gain skipping RING_DELAY.
        Hide
        Brandon Williams added a comment -

        Posting what I have with this approach, though a bit rough. No output format yet, just converts the loader to not require gossip. There's a catch, however. Since FileStreamTask uses a socket for every file, when all the files are done we don't have a socket left to acknowledge the session with, so the loading hangs at 'Waiting for targets to rebuild indexes' forever. We can do major streaming surgery on FST, or possibly wait for CASSANDRA-3112 to get past this.

        Show
        Brandon Williams added a comment - Posting what I have with this approach, though a bit rough. No output format yet, just converts the loader to not require gossip. There's a catch, however. Since FileStreamTask uses a socket for every file, when all the files are done we don't have a socket left to acknowledge the session with, so the loading hangs at 'Waiting for targets to rebuild indexes' forever. We can do major streaming surgery on FST, or possibly wait for CASSANDRA-3112 to get past this.
        Hide
        Brandon Williams added a comment -

        I think that's worth a shot, it would break compatibility but you already can't stream between majors so that's fine.

        Show
        Brandon Williams added a comment - I think that's worth a shot, it would break compatibility but you already can't stream between majors so that's fine.
        Hide
        Jonathan Ellis added a comment -

        Damn it.

        Should we change streaming to use a bidirectional socket? It already gets dedicated sockets so not a huge change although I like how clean single-direction sockets are.

        Show
        Jonathan Ellis added a comment - Damn it. Should we change streaming to use a bidirectional socket? It already gets dedicated sockets so not a huge change although I like how clean single-direction sockets are.
        Hide
        Brandon Williams added a comment -

        Another idea: we could give the reducer a description of the ring, which it could plug into the partitioner/strategy to know what nodes to stream to. Gossip participation isn't technically required by the streaming protocol IIANM.

        This is very close to working, the sstables can be sent without gossip or MS, but the problem lies in the StreamReply, which unfortunately goes over OTC and thus gets sent to the server, not the bulk loader. I'm not sure if there's a simple way around this.

        Show
        Brandon Williams added a comment - Another idea: we could give the reducer a description of the ring, which it could plug into the partitioner/strategy to know what nodes to stream to. Gossip participation isn't technically required by the streaming protocol IIANM. This is very close to working, the sstables can be sent without gossip or MS, but the problem lies in the StreamReply, which unfortunately goes over OTC and thus gets sent to the server, not the bulk loader. I'm not sure if there's a simple way around this.
        Hide
        Jonathan Ellis added a comment -

        Another idea: we could give the reducer a description of the ring, which it could plug into the partitioner/strategy to know what nodes to stream to. Gossip participation isn't technically required by the streaming protocol IIANM.

        Show
        Jonathan Ellis added a comment - Another idea: we could give the reducer a description of the ring, which it could plug into the partitioner/strategy to know what nodes to stream to. Gossip participation isn't technically required by the streaming protocol IIANM.
        Hide
        Jonathan Ellis added a comment -

        That sounds reasonable.

        Show
        Jonathan Ellis added a comment - That sounds reasonable.
        Hide
        T Jake Luciani added a comment -

        We could write an alternate CFOF like BulkColumnFamilyOutputFormat that can be used when the TT is running on the same node as Cassandra.
        The reducer would write files to hadoop.local.dir then when the reducer is closed it will contact the local cassandra instance via JMX with the output dir to be loaded into via streaming.

        Show
        T Jake Luciani added a comment - We could write an alternate CFOF like BulkColumnFamilyOutputFormat that can be used when the TT is running on the same node as Cassandra. The reducer would write files to hadoop.local.dir then when the reducer is closed it will contact the local cassandra instance via JMX with the output dir to be loaded into via streaming.
        Hide
        Brandon Williams added a comment -

        How are the task JVMs in other processes going to use an existing MS?

        Show
        Brandon Williams added a comment - How are the task JVMs in other processes going to use an existing MS?
        Hide
        Jonathan Ellis added a comment -

        What if we refactored the bulk load code to use the existing MS instead of creating a new one? The whole point of the new BL was to be "normal" streaming, so the recipients shouldn't care if the source is a member of the ring or not.

        Show
        Jonathan Ellis added a comment - What if we refactored the bulk load code to use the existing MS instead of creating a new one? The whole point of the new BL was to be "normal" streaming, so the recipients shouldn't care if the source is a member of the ring or not.
        Hide
        Brandon Williams added a comment -

        This isn't as easy as it seems. Bulk loading this way requires becoming a fat client. Since hadoop is colocated with cassandra, this means we would have to divorce the "ip == node" marriage. This means rewriting most of how gossip works, adding the port for the storage proto (and thus allowing port divergence, an idea we have not been fond of in the past), modifying MessagingService, Incoming/OutgoingTcpConnection, and probably other classes that are notoriously hairy.

        That is a lot of work, very difficult to make backwards-compatible, and we really don't know what, if any, sort of gains we'll see using this method afterwards. I'm personally very strongly -1 on making these changes to gossip since I feel like it is finally fairly stable.

        Even in a non-colocated setup, the task jvms would still need to respect RING_DELAY, which might be enough to erode any gains that this could provide in many scenarios.

        One option might be to speak the storage proto directly to the local C* instance, but add some kind of logic that says 'this is not a node nor a fat client, just accept writes/reads from it and nothing else.'

        Show
        Brandon Williams added a comment - This isn't as easy as it seems. Bulk loading this way requires becoming a fat client. Since hadoop is colocated with cassandra, this means we would have to divorce the "ip == node" marriage. This means rewriting most of how gossip works, adding the port for the storage proto (and thus allowing port divergence, an idea we have not been fond of in the past), modifying MessagingService, Incoming/OutgoingTcpConnection, and probably other classes that are notoriously hairy. That is a lot of work, very difficult to make backwards-compatible, and we really don't know what, if any, sort of gains we'll see using this method afterwards. I'm personally very strongly -1 on making these changes to gossip since I feel like it is finally fairly stable. Even in a non-colocated setup, the task jvms would still need to respect RING_DELAY, which might be enough to erode any gains that this could provide in many scenarios. One option might be to speak the storage proto directly to the local C* instance, but add some kind of logic that says 'this is not a node nor a fat client, just accept writes/reads from it and nothing else.'

          People

          • Assignee:
            Brandon Williams
            Reporter:
            Jonathan Ellis
            Reviewer:
            Yuki Morishita
          • Votes:
            1 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development