Direct Streaming Scanner Proof of Concept
Intro
This code is a proof of concept for a streaming scanner API. It
should not be used as-is in production, since:
- It hasn't been tested.
- It isn't integrated with security.
- Scans aren't fully parameterized.
- Scans only work against single-cell records (e.g. if you're
encoded your data into a single cell using protobuf or some
other serialization library, you're okay; otherwise, you're
not).
This code is oriented towards testing the hypothesis that HBase
scans perform about 5x slower than HDFS scans due to
inefficiencies in the HBase scan API, particularly in that the
HBase scan API uses RPC and typically does not come close to
saturating the IO system, both between disk and region server,
and between region server and client. This code addresses these
concerns by refactoring the scan API into a streaming/event-
driven style, and by aggressively optimizing the code path for
certain types of scans.
Code
Here's how to read and test the code.
StreamHRegionServer.java is a subclass of HRegionServer. It
looks up and instantiates a servlet to expose the new scanning
API. It provides the API servlet with a reference to itself.
The StreamHRegionServer can be swapped into existing
installations by setting the following properties in hbase-
site.xml:
hregionservlet.class
org.apache.hadoop.hbase.regionserver.StreamServletDi
rect
hbase.regionserver.impl
org.apache.hadoop.hbase.regionserver.StreamHRegionSe
rver
HRegionServlet.java is an interface that allows a servlet
subclass to hold a reference to HRegionServer.
StreamServletDirect.java handles calls to the streaming scan
API. The general gist is to decode Scan parameters from the
HTTP request, open a scanner via the HRegionServer interface,
then shove results down the response stream as quickly as
possible. To avoid any unnecessary processing in the read loop,
a CodedOuputStream is used where the bytes of the HBase cell are
pushed directly into the response stream, prefixed with the
length of the record. Note that the current implementation
works only for a single cell per row, and the family and
qualifier are hard coded. It's easy to imagine the API being
extended to specify columns in the parameters and encode
multiple columns into the output stream (although the receiving
end would need to know how to parse out the columns in the
correct order). I tried encoding the result in it's own
Protocol Buffer message, but found that the overhead of
processing the protobufs was too high, which resulted in the
servlet not being able to saturate the IO system.
StreamReceiverDirect.java reads the output stream from the
servlet. It also knows (at a basic level) how to walk the chain
of regions in the table and call the streaming scanner API on
each region server in turn. Note that in the current
implementation, the receiver needs to know how to build your
protobuf record.
ScannerTest.java is a driver that runs several iterations of
scans using two different implementations for the sake of
comparison.
Results
I tested the code against a two-node cluster running on virtual
hardware. The records are single-cell atoms (essentially log
records with about a dozen fields, encoded with protobuf) in
roughly the 200-2000 byte range. HBase is tuned in a slightly
non-standard way, with 1 GB HDFS blocks, 1 MB HFile blocks, and
block cache size of 0.1. Block cache is disabled for the tables
I tested against. Row keys are UUIDs in one table, and UUIDs
prefixed with a time string in another. Here are results from
some of my sample runs:
13/06/04 22:49:27 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.ClientScanner
13/06/04 22:51:42 INFO mortbay.log: Scanned 7166450 records in
135.326000 sec, 52956.933627 records/sec
13/06/04 22:51:42 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.PipelineScanner
13/06/04 22:53:41 INFO mortbay.log: Scanned 7166450 records in
118.830000 sec, 60308.423799 records/sec
13/06/04 22:53:41 INFO mortbay.log: Scanning with direct stream
receiver
13/06/04 22:54:15 INFO client.StreamReceiverDirect: End of scan
reached; currRow: f0, stopRow: f
13/06/04 22:54:15 INFO mortbay.log: Scanned 7166450 records in
34.193000 sec, 209588.219811 records/sec
13/06/04 22:54:15 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.ClientScanner
13/06/04 22:56:25 INFO mortbay.log: Scanned 7166450 records in
129.743000 sec, 55235.735261 records/sec
13/06/04 22:56:25 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.PipelineScanner
13/06/04 22:58:24 INFO mortbay.log: Scanned 7166450 records in
118.902000 sec, 60271.904594 records/sec
13/06/04 22:58:24 INFO mortbay.log: Scanning with direct stream
receiver
13/06/04 22:58:58 INFO client.StreamReceiverDirect: End of scan
reached; currRow: f0, stopRow: f
13/06/04 22:58:58 INFO mortbay.log: Scanned 7166450 records in
33.952000 sec, 211075.930726 records/sec
13/06/04 22:58:58 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.ClientScanner
13/06/04 23:01:09 INFO mortbay.log: Scanned 7166450 records in
130.902000 sec, 54746.680723 records/sec
13/06/04 23:01:09 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.PipelineScanner
13/06/04 23:03:07 INFO mortbay.log: Scanned 7166450 records in
117.956000 sec, 60755.281630 records/sec
13/06/04 23:03:07 INFO mortbay.log: Scanning with direct stream
receiver
13/06/04 23:03:40 INFO client.StreamReceiverDirect: End of scan
reached; currRow: f0, stopRow: f
13/06/04 23:03:40 INFO mortbay.log: Scanned 7166450 records in
33.321000 sec, 215073.077039 records/sec
...
13/06/04 22:43:33 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.ClientScanner
13/06/04 22:44:24 INFO mortbay.log: Scanned 2897152 records in
51.861000 sec, 55863.789746 records/sec
13/06/04 22:44:24 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.PipelineScanner
13/06/04 22:45:11 INFO mortbay.log: Scanned 2897152 records in
47.273000 sec, 61285.554122 records/sec
13/06/04 22:45:11 INFO mortbay.log: Scanning with direct stream
receiver
13/06/04 22:45:24 INFO client.StreamReceiverDirect: End of scan
reached; currRow: 2013-02-24-00, stopRow: 2013-02
13/06/04 22:45:24 INFO mortbay.log: Scanned 2897152 records in
12.989000 sec, 223046.577874 records/sec
13/06/04 22:45:24 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.ClientScanner
13/06/04 22:46:12 INFO mortbay.log: Scanned 2897152 records in
47.934000 sec, 60440.438937 records/sec
13/06/04 22:46:12 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.PipelineScanner
13/06/04 22:46:59 INFO mortbay.log: Scanned 2897152 records in
46.928000 sec, 61736.106376 records/sec
13/06/04 22:46:59 INFO mortbay.log: Scanning with direct stream
receiver
13/06/04 22:47:11 INFO client.StreamReceiverDirect: End of scan
reached; currRow: 2013-02-24-00, stopRow: 2013-02
13/06/04 22:47:11 INFO mortbay.log: Scanned 2897152 records in
12.626000 sec, 229459.211152 records/sec
13/06/04 22:47:11 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.ClientScanner
13/06/04 22:48:00 INFO mortbay.log: Scanned 2897152 records in
48.769000 sec, 59405.606020 records/sec
13/06/04 22:48:00 INFO mortbay.log: Scanning with class
org.apache.hadoop.hbase.client.PipelineScanner
13/06/04 22:48:47 INFO mortbay.log: Scanned 2897152 records in
47.277000 sec, 61280.368890 records/sec
13/06/04 22:48:47 INFO mortbay.log: Scanning with direct stream
receiver
13/06/04 22:49:00 INFO client.StreamReceiverDirect: End of scan
reached; currRow: 2013-02-24-00, stopRow: 2013-02
13/06/04 22:49:00 INFO mortbay.log: Scanned 2897152 records in
12.804000 sec, 226269.290847 records/sec
Conclusion
The initial results show an encouraging speed-up for simple
scans that meet the entrance criteria. More testing is
obviously needed. I'd appreciate anyone else taking the time to
test and replicate. You can contact me with any questions or
comments:
Sandy Pratt
prattrs at adobe