Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1440311) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -276,6 +276,11 @@ */ public static final int DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX = 1; + /** + * A flag that enables StoreFileScanner parallel-seeking in StoreScanner + */ + public static final String STORESCANNER_SEEK_PARALLEL_ENABLED = + "hbase.storescanner.seek.parallel.enabled"; /** Conf key for the memstore size at which we flush the memstore */ public static final String HREGION_MEMSTORE_FLUSH_SIZE = Index: hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java =================================================================== --- hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java (revision 1440311) +++ hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java (working copy) @@ -95,6 +95,7 @@ rows++; } assertEquals(0, rows); + ht.close(); } @Test @@ -118,6 +119,7 @@ rows++; } assertEquals(0, rows); + ht.close(); } private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize, @@ -147,6 +149,7 @@ for (BulkDeleteResponse response : result.values()) { noOfDeletedRows += response.getRowsDeleted(); } + ht.close(); return noOfDeletedRows; } @@ -177,6 +180,7 @@ rows++; } assertEquals(90, rows); + ht.close(); } @Test @@ -205,6 +209,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } @Test @@ -235,6 +240,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } @Test @@ -282,6 +288,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } @Test @@ -328,6 +335,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } @Test @@ -412,6 +420,7 @@ rows++; } assertEquals(100, rows); + ht.close(); } private HTable createTable(byte[] tableName) throws IOException { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1440311) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -19,20 +19,30 @@ package org.apache.hadoop.hbase.regionserver; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.HStore.ScanInfo; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -66,6 +76,15 @@ /** We don't ever expect to change this, the constant is just for clarity. */ static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; + static final boolean parallelSeek = HBaseConfiguration.create(). + getBoolean(HConstants.STORESCANNER_SEEK_PARALLEL_ENABLED, false); + static ExecutorService seekExecutor = null; + static { + if (parallelSeek) { + seekExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true). + setNameFormat(StoreScanner.class.getName() + "-seek-%d").build()); + } + } /** Used during unit testing to ensure that lazy seek does save seek ops */ protected static boolean lazySeekEnabledGlobally = @@ -127,8 +146,12 @@ scanner.requestSeek(matcher.getStartKey(), false, true); } } else { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); + if (!parallelSeek) { + for (KeyValueScanner scanner : scanners) { + scanner.seek(matcher.getStartKey()); + } + } else { + parallelSeek(scanners, matcher.getStartKey()); } } @@ -193,8 +216,12 @@ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); // Seek all scanners to the initial key - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); + if (!parallelSeek) { + for (KeyValueScanner scanner : scanners) { + scanner.seek(matcher.getStartKey()); + } + } else { + parallelSeek(scanners, matcher.getStartKey()); } heap = new KeyValueHeap(scanners, scanInfo.getComparator()); } @@ -555,6 +582,59 @@ } /** + * seek storefile in parallel to optimize IO latency as much as possible + * @throws IOException + */ + private void parallelSeek(final List + scanners, final KeyValue keyValue) throws IOException { + if (scanners.isEmpty()) return; + + int storeFileScannerCount = scanners.size(); + List> tasks = new ArrayList>(storeFileScannerCount); + for (KeyValueScanner scanner : scanners) { + if (scanner instanceof StoreFileScanner) { + Callable task = new ScannerSeekWorker(scanner, keyValue, + MultiVersionConsistencyControl.getThreadReadPoint()); + tasks.add(task); + } else { + scanner.seek(keyValue); + } + } + try { + List> futures = seekExecutor.invokeAll(tasks); + for (Future future : futures) { + future.get(); + } + } catch (InterruptedException ie) { + LOG.error(""); + throw new IOException(ie); + } catch (ExecutionException e) { + LOG.error(""); + throw new IOException(e); + } catch (CancellationException ce) { + LOG.error(""); + throw new IOException(ce); + } + } + + private class ScannerSeekWorker implements Callable { + private KeyValueScanner scanner; + private KeyValue keyValue; + + public ScannerSeekWorker(KeyValueScanner scanner, KeyValue keyValue, + long readPoint) { + this.scanner = scanner; + this.keyValue = keyValue; + MultiVersionConsistencyControl.setThreadReadPoint(readPoint); + } + + public Void call() throws IOException { + scanner.seek(keyValue); + return null; + } + } + + /** * Used in testing. * @return all scanners in no particular order */ Index: hbase-server/src/main/resources/hbase-default.xml =================================================================== --- hbase-server/src/main/resources/hbase-default.xml (revision 1440311) +++ hbase-server/src/main/resources/hbase-default.xml (working copy) @@ -442,6 +442,14 @@ + hbase.storescanner.seek.parallel.enabled + false + + Enables StoreFileScanner parallel-seeking in StoreScanner, + a feature which can reduce response latency under special conditions. + + + hbase.mapreduce.hfileoutputformat.blocksize 65536 The mapreduce HFileOutputFormat writes storefiles/hfiles.