Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestParallelScan.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestParallelScan.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestParallelScan.java (revision 0) @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestParallelScan { + @Test + public void testParallelScanAgainstSameRegion() throws Exception { + testParallelScanAgainstSameRegion(null); + } + + @Test + public void testParallelScanAgainstSameRegionWithCoProc() throws Exception { + // This fails. It appears that there's some locking going on for the region + // that's preventing another invocation for the same region. + testParallelScanAgainstSameRegion(ParallelScanTestCoProc.class.getName()); + } + + public void testParallelScanAgainstSameRegion(String coProcClassName) throws Exception { + Configuration config = HBaseConfiguration.create(); + config.setInt("hbase.master.info.port", -1); + config.setInt("hbase.regionserver.info.port", -1); + HBaseTestingUtility util = new HBaseTestingUtility(config); + util.startMiniCluster(); + HBaseAdmin admin = new HBaseAdmin(config); + byte[] tableName = Bytes.toBytes("foo"); + byte[] cf = Bytes.toBytes("a"); + byte[] cq = cf; + int nRows = 100000; + HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf); + HTableDescriptor descriptor = new HTableDescriptor(tableName); + descriptor.addFamily(columnDescriptor); + if (coProcClassName != null) { + descriptor.addCoprocessor(coProcClassName, new Path("nojar.jar"), 1, null); + } + try { + admin.createTable(descriptor, new byte[][] {Bytes.toBytes(nRows)}); + } catch (TableExistsException e) { + } + + HConnection connection = HConnectionManager.createConnection(config); + ThreadFactory threadFactory = Executors.defaultThreadFactory(); + ArrayBlockingQueue queue = new ArrayBlockingQueue(10); + ThreadPoolExecutor exec = new ThreadPoolExecutor(5, 5, 1000, TimeUnit.MILLISECONDS, queue, + threadFactory); + HTable table = new HTable(tableName, connection, exec); + byte[] value = new byte[0]; + int batchSize = 5000; + List puts = new ArrayList(batchSize); + for (int i = 0; i < nRows; i++) { + byte[] key = Bytes.toBytes(i); + Put put = new Put(key); + put.add(new KeyValue(key, cf, cq, value)); + puts.add(put); + if (i % batchSize == 0) { + table.batch(puts); + puts.clear(); + System.out.println("Put batch"); + } + } + table.batch(puts); + System.out.println("Last key: " + Bytes.toStringBinary(Bytes.toBytes(nRows-1))); + + Pair pair = new Pair(HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + List > everything = Collections.singletonList(pair); + scanTable(exec, connection, tableName, everything); + + long serialTime = scanTable(exec, connection, tableName, everything); + List > halfAndHalf = new ArrayList>(); + byte[] halfPoint = Bytes.toBytes(nRows/2); + System.out.println("Half key: " + Bytes.toStringBinary(halfPoint)); + halfAndHalf.add(new Pair(HConstants.EMPTY_START_ROW, halfPoint)); + halfAndHalf.add(new Pair(halfPoint, HConstants.EMPTY_END_ROW)); + long parallelTime = scanTable(exec, connection, tableName, halfAndHalf); + // Somewhat arbitrary, but parallel time must be at least 25% less than serial time + assertTrue(parallelTime * 5 / 4 < serialTime); + } + + private long scanTable(final ThreadPoolExecutor exec, final HConnection conn, + final byte[] tableName, List> splitPoints) throws Exception { + final List> futures = new ArrayList>(); + for (final Pair splitPoint : splitPoints) { + Future future = exec.submit(new Runnable() { + + @Override + public void run() { + HTable table; + try { + table = new HTable(tableName, conn, exec); + Scan scan = new Scan(); + byte[] startRow = splitPoint.getFirst(); + if (startRow.length > 0) { + scan.setStartRow(splitPoint.getFirst()); + } + byte[] stopRow = splitPoint.getSecond(); + if (stopRow.length > 0) { + scan.setStopRow(stopRow); + } + scan.setCaching(1000); + ResultScanner scanner = table.getScanner(scan); + while (scanner.next() != null) { + } + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + futures.add(future); + } + + System.out.println("Query start"); + long startTime = System.currentTimeMillis(); + for (Future future : futures) { + future.get(); + } + long time = System.currentTimeMillis() - startTime; + System.out.println("Query took: " + time); + return time; + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/ParallelScanTestCoProc.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/ParallelScanTestCoProc.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/ParallelScanTestCoProc.java (revision 0) @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; +import org.apache.hadoop.hbase.regionserver.RegionScanner; + +public class ParallelScanTestCoProc extends BaseRegionObserver { + @Override + public final RegionScanner postScannerOpen(final ObserverContext c, + final Scan scan, final RegionScanner s) throws IOException { + boolean hasMore = false; + KeyValue keyValue = null; + do { + List results = new ArrayList(); + hasMore = s.next(results) && !s.isFilterDone(); + if (!results.isEmpty()) { + keyValue = results.get(0); + } + } while (hasMore); + + final KeyValue lastKeyValue = keyValue; + RegionScanner scanner = new RegionScanner() { + private boolean done = lastKeyValue == null; + + @Override + public long getMaxResultSize() { + return 100; + } + + @Override + public boolean next(List results, String metric) throws IOException { + return next(results); + } + + @Override + public boolean nextRaw(List results) throws IOException { + return next(results); + } + + @Override + public boolean nextRaw(List result, int limit, String metric) throws IOException { + return next(result); + } + + @Override + public boolean next(List result, int limit, String metric) throws IOException { + return next(result); + } + + @Override + public HRegionInfo getRegionInfo() { + return s.getRegionInfo(); + } + + @Override + public boolean isFilterDone() { + return done; + } + + @Override + public long getMvccReadPoint() { + MultiVersionConsistencyControl.setThreadReadPoint(Long.MAX_VALUE); + return Long.MAX_VALUE; + } + + @Override + public void close() throws IOException { + s.close(); + } + + @Override + public boolean next(List results) throws IOException { + if (done) return false; + done = true; + results.add(lastKeyValue); + return false; + } + + @Override + public boolean next(List result, int limit) throws IOException { + return next(result); + } + + @Override + public boolean reseek(byte[] row) throws IOException { + throw new DoNotRetryIOException("Unsupported"); + } + }; + return scanner; + } +}