diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index ec33dd2..6b70a88 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -76,7 +76,7 @@ public class ClientAsyncPrefetchScanner extends ClientScanner { protected void initCache() { // concurrent cache cacheCapacity = calcCacheCapacity(); - cache = new LinkedBlockingQueue(cacheCapacity); + cache = new LinkedBlockingQueue(); cacheSizeInBytes = new AtomicLong(0); exceptionsQueue = new ConcurrentLinkedQueue(); prefetchRunnable = new PrefetchRunnable(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 19b06b5..d8b0311 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; +import java.util.stream.IntStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -646,33 +647,61 @@ public class TestScannersFromClientSide { verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); } - /** - * Test from client side for async scan - * - * @throws Exception - */ @Test - public void testAsyncScanner() throws Exception { - TableName TABLE = TableName.valueOf("testAsyncScan"); - byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); - byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); - byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); + public void testAsyncScannerWithSmallData() throws Exception { + testAsyncScanner(TableName.valueOf("testAsyncScannerWithSmallData"), + 2, + 3, + 10); + } - Table ht = TEST_UTIL.createTable(TABLE, FAMILIES); + @Test + public void testAsyncScannerWithMediumData() throws Exception { + testAsyncScanner(TableName.valueOf("testAsyncScannerWithMediumData"), + 10, + 15, + 50); + } + + @Test + public void testAsyncScannerWithManyRows() throws Exception { + testAsyncScanner(TableName.valueOf("testAsyncScannerWithManyRows"), + 30000, + 1, + 1); + } + + @Test + public void testAsyncScannerWithManyColumns() throws Exception { + testAsyncScanner(TableName.valueOf("testAsyncScannerWithManyColumns"), + 1, + 20, + 1000); + } + + private void testAsyncScanner(TableName table, int rowNumber, int familyNumber, + int qualifierNumber) throws Exception { + assert rowNumber > 0; + assert familyNumber > 0; + assert qualifierNumber > 0; + byte[][] rows = makeNAsciiWithZeroPrefix(ROW, rowNumber); + byte[][] families = makeNAsciiWithZeroPrefix(FAMILY, familyNumber); + byte[][] qualifiers = makeNAsciiWithZeroPrefix(QUALIFIER, qualifierNumber); + + Table ht = TEST_UTIL.createTable(table, families); Put put; Scan scan; Result result; boolean toLog = true; - List kvListExp, kvListScan; - kvListExp = new ArrayList(); + List kvListExp = new ArrayList<>(); - for (int r=0; r < ROWS.length; r++) { - put = new Put(ROWS[r]); - for (int c=0; c < FAMILIES.length; c++) { - for (int q=0; q < QUALIFIERS.length; q++) { - KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); + for (byte[] r : rows) { + put = new Put(r); + for (byte[] f : families) { + for (byte[] q : qualifiers) { + KeyValue kv = new KeyValue(r, f, q, 1, VALUE); put.add(kv); kvListExp.add(kv); } @@ -683,7 +712,7 @@ public class TestScannersFromClientSide { scan = new Scan(); scan.setAsyncPrefetch(true); ResultScanner scanner = ht.getScanner(scan); - kvListScan = new ArrayList(); + List kvListScan = new ArrayList<>(); while ((result = scanner.next()) != null) { for (Cell kv : result.listCells()) { kvListScan.add(kv); @@ -692,7 +721,20 @@ public class TestScannersFromClientSide { result = Result.create(kvListScan); assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); verifyResult(result, kvListExp, toLog, "Testing async scan"); + TEST_UTIL.deleteTable(table); + } + private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) { + int maxLength = Integer.toString(n).length(); + byte [][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + int length = Integer.toString(i).length(); + StringBuilder buf = new StringBuilder(Integer.toString(i)); + IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0")); + byte[] tail = Bytes.toBytes(buf.toString()); + ret[i] = Bytes.add(base, tail); + } + return ret; } static void verifyResult(Result result, List expKvList, boolean toLog,