From f153f6a6ad96d9784384ff6aa075ec1ded907c67 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 8 Aug 2017 10:26:49 +0800 Subject: [PATCH] HBASE-18489 Expose scan cursor in RawScanResultConsumer --- .../AsyncScanSingleRegionRpcRetryingCaller.java | 18 +- .../hadoop/hbase/client/RawScanResultConsumer.java | 20 ++- .../hadoop/hbase/regionserver/RSRpcServices.java | 13 +- .../hbase/regionserver/ReversedStoreScanner.java | 10 ++ .../hadoop/hbase/regionserver/ScannerContext.java | 12 +- .../hadoop/hbase/regionserver/StoreScanner.java | 20 ++- .../hbase/client/AbstractTestScanCursor.java | 146 ++++++++++++++++ .../hbase/client/TestRawAsyncScanCursor.java | 107 ++++++++++++ .../apache/hadoop/hbase/client/TestScanCursor.java | 90 ++++++++++ .../hbase/regionserver/TestScannerCursor.java | 191 --------------------- 10 files changed, 416 insertions(+), 211 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index e5448d9..89b3afc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -144,7 +145,9 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final class ScanControllerImpl implements RawScanResultConsumer.ScanController { // Make sure the methods are only called in this thread. - private final Thread callerThread = Thread.currentThread(); + private final Thread callerThread; + + private final Optional cursor; // INITIALIZED -> SUSPENDED -> DESTROYED // INITIALIZED -> TERMINATED -> DESTROYED @@ -154,6 +157,12 @@ class AsyncScanSingleRegionRpcRetryingCaller { private ScanResumerImpl resumer; + public ScanControllerImpl(ScanResponse resp) { + callerThread = Thread.currentThread(); + cursor = resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) + : Optional.empty(); + } + private void preCheck() { Preconditions.checkState(Thread.currentThread() == callerThread, "The current thread is %s, expected thread is %s, " + @@ -184,6 +193,11 @@ class AsyncScanSingleRegionRpcRetryingCaller { this.state = ScanControllerState.DESTROYED; return state; } + + @Override + public Optional cursor() { + return cursor; + } } private enum ScanResumerState { @@ -479,7 +493,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { return; } - ScanControllerImpl scanController = new ScanControllerImpl(); + ScanControllerImpl scanController = new ScanControllerImpl(resp); if (results.length > 0) { updateNextStartRowWhenError(results[results.length - 1]); consumer.onNext(results, scanController); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java index 820960b..d9a6fb5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Optional; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; @@ -47,14 +49,14 @@ public interface RawScanResultConsumer { } /** - * Used to suspend or stop a scan. + * Used to suspend or stop a scan, or get a scan cursor if available. *

- * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A - * IllegalStateException will be thrown if you call them at other places. + * Notice that, you should only call the {@link #suspend()} or {@link #terminate()} inside onNext + * or onHeartbeat method. A IllegalStateException will be thrown if you call them at other places. *

- * You can only call one of the methods below, i.e., call suspend or terminate(of course you are - * free to not call them both), and the methods are not reentrant. A IllegalStateException will be - * thrown if you have already called one of the methods. + * You can only call one of the {@link #suspend()} and {@link #terminate()} methods(of course you + * are free to not call them both), and the methods are not reentrant. A IllegalStateException + * will be thrown if you have already called one of the methods. */ @InterfaceAudience.Public interface ScanController { @@ -75,6 +77,12 @@ public interface RawScanResultConsumer { * or you want to stop the scan in onHeartbeat method because it has spent too many time. */ void terminate(); + + /** + * Get the scan cursor if available. + * @return The scan cursor. + */ + Optional cursor(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index bc196a6..05b2740 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -390,6 +390,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final RpcCallback shippedCallback; private byte[] rowOfLastPartialResult; private boolean needCursor; + private Cell lastScanCursorCell; public RegionScannerHolder(String scannerName, RegionScanner s, Region r, RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) { @@ -3041,6 +3042,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); + if (rsh.needCursor) { + // Record the last scan cursor cell here to prevent setting a smaller scan cursor when + // crossing families. + scannerContext.setScanCursorCell(rsh.lastScanCursorCell); + } boolean limitReached = false; while (numOfResults < maxResults) { // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The @@ -3108,9 +3114,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Heartbeat messages occur when the time limit has been reached. builder.setHeartbeatMessage(timeLimitReached); if (timeLimitReached && rsh.needCursor) { - Cell readingCell = scannerContext.getPeekedCellInHeartbeat(); - if (readingCell != null ) { - builder.setCursor(ProtobufUtil.toCursor(readingCell)); + Cell cursorCell = scannerContext.getScanCursorCell(); + if (cursorCell != null ) { + builder.setCursor(ProtobufUtil.toCursor(cursorCell)); + rsh.lastScanCursorCell = cursorCell; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 07f98ad..1930c24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -98,6 +98,16 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { } @Override + protected boolean shouldSetScanCursor(ScannerContext ctx, CellComparator comparator) { + // It is not likely to have a null comparator when we arrive here, but let's do the sanity + // check. + if (ctx.getScanCursorCell() == null || comparator == null) { + return true; + } + return comparator.compare(prevCell, ctx.getScanCursorCell()) < 0; + } + + @Override protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator) throws IOException { // Check that the heap gives us KVs in an increasing order for same row and diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 2bab82e..5116fec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -22,10 +22,12 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; +import org.apache.hadoop.hbase.util.Bytes; /** * ScannerContext instances encapsulate limit tracking AND progress towards those limits during @@ -97,7 +99,7 @@ public class ScannerContext { boolean keepProgress; private static boolean DEFAULT_KEEP_PROGRESS = false; - private Cell peekedCellInHeartbeat = null; + private Cell scanCursorCell = null; /** * Tracks the relevant server side metrics during scans. null when metrics should not be tracked @@ -333,12 +335,12 @@ public class ScannerContext { || checkTimeLimit(checkerScope); } - public Cell getPeekedCellInHeartbeat() { - return peekedCellInHeartbeat; + Cell getScanCursorCell() { + return scanCursorCell; } - public void setPeekedCellInHeartbeat(Cell peekedCellInHeartbeat) { - this.peekedCellInHeartbeat = peekedCellInHeartbeat; + void setScanCursorCell(Cell peekedCellInHeartbeat) { + this.scanCursorCell = peekedCellInHeartbeat; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 969d485..01990cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -48,11 +48,10 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatc import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - /** * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> * for a single row. @@ -105,7 +104,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * KVs skipped via seeking to next row/column. TODO: estimate them? */ private long kvsScanned = 0; - private Cell prevCell = null; + protected Cell prevCell = null; private final long preadMaxBytes; private long bytesRead; @@ -537,6 +536,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return next(outResult, NoLimitScannerContext.getInstance()); } + protected boolean shouldSetScanCursor(ScannerContext ctx, CellComparator comparator) { + // It is not likely to have a null comparator when we arrive here, but let's do the sanity + // check. + if (ctx.getScanCursorCell() == null || comparator == null) { + return true; + } + return comparator.compare(prevCell, ctx.getScanCursorCell()) > 0; + } + /** * Get the next row of values from this Store. * @param outResult @@ -593,7 +601,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { scannerContext.updateTimeProgress(); if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { - scannerContext.setPeekedCellInHeartbeat(prevCell); + if (shouldSetScanCursor(scannerContext, comparator)) { + // Here we do a copy to avoid providing corrupt data if we release the prevCell in + // shipped method. This is not on the critical path so it is OK to do a copy. + scannerContext.setScanCursorCell(KeyValueUtil.toNewKeyCell(prevCell)); + } return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java new file mode 100644 index 0000000..ffd8c01 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public abstract class AbstractTestScanCursor { + + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + /** + * Table configuration + */ + protected static TableName TABLE_NAME = TableName.valueOf("TestScanCursor"); + + protected static int NUM_ROWS = 5; + protected static byte[] ROW = Bytes.toBytes("testRow"); + protected static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + protected static int NUM_FAMILIES = 2; + protected static byte[] FAMILY = Bytes.toBytes("testFamily"); + protected static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + protected static int NUM_QUALIFIERS = 2; + protected static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + protected static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + protected static int VALUE_SIZE = 10; + protected static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + protected static final int TIMEOUT = 4000; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT); + + // Check the timeout condition after every cell + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); + TEST_UTIL.startMiniCluster(1); + + createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + private static void createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + TEST_UTIL.createTable(name, families).put(createPuts(rows, families, qualifiers, cellValue)); + } + + private static List createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + List puts = new ArrayList<>(); + for (int row = 0; row < rows.length; row++) { + Put put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + return puts; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + public static final class SparseFilter extends FilterBase { + + private final boolean reversed; + + public SparseFilter(boolean reversed) { + this.reversed = reversed; + } + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + Threads.sleep(TIMEOUT / 2 + 100); + return Bytes.equals(CellUtil.cloneRow(v), ROWS[reversed ? 0 : NUM_ROWS - 1]) + ? ReturnCode.INCLUDE + : ReturnCode.SKIP; + } + + @Override + public byte[] toByteArray() throws IOException { + return reversed ? new byte[] { 1 } : new byte[] { 0 }; + } + + public static Filter parseFrom(final byte[] pbBytes) { + return new SparseFilter(pbBytes[0] != 0); + } + } + + protected Scan createScanWithSparseFilter() { + return new Scan().setMaxResultSize(Long.MAX_VALUE).setCaching(Integer.MAX_VALUE) + .setNeedCursorResult(true).setAllowPartialResults(true).setFilter(new SparseFilter(false)); + } + + protected Scan createReversedScanWithSparseFilter() { + return new Scan().setMaxResultSize(Long.MAX_VALUE).setCaching(Integer.MAX_VALUE) + .setReversed(true).setNeedCursorResult(true).setAllowPartialResults(true) + .setFilter(new SparseFilter(true)); + } + + protected Scan createScanWithSizeLimit() { + return new Scan().setMaxResultSize(1).setCaching(Integer.MAX_VALUE).setNeedCursorResult(true); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java new file mode 100644 index 0000000..9caf942 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestRawAsyncScanCursor extends AbstractTestScanCursor { + + private void doTest(boolean reversed) + throws InterruptedException, ExecutionException, IOException { + CompletableFuture future = new CompletableFuture<>(); + try (AsyncConnection conn = + ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { + RawAsyncTable table = conn.getRawTable(TABLE_NAME); + table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), + new RawScanResultConsumer() { + + private int count; + + @Override + public void onHeartbeat(ScanController controller) { + int row = count / NUM_FAMILIES / NUM_QUALIFIERS; + if (reversed) { + row = NUM_ROWS - 1 - row; + } + try { + assertArrayEquals(ROWS[row], controller.cursor().get().getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; + } + } + + @Override + public void onNext(Result[] results, ScanController controller) { + try { + assertEquals(1, results.length); + assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); + // we will always provide a scan cursor if time limit is reached. + if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) { + assertFalse(controller.cursor().isPresent()); + } else { + assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], + controller.cursor().get().getRow()); + } + assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; + } + } + + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } + + @Override + public void onComplete() { + future.complete(null); + } + }); + future.get(); + } + } + + @Test + public void testHeartbeatWithSparseFilter() + throws IOException, InterruptedException, ExecutionException { + doTest(false); + } + + @Test + public void testHeartbeatWithSparseFilterReversed() + throws IOException, InterruptedException, ExecutionException { + doTest(true); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java new file mode 100644 index 0000000..f7798f0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestScanCursor extends AbstractTestScanCursor { + + @Test + public void testHeartbeatWithSparseFilter() throws Exception { + try (ResultScanner scanner = + TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSparseFilter())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { + Assert.assertTrue(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + Assert.assertFalse(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); + } + num++; + } + } + } + + @Test + public void testHeartbeatWithSparseFilterReversed() throws Exception { + try (ResultScanner scanner = TEST_UTIL.getConnection().getTable(TABLE_NAME) + .getScanner(createReversedScanWithSparseFilter())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { + Assert.assertTrue(r.isCursor()); + Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + Assert.assertFalse(r.isCursor()); + Assert.assertArrayEquals(ROWS[0], r.getRow()); + } + num++; + } + } + } + + @Test + public void testSizeLimit() throws IOException { + try (ResultScanner scanner = + TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) { + Assert.assertTrue(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + Assert.assertFalse(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); + } + num++; + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java deleted file mode 100644 index e40b808..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTestConst; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(MediumTests.class) -public class TestScannerCursor { - - private static final Log LOG = - LogFactory.getLog(TestScannerCursor.class); - - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static Table TABLE = null; - - /** - * Table configuration - */ - private static TableName TABLE_NAME = TableName.valueOf("TestScannerCursor"); - - private static int NUM_ROWS = 5; - private static byte[] ROW = Bytes.toBytes("testRow"); - private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); - - private static int NUM_FAMILIES = 2; - private static byte[] FAMILY = Bytes.toBytes("testFamily"); - private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); - - private static int NUM_QUALIFIERS = 2; - private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); - private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); - - private static int VALUE_SIZE = 10; - private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); - - private static final int TIMEOUT = 4000; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - - conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT); - - // Check the timeout condition after every cell - conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); - TEST_UTIL.startMiniCluster(1); - - TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); - - } - - static Table createTestTable(TableName name, byte[][] rows, byte[][] families, - byte[][] qualifiers, byte[] cellValue) throws IOException { - Table ht = TEST_UTIL.createTable(name, families); - List puts = createPuts(rows, families, qualifiers, cellValue); - ht.put(puts); - return ht; - } - - static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, - byte[] value) throws IOException { - Put put; - ArrayList puts = new ArrayList<>(); - - for (int row = 0; row < rows.length; row++) { - put = new Put(rows[row]); - for (int fam = 0; fam < families.length; fam++) { - for (int qual = 0; qual < qualifiers.length; qual++) { - KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); - put.add(kv); - } - } - puts.add(put); - } - - return puts; - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - public static class SparseFilter extends FilterBase { - - @Override - public ReturnCode filterKeyValue(Cell v) throws IOException { - Threads.sleep(TIMEOUT / 2 + 100); - return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE - : ReturnCode.SKIP; - } - - public static Filter parseFrom(final byte[] pbBytes) { - return new SparseFilter(); - } - } - - @Test - public void testHeartbeatWithSparseFilter() throws Exception { - Scan scan = new Scan(); - scan.setMaxResultSize(Long.MAX_VALUE); - scan.setCaching(Integer.MAX_VALUE); - scan.setNeedCursorResult(true); - scan.setAllowPartialResults(true); - scan.setFilter(new SparseFilter()); - try(ResultScanner scanner = TABLE.getScanner(scan)) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - - if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); - } - num++; - } - } - } - - @Test - public void testSizeLimit() throws IOException { - Scan scan = new Scan(); - scan.setMaxResultSize(1); - scan.setCaching(Integer.MAX_VALUE); - scan.setNeedCursorResult(true); - try (ResultScanner scanner = TABLE.getScanner(scan)) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - - if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS)-1) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); - } - num++; - } - } - } - -} -- 2.7.4