HBase
  1. HBase
  2. HBASE-5979

Non-pread DFSInputStreams should be associated with scanners, not HFile.Readers

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Performance, regionserver
    • Labels:
      None

      Description

      Currently, every HFile.Reader has a single DFSInputStream, which it uses to service all gets and scans. For gets, we use the positional read API (aka "pread") and for scans we use a synchronized block to seek, then read. The advantage of pread is that it doesn't hold any locks, so multiple gets can proceed at the same time. The advantage of seek+read for scans is that the datanode starts to send the entire rest of the HDFS block, rather than just the single hfile block necessary. So, in a single thread, pread is faster for gets, and seek+read is faster for scans since you get a strong pipelining effect.

      However, in a multi-threaded case where there are multiple scans (including scans which are actually part of compactions), the seek+read strategy falls apart, since only one scanner may be reading at a time. Additionally, a large amount of wasted IO is generated on the datanode side, and we get none of the earlier-mentioned advantages.

      In one test, I switched scans to always use pread, and saw a 5x improvement in throughput of the YCSB scan-only workload, since it previously was completely blocked by contention on the DFSIS lock.

        Issue Links

          Activity

          Hide
          Todd Lipcon added a comment -

          My thinking is that the solution is something like this:

          When any scanner starts, it begins by using the "pread" API for the first N hfile blocks it reads. This allows short scans, which can often fall entirely within one or two HFile blocks, to avoid the read amplification of doing a DFSInputStream seek.

          After a scanner has read several blocks from an HFile, it switches over to the seek+read mode. However, it does this with its own input stream. This way, all of the pre-buffering that happens through the HDFS layer will benefit it, and it doesn't have to contend with other scans. This should improve performance of long scans in the presence of contention (eg scans + compactions or multiple longer scans within the same region). The actual input streams would thus become owned by the individual HFileScanners.

          Not sure if I'll have time to prototype a patch for this any time soon, but happy to help review ideas.

          Show
          Todd Lipcon added a comment - My thinking is that the solution is something like this: When any scanner starts, it begins by using the "pread" API for the first N hfile blocks it reads. This allows short scans, which can often fall entirely within one or two HFile blocks, to avoid the read amplification of doing a DFSInputStream seek. After a scanner has read several blocks from an HFile, it switches over to the seek+read mode. However, it does this with its own input stream. This way, all of the pre-buffering that happens through the HDFS layer will benefit it, and it doesn't have to contend with other scans. This should improve performance of long scans in the presence of contention (eg scans + compactions or multiple longer scans within the same region). The actual input streams would thus become owned by the individual HFileScanners. Not sure if I'll have time to prototype a patch for this any time soon, but happy to help review ideas.
          Hide
          Kannan Muthukkaruppan added a comment -

          Todd: Nice catch! Your suggestion makes sense.

          Show
          Kannan Muthukkaruppan added a comment - Todd: Nice catch! Your suggestion makes sense.
          Hide
          Todd Lipcon added a comment -

          Kannan: I tried to work on this a bit in my spare time, but didn't get very far. So if FB folks have cycles to work on it, that would be awesome!

          I think one route is to do like I suggested above and have the StoreFileScanners hold a DFSInputStream. Another option would be to make a wrapper FileSystem (or FSReader) which pools a few streams. Then change the scanners to always issue positional reads, and have the wrapper code look for any stream which is already seeked to the right position (or just before the right position). The advantage of this technique is that we'd end up getting the same sequential read benefit, even if the user was issuing normal get() calls in ascending row order.

          Show
          Todd Lipcon added a comment - Kannan: I tried to work on this a bit in my spare time, but didn't get very far. So if FB folks have cycles to work on it, that would be awesome! I think one route is to do like I suggested above and have the StoreFileScanners hold a DFSInputStream. Another option would be to make a wrapper FileSystem (or FSReader) which pools a few streams. Then change the scanners to always issue positional reads, and have the wrapper code look for any stream which is already seeked to the right position (or just before the right position). The advantage of this technique is that we'd end up getting the same sequential read benefit, even if the user was issuing normal get() calls in ascending row order.
          Hide
          Kannan Muthukkaruppan added a comment -

          Todd: If we always use positional reads, we don't the benefit of HDFS sending the rest of the HDFS block, correct? So I didn't quite catch your recent suggestion. Did you mean, issue positional reads, but explicitly read a much larger chunk (in the Scan case) than just the current block?

          Show
          Kannan Muthukkaruppan added a comment - Todd: If we always use positional reads, we don't the benefit of HDFS sending the rest of the HDFS block, correct? So I didn't quite catch your recent suggestion. Did you mean, issue positional reads, but explicitly read a much larger chunk (in the Scan case) than just the current block?
          Hide
          Todd Lipcon added a comment -

          Hey Kannan,

          Sorry, let me elaborate on that suggestion:

          The idea is to make a new FSReader implementation, which only has one API. That API would look like the current positional read call (i.e take a position and length).

          Internally, it would have a pool of cached DFSInputStreams, and remember the position for each of them. Each of the input streams would be referencing the same file. When a read request comes in, it is matched against the pooled streams: if it is within N bytes forward from the current position of one of the streams, then a seek and read would be issued, synchronized on that stream. Otherwise, any random stream would be chosen and a position read would be chosen. Separately, we can track the last N positional reads: if we detect a sequential pattern in the position reads, we can take one of the pooled input streams and seek to the next predicted offset, so that future reads get the sequential benefit.

          Show
          Todd Lipcon added a comment - Hey Kannan, Sorry, let me elaborate on that suggestion: The idea is to make a new FSReader implementation, which only has one API. That API would look like the current positional read call (i.e take a position and length). Internally, it would have a pool of cached DFSInputStreams, and remember the position for each of them. Each of the input streams would be referencing the same file. When a read request comes in, it is matched against the pooled streams: if it is within N bytes forward from the current position of one of the streams, then a seek and read would be issued, synchronized on that stream. Otherwise, any random stream would be chosen and a position read would be chosen. Separately, we can track the last N positional reads: if we detect a sequential pattern in the position reads, we can take one of the pooled input streams and seek to the next predicted offset, so that future reads get the sequential benefit.
          Hide
          Lars Hofhansl added a comment -

          Can we use a scan's cacheBlocks flag as an indicator for whether it would benefit from its own input stream? Presumably only large scans would have caching disabled.

          HBASE-7336 has a simple adhoc fix for the truly terrible read performance when two long running scanners scan the same store file.

          Show
          Lars Hofhansl added a comment - Can we use a scan's cacheBlocks flag as an indicator for whether it would benefit from its own input stream? Presumably only large scans would have caching disabled. HBASE-7336 has a simple adhoc fix for the truly terrible read performance when two long running scanners scan the same store file.
          Hide
          Lars Hofhansl added a comment -

          I think the most important aspect is to allow the OS to do its disc scheduling. Right now only one thread can do seek+read on the same file, and hence other threads need to wait (before HBASE-7336 at least), which in turn leads to somewhat of a worst case behavior... The head is moved to one location, when that request is done the next threads comes in moved it a different location, then another thread comes in, etc, etc. In the end each threads eats the latency of the seek time.
          If these request work in parallel the OS can do its job (elevator or whatever is configured) to optimize head movements.

          Show
          Lars Hofhansl added a comment - I think the most important aspect is to allow the OS to do its disc scheduling. Right now only one thread can do seek+read on the same file, and hence other threads need to wait (before HBASE-7336 at least), which in turn leads to somewhat of a worst case behavior... The head is moved to one location, when that request is done the next threads comes in moved it a different location, then another thread comes in, etc, etc. In the end each threads eats the latency of the seek time. If these request work in parallel the OS can do its job (elevator or whatever is configured) to optimize head movements.
          Hide
          stack added a comment -

          The nice thing about the original Todd suggestion is that we can be preading while we are opening the scanner's own file. Once a successful open, we can then switch to read from the scanner's own reader.

          Scanner will more often than not have to open multiple files – as many as there are under the store/column family.

          Things get interesting around flush and compaction. Scanner could listen for changes and adjust accordingly. Might have to slow to pread speeds for a while it did new opens.

          Show
          stack added a comment - The nice thing about the original Todd suggestion is that we can be preading while we are opening the scanner's own file. Once a successful open, we can then switch to read from the scanner's own reader. Scanner will more often than not have to open multiple files – as many as there are under the store/column family. Things get interesting around flush and compaction. Scanner could listen for changes and adjust accordingly. Might have to slow to pread speeds for a while it did new opens.
          Hide
          Vladimir Rodionov added a comment -

          The advantage of pread is that it doesn't hold any locks, so multiple gets can proceed at the same time.

          Interesting .. This is th code snippet from FSInputStream (Hadoop 2.2)

            @Override
            public int read(long position, byte[] buffer, int offset, int length)
              throws IOException {
              synchronized (this) {
                long oldPos = getPos();
                int nread = -1;
                try {
                  seek(position);
                  nread = read(buffer, offset, length);
                } finally {
                  seek(oldPos);
                }
                return nread;
              }
            }
          

          Positional read is implemented with a lock. DFSInputStream extends the above class and does not override the default implementation. You have no parallelism even for simple gets.

          Show
          Vladimir Rodionov added a comment - The advantage of pread is that it doesn't hold any locks, so multiple gets can proceed at the same time. Interesting .. This is th code snippet from FSInputStream (Hadoop 2.2) @Override public int read( long position, byte [] buffer, int offset, int length) throws IOException { synchronized ( this ) { long oldPos = getPos(); int nread = -1; try { seek(position); nread = read(buffer, offset, length); } finally { seek(oldPos); } return nread; } } Positional read is implemented with a lock . DFSInputStream extends the above class and does not override the default implementation. You have no parallelism even for simple gets.
          Hide
          Todd Lipcon added a comment -

          DFSInputStream extends the above class and does not override the default implementation. You have no parallelism even for simple gets.

          That's not true – DFSInputStream definitely overrides the positional read method and provides parallelism.

            @Override
            public int read(long position, byte[] buffer, int offset, int length)
              throws IOException {
              // sanity checks
              dfsClient.checkOpen();
              ...
          

          (and it has since at least 5 years ago when I started working on Hadoop, as far as I can remember)

          Show
          Todd Lipcon added a comment - DFSInputStream extends the above class and does not override the default implementation. You have no parallelism even for simple gets. That's not true – DFSInputStream definitely overrides the positional read method and provides parallelism. @Override public int read( long position, byte [] buffer, int offset, int length) throws IOException { // sanity checks dfsClient.checkOpen(); ... (and it has since at least 5 years ago when I started working on Hadoop, as far as I can remember)
          Hide
          Vladimir Rodionov added a comment -

          My bad. You are right.

          Show
          Vladimir Rodionov added a comment - My bad. You are right.

            People

            • Assignee:
              Unassigned
              Reporter:
              Todd Lipcon
            • Votes:
              1 Vote for this issue
              Watchers:
              20 Start watching this issue

              Dates

              • Created:
                Updated:

                Development