HBase
  1. HBase
  2. HBASE-8691

High-Throughput Streaming Scan API

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.95.0
    • Fix Version/s: None
    • Component/s: Scanners
    • Labels:

      Description

      I've done some working testing various ways to refactor and optimize Scans in HBase, and have found that performance can be dramatically increased by the addition of a streaming scan API. The attached code constitutes a proof of concept that shows performance increases of almost 4x in some workloads.

      I'd appreciate testing, replication, and comments. If the approach seems viable, I think such an API should be built into some future version of HBase.

      1. HRegionServlet.java
        0.3 kB
        Sandy Pratt
      2. README.txt
        8 kB
        Sandy Pratt
      3. RecordReceiver.java
        0.2 kB
        Sandy Pratt
      4. ScannerTest.java
        5 kB
        Sandy Pratt
      5. StreamHRegionServer.java
        1 kB
        Sandy Pratt
      6. StreamReceiverDirect.java
        4 kB
        Sandy Pratt
      7. StreamServletDirect.java
        3 kB
        Sandy Pratt

        Issue Links

          Activity

          Hide
          Ted Yu added a comment -

          @Sandy:
          Were there any issues uncovered from the deployment with streaming API ?

          Are you able to produce patch for trunk ?

          Cheers

          Show
          Ted Yu added a comment - @Sandy: Were there any issues uncovered from the deployment with streaming API ? Are you able to produce patch for trunk ? Cheers
          Hide
          Sandy Pratt added a comment -

          Since my last comment, I've worked out some issues around integrating the streaming scan API with Hive, and I've pushed it out on an experimental basis to a production cluster for testing. End-to-end, in a table scan situation, the streaming scan API turns out to be about 45% faster than the RPC scan API (a full table scan of my dataset took 31 about minutes with the streaming API versus about 45 minutes with the RCP API).

          Some of the tweaking I had to do to get to that point:

          • Refactored the streaming client scanner to conform to the standard AbstractClientScanner API (I had used an event-driven approach previously, where clients registered an "onMessage" interface and hit go)
          • Created a TableInputFormat/RecordReader/etc. that leverages the new API
          • Profiled my custom SerDe to iron out some surprising hotpots

          As noted earlier, I looks like performance when streaming from the RS is highly dependent on keeping the pipe saturated. Too much latency in any particular spot will cause bubbles, which kills performance. As written, my SerDe was wasting too many cycles doing date formatting and initializing HashMaps (they seem to make a system call to srand).

          Once I ironed those issues out, I did a comparison between the streaming scan API and the RPC scan API, both using the newly optimized SerDe, which is where I found the 45% performance improvement. If it's true that latency is key to performance here, that delta might go up with more modern CPUs (I have Xeon 5450s currently) as the overhead of Hive and the SerDe decrease relative to network speed.

          Show
          Sandy Pratt added a comment - Since my last comment, I've worked out some issues around integrating the streaming scan API with Hive, and I've pushed it out on an experimental basis to a production cluster for testing. End-to-end, in a table scan situation, the streaming scan API turns out to be about 45% faster than the RPC scan API (a full table scan of my dataset took 31 about minutes with the streaming API versus about 45 minutes with the RCP API). Some of the tweaking I had to do to get to that point: Refactored the streaming client scanner to conform to the standard AbstractClientScanner API (I had used an event-driven approach previously, where clients registered an "onMessage" interface and hit go) Created a TableInputFormat/RecordReader/etc. that leverages the new API Profiled my custom SerDe to iron out some surprising hotpots As noted earlier, I looks like performance when streaming from the RS is highly dependent on keeping the pipe saturated. Too much latency in any particular spot will cause bubbles, which kills performance. As written, my SerDe was wasting too many cycles doing date formatting and initializing HashMaps (they seem to make a system call to srand). Once I ironed those issues out, I did a comparison between the streaming scan API and the RPC scan API, both using the newly optimized SerDe, which is where I found the 45% performance improvement. If it's true that latency is key to performance here, that delta might go up with more modern CPUs (I have Xeon 5450s currently) as the overhead of Hive and the SerDe decrease relative to network speed.
          Hide
          Sandy Pratt added a comment -

          I noticed when doing some more testing that I'm returning only a partial Result in the stream case, but the full Result in the control cases. That means the stream result doesn't have to transfer the rowkey (x3) and a couple of timestamps. When I correct the test to return the full Result, using DataOutputBuffer/DataInputBuffer to serialize and deserialize, the stream case is about 2x faster than the RPC cases (rather than 4x faster). I think it's still worth looking at for a 2x speedup.

          Unfortunately, I can't figure out how to make Hive work with an event driven interface. Since my customers use Hive primarily, I might have to put this on the back burner for now. I hope somebody finds it useful.

          Show
          Sandy Pratt added a comment - I noticed when doing some more testing that I'm returning only a partial Result in the stream case, but the full Result in the control cases. That means the stream result doesn't have to transfer the rowkey (x3) and a couple of timestamps. When I correct the test to return the full Result, using DataOutputBuffer/DataInputBuffer to serialize and deserialize, the stream case is about 2x faster than the RPC cases (rather than 4x faster). I think it's still worth looking at for a 2x speedup. Unfortunately, I can't figure out how to make Hive work with an event driven interface. Since my customers use Hive primarily, I might have to put this on the back burner for now. I hope somebody finds it useful.
          Hide
          Lars Hofhansl added a comment -

          Sandy Pratt Need to see if that patch still applies. It wasn't doing next calls as frequent as possible, but rather interleaving client and server work (it worked together with scanner caching).

          Show
          Lars Hofhansl added a comment - Sandy Pratt Need to see if that patch still applies. It wasn't doing next calls as frequent as possible, but rather interleaving client and server work (it worked together with scanner caching).
          Hide
          Sandy Pratt added a comment -

          > I was wondering how you got a performance loss, context switching?

          I think it was probably memory and cache synchronization, but that's just a guess. It could have been due to wrapping results in protobuf (but note there was a separate thread calling next concurrently). This was in the context of the streaming servlet BTW, not a normal RPC.

          Rather than try to explain in English, the code looked like this:

          final long scannerId = hRegionServer.openScanner(regionName, scan);

          final ArrayBlockingQueue<Result[]> cache = new ArrayBlockingQueue<Result[]>(5);

          final Thread producer = new Thread() {
          @Override
          public void run() {
          try {
          while (true) {
          Result[] results = hRegionServer.next(scannerId, BATCH_SIZE);
          cache.put(results);
          if (results == null || results.length == 0)

          { break; }

          }
          } catch (Exception e)

          { throw new RuntimeException(e); }

          }
          };

          producer.start();

          long numRecords = 0;

          try {
          while (true) {
          Result[] res = cache.take();
          if (res == null)

          { EventResult.Builder eos = EventResult.newBuilder(); eos.setEndOfScan(true); eos.setNumRecords(numRecords); eos.build().writeDelimitedTo(resp.getOutputStream()); break; }

          else if (res.length == 0)

          { EventResult.Builder eor = EventResult.newBuilder(); eor.setEndOfRegion(true); eor.setNumRecords(numRecords); eor.build().writeDelimitedTo(resp.getOutputStream()); break; }

          else {
          for (Result r : res)

          { byte[] b = r.getValue(..., ...); MyPB.Builder builder = MyPB.newBuilder(); builder.mergeFrom(b); MyPB pb = builder.build(); EventResult.Builder er = EventResult.newBuilder(); er.setPbEvent(pb); er.build().writeDelimitedTo(resp.getOutputStream()); numRecords++; }

          }
          }
          } catch (InterruptedException e)

          { throw new RuntimeException(e); }

          finally

          { resp.getOutputStream().close(); }
          Show
          Sandy Pratt added a comment - > I was wondering how you got a performance loss, context switching? I think it was probably memory and cache synchronization, but that's just a guess. It could have been due to wrapping results in protobuf (but note there was a separate thread calling next concurrently). This was in the context of the streaming servlet BTW, not a normal RPC. Rather than try to explain in English, the code looked like this: final long scannerId = hRegionServer.openScanner(regionName, scan); final ArrayBlockingQueue<Result[]> cache = new ArrayBlockingQueue<Result[]>(5); final Thread producer = new Thread() { @Override public void run() { try { while (true) { Result[] results = hRegionServer.next(scannerId, BATCH_SIZE); cache.put(results); if (results == null || results.length == 0) { break; } } } catch (Exception e) { throw new RuntimeException(e); } } }; producer.start(); long numRecords = 0; try { while (true) { Result[] res = cache.take(); if (res == null) { EventResult.Builder eos = EventResult.newBuilder(); eos.setEndOfScan(true); eos.setNumRecords(numRecords); eos.build().writeDelimitedTo(resp.getOutputStream()); break; } else if (res.length == 0) { EventResult.Builder eor = EventResult.newBuilder(); eor.setEndOfRegion(true); eor.setNumRecords(numRecords); eor.build().writeDelimitedTo(resp.getOutputStream()); break; } else { for (Result r : res) { byte[] b = r.getValue(..., ...); MyPB.Builder builder = MyPB.newBuilder(); builder.mergeFrom(b); MyPB pb = builder.build(); EventResult.Builder er = EventResult.newBuilder(); er.setPbEvent(pb); er.build().writeDelimitedTo(resp.getOutputStream()); numRecords++; } } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { resp.getOutputStream().close(); }
          Hide
          Jimmy Xiang added a comment -

          and a performance loss using it on the server side.

          We have considered prefetching in both the client side and the server side, and decided to start with the server side. HBASE-8420 handles server side prefetching (ported from 0.89, but with some difference). Based on my testing using ycsb, it does have about 6.5% performance gain. If the client side spends more time with the results, the performance gain should be more due to the parallelism. I was wondering how you got a performance loss, context switching?

          Show
          Jimmy Xiang added a comment - and a performance loss using it on the server side. We have considered prefetching in both the client side and the server side, and decided to start with the server side. HBASE-8420 handles server side prefetching (ported from 0.89, but with some difference). Based on my testing using ycsb, it does have about 6.5% performance gain. If the client side spends more time with the results, the performance gain should be more due to the parallelism. I was wondering how you got a performance loss, context switching?
          Hide
          Sandy Pratt added a comment -

          Lars,

          One of the first things I tried was a producer/consumer queue with the
          idea of having a thread call next as often as possible. I found a small
          speedup from using that approach on the client side, and a performance
          loss using it on the server side. What results did you find with your
          patch?

          Sandy

          Show
          Sandy Pratt added a comment - Lars, One of the first things I tried was a producer/consumer queue with the idea of having a thread call next as often as possible. I found a small speedup from using that approach on the client side, and a performance loss using it on the server side. What results did you find with your patch? Sandy
          Hide
          Sandy Pratt added a comment -

          Enis,

          One of the things I tested before I arrived at the streaming approach is a
          producer-consumer queue on the client side, and/or on the server side. On
          the client side, using a thread to call next as often as possible showed
          some modest speedup (about 10-15% depending on scanner caching). When
          used on the server side, a P/C queue was detrimental to performance, which
          surprised me. My guess is that the overhead of synchronization is too
          much.

          Regarding the block cache, IIRC I set it to off in the Scan object in my
          code. It doesn't look like the internal scanner has any trouble keeping
          up, regardless. The main problem seemed to be the cost of my loop on the
          server side.

          Sandy

          Show
          Sandy Pratt added a comment - Enis, One of the things I tested before I arrived at the streaming approach is a producer-consumer queue on the client side, and/or on the server side. On the client side, using a thread to call next as often as possible showed some modest speedup (about 10-15% depending on scanner caching). When used on the server side, a P/C queue was detrimental to performance, which surprised me. My guess is that the overhead of synchronization is too much. Regarding the block cache, IIRC I set it to off in the Scan object in my code. It doesn't look like the internal scanner has any trouble keeping up, regardless. The main problem seemed to be the cost of my loop on the server side. Sandy
          Hide
          Lars Hofhansl added a comment -

          I hacked up a patch for the first some time ago (still using HBase RPC, but keeping it going via a 2nd thread in the ClientScanner).
          Let me see whether I can find, but the principle is simple.

          Show
          Lars Hofhansl added a comment - I hacked up a patch for the first some time ago (still using HBase RPC, but keeping it going via a 2nd thread in the ClientScanner). Let me see whether I can find, but the principle is simple.
          Hide
          Enis Soztutar added a comment -

          This looks very promising from the POC.
          It seems that we can achieve this from purely client side changes. We can do a buffer of scan results, that when the scanner is opened, a thread continuously tries to fill, while the application processes the results. However, this will probably still not be able to achieve the same performance for pure streaming from RS.
          Alternatively, for each scanner, we can open a streaming thread in RS (like the DataStreamer in hdfs) to pump data to the socket until the socket buffer is full as in this patch. Yet another thing we can try is to keep the current scan semantics, but each next() will trigger a prefetch to block cache.

          Show
          Enis Soztutar added a comment - This looks very promising from the POC. It seems that we can achieve this from purely client side changes. We can do a buffer of scan results, that when the scanner is opened, a thread continuously tries to fill, while the application processes the results. However, this will probably still not be able to achieve the same performance for pure streaming from RS. Alternatively, for each scanner, we can open a streaming thread in RS (like the DataStreamer in hdfs) to pump data to the socket until the socket buffer is full as in this patch. Yet another thing we can try is to keep the current scan semantics, but each next() will trigger a prefetch to block cache.
          Hide
          Lars Hofhansl added a comment -

          Cool! This is solely an improvement of the client/server communication, not of the internal scanner framework (which I had spent a bit of time on).

          Show
          Lars Hofhansl added a comment - Cool! This is solely an improvement of the client/server communication, not of the internal scanner framework (which I had spent a bit of time on).
          Hide
          stack added a comment -

          Correct.

          Oh, and your kvs were 200-2000 bytes.....then that would seem to indicate that I should be able to do some kv block building before putting it on the wire – as long as it didn't take too long doing prefix encoding or compressing. I will experiment. Good on you Sandy.

          Show
          stack added a comment - Correct. Oh, and your kvs were 200-2000 bytes.....then that would seem to indicate that I should be able to do some kv block building before putting it on the wire – as long as it didn't take too long doing prefix encoding or compressing. I will experiment. Good on you Sandy.
          Hide
          Sandy Pratt added a comment -

          > Were you batching up 5k kvs before writing them out on the wire?

          Correct. I haven't experimented with other scanner caching settings yet (pertaining to the InternalScanner).

          Show
          Sandy Pratt added a comment - > Were you batching up 5k kvs before writing them out on the wire? Correct. I haven't experimented with other scanner caching settings yet (pertaining to the InternalScanner).
          Hide
          stack added a comment -

          ...but I don't know how to do that yet, and it didn't seem critical to validating my performance hypothesis.

          Makes sense. Nice hack starting Server and getting it going as a Servlet.

          ...but I did find that it's critical to do as little encoding of the stream as possible.

          This is interesting. Compressing or doing prefix encoding, we will want to put KVs together in blocks of 32k or so. Your findings that....

          ...but I was surprised at just how few there actually are. I would have thought there was time to muck around with protobuf, but no.

          ... would seem to indicate that composing the blocks of prefix-encoded or compressed kvs would put us back to the send-pause-send-pause step function.

          But what do you mean when you say this:

          I tested with scan caching 5000 and scan batch 5000

          Were you batching up 5k kvs before writing them out on the wire?

          As is, our rpc is not amenable at all to streaming. There is one call and then it has a single result (or error). Both call and result have their total size as effectively the first thing we transmit. Introducing a protocol where size is not known and the results come in until an End-of-Stream marker is sent will be interesting to interweave into what we currently have; maybe it would be better to do as you do and just do new protocol over another port. Let me take a looksee.

          Good on you Sandy.

          Show
          stack added a comment - ...but I don't know how to do that yet, and it didn't seem critical to validating my performance hypothesis. Makes sense. Nice hack starting Server and getting it going as a Servlet. ...but I did find that it's critical to do as little encoding of the stream as possible. This is interesting. Compressing or doing prefix encoding, we will want to put KVs together in blocks of 32k or so. Your findings that.... ...but I was surprised at just how few there actually are. I would have thought there was time to muck around with protobuf, but no. ... would seem to indicate that composing the blocks of prefix-encoded or compressed kvs would put us back to the send-pause-send-pause step function. But what do you mean when you say this: I tested with scan caching 5000 and scan batch 5000 Were you batching up 5k kvs before writing them out on the wire? As is, our rpc is not amenable at all to streaming. There is one call and then it has a single result (or error). Both call and result have their total size as effectively the first thing we transmit. Introducing a protocol where size is not known and the results come in until an End-of-Stream marker is sent will be interesting to interweave into what we currently have; maybe it would be better to do as you do and just do new protocol over another port. Let me take a looksee. Good on you Sandy.
          Hide
          James Taylor added a comment -

          Interesting ideas, Sandy. We'd love to leverage this down the road in Phoenix (https://github.com/forcedotcom/phoenix). As a simple, first use case, we'd love to get streaming support for BLOB, CLOB types. If you want a place for your alternate client API to live, take a look at Phoenix.

          Show
          James Taylor added a comment - Interesting ideas, Sandy. We'd love to leverage this down the road in Phoenix ( https://github.com/forcedotcom/phoenix ). As a simple, first use case, we'd love to get streaming support for BLOB, CLOB types. If you want a place for your alternate client API to live, take a look at Phoenix.
          Hide
          Ted Yu added a comment -

          I see.

          Thanks Sandy.

          Show
          Ted Yu added a comment - I see. Thanks Sandy.
          Hide
          Sandy Pratt added a comment -

          Ted,

          It's a place-holder for a client-specified decode of the cell. I arguably
          should have made it a byte array, but the result I reported used protobuf
          serialization, so I changed the name to a place holder and left it at
          that.

          If you change RecordReceiver to this:

          package org.apache.hadoop.hbase.client;

          public interface RecordReceiver

          { public int getNumScanned(); public void receive(byte[] msg); }

          then the rest should work itself out.

          Does that answer your question?

          Sandy

          Show
          Sandy Pratt added a comment - Ted, It's a place-holder for a client-specified decode of the cell. I arguably should have made it a byte array, but the result I reported used protobuf serialization, so I changed the name to a place holder and left it at that. If you change RecordReceiver to this: package org.apache.hadoop.hbase.client; public interface RecordReceiver { public int getNumScanned(); public void receive(byte[] msg); } then the rest should work itself out. Does that answer your question? Sandy
          Hide
          Sandy Pratt added a comment -

          Stack,

          Perfectly normal questions that I should have addressed in the initial
          post.

          I used the servlet as an expedient way of adding an API to HBase without
          taking the time to fully understand how HRegionServer uses its associated
          RPC server. I do think that a streaming scan API should be added to the
          normal HRegionServer interface, but I don't know how to do that yet, and
          it didn't seem critical to validating my performance hypothesis. I also
          wanted to make sure that there's no point where we wait for the full
          result before starting to return to the client.

          I'm not familiar with the work you're referring to about framing of
          results, but I did find that it's critical to do as little encoding of the
          stream as possible. For example, I tried one approach where I
          deserialized the cell on the server, then re-encapsulated it and send it
          down to the client. That was apparently too much work in a tight loop,
          and my performance wasn't much better that with a normal scan. Using the
          length-encoded byte stream had a huge impact on performance for me.
          Obviously there's only so many cycles to spend between getting the result
          from the InternalScanner and putting it on the wire before you start
          starving the pipe to the client, but I was surprised at just how few there
          actually are. I would have thought there was time to muck around with
          protobuf, but no.

          One thing I left on the table here is pushing the output stream down to
          InternalScanner so that it can stream results directly to the client. As
          is, it marshals a batch and then puts them on the wire (I tested with scan
          caching 5000 and scan batch 5000). That's potentially inefficient, I
          think.

          Sandy

          Show
          Sandy Pratt added a comment - Stack, Perfectly normal questions that I should have addressed in the initial post. I used the servlet as an expedient way of adding an API to HBase without taking the time to fully understand how HRegionServer uses its associated RPC server. I do think that a streaming scan API should be added to the normal HRegionServer interface, but I don't know how to do that yet, and it didn't seem critical to validating my performance hypothesis. I also wanted to make sure that there's no point where we wait for the full result before starting to return to the client. I'm not familiar with the work you're referring to about framing of results, but I did find that it's critical to do as little encoding of the stream as possible. For example, I tried one approach where I deserialized the cell on the server, then re-encapsulated it and send it down to the client. That was apparently too much work in a tight loop, and my performance wasn't much better that with a normal scan. Using the length-encoded byte stream had a huge impact on performance for me. Obviously there's only so many cycles to spend between getting the result from the InternalScanner and putting it on the wire before you start starving the pipe to the client, but I was surprised at just how few there actually are. I would have thought there was time to muck around with protobuf, but no. One thing I left on the table here is pushing the output stream down to InternalScanner so that it can stream results directly to the client. As is, it marshals a batch and then puts them on the wire (I tested with scan caching 5000 and scan batch 5000). That's potentially inefficient, I think. Sandy
          Hide
          Ted Yu added a comment -
          YourProtobuf pb = YourProtobuf.newBuilder().mergeFrom(b).build();
          

          Mind showing us YourProtobuf class ?

          Thanks

          Show
          Ted Yu added a comment - YourProtobuf pb = YourProtobuf.newBuilder().mergeFrom(b).build(); Mind showing us YourProtobuf class ? Thanks
          Hide
          stack added a comment -

          Thanks for looking into this Sandy.

          Now come the dumb questions.

          You did it as servlet just because this was easiest way of putting up a new socket on a regionserver over which you could do this new streaming protocol?

          Similarily, regionserver already has an Http Server instance to which we could mount the new servlet but it was just expediency that has you create the Server in StreamHRegionServer?

          (Pardon the dumb assertions above – just trying to make sure I can grok better what is going on)

          I need to measure what happens when we put our new framing of results – where we send a pb of metadata followed by blocks of KVs – over your stream. My guess is we should see same speedup (only using blocks of kvs, we can have compressed/prefix-encoded blocks of kvs on the wire) even though there will be some "stutter" while we compose the cellblocks server-side. Hopefully the stutter won't be noticed – as long as we keep the stream filled w/ data.

          This is great.

          Show
          stack added a comment - Thanks for looking into this Sandy. Now come the dumb questions. You did it as servlet just because this was easiest way of putting up a new socket on a regionserver over which you could do this new streaming protocol? Similarily, regionserver already has an Http Server instance to which we could mount the new servlet but it was just expediency that has you create the Server in StreamHRegionServer? (Pardon the dumb assertions above – just trying to make sure I can grok better what is going on) I need to measure what happens when we put our new framing of results – where we send a pb of metadata followed by blocks of KVs – over your stream. My guess is we should see same speedup (only using blocks of kvs, we can have compressed/prefix-encoded blocks of kvs on the wire) even though there will be some "stutter" while we compose the cellblocks server-side. Hopefully the stutter won't be noticed – as long as we keep the stream filled w/ data. This is great.
          Hide
          Sandy Pratt added a comment -

          See README for details on how to apply this code to an existing environment.

          Show
          Sandy Pratt added a comment - See README for details on how to apply this code to an existing environment.

            People

            • Assignee:
              Sandy Pratt
              Reporter:
              Sandy Pratt
            • Votes:
              0 Vote for this issue
              Watchers:
              26 Start watching this issue

              Dates

              • Created:
                Updated:

                Development