From 1cf43ad77e2dc8afb4e1bff3c78c3540c7345ed8 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 23 Feb 2017 10:12:42 +0800 Subject: [PATCH] HBASE-17595 Add partial result support for small/limited scan --- .../apache/hadoop/hbase/client/ClientScanner.java | 22 +- .../hadoop/hbase/client/ConnectionUtils.java | 24 ++ .../org/apache/hadoop/hbase/client/HTable.java | 4 - .../org/apache/hadoop/hbase/client/Result.java | 23 +- .../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 2 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 2 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 33 ++- .../hadoop/hbase/regionserver/ScannerContext.java | 6 +- .../hbase/TestPartialResultsFromClientSide.java | 30 --- .../hbase/client/TestScannersFromClientSide.java | 71 ------ .../hbase/client/TestScannersFromClientSide2.java | 254 +++++++++++++++++++++ 11 files changed, 320 insertions(+), 151 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 9ff36f8..9bf367a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows; + import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -411,7 +413,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // If the lastRow is not partial, then we should start from the next row. As now we can // exclude the start row, the logic here is the same for both normal scan and reversed scan. // If lastResult is partial then include it, otherwise exclude it. - scan.withStartRow(lastResult.getRow(), lastResult.isPartial() || scan.getBatch() > 0); + scan.withStartRow(lastResult.getRow(), lastResult.hasMoreCellsInRow()); } if (e instanceof OutOfOrderScannerNextException) { if (retryAfterOutOfOrderException.isTrue()) { @@ -501,16 +503,16 @@ public abstract class ClientScanner extends AbstractClientScanner { } countdown--; this.lastResult = rs; - if (this.lastResult.isPartial() || scan.getBatch() > 0) { + if (this.lastResult.hasMoreCellsInRow()) { updateLastCellLoadedToCache(this.lastResult); } else { this.lastCellLoadedToCache = null; } } - if (scan.getLimit() > 0) { - int limit = scan.getLimit() - resultsToAddToCache.size(); - assert limit >= 0; - scan.setLimit(limit); + if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) { + int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache); + assert newLimit >= 0; + scan.setLimit(newLimit); } } if (scanExhausted(values)) { @@ -616,7 +618,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // In every RPC response there should be at most a single partial result. Furthermore, if // there is a partial result, it is guaranteed to be in the last position of the array. Result last = resultsFromServer[resultsFromServer.length - 1]; - Result partial = last.isPartial() ? last : null; + Result partial = last.hasMoreCellsInRow() ? last : null; if (LOG.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); @@ -662,7 +664,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // If the result is not a partial, it is a signal to us that it is the last Result we // need to form the complete Result client-side - if (!result.isPartial()) { + if (!result.hasMoreCellsInRow()) { resultsToAddToCache.add(Result.createCompleteResult(partialResults)); clearPartialResults(); } @@ -678,7 +680,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // It's possible that in one response from the server we receive the final partial for // one row and receive a partial for a different row. Thus, make sure that all Results // are added to the proper list - if (result.isPartial()) { + if (result.hasMoreCellsInRow()) { addToPartialResults(result); } else { resultsToAddToCache.add(result); @@ -820,6 +822,6 @@ public abstract class ClientScanner extends AbstractClientScanner { index++; } Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); - return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); + return Result.create(list, result.getExists(), result.isStale(), result.hasMoreCellsInRow()); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 8f496c9..a40d9aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -263,4 +264,27 @@ public class ConnectionUtils { // the region. return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; } + + /** + * Count the individual rows for the given result list. + *

+ * There are two reason why we need to use this method instead of a simple {@code results.length}. + *

    + *
  1. Server may return only part of the whole cells of a row for the last result, and if + * allowPartial is true, we will return the array to user directly. We should not count the last + * result.
  2. + *
  3. If this is a batched scan, a row may be split into several results, but they should be + * counted as one row. For example, a row with 15 cells will be split into 3 results with 5 cells + * each if {@code scan.getBatch()} is 5.
  4. + *
+ */ + public static int numberOfIndividualRows(List results) { + int count = 0; + for (Result result : results) { + if (!result.hasMoreCellsInRow()) { + count++; + } + } + return count; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 45ab7de..527dc72 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -779,10 +779,6 @@ public class HTable implements HTableInterface, RegionLocator { */ @Override public ResultScanner getScanner(Scan scan) throws IOException { - if (scan.getBatch() > 0 && scan.isSmall()) { - throw new IllegalArgumentException("Small scan should not be used with batching"); - } - if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index d27518e..fed91a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -83,10 +83,9 @@ public class Result implements CellScannable, CellScanner { private boolean stale = false; /** - * See {@link #mayHaveMoreCellsInRow()}. And please notice that, The client side implementation - * should also check for row key change to determine if a Result is the last one for a row. + * See {@link #hasMoreCellsInRow()}. */ - private boolean mayHaveMoreCellsInRow = false; + private boolean hasMoreCellsInRow = false; // We're not using java serialization. Transient here is just a marker to say // that this is where we cache row if we're ever asked for it. private transient byte [] row = null; @@ -193,7 +192,7 @@ public class Result implements CellScannable, CellScanner { this.cells = cells; this.exists = exists; this.stale = stale; - this.mayHaveMoreCellsInRow = mayHaveMoreCellsInRow; + this.hasMoreCellsInRow = mayHaveMoreCellsInRow; this.readonly = false; } @@ -886,7 +885,7 @@ public class Result implements CellScannable, CellScanner { // Result1: -1- -2- (2 cells, size limit reached, mark as partial) // Result2: -3- -4- (2 cells, size limit reached, mark as partial) // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) - if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) { + if (i != (partialResults.size() - 1) && !r.hasMoreCellsInRow()) { throw new IOException( "Cannot form complete result. Result is missing partial flag. " + "Partial Results: " + partialResults); @@ -973,28 +972,26 @@ public class Result implements CellScannable, CellScanner { * for a row and should be combined with a result representing the remaining cells in that row to * form a complete (non-partial) result. * @return Whether or not the result is a partial result - * @deprecated the word 'partial' ambiguous, use {@link #mayHaveMoreCellsInRow()} instead. + * @deprecated the word 'partial' ambiguous, use {@link #hasMoreCellsInRow()} instead. * Deprecated since 1.4.0. - * @see #mayHaveMoreCellsInRow() + * @see #hasMoreCellsInRow() */ @Deprecated public boolean isPartial() { - return mayHaveMoreCellsInRow; + return hasMoreCellsInRow; } /** * For scanning large rows, the RS may choose to return the cells chunk by chunk to prevent OOM. * This flag is used to tell you if the current Result is the last one of the current row. False - * means this Result is the last one. True means there may still be more cells for the current - * row. Notice that, 'may' have, not must have. This is because we may reach the size or time - * limit just at the last cell of row at RS, so we do not know if it is the last one. + * means this Result is the last one. True means there are be more cells for the current row. *

* The Scan configuration used to control the result size on the server is * {@link Scan#setMaxResultSize(long)} and the default value can be seen here: * {@link HConstants#DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE} */ - public boolean mayHaveMoreCellsInRow() { - return mayHaveMoreCellsInRow; + public boolean hasMoreCellsInRow() { + return hasMoreCellsInRow; } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index bfdcfbd..17da2d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1438,7 +1438,7 @@ public final class ProtobufUtil { } builder.setStale(result.isStale()); - builder.setPartial(result.mayHaveMoreCellsInRow()); + builder.setPartial(result.hasMoreCellsInRow()); return builder.build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c44d363..b616176 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6071,7 +6071,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows - if (!scannerContext.mayHaveMoreCellsInRow()) { + if (!scannerContext.hasMoreCellsInRow()) { resetFilters(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e14356a..fe82925 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -274,13 +274,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final String scannerName; private final RegionScanner s; private final Region r; - private final boolean allowPartial; - public RegionScannerHolder(String scannerName, RegionScanner s, Region r, boolean allowPartial) { + public RegionScannerHolder(String scannerName, RegionScanner s, Region r) { this.scannerName = scannerName; this.s = s; this.r = r; - this.allowPartial = allowPartial; } public long getNextCallSeq() { @@ -414,7 +412,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (isClientCellBlockSupport()) { for (Result res : results) { builder.addCellsPerResult(res.size()); - builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); + builder.addPartialFlagPerResult(res.hasMoreCellsInRow()); } controller.setCellScanner(CellUtil.createCellScanner(results)); } else { @@ -1114,11 +1112,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return lastBlock; } - private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r, - boolean allowPartial) throws LeaseStillHeldException { + private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r) + throws LeaseStillHeldException { regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); - RegionScannerHolder rsh = new RegionScannerHolder(scannerName, s, r, allowPartial); + RegionScannerHolder rsh = new RegionScannerHolder(scannerName, s, r); RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; return rsh; @@ -2505,8 +2503,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.setMvccReadPoint(scanner.getMvccReadPoint()); builder.setTtl(scannerLeaseTimeoutPeriod); String scannerName = String.valueOf(scannerId); - return addScanner(scannerName, scanner, region, - !scan.isSmall() && !(request.hasLimitOfRows() && request.getLimitOfRows() > 0)); + return addScanner(scannerName, scanner, region); } private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) @@ -2563,7 +2560,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // return whether we have more results in region. private boolean scan(PayloadCarryingRpcController controller, ScanRequest request, - RegionScannerHolder rsh, long maxQuotaResultSize, int rows, List results, + RegionScannerHolder rsh, long maxQuotaResultSize, int maxResults, List results, ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) throws IOException { Region region = rsh.r; @@ -2595,8 +2592,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // correct ordering of partial results and so we prevent partial results from being // formed. boolean serverGuaranteesOrderOfPartials = results.isEmpty(); - boolean allowPartialResults = - clientHandlesPartials && serverGuaranteesOrderOfPartials && rsh.allowPartial; + boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; boolean moreRows = false; // Heartbeat messages occur when the processing of the ScanRequest is exceeds a @@ -2626,7 +2622,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); boolean limitReached = false; - while (i < rows) { + while (i < maxResults) { // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The // batch limit is a limit on the number of cells per Result. Thus, if progress is // being tracked (i.e. scannerContext.keepProgress() is true) then we need to @@ -2638,7 +2634,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, moreRows = scanner.nextRaw(values, scannerContext); if (!values.isEmpty()) { - Result r = Result.create(values, null, stale, scannerContext.mayHaveMoreCellsInRow()); + Result r = Result.create(values, null, stale, scannerContext.hasMoreCellsInRow()); lastBlock.setValue(addSize(context, r, lastBlock.getValue())); results.add(r); i++; @@ -2646,7 +2642,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); - boolean rowLimitReached = i >= rows; + boolean rowLimitReached = i >= maxResults; limitReached = sizeLimitReached || timeLimitReached || rowLimitReached; if (limitReached || !moreRows) { @@ -2703,7 +2699,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // coprocessor postNext hook if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); + region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); } return builder.getMoreResultsInRegion(); } @@ -2856,8 +2852,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // with the old scan implementation where we just ignore the returned results if moreResults // is false. Can remove the isEmpty check after we get rid of the old implementation. moreResults = false; - } else if (limitOfRows > 0 && results.size() >= limitOfRows && - !results.get(results.size() - 1).mayHaveMoreCellsInRow()) { + } else if (limitOfRows > 0 && !results.isEmpty() && + !results.get(results.size() - 1).hasMoreCellsInRow() && + ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) { // if we have reached the limit of rows moreResults = false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 4e61f1b..67b2693 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -218,10 +218,10 @@ public class ScannerContext { } /** - * @return true when we may have more cells for the current row. This usually because we have - * reached a limit in the middle of a row + * @return true when we have more cells for the current row. This usually because we have reached + * a limit in the middle of a row */ - boolean mayHaveMoreCellsInRow() { + boolean hasMoreCellsInRow() { return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW || scannerState == NextState.BATCH_LIMIT_REACHED; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index 181d55a..8679e46 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -588,36 +588,6 @@ public class TestPartialResultsFromClientSide { } /** - * Small scans should not return partial results because it would prevent small scans from - * retrieving all of the necessary results in a single RPC request which is what makese small - * scans useful. Thus, ensure that even when {@link Scan#getAllowPartialResults()} is true, small - * scans do not return partial results - * @throws Exception - */ - @Test - public void testSmallScansDoNotAllowPartials() throws Exception { - Scan scan = new Scan(); - testSmallScansDoNotAllowPartials(scan); - scan.setReversed(true); - testSmallScansDoNotAllowPartials(scan); - } - - public void testSmallScansDoNotAllowPartials(Scan baseScan) throws Exception { - Scan scan = new Scan(baseScan); - scan.setAllowPartialResults(true); - scan.setSmall(true); - scan.setMaxResultSize(1); - ResultScanner scanner = TABLE.getScanner(scan); - Result r = null; - - while ((r = scanner.next()) != null) { - assertFalse(r.isPartial()); - } - - scanner.close(); - } - - /** * Make puts to put the input value into each combination of row, family, and qualifier * @param rows * @param families diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 259aea4..4e356e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -676,74 +675,4 @@ public class TestScannersFromClientSide { assertEquals(expKvList.size(), result.size()); } - - private void assertResultEquals(Result result, int i) { - assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); - assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - } - - private void testStartRowStopRowInclusive(Table table, int start, boolean startInclusive, - int stop, boolean stopInclusive) throws IOException { - int actualStart = startInclusive ? start : start + 1; - int actualStop = stopInclusive ? stop + 1 : stop; - int expectedCount = actualStop - actualStart; - Result[] results; - try (ResultScanner scanner = table.getScanner( - new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive) - .withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive))) { - results = scanner.next(expectedCount); - } - assertEquals(expectedCount, results.length); - for (int i = 0; i < expectedCount; i++) { - assertResultEquals(results[i], actualStart + i); - } - } - - private void testReversedStartRowStopRowInclusive(Table table, int start, boolean startInclusive, - int stop, boolean stopInclusive) throws IOException { - int actualStart = startInclusive ? start : start - 1; - int actualStop = stopInclusive ? stop - 1 : stop; - int expectedCount = actualStart - actualStop; - Result[] results; - try (ResultScanner scanner = table.getScanner( - new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive) - .withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive) - .setReversed(true))) { - results = scanner.next(expectedCount); - } - assertEquals(expectedCount, results.length); - for (int i = 0; i < expectedCount; i++) { - assertResultEquals(results[i], actualStart - i); - } - } - - @Test - public void testStartRowStopRowInclusive() throws IOException, InterruptedException { - TableName tableName = TableName.valueOf("testStartRowStopRowInclusive"); - byte[][] splitKeys = new byte[8][]; - for (int i = 11; i < 99; i += 11) { - splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i)); - } - Table table = TEST_UTIL.createTable(tableName, FAMILY, splitKeys); - TEST_UTIL.waitTableAvailable(tableName); - try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(tableName)) { - for (int i = 0; i < 100; i++) { - mutator.mutate(new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, QUALIFIER, - Bytes.toBytes(i))); - } - } - // from first region to last region - testStartRowStopRowInclusive(table, 1, true, 98, false); - testStartRowStopRowInclusive(table, 12, true, 34, true); - testStartRowStopRowInclusive(table, 23, true, 45, false); - testStartRowStopRowInclusive(table, 34, false, 56, true); - testStartRowStopRowInclusive(table, 45, false, 67, false); - - // from last region to first region - testReversedStartRowStopRowInclusive(table, 98, true, 1, false); - testReversedStartRowStopRowInclusive(table, 54, true, 32, true); - testReversedStartRowStopRowInclusive(table, 65, true, 43, false); - testReversedStartRowStopRowInclusive(table, 76, false, 54, true); - testReversedStartRowStopRowInclusive(table, 87, false, 65, false); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java new file mode 100644 index 0000000..728a8f9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Testcase for newly added feature in HBASE-17143, such as startRow and stopRow + * inclusive/exclusive, limit for rows, etc. + */ +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestScannersFromClientSide2 { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("scan"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] CQ1 = Bytes.toBytes("cq1"); + + private static byte[] CQ2 = Bytes.toBytes("cq2"); + + @Parameter(0) + public boolean batch; + + @Parameter(1) + public boolean smallResultSize; + + @Parameter(2) + public boolean allowPartial; + + @Parameters(name = "{index}: batch={0}, smallResultSize={1}, allowPartial={2}") + public static List params() { + List params = new ArrayList<>(); + boolean[] values = new boolean[] { false, true }; + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + for (int k = 0; k < 2; k++) { + params.add(new Object[] { values[i], values[j], values[k] }); + } + } + } + return params; + } + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + List puts = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + puts.add(new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))); + } + TEST_UTIL.waitTableAvailable(TABLE_NAME); + table.put(puts); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private Scan createScan() { + Scan scan = new Scan(); + if (batch) { + scan.setBatch(1); + } + if (smallResultSize) { + scan.setMaxResultSize(1); + } + if (allowPartial) { + scan.setAllowPartialResults(true); + } + return scan; + } + + private void assertResultEquals(Result result, int i) { + assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); + assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2))); + } + + private List doScan(Scan scan) throws IOException { + List results = new ArrayList<>(); + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME); + ResultScanner scanner = table.getScanner(scan)) { + for (Result r; (r = scanner.next()) != null;) { + results.add(r); + } + } + return assertAndCreateCompleteResults(results); + } + + private List assertAndCreateCompleteResults(List results) throws IOException { + if ((!batch && !allowPartial) || (allowPartial && !batch && !smallResultSize)) { + for (Result result : results) { + assertFalse("Should not have partial result", result.hasMoreCellsInRow()); + } + return results; + } + List completeResults = new ArrayList<>(); + List partialResults = new ArrayList<>(); + for (Result result : results) { + if (!result.hasMoreCellsInRow()) { + assertFalse("Should have partial result", partialResults.isEmpty()); + partialResults.add(result); + completeResults.add(Result.createCompleteResult(partialResults)); + partialResults.clear(); + } else { + partialResults.add(result); + } + } + assertTrue("Should not have orphan partial result", partialResults.isEmpty()); + return completeResults; + } + + private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, + int limit) throws Exception { + Scan scan = + createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) + .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); + if (limit > 0) { + scan.setLimit(limit); + } + List results = doScan(scan); + int actualStart = startInclusive ? start : start + 1; + int actualStop = stopInclusive ? stop + 1 : stop; + int count = actualStop - actualStart; + if (limit > 0) { + count = Math.min(count, limit); + } + assertEquals(count, results.size()); + for (int i = 0; i < count; i++) { + assertResultEquals(results.get(i), actualStart + i); + } + } + + private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive, + int limit) throws Exception { + Scan scan = createScan() + .withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) + .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true); + if (limit > 0) { + scan.setLimit(limit); + } + List results = doScan(scan); + int actualStart = startInclusive ? start : start - 1; + int actualStop = stopInclusive ? stop - 1 : stop; + int count = actualStart - actualStop; + if (limit > 0) { + count = Math.min(count, limit); + } + assertEquals(count, results.size()); + for (int i = 0; i < count; i++) { + assertResultEquals(results.get(i), actualStart - i); + } + } + + @Test + public void testScanWithLimit() throws Exception { + testScan(1, true, 998, false, 900); // from first region to last region + testScan(123, true, 345, true, 100); + testScan(234, true, 456, false, 100); + testScan(345, false, 567, true, 100); + testScan(456, false, 678, false, 100); + + } + + @Test + public void testScanWithLimitGreaterThanActualCount() throws Exception { + testScan(1, true, 998, false, 1000); // from first region to last region + testScan(123, true, 345, true, 200); + testScan(234, true, 456, false, 200); + testScan(345, false, 567, true, 200); + testScan(456, false, 678, false, 200); + } + + public void testReversedScanWithLimit() throws Exception { + testReversedScan(998, true, 1, false, 900); // from last region to first region + testReversedScan(543, true, 321, true, 100); + testReversedScan(654, true, 432, false, 100); + testReversedScan(765, false, 543, true, 100); + testReversedScan(876, false, 654, false, 100); + } + + @Test + public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { + testReversedScan(998, true, 1, false, 1000); // from last region to first region + testReversedScan(543, true, 321, true, 200); + testReversedScan(654, true, 432, false, 200); + testReversedScan(765, false, 543, true, 200); + testReversedScan(876, false, 654, false, 200); + } + + @Test + public void testStartRowStopRowInclusive() throws Exception { + testScan(1, true, 998, false, -1); // from first region to last region + testScan(123, true, 345, true, -1); + testScan(234, true, 456, false, -1); + testScan(345, false, 567, true, -1); + testScan(456, false, 678, false, -1); + } + + @Test + public void testReversedStartRowStopRowInclusive() throws Exception { + testReversedScan(998, true, 1, false, -1); // from last region to first region + testReversedScan(543, true, 321, true, -1); + testReversedScan(654, true, 432, false, -1); + testReversedScan(765, false, 543, true, -1); + testReversedScan(876, false, 654, false, -1); + } +} -- 2.7.4