From 7da9689c2ca3f572e52791cc9f4a98cd7d32dc45 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 11 Aug 2017 18:31:25 +0800 Subject: [PATCH] HBASE-18553 Expose scan cursor for asynchronous scanner --- .../AsyncScanSingleRegionRpcRetryingCaller.java | 2 +- .../hbase/client/AsyncTableResultScanner.java | 29 +++++- .../hadoop/hbase/client/RawScanResultConsumer.java | 11 ++- .../client/AbstractTestResultScannerCursor.java | 86 +++++++++++++++++ .../hbase/client/TestAsyncResultScannerCursor.java | 49 ++++++++++ .../hbase/client/TestRawAsyncScanCursor.java | 107 ++++++++++++--------- .../hbase/client/TestResultScannerCursor.java | 34 +++++++ .../apache/hadoop/hbase/client/TestScanCursor.java | 90 ----------------- 8 files changed, 265 insertions(+), 143 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 89b3afc..8be138e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -497,7 +497,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { if (results.length > 0) { updateNextStartRowWhenError(results[results.length - 1]); consumer.onNext(results, scanController); - } else if (resp.hasHeartbeatMessage() && resp.getHeartbeatMessage()) { + } else { consumer.onHeartbeat(scanController); } ScanControllerState state = scanController.destroy(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index 28a5568..7ddc579 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayDeque; +import java.util.Optional; import java.util.Queue; import org.apache.commons.logging.Log; @@ -46,8 +47,12 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private final long maxCacheSize; + private final Scan scan; + private final Queue queue = new ArrayDeque<>(); + private Result lastResult; + private ScanMetrics scanMetrics; private long cacheSize; @@ -61,6 +66,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { this.rawTable = table; this.maxCacheSize = maxCacheSize; + this.scan = scan; table.scan(scan, this); } @@ -92,12 +98,27 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { if (cacheSize >= maxCacheSize) { stopPrefetch(controller); } + lastResult = results[results.length - 1]; } @Override public synchronized void onHeartbeat(ScanController controller) { if (closed) { controller.terminate(); + return; + } + if (scan.isNeedCursorResult()) { + Optional cursor = controller.cursor(); + if (cursor.isPresent()) { + queue.add(Result.createCursorResult(cursor.get())); + } else if (lastResult != null) { + // size limit maybe + queue.add(Result.createCursorResult(new Cursor(lastResult.getRow()))); + } + // It is possible that cursor and lastResult are both null if we have not gotten anything yet. + // A possible way is to just use the startRow as a cursor, but I do not think it is useful. It + // is likely to arrive here again when user use the cursor to scan next time since the user + // just execute the same scan... } } @@ -143,9 +164,11 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { } } Result result = queue.poll(); - cacheSize -= calcEstimatedSize(result); - if (resumer != null && cacheSize <= maxCacheSize / 2) { - resumePrefetch(); + if (!result.isCursor()) { + cacheSize -= calcEstimatedSize(result); + if (resumer != null && cacheSize <= maxCacheSize / 2) { + resumePrefetch(); + } } return result; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java index 54d4887..4fedb0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -95,8 +95,15 @@ public interface RawScanResultConsumer { void onNext(Result[] results, ScanController controller); /** - * Indicate that there is an heartbeat message but we have not cumulated enough cells to call - * onNext. + * Indicate that there is a heartbeat message but we have not cumulated enough cells to call + * {@link #onNext(Result[], ScanController)}. + *

+ * Note that this method will always be called when RS returns something to us but we do not have + * enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not be a + * 'heartbeat' message for RS, for example, we have a large row with many cells and size limit is + * exceeded before sending all the cells for this row. For RS it does send some data to us and the + * time limit has not been reached, but we can not return the data to client so here we call this + * method to tell client we have already received something. *

* This method give you a chance to terminate a slow scan operation. * @param controller used to suspend or terminate the scan. Notice that the {@code controller} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java new file mode 100644 index 0000000..f3520d3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Test; + +public abstract class AbstractTestResultScannerCursor extends AbstractTestScanCursor { + + protected abstract ResultScanner getScanner(Scan scan) throws Exception; + + @Test + public void testHeartbeatWithSparseFilter() throws Exception { + try (ResultScanner scanner = getScanner(createScanWithSparseFilter())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { + Assert.assertTrue(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + Assert.assertFalse(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); + } + num++; + } + } + } + + @Test + public void testHeartbeatWithSparseFilterReversed() throws Exception { + try (ResultScanner scanner = getScanner(createReversedScanWithSparseFilter())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { + Assert.assertTrue(r.isCursor()); + Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + Assert.assertFalse(r.isCursor()); + Assert.assertArrayEquals(ROWS[0], r.getRow()); + } + num++; + } + } + } + + @Test + public void testSizeLimit() throws IOException { + try (ResultScanner scanner = + TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) { + Assert.assertTrue(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + Assert.assertFalse(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); + } + num++; + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java new file mode 100644 index 0000000..5aebb4a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.ForkJoinPool; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncResultScannerCursor extends AbstractTestResultScannerCursor { + + private static AsyncConnection CONN; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + AbstractTestScanCursor.setUpBeforeClass(); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + public static void tearDownAfterClass() throws Exception { + if (CONN != null) { + CONN.close(); + } + AbstractTestScanCursor.tearDownAfterClass(); + } + + @Override + protected ResultScanner getScanner(Scan scan) throws Exception { + return CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java index 9caf942..f681998 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java @@ -27,70 +27,83 @@ import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ MediumTests.class, ClientTests.class }) public class TestRawAsyncScanCursor extends AbstractTestScanCursor { + private static AsyncConnection CONN; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + AbstractTestScanCursor.setUpBeforeClass(); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + public static void tearDownAfterClass() throws Exception { + if (CONN != null) { + CONN.close(); + } + AbstractTestScanCursor.tearDownAfterClass(); + } + private void doTest(boolean reversed) throws InterruptedException, ExecutionException, IOException { CompletableFuture future = new CompletableFuture<>(); - try (AsyncConnection conn = - ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { - RawAsyncTable table = conn.getRawTable(TABLE_NAME); - table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), - new RawScanResultConsumer() { + RawAsyncTable table = CONN.getRawTable(TABLE_NAME); + table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), + new RawScanResultConsumer() { - private int count; + private int count; - @Override - public void onHeartbeat(ScanController controller) { - int row = count / NUM_FAMILIES / NUM_QUALIFIERS; - if (reversed) { - row = NUM_ROWS - 1 - row; - } - try { - assertArrayEquals(ROWS[row], controller.cursor().get().getRow()); - count++; - } catch (Throwable e) { - future.completeExceptionally(e); - throw e; - } + @Override + public void onHeartbeat(ScanController controller) { + int row = count / NUM_FAMILIES / NUM_QUALIFIERS; + if (reversed) { + row = NUM_ROWS - 1 - row; + } + try { + assertArrayEquals(ROWS[row], controller.cursor().get().getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; } + } - @Override - public void onNext(Result[] results, ScanController controller) { - try { - assertEquals(1, results.length); - assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); - // we will always provide a scan cursor if time limit is reached. - if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) { - assertFalse(controller.cursor().isPresent()); - } else { - assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], - controller.cursor().get().getRow()); - } - assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); - count++; - } catch (Throwable e) { - future.completeExceptionally(e); - throw e; + @Override + public void onNext(Result[] results, ScanController controller) { + try { + assertEquals(1, results.length); + assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); + // we will always provide a scan cursor if time limit is reached. + if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) { + assertFalse(controller.cursor().isPresent()); + } else { + assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], + controller.cursor().get().getRow()); } + assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; } + } - @Override - public void onError(Throwable error) { - future.completeExceptionally(error); - } + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } - @Override - public void onComplete() { - future.complete(null); - } - }); - future.get(); - } + @Override + public void onComplete() { + future.complete(null); + } + }); + future.get(); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java new file mode 100644 index 0000000..3b2ef2c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestResultScannerCursor extends AbstractTestResultScannerCursor { + + @Override + protected ResultScanner getScanner(Scan scan) throws IOException { + return TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(scan); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java deleted file mode 100644 index f7798f0..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; - -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ MediumTests.class, ClientTests.class }) -public class TestScanCursor extends AbstractTestScanCursor { - - @Test - public void testHeartbeatWithSparseFilter() throws Exception { - try (ResultScanner scanner = - TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSparseFilter())) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], - r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); - } - num++; - } - } - } - - @Test - public void testHeartbeatWithSparseFilterReversed() throws Exception { - try (ResultScanner scanner = TEST_UTIL.getConnection().getTable(TABLE_NAME) - .getScanner(createReversedScanWithSparseFilter())) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS], - r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[0], r.getRow()); - } - num++; - } - } - } - - @Test - public void testSizeLimit() throws IOException { - try (ResultScanner scanner = - TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit())) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], - r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); - } - num++; - } - } - } -} -- 2.7.4