From 1e7448d69b1ddb68acd99249449fd2940e3b899e Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 10 Jan 2017 14:37:15 +0800 Subject: [PATCH] HBASE-17045 Unify the implementation of small scan and regular scan --- .../hadoop/hbase/client/AsyncClientScanner.java | 24 +- .../hbase/client/AsyncNonMetaRegionLocator.java | 3 +- .../client/AsyncRpcRetryingCallerFactory.java | 69 +- .../AsyncScanSingleRegionRpcRetryingCaller.java | 22 +- .../client/AsyncSmallScanRpcRetryingCaller.java | 183 ----- .../apache/hadoop/hbase/client/AsyncTableBase.java | 23 +- .../apache/hadoop/hbase/client/AsyncTableImpl.java | 5 +- .../hadoop/hbase/client/RawAsyncTableImpl.java | 50 +- .../hadoop/hbase/client/RawScanResultConsumer.java | 4 +- .../java/org/apache/hadoop/hbase/client/Scan.java | 62 +- .../hadoop/hbase/client/ScannerCallable.java | 2 +- .../hbase/shaded/protobuf/RequestConverter.java | 24 +- .../shaded/protobuf/generated/ClientProtos.java | 290 +++++-- .../src/main/protobuf/Client.proto | 2 + .../hbase/protobuf/generated/ClientProtos.java | 293 +++++-- hbase-protocol/src/main/protobuf/Client.proto | 2 + .../hadoop/hbase/regionserver/RSRpcServices.java | 885 +++++++++++---------- .../hadoop/hbase/client/TestAsyncTableScanAll.java | 130 +++ .../hbase/client/TestAsyncTableSmallScan.java | 109 --- .../hadoop/hbase/client/TestRawAsyncTableScan.java | 5 - 20 files changed, 1147 insertions(+), 1040 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index d7a3ed1..1483cba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; /** * The asynchronous client scanner implementation. @@ -85,12 +86,16 @@ class AsyncClientScanner { public final ClientService.Interface stub; - public final long scannerId; + public final HBaseRpcController controller; - public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) { + public final ScanResponse resp; + + public OpenScannerResponse(HRegionLocation loc, Interface stub, HBaseRpcController controller, + ScanResponse resp) { this.loc = loc; this.stub = stub; - this.scannerId = scannerId; + this.controller = controller; + this.resp = resp; } } @@ -98,14 +103,14 @@ class AsyncClientScanner { HRegionLocation loc, ClientService.Interface stub) { CompletableFuture future = new CompletableFuture<>(); try { - ScanRequest request = - RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, 0, false); + ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), + scan, scan.getCaching(), false); stub.scan(controller, request, resp -> { if (controller.failed()) { future.completeExceptionally(controller.getFailed()); return; } - future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId())); + future.complete(new OpenScannerResponse(loc, stub, controller, resp)); }); } catch (IOException e) { future.completeExceptionally(e); @@ -114,10 +119,11 @@ class AsyncClientScanner { } private void startScan(OpenScannerResponse resp) { - conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub) - .setScan(scan).consumer(consumer).resultCache(resultCache) + conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) + .stub(resp.stub).setScan(scan).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start().whenComplete((hasMore, error) -> { + .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start(resp.controller, resp.resp) + .whenComplete((hasMore, error) -> { if (error != null) { consumer.onError(error); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index ae79b65..6b1d1d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -376,7 +376,8 @@ class AsyncNonMetaRegionLocator { metaKey = createRegionName(tableName, req.row, NINES, false); } conn.getRawTable(META_TABLE_NAME) - .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) + .scanAll( + new Scan().withStartRow(metaKey).setReversed(true).addFamily(CATALOG_FAMILY).setLimit(1)) .whenComplete((results, error) -> onScanComplete(tableName, req, results, error)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 55c56ab..c393afe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -29,7 +29,9 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; /** * Factory to create an AsyncRpcRetryCaller. @@ -114,66 +116,6 @@ class AsyncRpcRetryingCallerFactory { return new SingleRequestCallerBuilder<>(); } - public class SmallScanCallerBuilder { - - private TableName tableName; - - private Scan scan; - - private int limit; - - private long scanTimeoutNs = -1L; - - private long rpcTimeoutNs = -1L; - - public SmallScanCallerBuilder table(TableName tableName) { - this.tableName = tableName; - return this; - } - - public SmallScanCallerBuilder setScan(Scan scan) { - this.scan = scan; - return this; - } - - public SmallScanCallerBuilder limit(int limit) { - this.limit = limit; - return this; - } - - public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { - this.scanTimeoutNs = unit.toNanos(scanTimeout); - return this; - } - - public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { - this.rpcTimeoutNs = unit.toNanos(rpcTimeout); - return this; - } - - public AsyncSmallScanRpcRetryingCaller build() { - TableName tableName = checkNotNull(this.tableName, "tableName is null"); - Scan scan = checkNotNull(this.scan, "scan is null"); - checkArgument(limit > 0, "invalid limit %d", limit); - return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs, - rpcTimeoutNs); - } - - /** - * Shortcut for {@code build().call()} - */ - public CompletableFuture> call() { - return build().call(); - } - } - - /** - * Create retry caller for small scan. - */ - public SmallScanCallerBuilder smallScan() { - return new SmallScanCallerBuilder(); - } - public class ScanSingleRegionCallerBuilder { private long scannerId = -1L; @@ -244,10 +186,11 @@ class AsyncRpcRetryingCallerFactory { } /** - * Short cut for {@code build().start()}. + * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}. */ - public CompletableFuture start() { - return build().start(); + public CompletableFuture start(HBaseRpcController controller, + ScanResponse respWhenOpen) { + return build().start(controller, respWhenOpen); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index dae88a7..41a1e16 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -245,7 +245,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { } } - private void onComplete(ScanResponse resp) { + private void onComplete(HBaseRpcController controller, ScanResponse resp) { if (controller.failed()) { onError(controller.getFailed()); return; @@ -288,6 +288,13 @@ class AsyncScanSingleRegionRpcRetryingCaller { completeNoMoreResults(); return; } + if (scan.getLimit() > 0) { + // The RS should have set the moreResults field in ScanResponse to false when we have reached + // the limit. + int limit = scan.getLimit() - results.length; + assert limit > 0; + scan.setLimit(limit); + } // as in 2.0 this value will always be set if (!resp.getMoreResultsInRegion()) { completeWhenNoMoreResultsInRegion.run(); @@ -299,8 +306,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void call() { resetController(controller, rpcTimeoutNs); ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, - nextCallSeq, false, false); - stub.scan(controller, req, this::onComplete); + nextCallSeq, false, false, scan.getLimit()); + stub.scan(controller, req, resp -> onComplete(controller, resp)); } private void next() { @@ -312,10 +319,15 @@ class AsyncScanSingleRegionRpcRetryingCaller { } /** + * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also + * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the + * open scanner request is also needed because we may have some data in the CellScanner which is + * contained in the controller. * @return {@code true} if we should continue, otherwise {@code false}. */ - public CompletableFuture start() { - next(); + public CompletableFuture start(HBaseRpcController controller, + ScanResponse respWhenOpen) { + onComplete(controller, respWhenOpen); return future; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java deleted file mode 100644 index 6ffa30a..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; -import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; -import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; - -/** - * Retry caller for smaller scan. - */ -@InterfaceAudience.Private -class AsyncSmallScanRpcRetryingCaller { - - private final AsyncConnectionImpl conn; - - private final TableName tableName; - - private final Scan scan; - - private final int limit; - - private final long scanTimeoutNs; - - private final long rpcTimeoutNs; - - private final Function nextScan; - - private final List resultList; - - private final CompletableFuture> future; - - public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan, - int limit, long scanTimeoutNs, long rpcTimeoutNs) { - this.conn = conn; - this.tableName = tableName; - this.scan = scan; - this.limit = limit; - this.scanTimeoutNs = scanTimeoutNs; - this.rpcTimeoutNs = rpcTimeoutNs; - if (scan.isReversed()) { - this.nextScan = this::reversedNextScan; - } else { - this.nextScan = this::nextScan; - } - this.resultList = new ArrayList<>(); - this.future = new CompletableFuture<>(); - } - - private static final class SmallScanResponse { - - public final Result[] results; - - public final HRegionInfo currentRegion; - - public final boolean hasMoreResultsInRegion; - - public SmallScanResponse(Result[] results, HRegionInfo currentRegion, - boolean hasMoreResultsInRegion) { - this.results = results; - this.currentRegion = currentRegion; - this.hasMoreResultsInRegion = hasMoreResultsInRegion; - } - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "Findbugs seems to be confused by lambda expression.") - private CompletableFuture scan(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub) { - CompletableFuture future = new CompletableFuture<>(); - ScanRequest req; - try { - req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, - limit - resultList.size(), true); - } catch (IOException e) { - future.completeExceptionally(e); - return future; - } - stub.scan(controller, req, resp -> { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - } else { - try { - Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp); - future.complete( - new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion())); - } catch (IOException e) { - future.completeExceptionally(e); - } - } - }); - return future; - } - - private void onComplete(SmallScanResponse resp) { - resultList.addAll(Arrays.asList(resp.results)); - if (resultList.size() == limit) { - future.complete(resultList); - return; - } - if (resp.hasMoreResultsInRegion) { - if (resp.results.length > 0) { - scan.withStartRow(resp.results[resp.results.length - 1].getRow(), false); - } - scan(); - return; - } - if (!nextScan.apply(resp.currentRegion)) { - future.complete(resultList); - } - } - - private void scan() { - conn.callerFactory. single().table(tableName).row(scan.getStartRow()) - .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::scan).call() - .whenComplete((resp, error) -> { - if (error != null) { - future.completeExceptionally(error); - } else { - onComplete(resp); - } - }); - } - - public CompletableFuture> call() { - scan(); - return future; - } - - private boolean nextScan(HRegionInfo info) { - if (noMoreResultsForScan(scan, info)) { - return false; - } else { - scan.withStartRow(info.getEndKey()); - scan(); - return true; - } - } - - private boolean reversedNextScan(HRegionInfo info) { - if (noMoreResultsForReverseScan(scan, info)) { - return false; - } else { - scan.withStartRow(info.getStartKey(), false); - scan(); - return true; - } - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index 19a22c0..b9050df 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -331,26 +331,15 @@ public interface AsyncTableBase { CompareOp compareOp, byte[] value, RowMutations mutation); /** - * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}. - * @see #smallScan(Scan, int) - */ - default CompletableFuture> smallScan(Scan scan) { - return smallScan(scan, Integer.MAX_VALUE); - } - - /** - * Return all the results that match the given scan object. The number of the returned results - * will not be greater than {@code limit}. + * Return all the results that match the given scan object. *

- * Notice that the scan must be small, and should not use batch or allowPartialResults. The - * {@code caching} property of the scan object is also ignored as we will use {@code limit} - * instead. + * Notice that usually you should use method with a {@link Scan} object that has limit set, + * otherwise it is likely to cause OOM. * @param scan A configured {@link Scan} object. - * @param limit the limit of results count * @return The results of this small scan operation. The return value will be wrapped by a * {@link CompletableFuture}. */ - CompletableFuture> smallScan(Scan scan, int limit); + CompletableFuture> scanAll(Scan scan); /** * Extracts certain cells from the given rows, in batch. @@ -386,8 +375,8 @@ public interface AsyncTableBase { * @return A list of {@link CompletableFuture}s that represent the existence for each get. */ default List> exists(List gets) { - return get(toCheckExistenceOnly(gets)).stream().map(f -> f.thenApply(r -> r.getExists())) - .collect(toList()); + return get(toCheckExistenceOnly(gets)).stream() + .> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 7281185..9b3b177 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -159,8 +159,8 @@ class AsyncTableImpl implements AsyncTable { } @Override - public CompletableFuture> smallScan(Scan scan, int limit) { - return wrap(rawTable.smallScan(scan, limit)); + public CompletableFuture> scanAll(Scan scan) { + return wrap(rawTable.scanAll(scan)); } private long resultSize2CacheSize(long maxResultSize) { @@ -197,4 +197,5 @@ class AsyncTableImpl implements AsyncTable { public List> batch(List actions) { return rawTable. batch(actions).stream().map(this::wrap).collect(Collectors.toList()); } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 347c85b..fc5bbbe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -20,13 +20,13 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -57,8 +57,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; @InterfaceAudience.Private class RawAsyncTableImpl implements RawAsyncTable { - private static final Log LOG = LogFactory.getLog(RawAsyncTableImpl.class); - private final AsyncConnectionImpl conn; private final TableName tableName; @@ -320,12 +318,6 @@ class RawAsyncTableImpl implements RawAsyncTable { .call(); } - private CompletableFuture failedFuture(Throwable error) { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(error); - return future; - } - private Scan setDefaultScanConfig(Scan scan) { // always create a new scan object as we may reset the start row later. Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); @@ -339,26 +331,35 @@ class RawAsyncTableImpl implements RawAsyncTable { } @Override - public CompletableFuture> smallScan(Scan scan, int limit) { - if (!scan.isSmall()) { - return failedFuture(new IllegalArgumentException("Only small scan is allowed")); - } - if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { - return failedFuture( - new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); - } - return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan)) - .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) - .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + public CompletableFuture> scanAll(Scan scan) { + CompletableFuture> future = new CompletableFuture<>(); + List scanResults = new ArrayList<>(); + scan(scan, new RawScanResultConsumer() { + + @Override + public boolean onNext(Result[] results) { + scanResults.addAll(Arrays.asList(results)); + return true; + } + + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } + + @Override + public void onComplete() { + future.complete(scanResults); + } + }); + return future; } public void scan(Scan scan, RawScanResultConsumer consumer) { - if (scan.isSmall()) { + if (scan.isSmall() || scan.getLimit() > 0) { if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { consumer.onError( new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); - } else { - LOG.warn("This is small scan " + scan + ", consider using smallScan directly?"); } } scan = setDefaultScanConfig(scan); @@ -413,4 +414,5 @@ class RawAsyncTableImpl implements RawAsyncTable { .readRpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS) .writeRpcTimeout(writeRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); } + } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java index 7f0514c..2e5d422 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -47,7 +47,9 @@ public interface RawScanResultConsumer { * This method give you a chance to terminate a slow scan operation. * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} */ - boolean onHeartbeat(); + default boolean onHeartbeat() { + return true; + } /** * Indicate that we hit an unrecoverable error and the scan operation is terminated. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index a2d9037..685f940 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -86,9 +86,9 @@ public class Scan extends Query { private static final String RAW_ATTR = "_raw_"; - private byte [] startRow = HConstants.EMPTY_START_ROW; + private byte[] startRow = HConstants.EMPTY_START_ROW; private boolean includeStartRow = true; - private byte [] stopRow = HConstants.EMPTY_END_ROW; + private byte[] stopRow = HConstants.EMPTY_END_ROW; private boolean includeStopRow = false; private int maxVersions = 1; private int batch = -1; @@ -172,6 +172,12 @@ public class Scan extends Query { private long mvccReadPoint = -1L; /** + * The number of rows we want for this scan. We will terminate the scan if the number of return + * rows reaches this value. + */ + private int limit = -1; + + /** * Create a Scan operation across all rows. */ public Scan() {} @@ -257,6 +263,7 @@ public class Scan extends Query { setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } this.mvccReadPoint = scan.getMvccReadPoint(); + this.limit = scan.getLimit(); } /** @@ -974,23 +981,20 @@ public class Scan extends Query { /** * Set whether this scan is a small scan *

- * Small scan should use pread and big scan can use seek + read - * - * seek + read is fast but can cause two problem (1) resource contention (2) - * cause too much network io - * - * [89-fb] Using pread for non-compaction read request - * https://issues.apache.org/jira/browse/HBASE-7266 - * - * On the other hand, if setting it true, we would do - * openScanner,next,closeScanner in one RPC call. It means the better - * performance for small scan. [HBASE-9488]. - * - * Generally, if the scan range is within one data block(64KB), it could be - * considered as a small scan. - * + * Small scan should use pread and big scan can use seek + read seek + read is fast but can cause + * two problem (1) resource contention (2) cause too much network io [89-fb] Using pread for + * non-compaction read request https://issues.apache.org/jira/browse/HBASE-7266 On the other hand, + * if setting it true, we would do openScanner,next,closeScanner in one RPC call. It means the + * better performance for small scan. [HBASE-9488]. Generally, if the scan range is within one + * data block(64KB), it could be considered as a small scan. * @param small + * @deprecated use {@link #setLimit(int)} instead. Now we always use pread for a user scan. And + * for the one rpc optimization, now we will also fetch data when openScanner, and if + * the number of rows reaches the limit then we will close the scanner automatically + * which means we will fall back to one rpc. + * @see #setLimit(int) */ + @Deprecated public Scan setSmall(boolean small) { this.small = small; return this; @@ -999,7 +1003,9 @@ public class Scan extends Query { /** * Get whether this scan is a small scan * @return true if small scan + * @deprecated see the comment of {@link #setSmall(boolean)} */ + @Deprecated public boolean isSmall() { return small; } @@ -1081,6 +1087,28 @@ public class Scan extends Query { } /** + * @return the limit of rows for this scan + */ + public int getLimit() { + return limit; + } + + /** + * Set the limit of rows for this scan. We will terminate the scan if the number of returned rows + * reaches this value. + *

+ * This condition will be tested at last, after all other conditions such as stopRow, filter, etc. + *

+ * Can not be used together with batch and allowPartial. + * @param limit the limit of rows for this scan + * @return + */ + public Scan setLimit(int limit) { + this.limit = limit; + return this; + } + + /** * Get the mvcc read point used to open a scanner. */ long getMvccReadPoint() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 642fae0..f867acb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -194,7 +194,7 @@ public class ScannerCallable extends ClientServiceCallable { try { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, - this.scanMetrics != null, renew); + this.scanMetrics != null, renew, -1); ScanResponse response = null; response = getStub().scan(getRpcController(), request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 4acb525..15df026 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -463,11 +463,10 @@ public final class RequestConverter { * @return a scan request * @throws IOException */ - public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan, - final int numberOfRows, final boolean closeScanner) throws IOException { + public static ScanRequest buildScanRequest(byte[] regionName, Scan scan, int numberOfRows, + boolean closeScanner) throws IOException { ScanRequest.Builder builder = ScanRequest.newBuilder(); - RegionSpecifier region = buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); + RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); builder.setRegion(region); @@ -475,19 +474,21 @@ public final class RequestConverter { builder.setClientHandlesPartials(true); builder.setClientHandlesHeartbeats(true); builder.setTrackScanMetrics(scan.isScanMetricsEnabled()); + if (scan.getLimit() > 0) { + builder.setLimitOfRows(scan.getLimit()); + } return builder.build(); } /** * Create a protocol buffer ScanRequest for a scanner id - * * @param scannerId * @param numberOfRows * @param closeScanner * @return a scan request */ - public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, - final boolean closeScanner, final boolean trackMetrics) { + public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner, + boolean trackMetrics) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); @@ -500,16 +501,14 @@ public final class RequestConverter { /** * Create a protocol buffer ScanRequest for a scanner id - * * @param scannerId * @param numberOfRows * @param closeScanner * @param nextCallSeq * @return a scan request */ - public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, - final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics, - final boolean renew) { + public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner, + long nextCallSeq, boolean trackMetrics, boolean renew, int limitOfRows) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); @@ -519,6 +518,9 @@ public final class RequestConverter { builder.setClientHandlesHeartbeats(true); builder.setTrackScanMetrics(trackMetrics); builder.setRenew(renew); + if (limitOfRows > 0) { + builder.setLimitOfRows(limitOfRows); + } return builder.build(); } diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java index ef44295..694ec82 100644 --- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java +++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java @@ -17898,6 +17898,23 @@ public final class ClientProtos { * optional bool renew = 10 [default = false]; */ boolean getRenew(); + + /** + *

+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + boolean hasLimitOfRows(); + /** + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + int getLimitOfRows(); } /** *
@@ -17930,6 +17947,7 @@ public final class ClientProtos {
       clientHandlesHeartbeats_ = false;
       trackScanMetrics_ = false;
       renew_ = false;
+      limitOfRows_ = 0;
     }
 
     @java.lang.Override
@@ -18026,6 +18044,11 @@ public final class ClientProtos {
               renew_ = input.readBool();
               break;
             }
+            case 88: {
+              bitField0_ |= 0x00000400;
+              limitOfRows_ = input.readUInt32();
+              break;
+            }
           }
         }
       } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -18213,6 +18236,29 @@ public final class ClientProtos {
       return renew_;
     }
 
+    public static final int LIMIT_OF_ROWS_FIELD_NUMBER = 11;
+    private int limitOfRows_;
+    /**
+     * 
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public boolean hasLimitOfRows() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public int getLimitOfRows() { + return limitOfRows_; + } + private byte memoizedIsInitialized = -1; public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; @@ -18267,6 +18313,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000200) == 0x00000200)) { output.writeBool(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + output.writeUInt32(11, limitOfRows_); + } unknownFields.writeTo(output); } @@ -18315,6 +18364,10 @@ public final class ClientProtos { size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream .computeBoolSize(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream + .computeUInt32Size(11, limitOfRows_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -18382,6 +18435,11 @@ public final class ClientProtos { result = result && (getRenew() == other.getRenew()); } + result = result && (hasLimitOfRows() == other.hasLimitOfRows()); + if (hasLimitOfRows()) { + result = result && (getLimitOfRows() + == other.getLimitOfRows()); + } result = result && unknownFields.equals(other.unknownFields); return result; } @@ -18440,6 +18498,10 @@ public final class ClientProtos { hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashBoolean( getRenew()); } + if (hasLimitOfRows()) { + hash = (37 * hash) + LIMIT_OF_ROWS_FIELD_NUMBER; + hash = (53 * hash) + getLimitOfRows(); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -18599,6 +18661,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000100); renew_ = false; bitField0_ = (bitField0_ & ~0x00000200); + limitOfRows_ = 0; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -18671,6 +18735,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000200; } result.renew_ = renew_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000400; + } + result.limitOfRows_ = limitOfRows_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18743,6 +18811,9 @@ public final class ClientProtos { if (other.hasRenew()) { setRenew(other.getRenew()); } + if (other.hasLimitOfRows()) { + setLimitOfRows(other.getLimitOfRows()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -19272,6 +19343,54 @@ public final class ClientProtos { onChanged(); return this; } + + private int limitOfRows_ ; + /** + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public boolean hasLimitOfRows() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public int getLimitOfRows() { + return limitOfRows_; + } + /** + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public Builder setLimitOfRows(int value) { + bitField0_ |= 0x00000400; + limitOfRows_ = value; + onChanged(); + return this; + } + /** + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public Builder clearLimitOfRows() { + bitField0_ = (bitField0_ & ~0x00000400); + limitOfRows_ = 0; + onChanged(); + return this; + } public final Builder setUnknownFields( final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) { return super.setUnknownFields(unknownFields); @@ -40850,7 +40969,7 @@ public final class ClientProtos { "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" + "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024", " \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004true" + - "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\246\002\n\013Sc" + + "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\300\002\n\013Sc" + "anRequest\022)\n\006region\030\001 \001(\0132\031.hbase.pb.Reg" + "ionSpecifier\022\034\n\004scan\030\002 \001(\0132\016.hbase.pb.Sc" + "an\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_of_rows" + @@ -40858,89 +40977,90 @@ public final class ClientProtos { "ll_seq\030\006 \001(\004\022\037\n\027client_handles_partials\030" + "\007 \001(\010\022!\n\031client_handles_heartbeats\030\010 \001(\010" + "\022\032\n\022track_scan_metrics\030\t \001(\010\022\024\n\005renew\030\n " + - "\001(\010:\005false\"\266\002\n\014ScanResponse\022\030\n\020cells_per", - "_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mor" + - "e_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030" + - "\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022\037" + - "\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more_" + - "results_in_region\030\010 \001(\010\022\031\n\021heartbeat_mes" + - "sage\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbase" + - ".pb.ScanMetrics\022\032\n\017mvcc_read_point\030\013 \001(\004" + - ":\0010\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001" + - " \002(\0132\031.hbase.pb.RegionSpecifier\022>\n\013famil" + - "y_path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReq", - "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022" + - "+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationT" + - "oken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 " + - "\001(\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014" + - "\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022" + - "\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nid" + - "entifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind" + - "\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLo" + - "adRequest\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb" + - ".TableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Re", - "gionSpecifier\"-\n\027PrepareBulkLoadResponse" + - "\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadR" + - "equest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001" + - "(\0132\031.hbase.pb.RegionSpecifier\"\031\n\027Cleanup" + - "BulkLoadResponse\"a\n\026CoprocessorServiceCa" + - "ll\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n" + - "\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030C" + - "oprocessorServiceResult\022&\n\005value\030\001 \001(\0132\027" + - ".hbase.pb.NameBytesPair\"v\n\031CoprocessorSe" + - "rviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.", - "RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb" + - ".CoprocessorServiceCall\"o\n\032CoprocessorSe" + - "rviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb" + - ".RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase." + - "pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001" + - "(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutation" + - "Proto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014ser" + - "vice_call\030\004 \001(\0132 .hbase.pb.CoprocessorSe" + - "rviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(" + - "\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002", - " \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c" + - "\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:" + - "\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compacti" + - "onPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadSt" + - "ats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpe" + - "cifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLo" + - "adStats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001" + - " \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*" + - "\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesPa" + - "ir\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Co", - "processorServiceResult\0220\n\tloadStats\030\005 \001(" + - "\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Regi" + - "onActionResult\0226\n\021resultOrException\030\001 \003(" + - "\0132\033.hbase.pb.ResultOrException\022*\n\texcept" + - "ion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mu" + - "ltiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase" + - ".pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\t" + - "condition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n" + - "\rMultiResponse\0228\n\022regionActionResult\030\001 \003" + - "(\0132\034.hbase.pb.RegionActionResult\022\021\n\tproc", - "essed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036." + - "hbase.pb.MultiRegionLoadStats*\'\n\013Consist" + - "ency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClien" + - "tService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025." + - "hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.p" + - "b.MutateRequest\032\030.hbase.pb.MutateRespons" + - "e\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase" + - ".pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbas" + - "e.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bul" + - "kLoadHFileResponse\022V\n\017PrepareBulkLoad\022 .", - "hbase.pb.PrepareBulkLoadRequest\032!.hbase." + - "pb.PrepareBulkLoadResponse\022V\n\017CleanupBul" + - "kLoad\022 .hbase.pb.CleanupBulkLoadRequest\032" + - "!.hbase.pb.CleanupBulkLoadResponse\022X\n\013Ex" + - "ecService\022#.hbase.pb.CoprocessorServiceR" + - "equest\032$.hbase.pb.CoprocessorServiceResp" + - "onse\022d\n\027ExecRegionServerService\022#.hbase." + - "pb.CoprocessorServiceRequest\032$.hbase.pb." + - "CoprocessorServiceResponse\0228\n\005Multi\022\026.hb" + - "ase.pb.MultiRequest\032\027.hbase.pb.MultiResp", - "onseBI\n1org.apache.hadoop.hbase.shaded.p" + - "rotobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "\001(\010:\005false\022\030\n\rlimit_of_rows\030\013 \001(\r:\0010\"\266\002\n", + "\014ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r\022" + + "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" + + "\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase." + + "pb.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag" + + "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" + + "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" + + "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" + + "s\022\032\n\017mvcc_read_point\030\013 \001(\004:\0010\"\240\002\n\024BulkLo" + + "adHFileRequest\022)\n\006region\030\001 \002(\0132\031.hbase.p" + + "b.RegionSpecifier\022>\n\013family_path\030\002 \003(\0132)", + ".hbase.pb.BulkLoadHFileRequest.FamilyPat" + + "h\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n\010fs_token\030\004 " + + "\001(\0132\031.hbase.pb.DelegationToken\022\022\n\nbulk_t" + + "oken\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(\010:\005false\032*\n\n" + + "FamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t" + + "\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(" + + "\010\"V\n\017DelegationToken\022\022\n\nidentifier\030\001 \001(\014" + + "\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007serv" + + "ice\030\004 \001(\t\"l\n\026PrepareBulkLoadRequest\022\'\n\nt" + + "able_name\030\001 \002(\0132\023.hbase.pb.TableName\022)\n\006", + "region\030\002 \001(\0132\031.hbase.pb.RegionSpecifier\"" + + "-\n\027PrepareBulkLoadResponse\022\022\n\nbulk_token" + + "\030\001 \002(\t\"W\n\026CleanupBulkLoadRequest\022\022\n\nbulk" + + "_token\030\001 \002(\t\022)\n\006region\030\002 \001(\0132\031.hbase.pb." + + "RegionSpecifier\"\031\n\027CleanupBulkLoadRespon" + + "se\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(" + + "\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003" + + " \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030CoprocessorServ" + + "iceResult\022&\n\005value\030\001 \001(\0132\027.hbase.pb.Name" + + "BytesPair\"v\n\031CoprocessorServiceRequest\022)", + "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" + + "r\022.\n\004call\030\002 \002(\0132 .hbase.pb.CoprocessorSe" + + "rviceCall\"o\n\032CoprocessorServiceResponse\022" + + ")\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifi" + + "er\022&\n\005value\030\002 \002(\0132\027.hbase.pb.NameBytesPa" + + "ir\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r\022)\n\010mutation" + + "\030\002 \001(\0132\027.hbase.pb.MutationProto\022\032\n\003get\030\003" + + " \001(\0132\r.hbase.pb.Get\0226\n\014service_call\030\004 \001(" + + "\0132 .hbase.pb.CoprocessorServiceCall\"k\n\014R" + + "egionAction\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R", + "egionSpecifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action" + + "\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017RegionLoadSt" + + "ats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccu" + + "pancy\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001" + + "(\005:\0010\"j\n\024MultiRegionLoadStats\022)\n\006region\030" + + "\001 \003(\0132\031.hbase.pb.RegionSpecifier\022\'\n\004stat" + + "\030\002 \003(\0132\031.hbase.pb.RegionLoadStats\"\336\001\n\021Re" + + "sultOrException\022\r\n\005index\030\001 \001(\r\022 \n\006result" + + "\030\002 \001(\0132\020.hbase.pb.Result\022*\n\texception\030\003 " + + "\001(\0132\027.hbase.pb.NameBytesPair\022:\n\016service_", + "result\030\004 \001(\0132\".hbase.pb.CoprocessorServi" + + "ceResult\0220\n\tloadStats\030\005 \001(\0132\031.hbase.pb.R" + + "egionLoadStatsB\002\030\001\"x\n\022RegionActionResult" + + "\0226\n\021resultOrException\030\001 \003(\0132\033.hbase.pb.R" + + "esultOrException\022*\n\texception\030\002 \001(\0132\027.hb" + + "ase.pb.NameBytesPair\"x\n\014MultiRequest\022,\n\014" + + "regionAction\030\001 \003(\0132\026.hbase.pb.RegionActi" + + "on\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tcondition\030\003 \001(" + + "\0132\023.hbase.pb.Condition\"\226\001\n\rMultiResponse" + + "\0228\n\022regionActionResult\030\001 \003(\0132\034.hbase.pb.", + "RegionActionResult\022\021\n\tprocessed\030\002 \001(\010\0228\n" + + "\020regionStatistics\030\003 \001(\0132\036.hbase.pb.Multi" + + "RegionLoadStats*\'\n\013Consistency\022\n\n\006STRONG" + + "\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientService\0222\n\003Ge" + + "t\022\024.hbase.pb.GetRequest\032\025.hbase.pb.GetRe" + + "sponse\022;\n\006Mutate\022\027.hbase.pb.MutateReques" + + "t\032\030.hbase.pb.MutateResponse\0225\n\004Scan\022\025.hb" + + "ase.pb.ScanRequest\032\026.hbase.pb.ScanRespon" + + "se\022P\n\rBulkLoadHFile\022\036.hbase.pb.BulkLoadH" + + "FileRequest\032\037.hbase.pb.BulkLoadHFileResp", + "onse\022V\n\017PrepareBulkLoad\022 .hbase.pb.Prepa" + + "reBulkLoadRequest\032!.hbase.pb.PrepareBulk" + + "LoadResponse\022V\n\017CleanupBulkLoad\022 .hbase." + + "pb.CleanupBulkLoadRequest\032!.hbase.pb.Cle" + + "anupBulkLoadResponse\022X\n\013ExecService\022#.hb" + + "ase.pb.CoprocessorServiceRequest\032$.hbase" + + ".pb.CoprocessorServiceResponse\022d\n\027ExecRe" + + "gionServerService\022#.hbase.pb.Coprocessor" + + "ServiceRequest\032$.hbase.pb.CoprocessorSer" + + "viceResponse\0228\n\005Multi\022\026.hbase.pb.MultiRe", + "quest\032\027.hbase.pb.MultiResponseBI\n1org.ap" + + "ache.hadoop.hbase.shaded.protobuf.genera" + + "tedB\014ClientProtosH\001\210\001\001\240\001\001" }; org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -41048,7 +41168,7 @@ public final class ClientProtos { internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_hbase_pb_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", "LimitOfRows", }); internal_static_hbase_pb_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 2793b89..bfdb5b3 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -282,6 +282,8 @@ message ScanRequest { optional bool client_handles_heartbeats = 8; optional bool track_scan_metrics = 9; optional bool renew = 10 [default = false]; + // if we have returned limit_of_rows rows to client, then close the scanner. + optional uint32 limit_of_rows = 11 [default = 0]; } /** diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 087576c..e1f49c4 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -17573,6 +17573,24 @@ public final class ClientProtos { * optional bool renew = 10 [default = false]; */ boolean getRenew(); + + // optional uint32 limit_of_rows = 11 [default = 0]; + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ */ + boolean hasLimitOfRows(); + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ */ + int getLimitOfRows(); } /** * Protobuf type {@code hbase.pb.ScanRequest} @@ -17704,6 +17722,11 @@ public final class ClientProtos { renew_ = input.readBool(); break; } + case 88: { + bitField0_ |= 0x00000400; + limitOfRows_ = input.readUInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -17916,6 +17939,30 @@ public final class ClientProtos { return renew_; } + // optional uint32 limit_of_rows = 11 [default = 0]; + public static final int LIMIT_OF_ROWS_FIELD_NUMBER = 11; + private int limitOfRows_; + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ */ + public boolean hasLimitOfRows() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ */ + public int getLimitOfRows() { + return limitOfRows_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -17927,6 +17974,7 @@ public final class ClientProtos { clientHandlesHeartbeats_ = false; trackScanMetrics_ = false; renew_ = false; + limitOfRows_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -17982,6 +18030,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000200) == 0x00000200)) { output.writeBool(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + output.writeUInt32(11, limitOfRows_); + } getUnknownFields().writeTo(output); } @@ -18031,6 +18082,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(11, limitOfRows_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -18104,6 +18159,11 @@ public final class ClientProtos { result = result && (getRenew() == other.getRenew()); } + result = result && (hasLimitOfRows() == other.hasLimitOfRows()); + if (hasLimitOfRows()) { + result = result && (getLimitOfRows() + == other.getLimitOfRows()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -18157,6 +18217,10 @@ public final class ClientProtos { hash = (37 * hash) + RENEW_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getRenew()); } + if (hasLimitOfRows()) { + hash = (37 * hash) + LIMIT_OF_ROWS_FIELD_NUMBER; + hash = (53 * hash) + getLimitOfRows(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -18309,6 +18373,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000100); renew_ = false; bitField0_ = (bitField0_ & ~0x00000200); + limitOfRows_ = 0; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -18385,6 +18451,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000200; } result.renew_ = renew_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000400; + } + result.limitOfRows_ = limitOfRows_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18431,6 +18501,9 @@ public final class ClientProtos { if (other.hasRenew()) { setRenew(other.getRenew()); } + if (other.hasLimitOfRows()) { + setLimitOfRows(other.getLimitOfRows()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -18968,6 +19041,55 @@ public final class ClientProtos { return this; } + // optional uint32 limit_of_rows = 11 [default = 0]; + private int limitOfRows_ ; + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ */ + public boolean hasLimitOfRows() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ */ + public int getLimitOfRows() { + return limitOfRows_; + } + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ */ + public Builder setLimitOfRows(int value) { + bitField0_ |= 0x00000400; + limitOfRows_ = value; + onChanged(); + return this; + } + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ */ + public Builder clearLimitOfRows() { + bitField0_ = (bitField0_ & ~0x00000400); + limitOfRows_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ScanRequest) } @@ -39928,7 +40050,7 @@ public final class ClientProtos { "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" + "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024", " \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004true" + - "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\246\002\n\013Sc" + + "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\300\002\n\013Sc" + "anRequest\022)\n\006region\030\001 \001(\0132\031.hbase.pb.Reg" + "ionSpecifier\022\034\n\004scan\030\002 \001(\0132\016.hbase.pb.Sc" + "an\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_of_rows" + @@ -39936,89 +40058,90 @@ public final class ClientProtos { "ll_seq\030\006 \001(\004\022\037\n\027client_handles_partials\030" + "\007 \001(\010\022!\n\031client_handles_heartbeats\030\010 \001(\010" + "\022\032\n\022track_scan_metrics\030\t \001(\010\022\024\n\005renew\030\n " + - "\001(\010:\005false\"\266\002\n\014ScanResponse\022\030\n\020cells_per", - "_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mor" + - "e_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030" + - "\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022\037" + - "\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more_" + - "results_in_region\030\010 \001(\010\022\031\n\021heartbeat_mes" + - "sage\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbase" + - ".pb.ScanMetrics\022\032\n\017mvcc_read_point\030\013 \001(\004" + - ":\0010\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001" + - " \002(\0132\031.hbase.pb.RegionSpecifier\022>\n\013famil" + - "y_path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReq", - "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022" + - "+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationT" + - "oken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 " + - "\001(\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014" + - "\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022" + - "\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nid" + - "entifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind" + - "\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLo" + - "adRequest\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb" + - ".TableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Re", - "gionSpecifier\"-\n\027PrepareBulkLoadResponse" + - "\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadR" + - "equest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001" + - "(\0132\031.hbase.pb.RegionSpecifier\"\031\n\027Cleanup" + - "BulkLoadResponse\"a\n\026CoprocessorServiceCa" + - "ll\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n" + - "\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030C" + - "oprocessorServiceResult\022&\n\005value\030\001 \001(\0132\027" + - ".hbase.pb.NameBytesPair\"v\n\031CoprocessorSe" + - "rviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.", - "RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb" + - ".CoprocessorServiceCall\"o\n\032CoprocessorSe" + - "rviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb" + - ".RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase." + - "pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001" + - "(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutation" + - "Proto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014ser" + - "vice_call\030\004 \001(\0132 .hbase.pb.CoprocessorSe" + - "rviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(" + - "\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002", - " \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c" + - "\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:" + - "\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compacti" + - "onPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadSt" + - "ats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpe" + - "cifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLo" + - "adStats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001" + - " \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*" + - "\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesPa" + - "ir\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Co", - "processorServiceResult\0220\n\tloadStats\030\005 \001(" + - "\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Regi" + - "onActionResult\0226\n\021resultOrException\030\001 \003(" + - "\0132\033.hbase.pb.ResultOrException\022*\n\texcept" + - "ion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mu" + - "ltiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase" + - ".pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\t" + - "condition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n" + - "\rMultiResponse\0228\n\022regionActionResult\030\001 \003" + - "(\0132\034.hbase.pb.RegionActionResult\022\021\n\tproc", - "essed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036." + - "hbase.pb.MultiRegionLoadStats*\'\n\013Consist" + - "ency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClien" + - "tService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025." + - "hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.p" + - "b.MutateRequest\032\030.hbase.pb.MutateRespons" + - "e\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase" + - ".pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbas" + - "e.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bul" + - "kLoadHFileResponse\022V\n\017PrepareBulkLoad\022 .", - "hbase.pb.PrepareBulkLoadRequest\032!.hbase." + - "pb.PrepareBulkLoadResponse\022V\n\017CleanupBul" + - "kLoad\022 .hbase.pb.CleanupBulkLoadRequest\032" + - "!.hbase.pb.CleanupBulkLoadResponse\022X\n\013Ex" + - "ecService\022#.hbase.pb.CoprocessorServiceR" + - "equest\032$.hbase.pb.CoprocessorServiceResp" + - "onse\022d\n\027ExecRegionServerService\022#.hbase." + - "pb.CoprocessorServiceRequest\032$.hbase.pb." + - "CoprocessorServiceResponse\0228\n\005Multi\022\026.hb" + - "ase.pb.MultiRequest\032\027.hbase.pb.MultiResp", - "onseBB\n*org.apache.hadoop.hbase.protobuf" + - ".generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "\001(\010:\005false\022\030\n\rlimit_of_rows\030\013 \001(\r:\0010\"\266\002\n", + "\014ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r\022" + + "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" + + "\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase." + + "pb.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag" + + "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" + + "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" + + "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" + + "s\022\032\n\017mvcc_read_point\030\013 \001(\004:\0010\"\240\002\n\024BulkLo" + + "adHFileRequest\022)\n\006region\030\001 \002(\0132\031.hbase.p" + + "b.RegionSpecifier\022>\n\013family_path\030\002 \003(\0132)", + ".hbase.pb.BulkLoadHFileRequest.FamilyPat" + + "h\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n\010fs_token\030\004 " + + "\001(\0132\031.hbase.pb.DelegationToken\022\022\n\nbulk_t" + + "oken\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(\010:\005false\032*\n\n" + + "FamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t" + + "\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(" + + "\010\"V\n\017DelegationToken\022\022\n\nidentifier\030\001 \001(\014" + + "\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007serv" + + "ice\030\004 \001(\t\"l\n\026PrepareBulkLoadRequest\022\'\n\nt" + + "able_name\030\001 \002(\0132\023.hbase.pb.TableName\022)\n\006", + "region\030\002 \001(\0132\031.hbase.pb.RegionSpecifier\"" + + "-\n\027PrepareBulkLoadResponse\022\022\n\nbulk_token" + + "\030\001 \002(\t\"W\n\026CleanupBulkLoadRequest\022\022\n\nbulk" + + "_token\030\001 \002(\t\022)\n\006region\030\002 \001(\0132\031.hbase.pb." + + "RegionSpecifier\"\031\n\027CleanupBulkLoadRespon" + + "se\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(" + + "\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003" + + " \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030CoprocessorServ" + + "iceResult\022&\n\005value\030\001 \001(\0132\027.hbase.pb.Name" + + "BytesPair\"v\n\031CoprocessorServiceRequest\022)", + "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" + + "r\022.\n\004call\030\002 \002(\0132 .hbase.pb.CoprocessorSe" + + "rviceCall\"o\n\032CoprocessorServiceResponse\022" + + ")\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifi" + + "er\022&\n\005value\030\002 \002(\0132\027.hbase.pb.NameBytesPa" + + "ir\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r\022)\n\010mutation" + + "\030\002 \001(\0132\027.hbase.pb.MutationProto\022\032\n\003get\030\003" + + " \001(\0132\r.hbase.pb.Get\0226\n\014service_call\030\004 \001(" + + "\0132 .hbase.pb.CoprocessorServiceCall\"k\n\014R" + + "egionAction\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R", + "egionSpecifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action" + + "\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017RegionLoadSt" + + "ats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccu" + + "pancy\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001" + + "(\005:\0010\"j\n\024MultiRegionLoadStats\022)\n\006region\030" + + "\001 \003(\0132\031.hbase.pb.RegionSpecifier\022\'\n\004stat" + + "\030\002 \003(\0132\031.hbase.pb.RegionLoadStats\"\336\001\n\021Re" + + "sultOrException\022\r\n\005index\030\001 \001(\r\022 \n\006result" + + "\030\002 \001(\0132\020.hbase.pb.Result\022*\n\texception\030\003 " + + "\001(\0132\027.hbase.pb.NameBytesPair\022:\n\016service_", + "result\030\004 \001(\0132\".hbase.pb.CoprocessorServi" + + "ceResult\0220\n\tloadStats\030\005 \001(\0132\031.hbase.pb.R" + + "egionLoadStatsB\002\030\001\"x\n\022RegionActionResult" + + "\0226\n\021resultOrException\030\001 \003(\0132\033.hbase.pb.R" + + "esultOrException\022*\n\texception\030\002 \001(\0132\027.hb" + + "ase.pb.NameBytesPair\"x\n\014MultiRequest\022,\n\014" + + "regionAction\030\001 \003(\0132\026.hbase.pb.RegionActi" + + "on\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tcondition\030\003 \001(" + + "\0132\023.hbase.pb.Condition\"\226\001\n\rMultiResponse" + + "\0228\n\022regionActionResult\030\001 \003(\0132\034.hbase.pb.", + "RegionActionResult\022\021\n\tprocessed\030\002 \001(\010\0228\n" + + "\020regionStatistics\030\003 \001(\0132\036.hbase.pb.Multi" + + "RegionLoadStats*\'\n\013Consistency\022\n\n\006STRONG" + + "\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientService\0222\n\003Ge" + + "t\022\024.hbase.pb.GetRequest\032\025.hbase.pb.GetRe" + + "sponse\022;\n\006Mutate\022\027.hbase.pb.MutateReques" + + "t\032\030.hbase.pb.MutateResponse\0225\n\004Scan\022\025.hb" + + "ase.pb.ScanRequest\032\026.hbase.pb.ScanRespon" + + "se\022P\n\rBulkLoadHFile\022\036.hbase.pb.BulkLoadH" + + "FileRequest\032\037.hbase.pb.BulkLoadHFileResp", + "onse\022V\n\017PrepareBulkLoad\022 .hbase.pb.Prepa" + + "reBulkLoadRequest\032!.hbase.pb.PrepareBulk" + + "LoadResponse\022V\n\017CleanupBulkLoad\022 .hbase." + + "pb.CleanupBulkLoadRequest\032!.hbase.pb.Cle" + + "anupBulkLoadResponse\022X\n\013ExecService\022#.hb" + + "ase.pb.CoprocessorServiceRequest\032$.hbase" + + ".pb.CoprocessorServiceResponse\022d\n\027ExecRe" + + "gionServerService\022#.hbase.pb.Coprocessor" + + "ServiceRequest\032$.hbase.pb.CoprocessorSer" + + "viceResponse\0228\n\005Multi\022\026.hbase.pb.MultiRe", + "quest\032\027.hbase.pb.MultiResponseBB\n*org.ap" + + "ache.hadoop.hbase.protobuf.generatedB\014Cl" + + "ientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -40114,7 +40237,7 @@ public final class ClientProtos { internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", "LimitOfRows", }); internal_static_hbase_pb_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index ae932f7..ad1f3a6 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -282,6 +282,8 @@ message ScanRequest { optional bool client_handles_heartbeats = 8; optional bool track_scan_metrics = 9; optional bool renew = 10 [default = false]; + // if we have returned limit_of_rows rows to client, then close the scanner. + optional uint32 limit_of_rows = 11 [default = 0]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index f550267..409d4fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import org.apache.commons.lang.mutable.MutableObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -282,11 +283,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * An Rpc callback for closing a RegionScanner. */ - static class RegionScannerCloseCallBack implements RpcCallback { + private static final class RegionScannerCloseCallBack implements RpcCallback { private final RegionScanner scanner; - public RegionScannerCloseCallBack(RegionScanner scanner){ + public RegionScannerCloseCallBack(RegionScanner scanner) { this.scanner = scanner; } @@ -348,27 +349,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. */ - private static class RegionScannerHolder { - private AtomicLong nextCallSeq = new AtomicLong(0); - private RegionScanner s; - private Region r; - final RpcCallback closeCallBack; - final RpcCallback shippedCallback; - - public RegionScannerHolder(RegionScanner s, Region r, RpcCallback closeCallBack, - RpcCallback shippedCallback) { + private static final class RegionScannerHolder { + + private final AtomicLong nextCallSeq = new AtomicLong(0); + private final String scannerName; + private final RegionScanner s; + private final Region r; + private final RpcCallback closeCallBack; + private final RpcCallback shippedCallback; + + public RegionScannerHolder(String scannerName, RegionScanner s, Region r, + RpcCallback closeCallBack, RpcCallback shippedCallback) { + this.scannerName = scannerName; this.s = s; this.r = r; this.closeCallBack = closeCallBack; this.shippedCallback = shippedCallback; } - private long getNextCallSeq() { + public long getNextCallSeq() { return nextCallSeq.get(); } - private void incNextCallSeq() { - nextCallSeq.incrementAndGet(); + public boolean incNextCallSeq(long currentSeq) { + // Use CAS to prevent multiple scan request running on the same scanner. + return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); } } @@ -477,19 +482,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - private void addResults(final ScanResponse.Builder builder, final List results, - final RpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { + private void addResults(ScanResponse.Builder builder, List results, + HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { builder.setStale(!isDefaultRegion); - if (results == null || results.isEmpty()) return; + if (results.isEmpty()) return; if (clientCellBlockSupported) { for (Result res : results) { builder.addCellsPerResult(res.size()); builder.addPartialFlagPerResult(res.isPartial()); } - ((HBaseRpcController)controller). - setCellScanner(CellUtil.createCellScanner(results)); + controller.setCellScanner(CellUtil.createCellScanner(results)); } else { - for (Result res: results) { + for (Result res : results) { ClientProtos.Result pbr = ProtobufUtil.toResult(res); builder.addResults(pbr); } @@ -1196,10 +1200,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return lastBlock; } - RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r) + private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r) throws LeaseStillHeldException { Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, - new ScannerListener(scannerName)); + new ScannerListener(scannerName)); RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease); RpcCallback closeCallback; if (s instanceof RpcCallback) { @@ -1207,7 +1211,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else { closeCallback = new RegionScannerCloseCallBack(s); } - RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback); + RegionScannerHolder rsh = + new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback); RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; return rsh; @@ -2644,6 +2649,272 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private RegionScannerHolder getRegionScanner(ScanRequest request) + throws UnknownScannerException, NotServingRegionException, OutOfOrderScannerNextException { + String scannerName = Long.toString(request.getScannerId()); + RegionScannerHolder rsh = scanners.get(scannerName); + if (rsh == null) { + LOG.warn("Client tried to access missing scanner " + scannerName); + throw new UnknownScannerException( + "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " + + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " + + "long wait between consecutive client checkins, c) Server may be closing down, " + + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " + + "possible fix would be increasing the value of" + + "'hbase.client.scanner.timeout.period' configuration."); + } + HRegionInfo hri = rsh.s.getRegionInfo(); + // Yes, should be the same instance + if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { + String msg = "Region was re-opened after the scanner" + scannerName + " was created: " + + hri.getRegionNameAsString(); + scanners.remove(scannerName); + try { + RegionScanner scanner = rsh.s; + LOG.warn(msg + ", closing..."); + scanner.close(); + regionServer.leases.cancelLease(scannerName); + } catch (IOException e) { + LOG.warn("Getting exception closing " + scannerName, e); + } + throw new NotServingRegionException(msg); + } + return rsh; + } + + private Pair newRegionScanner(ScanRequest request, + ScanResponse.Builder builder) throws IOException { + Region region = getRegion(request.getRegion()); + ClientProtos.Scan protoScan = request.getScan(); + boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); + Scan scan = ProtobufUtil.toScan(protoScan); + // if the request doesn't set this, get the default region setting. + if (!isLoadingCfsOnDemandSet) { + scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); + } + + if (!scan.hasFamilies()) { + // Adding all families to scanner + for (byte[] family : region.getTableDesc().getFamiliesKeys()) { + scan.addFamily(family); + } + } + RegionScanner scanner = null; + if (region.getCoprocessorHost() != null) { + scanner = region.getCoprocessorHost().preScannerOpen(scan); + } + if (scanner == null) { + scanner = region.getScanner(scan); + } + if (region.getCoprocessorHost() != null) { + scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); + } + long scannerId = this.scannerIdGen.incrementAndGet(); + builder.setScannerId(scannerId); + builder.setMvccReadPoint(scanner.getMvccReadPoint()); + builder.setTtl(scannerLeaseTimeoutPeriod); + String scannerName = String.valueOf(scannerId); + return Pair.newPair(addScanner(scannerName, scanner, region), scan.isSmall()); + } + + private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) + throws OutOfOrderScannerNextException { + // if nextCallSeq does not match throw Exception straight away. This needs to be + // performed even before checking of Lease. + // See HBASE-5974 + if (request.hasNextCallSeq()) { + long callSeq = request.getNextCallSeq(); + if (!rsh.incNextCallSeq(callSeq)) { + throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq() + + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request=" + + TextFormat.shortDebugString(request)); + } + } + } + + private void addScannerLeaseBack(Leases.Lease lease) { + try { + regionServer.leases.addLease(lease); + } catch (LeaseStillHeldException e) { + // should not happen as the scanner id is unique. + throw new AssertionError(e); + } + } + + private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { + // Set the time limit to be half of the more restrictive timeout value (one of the + // timeout values must be positive). In the event that both values are positive, the + // more restrictive of the two is used to calculate the limit. + if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { + long timeLimitDelta; + if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { + timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); + } else { + timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; + } + if (controller != null) { + timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout()); + } + // Use half of whichever timeout value was more restrictive... But don't allow + // the time limit to be less than the allowable minimum (could cause an + // immediatate timeout before scanning any data). + timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); + return EnvironmentEdgeManager.currentTime() + timeLimitDelta; + } + // Default value of timeLimit is negative to indicate no timeLimit should be + // enforced. + return -1L; + } + + private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, + boolean isSmallScan, long maxQuotaResultSize, int rows, List results, + ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) + throws IOException { + Region region = rsh.r; + RegionScanner scanner = rsh.s; + long maxResultSize; + if (scanner.getMaxResultSize() > 0) { + maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); + } else { + maxResultSize = maxQuotaResultSize; + } + // This is cells inside a row. Default size is 10 so if many versions or many cfs, + // then we'll resize. Resizings show in profiler. Set it higher than 10. For now + // arbitrary 32. TODO: keep record of general size of results being returned. + List values = new ArrayList(32); + region.startRegionOperation(Operation.SCAN); + try { + int i = 0; + long before = EnvironmentEdgeManager.currentTime(); + synchronized (scanner) { + boolean stale = (region.getRegionInfo().getReplicaId() != 0); + boolean clientHandlesPartials = + request.hasClientHandlesPartials() && request.getClientHandlesPartials(); + boolean clientHandlesHeartbeats = + request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); + + // On the server side we must ensure that the correct ordering of partial results is + // returned to the client to allow them to properly reconstruct the partial results. + // If the coprocessor host is adding to the result list, we cannot guarantee the + // correct ordering of partial results and so we prevent partial results from being + // formed. + boolean serverGuaranteesOrderOfPartials = results.isEmpty(); + boolean allowPartialResults = + clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; + boolean moreRows = false; + + // Heartbeat messages occur when the processing of the ScanRequest is exceeds a + // certain time threshold on the server. When the time threshold is exceeded, the + // server stops the scan and sends back whatever Results it has accumulated within + // that time period (may be empty). Since heartbeat messages have the potential to + // create partial Results (in the event that the timeout occurs in the middle of a + // row), we must only generate heartbeat messages when the client can handle both + // heartbeats AND partials + boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; + + // Default value of timeLimit is negative to indicate no timeLimit should be + // enforced. + long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); + + final LimitScope sizeScope = + allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + final LimitScope timeScope = + allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + + boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); + + // Configure with limits for this RPC. Set keep progress true since size progress + // towards size limit should be kept between calls to nextRaw + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); + contextBuilder.setSizeLimit(sizeScope, maxResultSize); + contextBuilder.setBatchLimit(scanner.getBatch()); + contextBuilder.setTimeLimit(timeScope, timeLimit); + contextBuilder.setTrackMetrics(trackMetrics); + ScannerContext scannerContext = contextBuilder.build(); + boolean limitReached = false; + while (i < rows) { + // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The + // batch limit is a limit on the number of cells per Result. Thus, if progress is + // being tracked (i.e. scannerContext.keepProgress() is true) then we need to + // reset the batch progress between nextRaw invocations since we don't want the + // batch progress from previous calls to affect future calls + scannerContext.setBatchProgress(0); + + // Collect values to be returned here + moreRows = scanner.nextRaw(values, scannerContext); + + if (!values.isEmpty()) { + final boolean partial = scannerContext.partialResultFormed(); + Result r = Result.create(values, null, stale, partial); + lastBlock.setValue(addSize(context, r, lastBlock.getValue())); + results.add(r); + i++; + } + + boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); + boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); + boolean rowLimitReached = i >= rows; + limitReached = sizeLimitReached || timeLimitReached || rowLimitReached; + + if (limitReached || !moreRows) { + if (LOG.isTraceEnabled()) { + LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + moreRows + + " scannerContext: " + scannerContext); + } + // We only want to mark a ScanResponse as a heartbeat message in the event that + // there are more values to be read server side. If there aren't more values, + // marking it as a heartbeat is wasteful because the client will need to issue + // another ScanRequest only to realize that they already have all the values + if (moreRows) { + // Heartbeat messages occur when the time limit has been reached. + builder.setHeartbeatMessage(timeLimitReached); + } + break; + } + values.clear(); + } + + if (limitReached || moreRows) { + // We stopped prematurely + builder.setMoreResultsInRegion(true); + } else { + // We didn't get a single batch + builder.setMoreResultsInRegion(false); + } + + // Check to see if the client requested that we track metrics server side. If the + // client requested metrics, retrieve the metrics from the scanner context. + if (trackMetrics) { + Map metrics = scannerContext.getMetrics().getMetricsMap(); + ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); + NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); + + for (Entry entry : metrics.entrySet()) { + pairBuilder.setName(entry.getKey()); + pairBuilder.setValue(entry.getValue()); + metricBuilder.addMetrics(pairBuilder.build()); + } + + builder.setScanMetrics(metricBuilder.build()); + } + } + region.updateReadRequestsCount(i); + long end = EnvironmentEdgeManager.currentTime(); + long responseCellSize = context != null ? context.getResponseCellSize() : 0; + region.getMetrics().updateScanTime(end - before); + if (regionServer.metricsRegionServer != null) { + regionServer.metricsRegionServer.updateScanSize(responseCellSize); + regionServer.metricsRegionServer.updateScanTime(end - before); + } + } finally { + region.closeRegionOperation(); + } + // coprocessor postNext hook + if (region != null && region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); + } + } + /** * Scan data in a table. * @@ -2653,435 +2924,206 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ @Override public ScanResponse scan(final RpcController controller, final ScanRequest request) - throws ServiceException { - OperationQuota quota = null; - Leases.Lease lease = null; - String scannerName = null; + throws ServiceException { + if (controller != null && !(controller instanceof HBaseRpcController)) { + throw new UnsupportedOperationException( + "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); + } + if (!request.hasScannerId() && !request.hasScan()) { + throw new ServiceException( + new DoNotRetryIOException("Missing required input: scannerId or scan")); + } try { - if (!request.hasScannerId() && !request.hasScan()) { - throw new DoNotRetryIOException( - "Missing required input: scannerId or scan"); - } - long scannerId = -1; + checkOpen(); + } catch (IOException e) { if (request.hasScannerId()) { - scannerId = request.getScannerId(); - scannerName = String.valueOf(scannerId); - } - try { - checkOpen(); - } catch (IOException e) { - // If checkOpen failed, server not running or filesystem gone, - // cancel this lease; filesystem is gone or we're closing or something. - if (scannerName != null) { - LOG.debug("Server shutting down and client tried to access missing scanner " - + scannerName); - if (regionServer.leases != null) { - try { - regionServer.leases.cancelLease(scannerName); - } catch (LeaseException le) { - // No problem, ignore - if (LOG.isTraceEnabled()) { - LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); - } - } + String scannerName = Long.toString(request.getScannerId()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Server shutting down and client tried to access missing scanner " + scannerName); + } + if (regionServer.leases != null) { + try { + regionServer.leases.cancelLease(scannerName); + } catch (LeaseException le) { + // No problem, ignore + if (LOG.isTraceEnabled()) { + LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); + } } } - throw e; - } - requestCount.increment(); - rpcScanRequestCount.increment(); - - int ttl = 0; - Region region = null; - RegionScanner scanner = null; - RegionScannerHolder rsh = null; - boolean moreResults = true; - boolean closeScanner = false; - boolean isSmallScan = false; - ScanResponse.Builder builder = ScanResponse.newBuilder(); - if (request.hasCloseScanner()) { - closeScanner = request.getCloseScanner(); - } - int rows = closeScanner ? 0 : 1; - if (request.hasNumberOfRows()) { - rows = request.getNumberOfRows(); } + throw new ServiceException(e); + } + requestCount.increment(); + rpcScanRequestCount.increment(); + RegionScannerHolder rsh; + ScanResponse.Builder builder = ScanResponse.newBuilder(); + boolean isSmallScan; + try { if (request.hasScannerId()) { - rsh = scanners.get(scannerName); - if (rsh == null) { - LOG.warn("Client tried to access missing scanner " + scannerName); - throw new UnknownScannerException( - "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " - + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " - + "long wait between consecutive client checkins, c) Server may be closing down, " - + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " - + "possible fix would be increasing the value of" - + "'hbase.client.scanner.timeout.period' configuration."); - } - scanner = rsh.s; - HRegionInfo hri = scanner.getRegionInfo(); - region = regionServer.getRegion(hri.getRegionName()); - if (region != rsh.r) { // Yes, should be the same instance - throw new NotServingRegionException("Region was re-opened after the scanner" - + scannerName + " was created: " + hri.getRegionNameAsString()); - } + rsh = getRegionScanner(request); + isSmallScan = false; } else { - region = getRegion(request.getRegion()); - ClientProtos.Scan protoScan = request.getScan(); - boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); - Scan scan = ProtobufUtil.toScan(protoScan); - // if the request doesn't set this, get the default region setting. - if (!isLoadingCfsOnDemandSet) { - scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); - } - - isSmallScan = scan.isSmall(); - if (!scan.hasFamilies()) { - // Adding all families to scanner - for (byte[] family: region.getTableDesc().getFamiliesKeys()) { - scan.addFamily(family); - } - } - - if (region.getCoprocessorHost() != null) { - scanner = region.getCoprocessorHost().preScannerOpen(scan); - } - if (scanner == null) { - scanner = region.getScanner(scan); - } - if (region.getCoprocessorHost() != null) { - scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); - } - scannerId = this.scannerIdGen.incrementAndGet(); - scannerName = String.valueOf(scannerId); - rsh = addScanner(scannerName, scanner, region); - ttl = this.scannerLeaseTimeoutPeriod; - builder.setMvccReadPoint(scanner.getMvccReadPoint()); - } - if (request.hasRenew() && request.getRenew()) { - rsh = scanners.get(scannerName); - lease = regionServer.leases.removeLease(scannerName); - if (lease != null && rsh != null) { - regionServer.leases.addLease(lease); - // Increment the nextCallSeq value which is the next expected from client. - rsh.incNextCallSeq(); - } - return builder.build(); + Pair pair = newRegionScanner(request, builder); + rsh = pair.getFirst(); + isSmallScan = pair.getSecond().booleanValue(); } - RpcCallContext context = RpcServer.getCurrentCall(); - Object lastBlock = null; - + } catch (IOException e) { + throw new ServiceException(e); + } + Region region = rsh.r; + String scannerName = rsh.scannerName; + Leases.Lease lease; + try { + // Remove lease while its being processed in server; protects against case + // where processing of request takes > lease expiration time. + lease = regionServer.leases.removeLease(scannerName); + } catch (LeaseException e) { + throw new ServiceException(e); + } + if (request.hasRenew() && request.getRenew()) { + // add back and return + addScannerLeaseBack(lease); + try { + checkScanNextCallSeq(request, rsh); + } catch (OutOfOrderScannerNextException e) { + throw new ServiceException(e); + } + return builder.build(); + } + OperationQuota quota; + try { quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); - long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); - + } catch (IOException e) { + addScannerLeaseBack(lease); + throw new ServiceException(e); + }; + try { + checkScanNextCallSeq(request, rsh); + } catch (OutOfOrderScannerNextException e) { + addScannerLeaseBack(lease); + throw new ServiceException(e); + } + // Now we have increased the next call sequence. If we give client an error, the retry will + // never success. So we'd better close the scanner and return a DoNotRetryIOException to client + // and then client will try to open a new scanner. + boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; + int rows; // this is scan.getCaching + if (request.hasNumberOfRows()) { + rows = request.getNumberOfRows(); + } else { + rows = closeScanner ? 0 : 1; + } + RpcCallContext context = RpcServer.getCurrentCall(); + // now let's do the real scan. + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); + RegionScanner scanner = rsh.s; + boolean moreResults = true; + // this is the limit of rows for this scan, if we the number of rows reach this value, we will + // close the scanner. + int limitOfRows; + if (request.hasLimitOfRows()) { + limitOfRows = request.getLimitOfRows(); + rows = Math.min(rows, limitOfRows); + } else { + limitOfRows = -1; + } + MutableObject lastBlock = new MutableObject(); + boolean scannerClosed = false; + try { + List results = new ArrayList<>(); if (rows > 0) { - // if nextCallSeq does not match throw Exception straight away. This needs to be - // performed even before checking of Lease. - // See HBASE-5974 - if (request.hasNextCallSeq()) { - if (rsh != null) { - if (request.getNextCallSeq() != rsh.getNextCallSeq()) { - throw new OutOfOrderScannerNextException( - "Expected nextCallSeq: " + rsh.getNextCallSeq() - + " But the nextCallSeq got from client: " + request.getNextCallSeq() + - "; request=" + TextFormat.shortDebugString(request)); - } - // Increment the nextCallSeq value which is the next expected from client. - rsh.incNextCallSeq(); - } - } - boolean scannerClosed = false; - try { - // Remove lease while its being processed in server; protects against case - // where processing of request takes > lease expiration time. - lease = regionServer.leases.removeLease(scannerName); - List results = new ArrayList(); - - boolean done = false; - // Call coprocessor. Get region info from scanner. - if (region != null && region.getCoprocessorHost() != null) { - Boolean bypass = region.getCoprocessorHost().preScannerNext( - scanner, results, rows); - if (!results.isEmpty()) { - for (Result r : results) { - lastBlock = addSize(context, r, lastBlock); - } - } - if (bypass != null && bypass.booleanValue()) { - done = true; - } - } - - if (!done) { - long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); - if (maxResultSize <= 0) { - maxResultSize = maxQuotaResultSize; - } - // This is cells inside a row. Default size is 10 so if many versions or many cfs, - // then we'll resize. Resizings show in profiler. Set it higher than 10. For now - // arbitrary 32. TODO: keep record of general size of results being returned. - List values = new ArrayList(32); - region.startRegionOperation(Operation.SCAN); - try { - int i = 0; - long before = EnvironmentEdgeManager.currentTime(); - synchronized(scanner) { - boolean stale = (region.getRegionInfo().getReplicaId() != 0); - boolean clientHandlesPartials = - request.hasClientHandlesPartials() && request.getClientHandlesPartials(); - boolean clientHandlesHeartbeats = - request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); - - // On the server side we must ensure that the correct ordering of partial results is - // returned to the client to allow them to properly reconstruct the partial results. - // If the coprocessor host is adding to the result list, we cannot guarantee the - // correct ordering of partial results and so we prevent partial results from being - // formed. - boolean serverGuaranteesOrderOfPartials = results.isEmpty(); - boolean allowPartialResults = - clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; - boolean moreRows = false; - - // Heartbeat messages occur when the processing of the ScanRequest is exceeds a - // certain time threshold on the server. When the time threshold is exceeded, the - // server stops the scan and sends back whatever Results it has accumulated within - // that time period (may be empty). Since heartbeat messages have the potential to - // create partial Results (in the event that the timeout occurs in the middle of a - // row), we must only generate heartbeat messages when the client can handle both - // heartbeats AND partials - boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; - - // Default value of timeLimit is negative to indicate no timeLimit should be - // enforced. - long timeLimit = -1; - - // Set the time limit to be half of the more restrictive timeout value (one of the - // timeout values must be positive). In the event that both values are positive, the - // more restrictive of the two is used to calculate the limit. - if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { - long timeLimitDelta; - if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { - timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); - } else { - timeLimitDelta = - scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; - } - if (controller != null) { - if (controller instanceof HBaseRpcController) { - HBaseRpcController pRpcController = - (HBaseRpcController)controller; - if (pRpcController.getCallTimeout() > 0) { - timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout()); - } - } else { - throw new UnsupportedOperationException("We only do " + - "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); - } - } - // Use half of whichever timeout value was more restrictive... But don't allow - // the time limit to be less than the allowable minimum (could cause an - // immediatate timeout before scanning any data). - timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); - timeLimit = System.currentTimeMillis() + timeLimitDelta; - } - - final LimitScope sizeScope = - allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; - final LimitScope timeScope = - allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; - - boolean trackMetrics = - request.hasTrackScanMetrics() && request.getTrackScanMetrics(); - - // Configure with limits for this RPC. Set keep progress true since size progress - // towards size limit should be kept between calls to nextRaw - ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); - contextBuilder.setSizeLimit(sizeScope, maxResultSize); - contextBuilder.setBatchLimit(scanner.getBatch()); - contextBuilder.setTimeLimit(timeScope, timeLimit); - contextBuilder.setTrackMetrics(trackMetrics); - ScannerContext scannerContext = contextBuilder.build(); - boolean limitReached = false; - while (i < rows) { - // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The - // batch limit is a limit on the number of cells per Result. Thus, if progress is - // being tracked (i.e. scannerContext.keepProgress() is true) then we need to - // reset the batch progress between nextRaw invocations since we don't want the - // batch progress from previous calls to affect future calls - scannerContext.setBatchProgress(0); - - // Collect values to be returned here - moreRows = scanner.nextRaw(values, scannerContext); - - if (!values.isEmpty()) { - final boolean partial = scannerContext.partialResultFormed(); - Result r = Result.create(values, null, stale, partial); - lastBlock = addSize(context, r, lastBlock); - results.add(r); - i++; - } - - boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); - boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); - boolean rowLimitReached = i >= rows; - limitReached = sizeLimitReached || timeLimitReached || rowLimitReached; - - if (limitReached || !moreRows) { - if (LOG.isTraceEnabled()) { - LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " - + moreRows + " scannerContext: " + scannerContext); - } - // We only want to mark a ScanResponse as a heartbeat message in the event that - // there are more values to be read server side. If there aren't more values, - // marking it as a heartbeat is wasteful because the client will need to issue - // another ScanRequest only to realize that they already have all the values - if (moreRows) { - // Heartbeat messages occur when the time limit has been reached. - builder.setHeartbeatMessage(timeLimitReached); - } - break; - } - values.clear(); - } - - if (limitReached || moreRows) { - // We stopped prematurely - builder.setMoreResultsInRegion(true); - } else { - // We didn't get a single batch - builder.setMoreResultsInRegion(false); - } - - // Check to see if the client requested that we track metrics server side. If the - // client requested metrics, retrieve the metrics from the scanner context. - if (trackMetrics) { - Map metrics = scannerContext.getMetrics().getMetricsMap(); - ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); - NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); - - for (Entry entry : metrics.entrySet()) { - pairBuilder.setName(entry.getKey()); - pairBuilder.setValue(entry.getValue()); - metricBuilder.addMetrics(pairBuilder.build()); - } - - builder.setScanMetrics(metricBuilder.build()); - } - } - region.updateReadRequestsCount(i); - long end = EnvironmentEdgeManager.currentTime(); - long responseCellSize = context != null ? context.getResponseCellSize() : 0; - region.getMetrics().updateScanTime(end - before); - if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateScanSize(responseCellSize); - regionServer.metricsRegionServer.updateScanTime(end - before); - } - } finally { - region.closeRegionOperation(); - } - // coprocessor postNext hook - if (region != null && region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); + boolean done = false; + // Call coprocessor. Get region info from scanner. + if (region != null && region.getCoprocessorHost() != null) { + Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); + if (!results.isEmpty()) { + for (Result r : results) { + lastBlock.setValue(addSize(context, r, lastBlock.getValue())); } } - - quota.addScanResult(results); - - // If the scanner's filter - if any - is done with the scan - // and wants to tell the client to stop the scan. This is done by passing - // a null result, and setting moreResults to false. - if (scanner.isFilterDone() && results.isEmpty()) { - moreResults = false; - results = null; - } else { - addResults(builder, results, controller, - RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), - isClientCellBlockSupport(context)); - } - } catch (IOException e) { - // The scanner state might be left in a dirty state, so we will tell the Client to - // fail this RPC and close the scanner while opening up another one from the start of - // row that the client has last seen. - closeScanner(region, scanner, scannerName, context); - // scanner is closed here - scannerClosed = true; - - // If it is a CorruptHFileException or a FileNotFoundException, throw the - // DoNotRetryIOException. This can avoid the retry in ClientScanner. - if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) { - throw new DoNotRetryIOException(e); - } - // We closed the scanner already. Instead of throwing the IOException, and client - // retrying with the same scannerId only to get USE on the next RPC, we directly throw - // a special exception to save an RPC. - if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { - // 1.4.0+ clients know how to handle - throw new ScannerResetException("Scanner is closed on the server-side", e); - } else { - // older clients do not know about SRE. Just throw USE, which they will handle - throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" - + " scanner state for clients older than 1.3.", e); - } - } finally { - // If the scanner is not closed, set the shipped callback - if (!scannerClosed) { - if (context != null) { - context.setCallBack(rsh.shippedCallback); - } - - // Adding resets expiration time on lease. - if (scanners.containsKey(scannerName)) { - ttl = this.scannerLeaseTimeoutPeriod; - // When context != null, adding back the lease will be done in callback set above. - if (context == null) { - if (lease != null) regionServer.leases.addLease(lease); - } - } + if (bypass != null && bypass.booleanValue()) { + done = true; } } + if (!done) { + scan((HBaseRpcController) controller, request, rsh, isSmallScan, maxQuotaResultSize, rows, + results, builder, lastBlock, context); + } } - if (!moreResults || closeScanner) { - ttl = 0; + quota.addScanResult(results); + + if (scanner.isFilterDone()) { + // If the scanner's filter - if any - is done with the scan + moreResults = false; + } else if (limitOfRows > 0 && results.size() >= limitOfRows + && !results.get(results.size() - 1).isPartial()) { + // if we have reached the limit of rows moreResults = false; - if (closeScanner(region, scanner, scannerName, context)) { - return builder.build(); // bypass - } } - - if (ttl > 0) { - builder.setTtl(ttl); + addResults(builder, results, (HBaseRpcController) controller, + RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), + isClientCellBlockSupport(context)); + if (!moreResults || closeScanner) { + closeScanner(region, scanner, scannerName, context); } - builder.setScannerId(scannerId); builder.setMoreResults(moreResults); return builder.build(); - } catch (IOException ie) { - if (scannerName != null && ie instanceof NotServingRegionException) { - RegionScannerHolder rsh = scanners.remove(scannerName); - if (rsh != null) { - try { - RegionScanner scanner = rsh.s; - LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ..."); - scanner.close(); - regionServer.leases.cancelLease(scannerName); - } catch (IOException e) { - LOG.warn("Getting exception closing " + scannerName, e); - } + } catch (Exception e) { + try { + // The scanner state might be left in a dirty state, so we will tell the Client to + // fail this RPC and close the scanner while opening up another one from the start of + // row that the client has last seen. + closeScanner(region, scanner, scannerName, context); + // scanner is closed here + scannerClosed = true; + + // If it is a CorruptHFileException or a FileNotFoundException, throw the + // DoNotRetryIOException. This can avoid the retry in ClientScanner. + if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) { + throw new DoNotRetryIOException(e); } + // We closed the scanner already. Instead of throwing the IOException, and client + // retrying with the same scannerId only to get USE on the next RPC, we directly throw + // a special exception to save an RPC. + if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { + // 1.4.0+ clients know how to handle + throw new ScannerResetException("Scanner is closed on the server-side", e); + } else { + // older clients do not know about SRE. Just throw USE, which they will handle + throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" + + " scanner state for clients older than 1.3.", e); + } + } catch (IOException ioe) { + throw new ServiceException(ioe); } - throw new ServiceException(ie); } finally { - if (quota != null) { - quota.close(); + if (!scannerClosed) { + // the closeCallBack will be set in closeScanner so here we only care about shippedCallback + if (context != null) { + context.setCallBack(rsh.shippedCallback); + } + + // Adding resets expiration time on lease. + if (scanners.containsKey(scannerName)) { + // When context != null, adding back the lease will be done in callback set above. + if (context == null) { + addScannerLeaseBack(lease); + } + } } + quota.close(); } } - private boolean closeScanner(Region region, RegionScanner scanner, String scannerName, + private void closeScanner(Region region, RegionScanner scanner, String scannerName, RpcCallContext context) throws IOException { if (region != null && region.getCoprocessorHost() != null) { if (region.getCoprocessorHost().preScannerClose(scanner)) { - return true; // bypass + // bypass the actual close. + return; } } RegionScannerHolder rsh = scanners.remove(scannerName); @@ -3103,7 +3145,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, region.getCoprocessorHost().postScannerClose(scanner); } } - return false; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java new file mode 100644 index 0000000..f51e8ac --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan { + + @Parameter(0) + public String tableType; + + @Parameter(1) + public Supplier getTable; + + @Parameter(2) + public String scanType; + + @Parameter(3) + public Supplier scanCreator; + + private static RawAsyncTable getRawTable() { + return ASYNC_CONN.getRawTable(TABLE_NAME); + } + + private static AsyncTable getTable() { + return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + } + + private static Scan createNormalScan() { + return new Scan(); + } + + // test if we can handle partial result when open scanner. + private static Scan createSmallResultSizeScan() { + return new Scan().setMaxResultSize(1); + } + + @Parameters(name = "{index}: table={0}, scan={2}") + public static List params() { + Supplier rawTable = TestAsyncTableScanAll::getRawTable; + Supplier normalTable = TestAsyncTableScanAll::getTable; + Supplier normalScan = TestAsyncTableScanAll::createNormalScan; + Supplier smallResultSizeScan = TestAsyncTableScanAll::createSmallResultSizeScan; + return Arrays.asList(new Object[] { "raw", rawTable, "normal", normalScan }, + new Object[] { "raw", rawTable, "smallResultSize", smallResultSizeScan }, + new Object[] { "normal", normalTable, "normal", normalScan }, + new Object[] { "normal", normalTable, "smallResultSize", smallResultSizeScan }); + } + + @Test + public void testScanWithLimit() throws InterruptedException, ExecutionException { + int start = 111; + int stop = 888; + int limit = 300; + List results = getTable.get() + .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start))) + .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit)) + .get(); + assertEquals(limit, results.size()); + IntStream.range(0, limit).forEach(i -> { + Result result = results.get(i); + int actualIndex = start + i; + assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1))); + }); + } + + @Test + public void testReversedScanWithLimit() throws InterruptedException, ExecutionException { + int start = 888; + int stop = 111; + int limit = 300; + List results = getTable.get() + .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start))) + .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit) + .setReversed(true)) + .get(); + assertEquals(limit, results.size()); + IntStream.range(0, limit).forEach(i -> { + Result result = results.get(i); + int actualIndex = start - i; + assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1))); + }); + } + + @Override + protected Scan createScan() { + return scanCreator.get(); + } + + @Override + protected List doScan(Scan scan) throws Exception { + return getTable.get().scanAll(scan).get(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java deleted file mode 100644 index 3737af2..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.function.Supplier; -import java.util.stream.IntStream; - -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -@Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan { - - @Parameter - public Supplier getTable; - - private static RawAsyncTable getRawTable() { - return ASYNC_CONN.getRawTable(TABLE_NAME); - } - - private static AsyncTable getTable() { - return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); - } - - @Parameters - public static List params() { - return Arrays.asList(new Supplier[] { TestAsyncTableSmallScan::getRawTable }, - new Supplier[] { TestAsyncTableSmallScan::getTable }); - } - - @Test - public void testScanWithLimit() throws InterruptedException, ExecutionException { - AsyncTableBase table = getTable.get(); - int start = 111; - int stop = 888; - int limit = 300; - List results = - table - .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true), - limit) - .get(); - assertEquals(limit, results.size()); - IntStream.range(0, limit).forEach(i -> { - Result result = results.get(i); - int actualIndex = start + i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1))); - }); - } - - @Test - public void testReversedScanWithLimit() throws InterruptedException, ExecutionException { - AsyncTableBase table = getTable.get(); - int start = 888; - int stop = 111; - int limit = 300; - List results = table.smallScan( - new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true), - limit).get(); - assertEquals(limit, results.size()); - IntStream.range(0, limit).forEach(i -> { - Result result = results.get(i); - int actualIndex = start - i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1))); - }); - } - - @Override - protected Scan createScan() { - return new Scan().setSmall(true); - } - - @Override - protected List doScan(Scan scan) throws Exception { - return getTable.get().smallScan(scan).get(); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 270e3e1..ca1cbfc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -58,11 +58,6 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { } @Override - public boolean onHeartbeat() { - return true; - } - - @Override public synchronized void onError(Throwable error) { finished = true; this.error = error; -- 2.7.4