Index: src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java (revision 1441715) +++ src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java (working copy) @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -162,4 +163,9 @@ public Leases getLeases() { return null; } + + @Override + public ThreadPoolExecutor getParallelSFSeekExecutor() { + return null; + } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (revision 1441715) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (working copy) @@ -550,14 +550,14 @@ List scanners = scanFixture(kvs); Scan scan = new Scan(); scan.setMaxVersions(2); - Store.ScanInfo scanInfo = new Store.ScanInfo(Bytes.toBytes("cf"), + Store.ScanInfo newScanInfo = new Store.ScanInfo(Bytes.toBytes("cf"), 0 /* minVersions */, 2 /* maxVersions */, 500 /* ttl */, false /* keepDeletedCells */, 200, /* timeToPurgeDeletes */ KeyValue.COMPARATOR); StoreScanner scanner = - new StoreScanner(scan, scanInfo, + new StoreScanner(scan, newScanInfo, ScanType.MAJOR_COMPACT, null, scanners, HConstants.OLDEST_TIMESTAMP); List results = new ArrayList(); @@ -571,6 +571,7 @@ assertEquals(kvs[14], results.get(5)); assertEquals(kvs[15], results.get(6)); assertEquals(7, results.size()); + scanner.close(); }finally{ EnvironmentEdgeManagerTestHelper.reset(); } Index: src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java (revision 1441715) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java (working copy) @@ -89,6 +89,7 @@ rows++; } assertEquals(0, rows); + ht.close(); } @Test @@ -112,6 +113,7 @@ rows++; } assertEquals(0, rows); + ht.close(); } private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize, @@ -129,6 +131,7 @@ for (BulkDeleteResponse response : result.values()) { noOfDeletedRows += response.getRowsDeleted(); } + ht.close(); return noOfDeletedRows; } @@ -159,6 +162,7 @@ rows++; } assertEquals(90, rows); + ht.close(); } @Test @@ -187,6 +191,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } @Test @@ -217,6 +222,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } @Test @@ -264,6 +270,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } @Test @@ -310,6 +317,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } @Test @@ -385,6 +393,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } private HTable createTable(byte[] tableName) throws IOException { @@ -404,4 +413,4 @@ put.add(FAMILY1, QUALIFIER3, value.getBytes()); return put; } -} \ No newline at end of file +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1441715) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -21,9 +21,15 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,6 +64,11 @@ private final boolean isGet; private final boolean explicitColumnQuery; private final boolean useRowColBloom; + /** + * A flag that enables StoreFileScanner parallel-seeking + */ + protected boolean isParallelSeekEnabled = false; + protected ExecutorService seekExecutor; private final Scan scan; private final NavigableSet columns; private final long oldestUnexpiredTS; @@ -69,6 +80,8 @@ /** Used during unit testing to ensure that lazy seek does save seek ops */ private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; + public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = + "hbase.storescanner.parallel.seek.enable"; // if heap == null and lastTop != null, you need to reseek given the key below private KeyValue lastTop = null; @@ -91,6 +104,17 @@ // for multi-row (non-"get") scans because this is not done in // StoreFile.passesBloomFilter(Scan, SortedSet). useRowColBloom = numCol > 1 || (!isGet && numCol == 1); + // The parallel-seeking is on : + // 1) the config value is *true* + // 2) have more than one store file + if (store != null && store.getHRegion() != null + && store.getStorefilesCount() > 1) { + RegionServerServices rsService = store.getHRegion().getRegionServerServices(); + if (rsService == null || !rsService.getConfiguration().getBoolean( + STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return; + isParallelSeekEnabled = true; + seekExecutor = rsService.getParallelSFSeekExecutor(); + } } /** @@ -127,8 +151,12 @@ scanner.requestSeek(matcher.getStartKey(), false, true); } } else { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); + if (!isParallelSeekEnabled) { + for (KeyValueScanner scanner : scanners) { + scanner.seek(matcher.getStartKey()); + } + } else { + parallelSeek(scanners, matcher.getStartKey()); } } @@ -161,8 +189,12 @@ scanners = selectScannersFrom(scanners); // Seek all scanners to the initial key - for(KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); + if (!isParallelSeekEnabled) { + for (KeyValueScanner scanner : scanners) { + scanner.seek(matcher.getStartKey()); + } + } else { + parallelSeek(scanners, matcher.getStartKey()); } // Combine all seeked scanners with a heap @@ -189,8 +221,12 @@ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); // Seek all scanners to the initial key - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); + if (!isParallelSeekEnabled) { + for (KeyValueScanner scanner : scanners) { + scanner.seek(matcher.getStartKey()); + } + } else { + parallelSeek(scanners, matcher.getStartKey()); } heap = new KeyValueHeap(scanners, scanInfo.getComparator()); } @@ -510,8 +546,12 @@ * could have done it now by storing the scan object from the constructor */ List scanners = getScannersNoCompaction(); - for(KeyValueScanner scanner : scanners) { - scanner.seek(lastTopKey); + if (!isParallelSeekEnabled) { + for (KeyValueScanner scanner : scanners) { + scanner.seek(lastTopKey); + } + } else { + parallelSeek(scanners, lastTopKey); } // Combine all seeked scanners with a heap @@ -551,6 +591,58 @@ return 0; } + /** + * Seek storefiles in parallel to optimize IO latency as much as possible + * @param scanners the list {@link KeyValueScanner}s to be read from + * @param kv the KeyValue on which the operation is being requested + * @throws IOException + */ + private void parallelSeek(final List + scanners, final KeyValue kv) throws IOException { + if (scanners.isEmpty()) return; + int storeFileScannerCount = scanners.size(); + List> futures = new ArrayList>(storeFileScannerCount); + for (KeyValueScanner scanner : scanners) { + if (scanner instanceof StoreFileScanner) { + Callable task = new ScannerSeekWorker(scanner, kv, + MultiVersionConsistencyControl.getThreadReadPoint()); + futures.add(seekExecutor.submit(task)); + } else { + scanner.seek(kv); + } + } + try { + for (Future future : futures) { + future.get(); + } + } catch (InterruptedException ie) { + throw new InterruptedIOException(ie.getMessage()); + } catch (ExecutionException e) { + throw new IOException(e.getMessage()); + } catch (CancellationException ce) { + throw new IOException(ce.getMessage()); + } + } + + private static class ScannerSeekWorker implements Callable { + private KeyValueScanner scanner; + private KeyValue keyValue; + private long readPoint; + + public ScannerSeekWorker(KeyValueScanner scanner, KeyValue keyValue, + long readPoint) { + this.scanner = scanner; + this.keyValue = keyValue; + this.readPoint = readPoint; + } + + public Void call() throws IOException { + MultiVersionConsistencyControl.setThreadReadPoint(readPoint); + scanner.seek(keyValue); + return null; + } + } + /** * Used in testing. * @return all scanners in no particular order Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (revision 1441715) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.catalog.CatalogTracker; @@ -89,4 +90,9 @@ * @return The RegionServer's "Leases" service */ public Leases getLeases(); + + /** + * @return StoreFileScanner parallel-seeking executor + */ + public ThreadPoolExecutor getParallelSFSeekExecutor(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1441715) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -47,6 +47,8 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -173,6 +175,7 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * HRegionServer makes a set of HRegions available to clients. It checks in with @@ -206,6 +209,8 @@ protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false); private HFileSystem fs; private boolean useHBaseChecksum; // verify hbase checksums? + /** Used for StoreFileScanner parallel-seeking*/ + private ThreadPoolExecutor parallelSFSeekExecutor; private Path rootDir; private final Random rand = new Random(); @@ -418,6 +423,11 @@ this.abortRequested = false; this.stopped = false; + if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { + parallelSFSeekExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat( + "StoreScannerSeek-%d").build()); + } // Server to handle client requests. String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost( @@ -837,6 +847,9 @@ this.hbaseMaster = null; this.rpcEngine.close(); this.leases.close(); + if (parallelSFSeekExecutor != null) { + parallelSFSeekExecutor.shutdownNow(); + } if (!killed) { join(); @@ -3681,6 +3694,13 @@ return service; } + /** + * @return StoreFileScanner parallel-seeking executor + */ + public ThreadPoolExecutor getParallelSFSeekExecutor() { + return parallelSFSeekExecutor; + } + // // Main program and support routines // Index: src/main/resources/hbase-default.xml =================================================================== --- src/main/resources/hbase-default.xml (revision 1441715) +++ src/main/resources/hbase-default.xml (working copy) @@ -448,6 +448,14 @@ + hbase.storescanner.parallel.seek.enable + false + + Enables StoreFileScanner parallel-seeking in StoreScanner, + a feature which can reduce response latency under special conditions. + + + hbase.mapreduce.hfileoutputformat.blocksize 65536 The mapreduce HFileOutputFormat writes storefiles/hfiles.