diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index ec33dd2..6b70a88 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -76,7 +76,7 @@ public class ClientAsyncPrefetchScanner extends ClientScanner { protected void initCache() { // concurrent cache cacheCapacity = calcCacheCapacity(); - cache = new LinkedBlockingQueue(cacheCapacity); + cache = new LinkedBlockingQueue(); cacheSizeInBytes = new AtomicLong(0); exceptionsQueue = new ConcurrentLinkedQueue(); prefetchRunnable = new PrefetchRunnable(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 19b06b5..8862109 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -646,45 +649,72 @@ public class TestScannersFromClientSide { verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); } - /** - * Test from client side for async scan - * - * @throws Exception - */ @Test - public void testAsyncScanner() throws Exception { - TableName TABLE = TableName.valueOf("testAsyncScan"); - byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); - byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); - byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); + public void testAsyncScannerWithSmallData() throws Exception { + testAsyncScanner(TableName.valueOf("testAsyncScannerWithSmallData"), + 2, + 3, + 10); + } - Table ht = TEST_UTIL.createTable(TABLE, FAMILIES); + @Test + public void testAsyncScannerWithManyRows() throws Exception { + testAsyncScanner(TableName.valueOf("testAsyncScannerWithManyRows"), + 30000, + 1, + 1); + } - Put put; - Scan scan; - Result result; - boolean toLog = true; - List kvListExp, kvListScan; + private void testAsyncScanner(TableName table, int rowNumber, int familyNumber, + int qualifierNumber) throws Exception { + assert rowNumber > 0; + assert familyNumber > 0; + assert qualifierNumber > 0; + byte[] row = Bytes.toBytes("r"); + byte[] family = Bytes.toBytes("f"); + byte[] qualifier = Bytes.toBytes("q"); + byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber); + byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber); + byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber); - kvListExp = new ArrayList(); + Table ht = TEST_UTIL.createTable(table, families); - for (int r=0; r < ROWS.length; r++) { - put = new Put(ROWS[r]); - for (int c=0; c < FAMILIES.length; c++) { - for (int q=0; q < QUALIFIERS.length; q++) { - KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); + boolean toLog = true; + List kvListExp = new ArrayList<>(); + + List puts = new ArrayList<>(); + for (byte[] r : rows) { + Put put = new Put(r); + for (byte[] f : families) { + for (byte[] q : qualifiers) { + KeyValue kv = new KeyValue(r, f, q, 1, VALUE); put.add(kv); kvListExp.add(kv); } } - ht.put(put); + puts.add(put); + if (puts.size() > 1000) { + ht.put(puts); + puts.clear(); + } + } + if (!puts.isEmpty()) { + ht.put(puts); + puts.clear(); } - scan = new Scan(); + Scan scan = new Scan(); scan.setAsyncPrefetch(true); ResultScanner scanner = ht.getScanner(scan); - kvListScan = new ArrayList(); + List kvListScan = new ArrayList<>(); + Result result; + boolean first = true; while ((result = scanner.next()) != null) { + // waiting for cache. see HBASE-17376 + if (first) { + TimeUnit.SECONDS.sleep(1); + first = false; + } for (Cell kv : result.listCells()) { kvListScan.add(kv); } @@ -692,7 +722,20 @@ public class TestScannersFromClientSide { result = Result.create(kvListScan); assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); verifyResult(result, kvListExp, toLog, "Testing async scan"); + TEST_UTIL.deleteTable(table); + } + private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) { + int maxLength = Integer.toString(n).length(); + byte [][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + int length = Integer.toString(i).length(); + StringBuilder buf = new StringBuilder(Integer.toString(i)); + IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0")); + byte[] tail = Bytes.toBytes(buf.toString()); + ret[i] = Bytes.add(base, tail); + } + return ret; } static void verifyResult(Result result, List expKvList, boolean toLog,