Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1437965) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +34,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.HStore.ScanInfo; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.MemStore.MemStoreScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -127,9 +130,7 @@ scanner.requestSeek(matcher.getStartKey(), false, true); } } else { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } + parallelSeek(scanners, matcher.getStartKey()); } // set storeLimit @@ -166,9 +167,7 @@ scanners = selectScannersFrom(scanners); // Seek all scanners to the initial key - for(KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } + parallelSeek(scanners, matcher.getStartKey()); // Combine all seeked scanners with a heap heap = new KeyValueHeap(scanners, store.comparator); @@ -193,9 +192,7 @@ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); // Seek all scanners to the initial key - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } + parallelSeek(scanners, matcher.getStartKey()); heap = new KeyValueHeap(scanners, scanInfo.getComparator()); } @@ -511,11 +508,8 @@ * to get a limited set of scanners? We did so in the constructor and we * could have done it now by storing the scan object from the constructor */ List scanners = getScannersNoCompaction(); + parallelSeek(scanners, lastTopKey); - for(KeyValueScanner scanner : scanners) { - scanner.seek(lastTopKey); - } - // Combine all seeked scanners with a heap heap = new KeyValueHeap(scanners, store.comparator); @@ -555,6 +549,83 @@ } /** + * Do scanner.seek() in parallel + * @throws IOException + */ + private void parallelSeek(final List + scanners, final KeyValue keyValue) throws IOException { + if (scanners.isEmpty()) return; + + int storeFileScannerNum = 0; + for (KeyValueScanner scanner : scanners) { + if (scanner instanceof StoreFileScanner) { + storeFileScannerNum++; + } + } + CountDownLatch latch = new CountDownLatch(storeFileScannerNum); + List workers = + new ArrayList(storeFileScannerNum); + for (KeyValueScanner scanner : scanners) { + if (scanner instanceof StoreFileScanner) { + ScannerSeekWorker worker = new ScannerSeekWorker(scanner, keyValue, + latch, MultiVersionConsistencyControl.getThreadReadPoint()); + workers.add(worker); + } + } + for (ScannerSeekWorker worker : workers) { + worker.start(); + } + + try { + latch.await(); + } catch (InterruptedException e) { + LOG.error("", e); + } + + for (ScannerSeekWorker worker : workers) { + if (!worker.isSuccess()) throw new IOException(); + } + + for (KeyValueScanner scanner : scanners) { + if (scanner instanceof MemStoreScanner) { + scanner.seek(keyValue); + } + } + } + + private static class ScannerSeekWorker extends Thread { + KeyValueScanner scanner; + KeyValue keyValue; + CountDownLatch latch; + boolean isSuccess; + + public ScannerSeekWorker(KeyValueScanner scanner, + KeyValue keyValue, CountDownLatch latch, long readPoint) { + this.scanner = scanner; + this.keyValue = keyValue; + this.latch = latch; + isSuccess = false; + MultiVersionConsistencyControl.setThreadReadPoint(readPoint); + } + + public boolean isSuccess() { + return isSuccess; + } + + @Override + public void run() { + try { + scanner.seek(keyValue); + isSuccess = true; + } catch (IOException e) { + LOG.info("", e); + } finally { + latch.countDown(); + } + } + } + + /** * Used in testing. * @return all scanners in no particular order */