Direct Streaming Scanner Proof of Concept Intro This code is a proof of concept for a streaming scanner API. It should not be used as-is in production, since: - It hasn't been tested. - It isn't integrated with security. - Scans aren't fully parameterized. - Scans only work against single-cell records (e.g. if you're encoded your data into a single cell using protobuf or some other serialization library, you're okay; otherwise, you're not). This code is oriented towards testing the hypothesis that HBase scans perform about 5x slower than HDFS scans due to inefficiencies in the HBase scan API, particularly in that the HBase scan API uses RPC and typically does not come close to saturating the IO system, both between disk and region server, and between region server and client. This code addresses these concerns by refactoring the scan API into a streaming/event- driven style, and by aggressively optimizing the code path for certain types of scans. Code Here's how to read and test the code. StreamHRegionServer.java is a subclass of HRegionServer. It looks up and instantiates a servlet to expose the new scanning API. It provides the API servlet with a reference to itself. The StreamHRegionServer can be swapped into existing installations by setting the following properties in hbase- site.xml: hregionservlet.class org.apache.hadoop.hbase.regionserver.StreamServletDi rect hbase.regionserver.impl org.apache.hadoop.hbase.regionserver.StreamHRegionSe rver HRegionServlet.java is an interface that allows a servlet subclass to hold a reference to HRegionServer. StreamServletDirect.java handles calls to the streaming scan API. The general gist is to decode Scan parameters from the HTTP request, open a scanner via the HRegionServer interface, then shove results down the response stream as quickly as possible. To avoid any unnecessary processing in the read loop, a CodedOuputStream is used where the bytes of the HBase cell are pushed directly into the response stream, prefixed with the length of the record. Note that the current implementation works only for a single cell per row, and the family and qualifier are hard coded. It's easy to imagine the API being extended to specify columns in the parameters and encode multiple columns into the output stream (although the receiving end would need to know how to parse out the columns in the correct order). I tried encoding the result in it's own Protocol Buffer message, but found that the overhead of processing the protobufs was too high, which resulted in the servlet not being able to saturate the IO system. StreamReceiverDirect.java reads the output stream from the servlet. It also knows (at a basic level) how to walk the chain of regions in the table and call the streaming scanner API on each region server in turn. Note that in the current implementation, the receiver needs to know how to build your protobuf record. ScannerTest.java is a driver that runs several iterations of scans using two different implementations for the sake of comparison. Results I tested the code against a two-node cluster running on virtual hardware. The records are single-cell atoms (essentially log records with about a dozen fields, encoded with protobuf) in roughly the 200-2000 byte range. HBase is tuned in a slightly non-standard way, with 1 GB HDFS blocks, 1 MB HFile blocks, and block cache size of 0.1. Block cache is disabled for the tables I tested against. Row keys are UUIDs in one table, and UUIDs prefixed with a time string in another. Here are results from some of my sample runs: 13/06/04 22:49:27 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.ClientScanner 13/06/04 22:51:42 INFO mortbay.log: Scanned 7166450 records in 135.326000 sec, 52956.933627 records/sec 13/06/04 22:51:42 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.PipelineScanner 13/06/04 22:53:41 INFO mortbay.log: Scanned 7166450 records in 118.830000 sec, 60308.423799 records/sec 13/06/04 22:53:41 INFO mortbay.log: Scanning with direct stream receiver 13/06/04 22:54:15 INFO client.StreamReceiverDirect: End of scan reached; currRow: f0, stopRow: f 13/06/04 22:54:15 INFO mortbay.log: Scanned 7166450 records in 34.193000 sec, 209588.219811 records/sec 13/06/04 22:54:15 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.ClientScanner 13/06/04 22:56:25 INFO mortbay.log: Scanned 7166450 records in 129.743000 sec, 55235.735261 records/sec 13/06/04 22:56:25 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.PipelineScanner 13/06/04 22:58:24 INFO mortbay.log: Scanned 7166450 records in 118.902000 sec, 60271.904594 records/sec 13/06/04 22:58:24 INFO mortbay.log: Scanning with direct stream receiver 13/06/04 22:58:58 INFO client.StreamReceiverDirect: End of scan reached; currRow: f0, stopRow: f 13/06/04 22:58:58 INFO mortbay.log: Scanned 7166450 records in 33.952000 sec, 211075.930726 records/sec 13/06/04 22:58:58 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.ClientScanner 13/06/04 23:01:09 INFO mortbay.log: Scanned 7166450 records in 130.902000 sec, 54746.680723 records/sec 13/06/04 23:01:09 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.PipelineScanner 13/06/04 23:03:07 INFO mortbay.log: Scanned 7166450 records in 117.956000 sec, 60755.281630 records/sec 13/06/04 23:03:07 INFO mortbay.log: Scanning with direct stream receiver 13/06/04 23:03:40 INFO client.StreamReceiverDirect: End of scan reached; currRow: f0, stopRow: f 13/06/04 23:03:40 INFO mortbay.log: Scanned 7166450 records in 33.321000 sec, 215073.077039 records/sec ... 13/06/04 22:43:33 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.ClientScanner 13/06/04 22:44:24 INFO mortbay.log: Scanned 2897152 records in 51.861000 sec, 55863.789746 records/sec 13/06/04 22:44:24 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.PipelineScanner 13/06/04 22:45:11 INFO mortbay.log: Scanned 2897152 records in 47.273000 sec, 61285.554122 records/sec 13/06/04 22:45:11 INFO mortbay.log: Scanning with direct stream receiver 13/06/04 22:45:24 INFO client.StreamReceiverDirect: End of scan reached; currRow: 2013-02-24-00, stopRow: 2013-02 13/06/04 22:45:24 INFO mortbay.log: Scanned 2897152 records in 12.989000 sec, 223046.577874 records/sec 13/06/04 22:45:24 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.ClientScanner 13/06/04 22:46:12 INFO mortbay.log: Scanned 2897152 records in 47.934000 sec, 60440.438937 records/sec 13/06/04 22:46:12 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.PipelineScanner 13/06/04 22:46:59 INFO mortbay.log: Scanned 2897152 records in 46.928000 sec, 61736.106376 records/sec 13/06/04 22:46:59 INFO mortbay.log: Scanning with direct stream receiver 13/06/04 22:47:11 INFO client.StreamReceiverDirect: End of scan reached; currRow: 2013-02-24-00, stopRow: 2013-02 13/06/04 22:47:11 INFO mortbay.log: Scanned 2897152 records in 12.626000 sec, 229459.211152 records/sec 13/06/04 22:47:11 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.ClientScanner 13/06/04 22:48:00 INFO mortbay.log: Scanned 2897152 records in 48.769000 sec, 59405.606020 records/sec 13/06/04 22:48:00 INFO mortbay.log: Scanning with class org.apache.hadoop.hbase.client.PipelineScanner 13/06/04 22:48:47 INFO mortbay.log: Scanned 2897152 records in 47.277000 sec, 61280.368890 records/sec 13/06/04 22:48:47 INFO mortbay.log: Scanning with direct stream receiver 13/06/04 22:49:00 INFO client.StreamReceiverDirect: End of scan reached; currRow: 2013-02-24-00, stopRow: 2013-02 13/06/04 22:49:00 INFO mortbay.log: Scanned 2897152 records in 12.804000 sec, 226269.290847 records/sec Conclusion The initial results show an encouraging speed-up for simple scans that meet the entrance criteria. More testing is obviously needed. I'd appreciate anyone else taking the time to test and replicate. You can contact me with any questions or comments: Sandy Pratt prattrs at adobe