Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1437340) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -127,9 +128,7 @@ scanner.requestSeek(matcher.getStartKey(), false, true); } } else { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } + parallelSeek(scanners, matcher.getStartKey()); } // set storeLimit @@ -166,9 +165,7 @@ scanners = selectScannersFrom(scanners); // Seek all scanners to the initial key - for(KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } + parallelSeek(scanners, matcher.getStartKey()); // Combine all seeked scanners with a heap heap = new KeyValueHeap(scanners, store.comparator); @@ -193,9 +190,7 @@ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); // Seek all scanners to the initial key - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } + parallelSeek(scanners, matcher.getStartKey()); heap = new KeyValueHeap(scanners, scanInfo.getComparator()); } @@ -511,11 +506,8 @@ * to get a limited set of scanners? We did so in the constructor and we * could have done it now by storing the scan object from the constructor */ List scanners = getScannersNoCompaction(); + parallelSeek(scanners, lastTopKey); - for(KeyValueScanner scanner : scanners) { - scanner.seek(lastTopKey); - } - // Combine all seeked scanners with a heap heap = new KeyValueHeap(scanners, store.comparator); @@ -555,6 +547,66 @@ } /** + * Do scanner.seek() in parallel + * @throws IOException + */ + private void parallelSeek(final List + scanners, final KeyValue keyValue) throws IOException { + if (scanners.isEmpty()) return; + CountDownLatch latch = new CountDownLatch(scanners.size()); + List workers = + new ArrayList(scanners.size()); + for (KeyValueScanner scanner : scanners) { + ScannerSeekWorker worker = + new ScannerSeekWorker(scanner, keyValue, latch); + workers.add(worker); + } + for (ScannerSeekWorker worker : workers) { + worker.start(); + } + + try { + latch.await(); + } catch (InterruptedException e) { + LOG.error("", e); + } + for (ScannerSeekWorker worker : workers) { + if (!worker.isSuccess()) throw new IOException(); + } + } + + private static class ScannerSeekWorker extends Thread { + KeyValueScanner scanner; + KeyValue keyValue; + CountDownLatch latch; + boolean isSuccess; + + public ScannerSeekWorker(KeyValueScanner scanner, + KeyValue keyValue, CountDownLatch latch) { + this.scanner = scanner; + this.keyValue = keyValue; + this.latch = latch; + isSuccess = false; + } + + public boolean isSuccess() { + return isSuccess; + } + + @Override + public void run() { + try { + scanner.seek(keyValue); + isSuccess = true; + } catch (IOException e) { + LOG.info("", e); + } finally { + latch.countDown(); + } + } + } + + /** * Used in testing. * @return all scanners in no particular order */