Uploaded image for project: 'Cassandra'
  1. Cassandra
  2. CASSANDRA-1278

Make bulk loading into Cassandra less crappy, more pluggable

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Fix Version/s: 0.8.1
    • Component/s: Tools
    • Labels:
      None

      Description

      Currently bulk loading into Cassandra is a black art. People are either directed to just do it responsibly with thrift or a higher level client, or they have to explore the contrib/bmt example - http://wiki.apache.org/cassandra/BinaryMemtable That contrib module requires delving into the code to find out how it works and then applying it to the given problem. Using either method, the user also needs to keep in mind that overloading the cluster is possible - which will hopefully be addressed in CASSANDRA-685

      This improvement would be to create a contrib module or set of documents dealing with bulk loading. Perhaps it could include code in the Core to make it more pluggable for external clients of different types.

      It is just that this is something that many that are new to Cassandra need to do - bulk load their data into Cassandra.

      1. 0001-Add-bulk-loader-utility.patch
        33 kB
        Sylvain Lebresne
      2. 0001-Add-bulk-loader-utility-v2.patch
        36 kB
        Sylvain Lebresne
      3. 1278-cassandra-0.7.1.txt
        159 kB
        Matthew F. Dennis
      4. 1278-cassandra-0.7.txt
        159 kB
        Matthew F. Dennis
      5. 1278-cassandra-0.7-v2.txt
        338 kB
        Matthew F. Dennis

        Issue Links

          Activity

          Hide
          jbellis Jonathan Ellis added a comment -

          Let's see how the streaming refactor shakes out. Replacing BMT w/ streaming makes a lot of sense to me.

          Show
          jbellis Jonathan Ellis added a comment - Let's see how the streaming refactor shakes out. Replacing BMT w/ streaming makes a lot of sense to me.
          Hide
          jbellis Jonathan Ellis added a comment -

          All a "bulk load" API needs to do is pretend it's a streaming source, and send data rows (in sorted order) to the target. Since Hadoop sorts as part of the reduce stage, we should be able do this directly in CFOF/CFRW.

          The tricky part is that StreamOutSession.begin assumes that it has a list of physical files to stream from (via addFilesToStream).

          Show
          jbellis Jonathan Ellis added a comment - All a "bulk load" API needs to do is pretend it's a streaming source, and send data rows (in sorted order) to the target. Since Hadoop sorts as part of the reduce stage, we should be able do this directly in CFOF/CFRW. The tricky part is that StreamOutSession.begin assumes that it has a list of physical files to stream from (via addFilesToStream).
          Hide
          mdennis Matthew F. Dennis added a comment -

          Attached patch implements a Cassandra Pipelined Thrift (CPT) loader.

          Thrift objects are serialized over a socket to C* in "segments". Once the segment is loaded, C* responds with a CPTResponse object detailing the various counts about what was loaded and the connection is closed. Rows and/or columns need not be sorted. No throttling need occur - the client can safely write data to the socket as fast as it will accept it. CPT correctly handles secondary indexes.

          The format of sending a segment is:

          PROTOCOL_MAGIC (i.e. MessagingService.PROTOCOL_MAGIC)
          PROTOCOL_HEADER (i.e. IncomingCassandraPipelinedThriftReader.PROTOCOL_HEADER)
          CPTHeader
          CPTRowHeader (for each row sent)
          Column|SuperColumn (for each Column|SuperColumn in the row)

          Each row is terminated by an empty Column or SuperColumn (e.g. IncomingCassandraPipelinedThriftReader.END_OF_COLUMNS) Each segment is terminated by an empty CPTRowHeader (e.g. IncomingCassandraPipelinedThriftReader.END_OF_ROWS)

          A CPTHeader consists of several fields:

          struct CPTHeader {
             1: required string keyspace,
             2: required string column_family,
             3: required i32 table_flush_size,
             4: required i32 forward_frame_size,
             5: required i32 so_rcvbuf_size,
             6: required i32 so_sndbuf_size,
             7: required bool forward,
          }
          

          table_flush_size controls the amount of data that is buffered before applying the mutation. Several KB is a good starting value for this.

          forward_frame_size only applies with forward=true and controls what thrift frame size to use when forwarding data to other nodes. 256K is a good value here, but it should be 1/2 or less the size of so_rcvbuf_size and so_sndbuf_size.

          so_rcvbuf_size/so_sndbuf_size is the size of the socket buffer used by C* when accepting/forwarding CPT data respectively. It should be at least twice as big as forward_frame_size and/or the frame sized used when sending CPT data. Values > 128K usually require changing net.core.rmem_max/net.core.wmem_max.

          forward controls whether C* should forward data to other nodes.

          bin/generatecptdata will produce test data with the parameters given in bin/generatecptdata.config

          bin/listcptdetails will list information about file(s)/dir(s) given as arguments, including number of rows and size of the useful raw data (versus overhead/packaging).

          bin/loadcptdata will load CPT files to a cluster and serves as a great starting point for doing this from other applications.

          Thrift generates a lot of garbage and as such it is trivial to max out JVM GC with the current implementation. Tuning GC for your loads is a good idea for best performance (smallish memtables and larger newgen is a good place to start).

          After a CPT load it is important to flush as CPT loading skips the commitlog (which also implies that the "retry" division is on segments, not row and/or individual mutations).

          Best performance, especially on larger clusters, will be achieved by partitioning your data into segments such that a given segment corresponds to precisely one range in the cluster (see loadcptdata for an example of how to do this). Then sending each segment to each of the replicas for that range in a separate connection/thread with forward=false (note that loadcptdata does not do this, it assumes segments can contain data for any node). This provides two important benefits:

          1) it allows all nodes at all times to make as much progress as possible independent of other nodes/replicas slowing down (e.g. compaction, GC).

          2) it allows you to retry precisely the failed segments on precisely the failed nodes if a node fails during a CPT load (in the more general sense it lets you reason about what data was loaded where instead of depending on AE, RR, HH).

          Show
          mdennis Matthew F. Dennis added a comment - Attached patch implements a Cassandra Pipelined Thrift (CPT) loader. Thrift objects are serialized over a socket to C* in "segments". Once the segment is loaded, C* responds with a CPTResponse object detailing the various counts about what was loaded and the connection is closed. Rows and/or columns need not be sorted. No throttling need occur - the client can safely write data to the socket as fast as it will accept it. CPT correctly handles secondary indexes. The format of sending a segment is: PROTOCOL_MAGIC (i.e. MessagingService.PROTOCOL_MAGIC) PROTOCOL_HEADER (i.e. IncomingCassandraPipelinedThriftReader.PROTOCOL_HEADER) CPTHeader CPTRowHeader (for each row sent) Column|SuperColumn (for each Column|SuperColumn in the row) Each row is terminated by an empty Column or SuperColumn (e.g. IncomingCassandraPipelinedThriftReader.END_OF_COLUMNS) Each segment is terminated by an empty CPTRowHeader (e.g. IncomingCassandraPipelinedThriftReader.END_OF_ROWS) A CPTHeader consists of several fields: struct CPTHeader { 1: required string keyspace, 2: required string column_family, 3: required i32 table_flush_size, 4: required i32 forward_frame_size, 5: required i32 so_rcvbuf_size, 6: required i32 so_sndbuf_size, 7: required bool forward, } table_flush_size controls the amount of data that is buffered before applying the mutation. Several KB is a good starting value for this. forward_frame_size only applies with forward=true and controls what thrift frame size to use when forwarding data to other nodes. 256K is a good value here, but it should be 1/2 or less the size of so_rcvbuf_size and so_sndbuf_size. so_rcvbuf_size/so_sndbuf_size is the size of the socket buffer used by C* when accepting/forwarding CPT data respectively. It should be at least twice as big as forward_frame_size and/or the frame sized used when sending CPT data. Values > 128K usually require changing net.core.rmem_max/net.core.wmem_max. forward controls whether C* should forward data to other nodes. bin/generatecptdata will produce test data with the parameters given in bin/generatecptdata.config bin/listcptdetails will list information about file(s)/dir(s) given as arguments, including number of rows and size of the useful raw data (versus overhead/packaging). bin/loadcptdata will load CPT files to a cluster and serves as a great starting point for doing this from other applications. Thrift generates a lot of garbage and as such it is trivial to max out JVM GC with the current implementation. Tuning GC for your loads is a good idea for best performance (smallish memtables and larger newgen is a good place to start). After a CPT load it is important to flush as CPT loading skips the commitlog (which also implies that the "retry" division is on segments, not row and/or individual mutations). Best performance, especially on larger clusters, will be achieved by partitioning your data into segments such that a given segment corresponds to precisely one range in the cluster (see loadcptdata for an example of how to do this). Then sending each segment to each of the replicas for that range in a separate connection/thread with forward=false (note that loadcptdata does not do this, it assumes segments can contain data for any node). This provides two important benefits: 1) it allows all nodes at all times to make as much progress as possible independent of other nodes/replicas slowing down (e.g. compaction, GC). 2) it allows you to retry precisely the failed segments on precisely the failed nodes if a node fails during a CPT load (in the more general sense it lets you reason about what data was loaded where instead of depending on AE, RR, HH).
          Hide
          tjake T Jake Luciani added a comment -

          I haven't had much time to dig into this yet but here is what I observed so far:

          1. took 12 + minutes to bulk load 1.8G of data locally. I have no bearing if that is fast or slow but it felt slow, what should I expect?
          2. compaction ran all during the bulk load.
          3. listcptdata needs a usage message
          4. needs a readme file explaining this, since it's not obvious what's going on here

          Show
          tjake T Jake Luciani added a comment - I haven't had much time to dig into this yet but here is what I observed so far: 1. took 12 + minutes to bulk load 1.8G of data locally. I have no bearing if that is fast or slow but it felt slow, what should I expect? 2. compaction ran all during the bulk load. 3. listcptdata needs a usage message 4. needs a readme file explaining this, since it's not obvious what's going on here
          Hide
          mdennis Matthew F. Dennis added a comment -

          1. took 12 + minutes to bulk load 1.8G of data locally. I have no bearing if that is fast or slow but it felt slow, what should I expect?

          That is very slow. Likely your memtable is too large and the JVM is getting stuck in GC. On a single EC2 XL instance loading from that instance it takes about 2.5 minutes to load that 1.8 GB of data consisting of ~5.2M rows with compaction turned on, 64 MB memtables and loading from the same node (it's faster using a second node to load the data).

          2. compaction ran all during the bulk load.

          The CPT loader itself doesn't do anything to turn compaction on or off. Better performance is often achieved by turning it off though.

          3. listcptdata needs a usage message

          done

          4. needs a readme file explaining this, since it's not obvious what's going on here

          done. Also, generatecptdata.config has also been commented.

          Show
          mdennis Matthew F. Dennis added a comment - 1. took 12 + minutes to bulk load 1.8G of data locally. I have no bearing if that is fast or slow but it felt slow, what should I expect? That is very slow. Likely your memtable is too large and the JVM is getting stuck in GC. On a single EC2 XL instance loading from that instance it takes about 2.5 minutes to load that 1.8 GB of data consisting of ~5.2M rows with compaction turned on, 64 MB memtables and loading from the same node (it's faster using a second node to load the data). 2. compaction ran all during the bulk load. The CPT loader itself doesn't do anything to turn compaction on or off. Better performance is often achieved by turning it off though. 3. listcptdata needs a usage message done 4. needs a readme file explaining this, since it's not obvious what's going on here done. Also, generatecptdata.config has also been commented.
          Hide
          mdennis Matthew F. Dennis added a comment -

          On a related note, I think AuthFlushingThriftTransport should likely be committed to thrift. If everyone agrees, I'll split that out into a thrift patch...

          Show
          mdennis Matthew F. Dennis added a comment - On a related note, I think AuthFlushingThriftTransport should likely be committed to thrift. If everyone agrees, I'll split that out into a thrift patch...
          Hide
          tjake T Jake Luciani added a comment - - edited

          I understand why you did what you did here, but the concept of taking thrift encoded data over streaming port then creating another set of thrift objects to create row mutations feels, well, bulky

          It seems like there would be a way to refine what you've done to go from client -> memtable more quickly.

          if you took the column and supercolumn serializers and streamed the delimited byte arrays you would build up a CSLM<ByteBuffer,ByteBuffer> and call SSTableWriter.append once it's "full"

          You could then kick off secondary index rebuilding in the background.

          Show
          tjake T Jake Luciani added a comment - - edited I understand why you did what you did here, but the concept of taking thrift encoded data over streaming port then creating another set of thrift objects to create row mutations feels, well, bulky It seems like there would be a way to refine what you've done to go from client -> memtable more quickly. if you took the column and supercolumn serializers and streamed the delimited byte arrays you would build up a CSLM<ByteBuffer,ByteBuffer> and call SSTableWriter.append once it's "full" You could then kick off secondary index rebuilding in the background.
          Hide
          jbellis Jonathan Ellis added a comment -

          I don't think we're involving gossip here?

          Show
          jbellis Jonathan Ellis added a comment - I don't think we're involving gossip here?
          Hide
          tjake T Jake Luciani added a comment -

          Oops, mean't streaming. fixed above.

          Show
          tjake T Jake Luciani added a comment - Oops, mean't streaming. fixed above.
          Hide
          jbellis Jonathan Ellis added a comment -

          For what it's worth, Matt and I discussed 3 options for the serialization format:

          1. a custom format
          2. thrift
          3. native ColumnSerializer format

          1. has little to recommend it. We have enough serialization formats already.

          2. has the benefit of being cross-platform out of the box.

          3. is obviously going to be fastest, especially if you require rows to be in sorted order so you can basically stream directly to an sstable data file.

          Neither 2. nor 3. is inherently unreasonable; the question is whether we can get to "fast enough" with 2. That's a question we couldn't answer without building it.

          Show
          jbellis Jonathan Ellis added a comment - For what it's worth, Matt and I discussed 3 options for the serialization format: 1. a custom format 2. thrift 3. native ColumnSerializer format 1. has little to recommend it. We have enough serialization formats already. 2. has the benefit of being cross-platform out of the box. 3. is obviously going to be fastest, especially if you require rows to be in sorted order so you can basically stream directly to an sstable data file. Neither 2. nor 3. is inherently unreasonable; the question is whether we can get to "fast enough" with 2. That's a question we couldn't answer without building it.
          Hide
          tjake T Jake Luciani added a comment - - edited

          I don't think we'd need to change the client facing API, I was thinking of just moving more of the work to the client.
          We could make a java based importtool that a user can pipe the CPT format serialized data into to which in turn would encode it to BMT or serialized rows to be streamed to the nodes.

          That way most of the work of encoding / decoding happens locally and users can write their loaders in any language.

          Show
          tjake T Jake Luciani added a comment - - edited I don't think we'd need to change the client facing API, I was thinking of just moving more of the work to the client. We could make a java based importtool that a user can pipe the CPT format serialized data into to which in turn would encode it to BMT or serialized rows to be streamed to the nodes. That way most of the work of encoding / decoding happens locally and users can write their loaders in any language.
          Hide
          jbellis Jonathan Ellis added a comment -

          That's more complex, but it does have a lot of "best of both worlds" to it.

          If we do that, I'd like to make it gossip-aware so it can take care of proxying/replicating too instead of doing that server-side.

          Show
          jbellis Jonathan Ellis added a comment - That's more complex, but it does have a lot of "best of both worlds" to it. If we do that, I'd like to make it gossip-aware so it can take care of proxying/replicating too instead of doing that server-side.
          Hide
          jbellis Jonathan Ellis added a comment - - edited

          Thinking about this some more, I think we can really simplify it from a client perspective.

          We could implement the Thrift Cassandra interface (the interface implemented by CassandraServer) in a bulk loader server. ("Server" in that thrift clients connect to it, but it would run on client machines, not Cassandra nodes.)

          Writes would be turned into streaming, serialized-byte-streams by using Memtable + sort. We would keep Memtable-per-replica-range, so the actual Cassandra node doesn't need to deserialize to potentially forward. (Obviously we would not support any read operations.)

          This approach would yield zero need for new work on the client side – you can use Hector, Pycassa, Aquiles, whatever, and normal batch_mutate could be turned into bulk load streams.

          The one change we'd need on the client side would be a batch_complete call to say "we're done, now build 2ary indexes." (per-sstable bloom + primary index can be built in parallel w/ the load, the way StreamIn currently does.)

          Again, we could probably update the StreamIn/StreamOut interface to handle the bulkload daemon -> Cassandra traffice. It may be simpler to create a new api but my guess is not.

          Show
          jbellis Jonathan Ellis added a comment - - edited Thinking about this some more, I think we can really simplify it from a client perspective. We could implement the Thrift Cassandra interface (the interface implemented by CassandraServer) in a bulk loader server. ("Server" in that thrift clients connect to it, but it would run on client machines, not Cassandra nodes.) Writes would be turned into streaming, serialized-byte-streams by using Memtable + sort. We would keep Memtable-per-replica-range, so the actual Cassandra node doesn't need to deserialize to potentially forward. (Obviously we would not support any read operations.) This approach would yield zero need for new work on the client side – you can use Hector, Pycassa, Aquiles, whatever, and normal batch_mutate could be turned into bulk load streams. The one change we'd need on the client side would be a batch_complete call to say "we're done, now build 2ary indexes." (per-sstable bloom + primary index can be built in parallel w/ the load, the way StreamIn currently does.) Again, we could probably update the StreamIn/StreamOut interface to handle the bulkload daemon -> Cassandra traffice. It may be simpler to create a new api but my guess is not.
          Hide
          tjake T Jake Luciani added a comment -

          This is pretty ideal. it puts most of the load on the client, and offers little change for the user...

          Show
          tjake T Jake Luciani added a comment - This is pretty ideal. it puts most of the load on the client, and offers little change for the user...
          Hide
          appodictic Edward Capriolo added a comment -

          I definitely agree that having a 100% pure cassandra way to do this is good. I would personally like to see a JMX function like 'nodetool addsstable mykeyspace mycf mysstable-file' . Most people can generating and move an SSTable on their own (sstableWriter +scp) . Those people only need a way to alert cassandra about new SSTables in the data directory. Doing that is a subset of the work needed to find a way to stream it.

          Show
          appodictic Edward Capriolo added a comment - I definitely agree that having a 100% pure cassandra way to do this is good. I would personally like to see a JMX function like 'nodetool addsstable mykeyspace mycf mysstable-file' . Most people can generating and move an SSTable on their own (sstableWriter +scp) . Those people only need a way to alert cassandra about new SSTables in the data directory. Doing that is a subset of the work needed to find a way to stream it.
          Hide
          mdennis Matthew F. Dennis added a comment -

          The attached v2 patch implements the requested proxy server. bin/proxyloader starts the proxy server while proxy.conf, proxy-env.sh and log4j-proxy.conf control the the configuration of the proxy server. Defaults are provided and described.

          As requested, there is no CPT implementation in this patch; existing code using CPT should be able to use BareMemtableManager directly without too much trouble.

          The proxy server accepts calls via Thrift RPC and then eventually streams the results as an SSTable to C* which will schedule a build of the indexes, bloom filters and secondary indexes.

          In addition to the extensive suite of functional tests, testing was done across 5 C* nodes at RF=3 and 3 proxy nodes all running on EC2 XL instances. stress.py was run on the same nodes as the proxy and at 4 threads was easily able to saturate the CPU. As expected the performance on each proxy is marginally higher than native C*. The important differences are:

          • a (much) lower number of threads is required to fully utilize all the available CPU (though adding more threads did not diminish the throughput).
          • the proxy does not return timeouts, but instead just slows input if it's completely overloaded.
          • moved the load from C* to the proxies as intended, essentially requiring only one core to build the indexes/filters once the data is streamed.
          Show
          mdennis Matthew F. Dennis added a comment - The attached v2 patch implements the requested proxy server. bin/proxyloader starts the proxy server while proxy.conf, proxy-env.sh and log4j-proxy.conf control the the configuration of the proxy server. Defaults are provided and described. As requested, there is no CPT implementation in this patch; existing code using CPT should be able to use BareMemtableManager directly without too much trouble. The proxy server accepts calls via Thrift RPC and then eventually streams the results as an SSTable to C* which will schedule a build of the indexes, bloom filters and secondary indexes. In addition to the extensive suite of functional tests, testing was done across 5 C* nodes at RF=3 and 3 proxy nodes all running on EC2 XL instances. stress.py was run on the same nodes as the proxy and at 4 threads was easily able to saturate the CPU. As expected the performance on each proxy is marginally higher than native C*. The important differences are: a (much) lower number of threads is required to fully utilize all the available CPU (though adding more threads did not diminish the throughput). the proxy does not return timeouts, but instead just slows input if it's completely overloaded. moved the load from C* to the proxies as intended, essentially requiring only one core to build the indexes/filters once the data is streamed.
          Hide
          mdennis Matthew F. Dennis added a comment -

          opened CASSANDRA-2438 to track exposing the table building via JMX

          Show
          mdennis Matthew F. Dennis added a comment - opened CASSANDRA-2438 to track exposing the table building via JMX
          Hide
          jbellis Jonathan Ellis added a comment -

          go ahead and use git to split this up into logical pieces (e.g., internals changes, proxy server, tests).

          where does throughput max out in rows/s as you add clients (proxy nodes), vs stress-against-plain-cassandra?

          Show
          jbellis Jonathan Ellis added a comment - go ahead and use git to split this up into logical pieces (e.g., internals changes, proxy server, tests). where does throughput max out in rows/s as you add clients (proxy nodes), vs stress-against-plain-cassandra?
          Hide
          jbellis Jonathan Ellis added a comment -

          Matt responded on IM,

          RF=3, 20 proxies, everything on XL nodes seems to be the magic number.  C* bumps up and off max CPU while the proxies are running.  each C* node sustains ~150Mb/s incoming.  All the proxies finished at roughly the same time (between 810 and 825 seconds).  There were 200M keys inserted by the stress into the proxies averaging a bit over 3000 inserts/sec/core on the proxies with an "effective" insert rate on the cluster of 12K+ /sec/core.

          Show
          jbellis Jonathan Ellis added a comment - Matt responded on IM, RF=3, 20 proxies, everything on XL nodes seems to be the magic number.  C* bumps up and off max CPU while the proxies are running.  each C* node sustains ~150Mb/s incoming.  All the proxies finished at roughly the same time (between 810 and 825 seconds).  There were 200M keys inserted by the stress into the proxies averaging a bit over 3000 inserts/sec/core on the proxies with an "effective" insert rate on the cluster of 12K+ /sec/core.
          Hide
          mdennis Matthew F. Dennis added a comment -

          rebased v2 attached

          Show
          mdennis Matthew F. Dennis added a comment - rebased v2 attached
          Hide
          mdennis Matthew F. Dennis added a comment -

          The above numbers are correct, but at RF=1 (I mistyped it in IM).

          At both RF=1 and RF=3 there were 5 M1.XL C* nodes and 20 M1.XL proxy nodes, each doing 10M inserts.

          At RF=1 C* nodes bump up against max CPU while the proxies are running from building indexes/filters and compacting. The nodes sustain ~150Mb/s incoming traffic each. All the proxies finished between 810 and 825 seconds. With 20 proxies * 10M inserts/proxy * RF=1 that is 200M inserts across 4 * 20 cores on the proxies or 4 * 5 cores when measured by cluster cores resulting in a bit over 3K inserts/sec/core on the proxies and a bit over 12K "effective inserts"/sec/core on the cluster.

          At RF=3 the results are as expected, taking about 2560 seconds to finish (so about 100 seconds longer than expected when increasing from RF=1). This is just shy of 3K inserts/sec/core on the proxies and little under 12K "effective inserts"/sec/core on the cluster. As it looked like 20 proxies maxed out 5 nodes at RF=1 one would expect RF=3 to take roughly 3 times as long. Network traffic was more variable though at RF=3 as it bounced between 80-200 Mb/s.

          There were no timeouts in either case.

          Show
          mdennis Matthew F. Dennis added a comment - The above numbers are correct, but at RF=1 (I mistyped it in IM). At both RF=1 and RF=3 there were 5 M1.XL C* nodes and 20 M1.XL proxy nodes, each doing 10M inserts. At RF=1 C* nodes bump up against max CPU while the proxies are running from building indexes/filters and compacting. The nodes sustain ~150Mb/s incoming traffic each. All the proxies finished between 810 and 825 seconds. With 20 proxies * 10M inserts/proxy * RF=1 that is 200M inserts across 4 * 20 cores on the proxies or 4 * 5 cores when measured by cluster cores resulting in a bit over 3K inserts/sec/core on the proxies and a bit over 12K "effective inserts"/sec/core on the cluster. At RF=3 the results are as expected, taking about 2560 seconds to finish (so about 100 seconds longer than expected when increasing from RF=1). This is just shy of 3K inserts/sec/core on the proxies and little under 12K "effective inserts"/sec/core on the cluster. As it looked like 20 proxies maxed out 5 nodes at RF=1 one would expect RF=3 to take roughly 3 times as long. Network traffic was more variable though at RF=3 as it bounced between 80-200 Mb/s. There were no timeouts in either case.
          Hide
          stuhood Stu Hood added a comment -

          Very interesting work here!

          • Did you accidentally remove the buffering for non-streaming connections in IncomingTCPConnection?
          • What kind of failures are supported during a load? What's the proper behavior for flush_proxy when some hosts fail?
          • Could we avoid coding in knowledge of the file format in the inner loop of IncomingLoaderStreamReader? I would much, much prefer that non-file-format-specific framing be added, and it would have the added benefit of not requiring as many system calls (4 per row vs 2 per frame)
          • What is the benefit of using an independent protocol for "Loader" streams?

          Again, awesome.

          Show
          stuhood Stu Hood added a comment - Very interesting work here! Did you accidentally remove the buffering for non-streaming connections in IncomingTCPConnection? What kind of failures are supported during a load? What's the proper behavior for flush_proxy when some hosts fail? Could we avoid coding in knowledge of the file format in the inner loop of IncomingLoaderStreamReader? I would much, much prefer that non-file-format-specific framing be added, and it would have the added benefit of not requiring as many system calls (4 per row vs 2 per frame) What is the benefit of using an independent protocol for "Loader" streams? Again, awesome.
          Hide
          mdennis Matthew F. Dennis added a comment -

          Did you accidentally remove the buffering for non-streaming connections in IncomingTCPConnection?

          It was intentional as previously only the streaming was buffered (at 4k) but the bulk of the work uses the socket channel; only the size and header are read from input and the header uses readFully. It adds an extra call when constructing the stream because of the size but avoids copying the data into the buffer (in the BufferedStream) and then into the byte array. We could lower some of those calls by reading both the magic and the header int at the same time into a ByteBuffer and then viewing it as an IntBuffer but I don't think that buys you anything as it only happens on a new connection. It also avoids bugs where something has been read from the socket into the buffer and then the socket channel is used later even though the buffer may not have been fully drained.

          What kind of failures are supported during a load?

          On the server side all failures result in the same behaviour: close socket, delete temp files.
          On the client side if flushing of a BareMemtable to the server fails the proxy will log it and continue running.

          In both cases any data that was being loaded via the proxy needs to be reloaded.

          What's the proper behaviour for flush_proxy when some hosts fail?

          log failed flushes and continue running; any data that was being loaded via the proxy needs to be reloaded.

          Could we avoid coding in knowledge of the file format in the inner loop of IncomingLoaderStreamReader? I would much, much prefer that non-file-format-specific framing be added, and it would have the added benefit of not requiring as many system calls (4 per row vs 2 per frame)

          We could construct something that buffers up X amount of data and then frames the data being sent and change the inner loop to decompose that but it's extra complexity, code and overhead. If we buffer it on the other side we consume more memory for a longer period of time (thus giving it a better chance that it needs to promoted and/or compacted) adding to the already problematic GC pressure. If we don't buffer the rows we end up framing every row which is additional data and still doing 2 out of the 4 transfers we do now on data of the same size (since the frames wouldn't be any bigger). BTW, 2 or 4 xfes in this situation doesn't affect the performance; the latency on the network and CPU of compaction and indexing building dwarf any gains to be made here. The current approach has the added benefit that debugging is easy because it's clear where the key and row boundaries are.

          What is the benefit of using an independent protocol for "Loader" streams?

          If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front. While a proxy could just generate a random name, it doesn't know the sizes because it doesn't have a SSTable on disk (or buffered in memory). There is also no way for a node to request a retry from a proxy if the stream fails because the proxy won't have the data and in general is probably firewalled off C*-to-Proxy connections. And even if we did, we'd still have a bunch of small sessions because the proxy doesn't know when a client is going to stop sending data to it. In the most general sense it could be a constant thing; a client may just continually pump an RSS feed or stock ticks or something into it. tl;dr simplicity and code reduction.

          Again, awesome.

          thanks

          Show
          mdennis Matthew F. Dennis added a comment - Did you accidentally remove the buffering for non-streaming connections in IncomingTCPConnection? It was intentional as previously only the streaming was buffered (at 4k) but the bulk of the work uses the socket channel; only the size and header are read from input and the header uses readFully. It adds an extra call when constructing the stream because of the size but avoids copying the data into the buffer (in the BufferedStream) and then into the byte array. We could lower some of those calls by reading both the magic and the header int at the same time into a ByteBuffer and then viewing it as an IntBuffer but I don't think that buys you anything as it only happens on a new connection. It also avoids bugs where something has been read from the socket into the buffer and then the socket channel is used later even though the buffer may not have been fully drained. What kind of failures are supported during a load? On the server side all failures result in the same behaviour: close socket, delete temp files. On the client side if flushing of a BareMemtable to the server fails the proxy will log it and continue running. In both cases any data that was being loaded via the proxy needs to be reloaded. What's the proper behaviour for flush_proxy when some hosts fail? log failed flushes and continue running; any data that was being loaded via the proxy needs to be reloaded. Could we avoid coding in knowledge of the file format in the inner loop of IncomingLoaderStreamReader? I would much, much prefer that non-file-format-specific framing be added, and it would have the added benefit of not requiring as many system calls (4 per row vs 2 per frame) We could construct something that buffers up X amount of data and then frames the data being sent and change the inner loop to decompose that but it's extra complexity, code and overhead. If we buffer it on the other side we consume more memory for a longer period of time (thus giving it a better chance that it needs to promoted and/or compacted) adding to the already problematic GC pressure. If we don't buffer the rows we end up framing every row which is additional data and still doing 2 out of the 4 transfers we do now on data of the same size (since the frames wouldn't be any bigger). BTW, 2 or 4 xfes in this situation doesn't affect the performance; the latency on the network and CPU of compaction and indexing building dwarf any gains to be made here. The current approach has the added benefit that debugging is easy because it's clear where the key and row boundaries are. What is the benefit of using an independent protocol for "Loader" streams? If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front. While a proxy could just generate a random name, it doesn't know the sizes because it doesn't have a SSTable on disk (or buffered in memory). There is also no way for a node to request a retry from a proxy if the stream fails because the proxy won't have the data and in general is probably firewalled off C*-to-Proxy connections. And even if we did, we'd still have a bunch of small sessions because the proxy doesn't know when a client is going to stop sending data to it. In the most general sense it could be a constant thing; a client may just continually pump an RSS feed or stock ticks or something into it. tl;dr simplicity and code reduction. Again, awesome. thanks
          Hide
          stuhood Stu Hood added a comment -

          > It was intentional as previously only the streaming was buffered (at 4k)
          It was the other way around IIRC: (non-encrypted) streaming used channel.transferTo, which bypassed the buffering entirely. The buffering was for internode messaging: see CASSANDRA-1943.

          > We could construct something that buffers up X amount of data and then frames the data being
          > sent and change the inner loop to decompose that but it's extra complexity, code and overhead.
          You're already buffering rows in StreamingProxyFlusher.bufferRow: the change would simply be to continue to buffer rows until a threshold was reached. The benefit here is that the code on the receiving side doesn't need to change when the proxy starts sending it a different SSTable version/format. I've never heard of somebody regretting having framing in a protocol: it's always the other way around.

          Also, an SSTable version (as usually held by Descriptor) should be added to the header of your protocol so that clients don't break by sending unversioned blobs: not having versioning is my primary complaint vis-a-vis BinaryMemtables.

          > If we buffer it on the other side we consume more memory for a longer period of time
          I was talking about buffering on the client side: the server side can do one system call to flush to disk, such that it never enters userspace.

          Thanks!

          Show
          stuhood Stu Hood added a comment - > It was intentional as previously only the streaming was buffered (at 4k) It was the other way around IIRC: (non-encrypted) streaming used channel.transferTo, which bypassed the buffering entirely. The buffering was for internode messaging: see CASSANDRA-1943 . > We could construct something that buffers up X amount of data and then frames the data being > sent and change the inner loop to decompose that but it's extra complexity, code and overhead. You're already buffering rows in StreamingProxyFlusher.bufferRow: the change would simply be to continue to buffer rows until a threshold was reached. The benefit here is that the code on the receiving side doesn't need to change when the proxy starts sending it a different SSTable version/format. I've never heard of somebody regretting having framing in a protocol: it's always the other way around. Also, an SSTable version (as usually held by Descriptor) should be added to the header of your protocol so that clients don't break by sending unversioned blobs: not having versioning is my primary complaint vis-a-vis BinaryMemtables. > If we buffer it on the other side we consume more memory for a longer period of time I was talking about buffering on the client side: the server side can do one system call to flush to disk, such that it never enters userspace. Thanks!
          Hide
          jbellis Jonathan Ellis added a comment -

          If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front

          We've had enough trouble debugging streaming when people use it all the time for repair. I shudder to think of the bugs we'll introduce to a second-class protocol that gets used slightly more often than BMT.

          Maybe we've been too clever here: why not just write out the full sstable on the client, and stream it over (indexes and all) so that

          • we move the [primary] index build off the server, which should give a nice performance boost
          • we have filenames and sizes ready to go so streaming will be happy

          We're still talking about a minor change to streaming of recognizing that we're getting all the components and not just data, but that's something we can deal with at the StreamInSession level, I don't think we'll need to change the protocol itself.

          Show
          jbellis Jonathan Ellis added a comment - If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front We've had enough trouble debugging streaming when people use it all the time for repair. I shudder to think of the bugs we'll introduce to a second-class protocol that gets used slightly more often than BMT. Maybe we've been too clever here: why not just write out the full sstable on the client, and stream it over (indexes and all) so that we move the [primary] index build off the server, which should give a nice performance boost we have filenames and sizes ready to go so streaming will be happy We're still talking about a minor change to streaming of recognizing that we're getting all the components and not just data, but that's something we can deal with at the StreamInSession level, I don't think we'll need to change the protocol itself.
          Hide
          stuhood Stu Hood added a comment -

          Maybe we've been too clever here: why not just write out the full sstable on the client, and stream it over (indexes and all) so that

          As much as I want to merge the protocols, I'm not sure I like the limitations this puts on clients: being able to send a stream without needing local tempspace is very, very beneficial, IMO (for example, needing tempspace was by far the most annoying limitation of a Hadoop LuceneOutputFormat I worked on).

          If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front

          something we can deal with at the StreamInSession level, I don't think we'll need to change the protocol itself

          With versioned messaging, changing the protocol is at least possible, if painful... my dream would be:

          1. Deprecate the file ranges in Streaming session objects, to be replaced with framing in the stream
          2. Move the Streaming session object to a header of the streaming connection (almost identical to LoaderStream)
          3. Deprecate the Messaging based setup and teardown for streaming sessions: a sender initiates a stream by opening a streaming connection, and tears it down with success codes after each file (again, like this protocol)

          tl;dr: I'd prefer some slight adjustments to Matt's protocol (mentioned above) over requiring tempspace on the client.

          Show
          stuhood Stu Hood added a comment - Maybe we've been too clever here: why not just write out the full sstable on the client, and stream it over (indexes and all) so that As much as I want to merge the protocols, I'm not sure I like the limitations this puts on clients: being able to send a stream without needing local tempspace is very, very beneficial, IMO (for example, needing tempspace was by far the most annoying limitation of a Hadoop LuceneOutputFormat I worked on). If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front something we can deal with at the StreamInSession level, I don't think we'll need to change the protocol itself With versioned messaging, changing the protocol is at least possible, if painful... my dream would be: Deprecate the file ranges in Streaming session objects, to be replaced with framing in the stream Move the Streaming session object to a header of the streaming connection (almost identical to LoaderStream) Deprecate the Messaging based setup and teardown for streaming sessions: a sender initiates a stream by opening a streaming connection, and tears it down with success codes after each file (again, like this protocol) tl;dr: I'd prefer some slight adjustments to Matt's protocol (mentioned above) over requiring tempspace on the client.
          Hide
          jbellis Jonathan Ellis added a comment -

          OTOH, we already have a no-tempspace-required API for the client. It's totally reasonable to require tempspace for bulkload in exchange for an extra 2x? performance win.

          Show
          jbellis Jonathan Ellis added a comment - OTOH, we already have a no-tempspace-required API for the client. It's totally reasonable to require tempspace for bulkload in exchange for an extra 2x? performance win.
          Hide
          stuhood Stu Hood added a comment -

          It's totally reasonable to require tempspace for bulkload in exchange for an extra 2x? performance win.

          There are definitely ways we can get this performance back on the server side (in the future) without affecting clients. In particular, we could build the index behind the data as it arrives: the only blocker for doing this currently is that we need an estimated size to start building the bloom filter, but I see multiple ways around that (including partitioning the filter, which has other benefits: see CASSANDRA-2466).

          Additionally, our existing streaming protocol requires that a client be able to communicate out of band in our Messaging layer, where there be dragons. Honestly, I'd like to call Matt's protocol (plus framing and a version) "streaming v2".

          But if you feel strongly about it, then by all means... I'm not trying to block progress here.

          Show
          stuhood Stu Hood added a comment - It's totally reasonable to require tempspace for bulkload in exchange for an extra 2x? performance win. There are definitely ways we can get this performance back on the server side (in the future) without affecting clients. In particular, we could build the index behind the data as it arrives: the only blocker for doing this currently is that we need an estimated size to start building the bloom filter, but I see multiple ways around that (including partitioning the filter, which has other benefits: see CASSANDRA-2466 ). Additionally, our existing streaming protocol requires that a client be able to communicate out of band in our Messaging layer, where there be dragons. Honestly, I'd like to call Matt's protocol (plus framing and a version) "streaming v2". But if you feel strongly about it, then by all means... I'm not trying to block progress here.
          Hide
          cburroughs Chris Burroughs added a comment -

          : the only blocker for doing this currently is that we need an estimated size to start building the bloom filter, but I see multiple ways around that (including partitioning the filter, which has other benefits: see CASSANDRA-2466).

          There is literature on scalable or dynamic BloomFilters to do this in a mathematically sound way.

          Show
          cburroughs Chris Burroughs added a comment - : the only blocker for doing this currently is that we need an estimated size to start building the bloom filter, but I see multiple ways around that (including partitioning the filter, which has other benefits: see CASSANDRA-2466 ). There is literature on scalable or dynamic BloomFilters to do this in a mathematically sound way.
          Hide
          jbellis Jonathan Ellis added a comment - - edited

          There is literature on scalable or dynamic BloomFilters to do this in a mathematically sound way.

          For example?

          Show
          jbellis Jonathan Ellis added a comment - - edited There is literature on scalable or dynamic BloomFilters to do this in a mathematically sound way. For example?
          Hide
          jbellis Jonathan Ellis added a comment - - edited

          I'd like to call Matt's protocol (plus framing and a version) "streaming v2".

          My main concern is that we have a single protocol for all streaming. Less concerned about which protocol that is, although my impression is that it's easier to do bulk load w/ existing (tested and debugged!) streaming, than to make "streaming v2" backwards compatible w/ the existing one.

          On the other hand, I don't think it's strictly necessary to require streaming compatibility since a cluster can run fine without it for a few days while doing a rolling upgrade. (Although it does make the "upgrade a single node, wait for a week to see how it does" approach less viable.)

          Show
          jbellis Jonathan Ellis added a comment - - edited I'd like to call Matt's protocol (plus framing and a version) "streaming v2". My main concern is that we have a single protocol for all streaming. Less concerned about which protocol that is, although my impression is that it's easier to do bulk load w/ existing (tested and debugged!) streaming, than to make "streaming v2" backwards compatible w/ the existing one. On the other hand, I don't think it's strictly necessary to require streaming compatibility since a cluster can run fine without it for a few days while doing a rolling upgrade. (Although it does make the "upgrade a single node, wait for a week to see how it does" approach less viable.)
          Hide
          cburroughs Chris Burroughs added a comment -

          bq. There is literature on scalable or dynamic BloomFilters to do this in a mathematically sound way.

          For example?

          References
          [1] P. Almeida, C. Baquero, N. Preguica, and D. Hutchison. Scalable bloom
              filters. Information Processing Letters, 101(6):255–261, March 2007.
          
          [2] Deke Guo, Jie Wu, Honghui Chen, Ye Yuan, and Xueshan Luo. The
              dynamic bloom filters. IEEE Transactions on Knowledge and Data
              Engineering, 22(1):120–133, January 2010.
          

          And also "Dynamic Bloom Filters: Analysis and usability" which is not appear to be in a journal but does cast some doubt on the practicality. Google Scholar can find PDFs for all of these.

          Show
          cburroughs Chris Burroughs added a comment - bq. There is literature on scalable or dynamic BloomFilters to do this in a mathematically sound way. For example? References [1] P. Almeida, C. Baquero, N. Preguica, and D. Hutchison. Scalable bloom filters. Information Processing Letters, 101(6):255–261, March 2007. [2] Deke Guo, Jie Wu, Honghui Chen, Ye Yuan, and Xueshan Luo. The dynamic bloom filters. IEEE Transactions on Knowledge and Data Engineering, 22(1):120–133, January 2010. And also "Dynamic Bloom Filters: Analysis and usability" which is not appear to be in a journal but does cast some doubt on the practicality. Google Scholar can find PDFs for all of these.
          Hide
          jbellis Jonathan Ellis added a comment -

          There is literature on scalable or dynamic BloomFilters to do this in a mathematically sound way.

          Thanks for the links. Chained BF looks like a lot of complexity to add.

          Do we really need to go there? We have the ability to calculate approximate row counts before we start streaming: in the bulk load and bootstrap case we obviously know how many rows we have; in the repair case, we can estimate from the in-memory row index sample.

          Show
          jbellis Jonathan Ellis added a comment - There is literature on scalable or dynamic BloomFilters to do this in a mathematically sound way. Thanks for the links. Chained BF looks like a lot of complexity to add. Do we really need to go there? We have the ability to calculate approximate row counts before we start streaming: in the bulk load and bootstrap case we obviously know how many rows we have; in the repair case, we can estimate from the in-memory row index sample.
          Hide
          stuhood Stu Hood added a comment -

          > We have the ability to calculate approximate row counts before we start streaming
          That works too. I'm not sure it is necessary for this particular ticket, but "expected keys" could certainly be an optional field (via messaging versioning) in the header for newer streaming clients.

          Show
          stuhood Stu Hood added a comment - > We have the ability to calculate approximate row counts before we start streaming That works too. I'm not sure it is necessary for this particular ticket, but "expected keys" could certainly be an optional field (via messaging versioning) in the header for newer streaming clients.
          Hide
          mdennis Matthew F. Dennis added a comment -

          It was intentional as previously only the streaming was buffered (at 4k)

          It was the other way around IIRC: (non-encrypted) streaming used channel.transferTo, which bypassed the buffering entirely. The buffering was for internode messaging: see CASSANDRA-1943.

          Yes, I see now; that was unintentional and has been corrected.

          We could construct something that buffers up X amount of data and then frames the data being sent and change the inner loop to decompose that but it's extra complexity, code and overhead.

          You're already buffering rows in StreamingProxyFlusher.bufferRow: the change would simply be to continue to buffer rows until a threshold was reached. The benefit here is that the code on the receiving side doesn't need to change when the proxy starts sending it a different SSTable version/format. I've never heard of somebody regretting having framing in a protocol: it's always the other way around.

          Yes, I understood what you were suggesting; that was precisely the extra buffering I was talking about. Buffering more than one row on the client side means we keep larger buffers around and increase the GC pressure which is already a problem on the proxy because of thrift. That being said, I've changed the protocol to be framed but the proxy still just sends one row at a time (each row in a frame) to avoid the problems mentioned. If we later wanted to change the proxy to buffer more or implement a different client the server won't care.

          Also, an SSTable version (as usually held by Descriptor) should be added to the header of your protocol so that clients don't break by sending unversioned blobs: not having versioning is my primary complaint vis-a-vis BinaryMemtables.

          Added, along with a protocol version, to the header.

          If we buffer it on the other side we consume more memory for a longer period of time

          I was talking about buffering on the client side: the server side can do one system call to flush to disk, such that it never enters userspace.

          I was too, it's primarily the buffering on the proxy side that is the problem. The goal is to get the data off the proxy as quickly as possible. As quickly as possible is one row at a time because of the serialization format (size must be known before entire row can be written).

          If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front

          We've had enough trouble debugging streaming when people use it all the time for repair. I shudder to think of the bugs we'll introduce to a second-class protocol that gets used slightly more often than BMT.

          that's because the streaming used for repair is complex and fragile; independent streams are tightly coupled in a session, sizes must be known up front, retries are complex and require out-of-band messaging between nodes, everything is "buffered" on disk before building of any indexes/filters starts, et cetera. In comparison the protocol used for loading is extremely simple; if it makes you feel better we could add a CRC/MD5 to the stream.

          Maybe we've been too clever here: why not just write out the full sstable on the client, and stream it over (indexes and all) so that

          • we move the [primary] index build off the server, which should give a nice performance boost
          • we have filenames and sizes ready to go so streaming will be happy

          We're still talking about a minor change to streaming of recognizing that we're getting all the components and not just data, but that's something we can deal with at the StreamInSession level, I don't think we'll need to change the protocol itself.

          One of the main goals of the bulk loading was that no local/temp storage was required on the client; that has been the plan from the beginning. If you have something that generates full tables, indexes and filters it makes more sense to generate them locally by using the SSTableWriter directly, push them to the box and then using CASSANDRA-2438 to "add" them to the node. Maybe we could add this as an option to the proxy to make it just a bit easier to do but it certainly isn't suitable as the only option. If we want this, it should be a separate ticket as it's separate functionality. Overall though, I'm not really a fan of requiring temp space on the proxy.

          The problem I can think of at the moment is that for large clusters this is a lot of seeking on the proxy since you need to generate one table for every replica set or a lot of repeated passes on the same data. Even if you do this or make it "very fast" (tm) it doesn't much matter because as you transfer small tables to nodes they will almost immediately be compacted meaning the work saved to generate the indexes and filters was wasted and was only a small percentage of the overall work moved off of the cluster. Compacting the tables on the clients before sending them would just make a questionable idea worse...

          Maybe we've been too clever here: why not just write out the full sstable on the client, and stream it over (indexes and all) so that

          As much as I want to merge the protocols, I'm not sure I like the limitations this puts on clients: being able to send a stream without needing local tempspace is very, very beneficial, IMO (for example, needing tempspace was by far the most annoying limitation of a Hadoop LuceneOutputFormat I worked on).

          Exactly; requiring temp space seems like an anti-feature to me.

          If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front

          something we can deal with at the StreamInSession level, I don't think we'll need to change the protocol itself

          With versioned messaging, changing the protocol is at least possible, if painful... my dream would be:

          1. Deprecate the file ranges in Streaming session objects, to be replaced with framing in the stream
          2. Move the Streaming session object to a header of the streaming connection (almost identical to LoaderStream)
          3. Deprecate the Messaging based setup and teardown for streaming sessions: a sender initiates a stream by opening a streaming connection, and tears it down with success codes after each file (again, like this protocol)

          The protocol is now versioned (as well as the table format) so this is possible (though certainly on a different ticket). If we change the existing streaming to use this protocol I think we end up with something a lot less fragile and a lot less complex.

          Essentially the sender is in control and keeps retrying until the receiver has the data; deprecate sessions all together. When node A wants to send things to node B, it records that fact in the system table. For each entry it sends the file using the bulk loading protocol and continues retrying until the file is excepted. For each range it wants to send it frames the entire range. The only complex part is preventing removal of the SSTable on the source (node A) until it was successfully streamed to the destination (node B).

          tl;dr: I'd prefer some slight adjustments to Matt's protocol (mentioned above) over requiring tempspace on the client.

          ditto

          Show
          mdennis Matthew F. Dennis added a comment - It was intentional as previously only the streaming was buffered (at 4k) It was the other way around IIRC: (non-encrypted) streaming used channel.transferTo, which bypassed the buffering entirely. The buffering was for internode messaging: see CASSANDRA-1943 . Yes, I see now; that was unintentional and has been corrected. We could construct something that buffers up X amount of data and then frames the data being sent and change the inner loop to decompose that but it's extra complexity, code and overhead. You're already buffering rows in StreamingProxyFlusher.bufferRow: the change would simply be to continue to buffer rows until a threshold was reached. The benefit here is that the code on the receiving side doesn't need to change when the proxy starts sending it a different SSTable version/format. I've never heard of somebody regretting having framing in a protocol: it's always the other way around. Yes, I understood what you were suggesting; that was precisely the extra buffering I was talking about. Buffering more than one row on the client side means we keep larger buffers around and increase the GC pressure which is already a problem on the proxy because of thrift. That being said, I've changed the protocol to be framed but the proxy still just sends one row at a time (each row in a frame) to avoid the problems mentioned. If we later wanted to change the proxy to buffer more or implement a different client the server won't care. Also, an SSTable version (as usually held by Descriptor) should be added to the header of your protocol so that clients don't break by sending unversioned blobs: not having versioning is my primary complaint vis-a-vis BinaryMemtables. Added, along with a protocol version, to the header. If we buffer it on the other side we consume more memory for a longer period of time I was talking about buffering on the client side: the server side can do one system call to flush to disk, such that it never enters userspace. I was too, it's primarily the buffering on the proxy side that is the problem. The goal is to get the data off the proxy as quickly as possible. As quickly as possible is one row at a time because of the serialization format (size must be known before entire row can be written). If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front We've had enough trouble debugging streaming when people use it all the time for repair. I shudder to think of the bugs we'll introduce to a second-class protocol that gets used slightly more often than BMT. that's because the streaming used for repair is complex and fragile; independent streams are tightly coupled in a session, sizes must be known up front, retries are complex and require out-of-band messaging between nodes, everything is "buffered" on disk before building of any indexes/filters starts, et cetera. In comparison the protocol used for loading is extremely simple; if it makes you feel better we could add a CRC/MD5 to the stream. Maybe we've been too clever here: why not just write out the full sstable on the client, and stream it over (indexes and all) so that we move the [primary] index build off the server, which should give a nice performance boost we have filenames and sizes ready to go so streaming will be happy We're still talking about a minor change to streaming of recognizing that we're getting all the components and not just data, but that's something we can deal with at the StreamInSession level, I don't think we'll need to change the protocol itself. One of the main goals of the bulk loading was that no local/temp storage was required on the client; that has been the plan from the beginning. If you have something that generates full tables, indexes and filters it makes more sense to generate them locally by using the SSTableWriter directly, push them to the box and then using CASSANDRA-2438 to "add" them to the node. Maybe we could add this as an option to the proxy to make it just a bit easier to do but it certainly isn't suitable as the only option. If we want this, it should be a separate ticket as it's separate functionality. Overall though, I'm not really a fan of requiring temp space on the proxy. The problem I can think of at the moment is that for large clusters this is a lot of seeking on the proxy since you need to generate one table for every replica set or a lot of repeated passes on the same data. Even if you do this or make it "very fast" (tm) it doesn't much matter because as you transfer small tables to nodes they will almost immediately be compacted meaning the work saved to generate the indexes and filters was wasted and was only a small percentage of the overall work moved off of the cluster. Compacting the tables on the clients before sending them would just make a questionable idea worse... Maybe we've been too clever here: why not just write out the full sstable on the client, and stream it over (indexes and all) so that As much as I want to merge the protocols, I'm not sure I like the limitations this puts on clients: being able to send a stream without needing local tempspace is very, very beneficial, IMO (for example, needing tempspace was by far the most annoying limitation of a Hadoop LuceneOutputFormat I worked on). Exactly; requiring temp space seems like an anti-feature to me. If you're comparing to the streams we use for repair and similar, they require table names and byte ranges be known up front something we can deal with at the StreamInSession level, I don't think we'll need to change the protocol itself With versioned messaging, changing the protocol is at least possible, if painful... my dream would be: 1. Deprecate the file ranges in Streaming session objects, to be replaced with framing in the stream 2. Move the Streaming session object to a header of the streaming connection (almost identical to LoaderStream) 3. Deprecate the Messaging based setup and teardown for streaming sessions: a sender initiates a stream by opening a streaming connection, and tears it down with success codes after each file (again, like this protocol) The protocol is now versioned (as well as the table format) so this is possible (though certainly on a different ticket). If we change the existing streaming to use this protocol I think we end up with something a lot less fragile and a lot less complex. Essentially the sender is in control and keeps retrying until the receiver has the data; deprecate sessions all together. When node A wants to send things to node B, it records that fact in the system table. For each entry it sends the file using the bulk loading protocol and continues retrying until the file is excepted. For each range it wants to send it frames the entire range. The only complex part is preventing removal of the SSTable on the source (node A) until it was successfully streamed to the destination (node B). tl;dr: I'd prefer some slight adjustments to Matt's protocol (mentioned above) over requiring tempspace on the client. ditto
          Hide
          jbellis Jonathan Ellis added a comment -

          One of the main goals of the bulk loading was that no local/temp storage was required on the client; that has been the plan from the beginning

          No, it hasn't.

          But we can leave that aside for now; we already have "build everything else from the sstable bits" code, so we can add "take advantage of local storage to offload that from the server" later as an optimization.

          deprecate sessions all together

          You're going to need some kind "when all of this is done, run this callback" construct for bootstrap/node movement. Currently we call that a Session.

          When node A wants to send things to node B, it records that fact in the system table. For each entry it sends the file using the bulk loading protocol and continues retrying until the file is excepted.

          Sounds exactly like what existing streaming does.

          The only complex part is preventing removal of the SSTable on the source

          Currently we do this by simply maintaining a reference to the SSTR object so the GC doesn't delete it. There's no need to make it more complicated than that.

          I took a look at the patch. Just superficially, there's a lot of gratuitous change in there, e.g., refactoring test_thrift_server.py. Those changes also need to be moved to a separate patch (again, I suggest git) so reviewers can easily distinguish refactoring from ticket-specific changes.

          Show
          jbellis Jonathan Ellis added a comment - One of the main goals of the bulk loading was that no local/temp storage was required on the client; that has been the plan from the beginning No, it hasn't. But we can leave that aside for now; we already have "build everything else from the sstable bits" code, so we can add "take advantage of local storage to offload that from the server" later as an optimization. deprecate sessions all together You're going to need some kind "when all of this is done, run this callback" construct for bootstrap/node movement. Currently we call that a Session. When node A wants to send things to node B, it records that fact in the system table. For each entry it sends the file using the bulk loading protocol and continues retrying until the file is excepted. Sounds exactly like what existing streaming does. The only complex part is preventing removal of the SSTable on the source Currently we do this by simply maintaining a reference to the SSTR object so the GC doesn't delete it. There's no need to make it more complicated than that. I took a look at the patch. Just superficially, there's a lot of gratuitous change in there, e.g., refactoring test_thrift_server.py. Those changes also need to be moved to a separate patch (again, I suggest git) so reviewers can easily distinguish refactoring from ticket-specific changes.
          Hide
          jbellis Jonathan Ellis added a comment -

          I think we've been over-engineering the problem. Ed was on the right track:

          I would personally like to see a JMX function like 'nodetool addsstable mykeyspace mycf mysstable-file' . Most people can generating and move an SSTable on their own (sstableWriter +scp)

          (This is, btw, the HBase bulk load approach, which despite some clunkiness does seem to solve the problem for those users.)

          The main drawback is that because of Cassandra's replication strategies, data from a naively-written sstable could span many nodes – even the entire cluster.

          So we can improve the experience a lot with a simple tool that just streams ranges from a local table to the right nodes. Since it's doing the exact thing that existing node movement needs – sending ranges from an existing sstable – it should not require any new code from Streaming.

          Sylvain volunteered to take a stab at this.

          Show
          jbellis Jonathan Ellis added a comment - I think we've been over-engineering the problem. Ed was on the right track: I would personally like to see a JMX function like 'nodetool addsstable mykeyspace mycf mysstable-file' . Most people can generating and move an SSTable on their own (sstableWriter +scp) (This is, btw, the HBase bulk load approach, which despite some clunkiness does seem to solve the problem for those users.) The main drawback is that because of Cassandra's replication strategies, data from a naively-written sstable could span many nodes – even the entire cluster. So we can improve the experience a lot with a simple tool that just streams ranges from a local table to the right nodes. Since it's doing the exact thing that existing node movement needs – sending ranges from an existing sstable – it should not require any new code from Streaming. Sylvain volunteered to take a stab at this.
          Hide
          slebresne Sylvain Lebresne added a comment -

          Attaching patch that implements the "simpler" idea. It provide a new utility 'sstableloader' (a fat client basically) that given a sstable (or more) will stream the relevant parts of that sstable to the relevant nodes.

          The tool tries to be self-documented but basically you must have a sstable with -Data and -Index component (we really need a -Index component to be able to do anything) in a directory dir whose name is the keyspace and call 'sstableloader dir'.

          Alternatively, if dir seats on one of the machine of the cluster, you can simply use a JMX call with as argument the path to dir.

          Show
          slebresne Sylvain Lebresne added a comment - Attaching patch that implements the "simpler" idea. It provide a new utility 'sstableloader' (a fat client basically) that given a sstable (or more) will stream the relevant parts of that sstable to the relevant nodes. The tool tries to be self-documented but basically you must have a sstable with -Data and -Index component (we really need a -Index component to be able to do anything) in a directory dir whose name is the keyspace and call 'sstableloader dir'. Alternatively, if dir seats on one of the machine of the cluster, you can simply use a JMX call with as argument the path to dir.
          Hide
          jbellis Jonathan Ellis added a comment -

          Can we give it an optional keyspace name argument to override the directory name?

          Show
          jbellis Jonathan Ellis added a comment - Can we give it an optional keyspace name argument to override the directory name?
          Hide
          slebresne Sylvain Lebresne added a comment -

          I'd love to, but as it turns out it is fairly heavily hardwired in Descriptor that the keyspace name is the directory where the file sits. And by hardwired I mean that even if you add a constructor to Descriptor to decorrelate the ksname field from the directory argument this doesn't work, because streaming only transmit the name of the file (including the directory), not the ksname field and thus would get the wrong name.

          That is, I don't think we can do that without adding a new argument to the stream header, which felt a bit overkill at first (it's probably doable though).

          Show
          slebresne Sylvain Lebresne added a comment - I'd love to, but as it turns out it is fairly heavily hardwired in Descriptor that the keyspace name is the directory where the file sits. And by hardwired I mean that even if you add a constructor to Descriptor to decorrelate the ksname field from the directory argument this doesn't work, because streaming only transmit the name of the file (including the directory), not the ksname field and thus would get the wrong name. That is, I don't think we can do that without adding a new argument to the stream header, which felt a bit overkill at first (it's probably doable though).
          Hide
          jbellis Jonathan Ellis added a comment -

          can we move script + source to tools/ like sstabledebug?

          Show
          jbellis Jonathan Ellis added a comment - can we move script + source to tools/ like sstabledebug?
          Hide
          jbellis Jonathan Ellis added a comment - - edited

          The server performed HH to the loader:

           INFO 11:17:23,184 Node /127.0.0.2 is now part of the cluster
           INFO 11:17:23,189 InetAddress /127.0.0.2 is now UP
           INFO 11:17:29,222 Started hinted handoff for endpoint /127.0.0.2
           INFO 11:17:29,353 Finished hinted handoff of 0 rows to endpoint /127.0.0.2
          

          Shouldn't we start in client-only mode?

          Show
          jbellis Jonathan Ellis added a comment - - edited The server performed HH to the loader: INFO 11:17:23,184 Node /127.0.0.2 is now part of the cluster INFO 11:17:23,189 InetAddress /127.0.0.2 is now UP INFO 11:17:29,222 Started hinted handoff for endpoint /127.0.0.2 INFO 11:17:29,353 Finished hinted handoff of 0 rows to endpoint /127.0.0.2 Shouldn't we start in client-only mode?
          Hide
          slebresne Sylvain Lebresne added a comment -

          I didn't do it because if I'm correct the tool stuff don't go into releases (which I believe is the reason why we don't have cli, sstable2json, ... in tools). I figured that's not necessarily something we want user to grab the source to get. But I suppose we can if we want (at least the script + BulkLoader.java, I'd be in favor of leaving SSTableLoader where it is).

          Show
          slebresne Sylvain Lebresne added a comment - I didn't do it because if I'm correct the tool stuff don't go into releases (which I believe is the reason why we don't have cli, sstable2json, ... in tools). I figured that's not necessarily something we want user to grab the source to get. But I suppose we can if we want (at least the script + BulkLoader.java, I'd be in favor of leaving SSTableLoader where it is).
          Hide
          slebresne Sylvain Lebresne added a comment -
          +            outputHandler.output("Starting client and waiting 15 seconds for gossip ...");
          +            try
          +            {
          +                // Init gossip
          +                StorageService.instance.initClient();
          

          It is in client-only mode as far as I can tell. Maybe client-only mode is screwed up though, I don't know.

          Show
          slebresne Sylvain Lebresne added a comment - + outputHandler.output("Starting client and waiting 15 seconds for gossip ..."); + try + { + // Init gossip + StorageService.instance.initClient(); It is in client-only mode as far as I can tell. Maybe client-only mode is screwed up though, I don't know.
          Hide
          jbellis Jonathan Ellis added a comment -

          progress is still at 0% when it's 20% of the way done (1115 sstables to stream, server has logged creating over 200).

          also, suggest giving a little more visibility into the streaming process:

          streamed N sstables of M (P%)
          [then later]
          waiting for targets to rebuild indexes...

          Show
          jbellis Jonathan Ellis added a comment - progress is still at 0% when it's 20% of the way done (1115 sstables to stream, server has logged creating over 200). also, suggest giving a little more visibility into the streaming process: streamed N sstables of M (P%) [then later] waiting for targets to rebuild indexes...
          Hide
          jbellis Jonathan Ellis added a comment -
           INFO 11:38:45,826 InetAddress /127.0.0.2 is now dead.
           INFO 11:38:56,844 FatClient /127.0.0.2 has been silent for 30000ms, removing from gossip
          

          looks like it does recognize the client-only flag, there's just a bug there w/ HH.

          Show
          jbellis Jonathan Ellis added a comment - INFO 11:38:45,826 InetAddress /127.0.0.2 is now dead. INFO 11:38:56,844 FatClient /127.0.0.2 has been silent for 30000ms, removing from gossip looks like it does recognize the client-only flag, there's just a bug there w/ HH.
          Hide
          jbellis Jonathan Ellis added a comment -

          there's just a bug there w/ HH

          attached fix to CASSANDRA-2668

          Show
          jbellis Jonathan Ellis added a comment - there's just a bug there w/ HH attached fix to CASSANDRA-2668
          Hide
          brandon.williams Brandon Williams added a comment -

          +1 on more visibility. It'd be nice if it printed the filename and the time it took for each time, since just having the percentages reset is a bit confusing. Also, this should respect SS.RING_DELAY instead of arbitrarily choosing an amount of time to wait for gossip.

          I loaded 10M rows with stress.java defaults, then bulkloaded them from one machine to another in 75s (accounting for gossip delay.) This totaled 3.1G of data, so about 44MB/s. Conversely, it took about 15 minutes to load with stress.

          Show
          brandon.williams Brandon Williams added a comment - +1 on more visibility. It'd be nice if it printed the filename and the time it took for each time, since just having the percentages reset is a bit confusing. Also, this should respect SS.RING_DELAY instead of arbitrarily choosing an amount of time to wait for gossip. I loaded 10M rows with stress.java defaults, then bulkloaded them from one machine to another in 75s (accounting for gossip delay.) This totaled 3.1G of data, so about 44MB/s. Conversely, it took about 15 minutes to load with stress.
          Hide
          jbellis Jonathan Ellis added a comment -

          if I'm correct the tool stuff don't go into releases

          we should fix that then.

          (which I believe is the reason why we don't have cli, sstable2json, ... in tools)

          cli is more "core" than a tool. sstable2json hasn't been moved yet because my ant-fu wasn't up to the challenge (CASSANDRA-1805)

          Show
          jbellis Jonathan Ellis added a comment - if I'm correct the tool stuff don't go into releases we should fix that then. (which I believe is the reason why we don't have cli, sstable2json, ... in tools) cli is more "core" than a tool. sstable2json hasn't been moved yet because my ant-fu wasn't up to the challenge ( CASSANDRA-1805 )
          Hide
          jbellis Jonathan Ellis added a comment -

          It'd be nice if it printed the filename and the time it took for each time, since just having the percentages reset is a bit confusing

          Part of me likes that idea, but it might be dangerous to print a filename and have the user assume it's "done," when in reality it only has the data component streamed (with the rest pending rebuild on the target) so if the target is killed & reset the tmp data file will just be deleted.

          Show
          jbellis Jonathan Ellis added a comment - It'd be nice if it printed the filename and the time it took for each time, since just having the percentages reset is a bit confusing Part of me likes that idea, but it might be dangerous to print a filename and have the user assume it's "done," when in reality it only has the data component streamed (with the rest pending rebuild on the target) so if the target is killed & reset the tmp data file will just be deleted.
          Hide
          brandon.williams Brandon Williams added a comment -

          Repeated the bulkload test, this time loading into a 3 node cluster (rf=1) and it completed in about 41 seconds (gossip delay adjusted.) Approximately 77MB/s.

          Show
          brandon.williams Brandon Williams added a comment - Repeated the bulkload test, this time loading into a 3 node cluster (rf=1) and it completed in about 41 seconds (gossip delay adjusted.) Approximately 77MB/s.
          Hide
          slebresne Sylvain Lebresne added a comment -

          It'd be nice if it printed the filename and the time it took for each time, since just having the percentages reset is a bit confusing.

          The fact that the percentages reset is really just a bug (I test at first with only one sstable, my bad). Anyway, that's fixed. I also agree with Jonathan's objection about printing the filename. And in general I'm not sure giving too much information is really necessary.

          Also, this should respect SS.RING_DELAY

          Yes, I think this is the fat client that wasn't respecting it, it was waiting for an hardcoded time of 5 seconds, which is almost always not enough. I've updated SS.initClient() to use RING_DELAY instead.

          Attaching v2 that:

          • use RING_DELAY
          • update the progress indication so that percentage works. It also add for each host the number of files that should be transfered to it and how many have already been. Lastly it adds a total percentage as well as approximate transfer rate infos.
          Show
          slebresne Sylvain Lebresne added a comment - It'd be nice if it printed the filename and the time it took for each time, since just having the percentages reset is a bit confusing. The fact that the percentages reset is really just a bug (I test at first with only one sstable, my bad). Anyway, that's fixed. I also agree with Jonathan's objection about printing the filename. And in general I'm not sure giving too much information is really necessary. Also, this should respect SS.RING_DELAY Yes, I think this is the fat client that wasn't respecting it, it was waiting for an hardcoded time of 5 seconds, which is almost always not enough. I've updated SS.initClient() to use RING_DELAY instead. Attaching v2 that: use RING_DELAY update the progress indication so that percentage works. It also add for each host the number of files that should be transfered to it and how many have already been. Lastly it adds a total percentage as well as approximate transfer rate infos.
          Hide
          jbellis Jonathan Ellis added a comment -

          +1

          created CASSANDRA-2677 to optimize streaming further. (Probably conflicts with CASSANDRA-2280 so let's get that reviewed first.)

          Show
          jbellis Jonathan Ellis added a comment - +1 created CASSANDRA-2677 to optimize streaming further. (Probably conflicts with CASSANDRA-2280 so let's get that reviewed first.)
          Hide
          hudson Hudson added a comment -

          Integrated in Cassandra-0.8 #124 (See https://builds.apache.org/hudson/job/Cassandra-0.8/124/)
          Add sstable bulk loading utility
          patch by slebresne; reviewed by jbellis for CASSANDRA-1278

          slebresne : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1126477
          Files :

          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java
          • /cassandra/branches/cassandra-0.8/CHANGES.txt
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
          • /cassandra/branches/cassandra-0.8/bin/sstableloader
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java
          • /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
          Show
          hudson Hudson added a comment - Integrated in Cassandra-0.8 #124 (See https://builds.apache.org/hudson/job/Cassandra-0.8/124/ ) Add sstable bulk loading utility patch by slebresne; reviewed by jbellis for CASSANDRA-1278 slebresne : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1126477 Files : /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java /cassandra/branches/cassandra-0.8/CHANGES.txt /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java /cassandra/branches/cassandra-0.8/bin/sstableloader /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
          Hide
          slebresne Sylvain Lebresne added a comment -

          Committed

          Show
          slebresne Sylvain Lebresne added a comment - Committed
          Hide
          slebresne Sylvain Lebresne added a comment -

          Note that I'm marking this resolved since that has been committed. However, as it stands sstableloader doesn't handler failure very well (because streaming doesn't). Once CASSANDRA-2433 is committed, this can be easily improved.

          Show
          slebresne Sylvain Lebresne added a comment - Note that I'm marking this resolved since that has been committed. However, as it stands sstableloader doesn't handler failure very well (because streaming doesn't). Once CASSANDRA-2433 is committed, this can be easily improved.

            People

            • Assignee:
              slebresne Sylvain Lebresne
              Reporter:
              jeromatron Jeremy Hanna
              Reviewer:
              Jonathan Ellis
            • Votes:
              2 Vote for this issue
              Watchers:
              16 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 40h Original Estimate - 40h
                40h
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 40h 40m
                40h 40m

                  Development