From 8a17c285d5d6ac850ddfcf9ef4956f6b0376b9b4 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 30 Jan 2017 23:42:11 +0800 Subject: [PATCH] HBASE-17508 Unify the implementation of small scan and regular scan for sync client --- .../org/apache/hadoop/hbase/MetaTableAccessor.java | 14 +- .../apache/hadoop/hbase/client/ClientScanner.java | 95 +++--- .../hbase/client/ClientSmallReversedScanner.java | 336 -------------------- .../hadoop/hbase/client/ClientSmallScanner.java | 306 ------------------ .../hbase/client/ConnectionImplementation.java | 6 +- .../org/apache/hadoop/hbase/client/HTable.java | 18 +- .../hadoop/hbase/client/ReversedClientScanner.java | 19 +- .../hbase/client/ReversedScannerCallable.java | 2 +- .../hadoop/hbase/client/ScannerCallable.java | 265 ++++++++-------- .../hbase/client/ScannerCallableWithReplicas.java | 17 +- .../hadoop/hbase/client/TestClientScanner.java | 112 +++---- .../client/TestClientSmallReversedScanner.java | 345 --------------------- .../hbase/client/TestClientSmallScanner.java | 335 -------------------- .../hadoop/hbase/regionserver/RSRpcServices.java | 34 +- .../java/org/apache/hadoop/hbase/tool/Canary.java | 7 +- .../hbase/TestMetaTableAccessorNoCluster.java | 2 +- .../hbase/TestPartialResultsFromClientSide.java | 15 +- .../regionserver/TestScannerWithBulkload.java | 6 +- .../security/access/TestAccessController.java | 3 +- .../security/access/TestAccessController2.java | 3 + 20 files changed, 308 insertions(+), 1632 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java delete mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java delete mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 1cc7963..5a37afc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -560,16 +560,12 @@ public class MetaTableAccessor { // Stop key appends the smallest possible char to the table name byte[] stopKey = getTableStopRowForMeta(tableName, QueryType.REGION); - Scan scan = getMetaScan(connection); + Scan scan = getMetaScan(connection, -1); scan.setStartRow(startKey); scan.setStopRow(stopKey); return scan; } - private static Scan getMetaScan(Connection connection) { - return getMetaScan(connection, Integer.MAX_VALUE); - } - private static Scan getMetaScan(Connection connection, int rowUpperLimit) { Scan scan = new Scan(); int scannerCaching = connection.getConfiguration() @@ -579,11 +575,11 @@ public class MetaTableAccessor { HConstants.DEFAULT_USE_META_REPLICAS)) { scan.setConsistency(Consistency.TIMELINE); } - if (rowUpperLimit <= scannerCaching) { - scan.setSmall(true); + if (rowUpperLimit > 0) { + scan.setLimit(rowUpperLimit); + scan.setReadType(Scan.ReadType.PREAD); } - int rows = Math.min(rowUpperLimit, scannerCaching); - scan.setCaching(rows); + scan.setCaching(scannerCaching); return scan; } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index ea91100..3186f42 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -158,16 +159,10 @@ public abstract class ClientScanner extends AbstractClientScanner { this.conf = conf; initCache(); - initializeScannerInConstruction(); } protected abstract void initCache(); - protected void initializeScannerInConstruction() throws IOException { - // initialize the scanner - nextScanner(this.caching, false); - } - protected ClusterConnection getConnection() { return this.connection; } @@ -235,15 +230,15 @@ public abstract class ClientScanner extends AbstractClientScanner { this.callable = null; } } - /* + + /** * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no * further, just tidy up outstanding scanners, if currentRegion != null and * done is true. * @param nbRows - * @param done Server-side says we're done scanning. */ - protected boolean nextScanner(int nbRows, final boolean done) throws IOException { + protected Result[] nextScanner(int nbRows) throws IOException { // Close the previous scanner if it's open closeScanner(); @@ -254,12 +249,12 @@ public abstract class ClientScanner extends AbstractClientScanner { if (this.currentRegion != null) { byte[] endKey = this.currentRegion.getEndKey(); if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(endKey) || done) { + || checkScanStopRow(endKey)) { close(); if (LOG.isTraceEnabled()) { LOG.trace("Finished " + this.currentRegion); } - return false; + return null; } localStartKey = endKey; // clear mvcc read point if we are going to switch regions @@ -280,16 +275,20 @@ public abstract class ClientScanner extends AbstractClientScanner { callable = getScannerCallable(localStartKey, nbRows); // Open a scanner on the region server starting at the // beginning of the region - call(callable, caller, scannerTimeout); + Result[] rrs = call(callable, caller, scannerTimeout); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } + if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) { + // no results for the scan, return null to terminate the scan. + return null; + } + return rrs; } catch (IOException e) { close(); throw e; } - return true; } @VisibleForTesting @@ -297,8 +296,8 @@ public abstract class ClientScanner extends AbstractClientScanner { return callable.isAnyRPCcancelled(); } - Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller caller, - int scannerTimeout) throws IOException, RuntimeException { + private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller caller, + int scannerTimeout) throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -364,19 +363,22 @@ public abstract class ClientScanner extends AbstractClientScanner { return cache != null ? cache.size() : 0; } + private boolean scanExhausted(Result[] values) { + // This means the server tells us the whole scan operation is done. Usually decided by filter or + // limit. + return values == null || callable.moreResultsForScan() == MoreResults.NO; + } + private boolean regionExhausted(Result[] values) { - // This means the server tells us the whole scan operation is done. Usually decided by filter. - if (values == null) { - return true; - } // Not a heartbeat message and we get nothing, this means the region is exhausted - if (values.length == 0 && !callable.isHeartbeatMessage()) { + // And in the old time we always return empty result for a open scanner operation so we add a + // check here to keep compatible with the old logic. Should remove the isOpenScanner in the + // future. + if (values.length == 0 && !callable.isHeartbeatMessage() && !callable.isOpenScanner()) { return true; } - // Server tells us that it has no more results for this region. Notice that this flag is get - // from the ScanResponse.getMoreResultsInRegion, not ScanResponse.getMoreResults. If the latter - // one is false then we will get a null values and quit in the first condition of this method. - if (callable.hasMoreResultsContext() && !callable.getServerHasMoreResults()) { + // Server tells us that it has no more results for this region. + if (callable.moreResultsInRegion() == MoreResults.NO) { return true; } return false; @@ -406,7 +408,8 @@ public abstract class ClientScanner extends AbstractClientScanner { int countdown = this.caching; // This is possible if we just stopped at the boundary of a region in the previous call. if (callable == null) { - if (!nextScanner(countdown, false)) { + values = nextScanner(countdown); + if (values == null) { return; } } @@ -420,18 +423,26 @@ public abstract class ClientScanner extends AbstractClientScanner { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. - values = call(callable, caller, scannerTimeout); + // now we will also fetch data when openScanner, so do not make a next call again if values + // is already non-null. + if (values == null) { + values = call(callable, caller, scannerTimeout); + } // When the replica switch happens, we need to do certain operations again. // The callable will openScanner with the right startkey but we need to pick up // from there. Bypass the rest of the loop and let the catch-up happen in the beginning // of the loop as it happens for the cases where we see exceptions. - // Since only openScanner would have happened, values would be null - if (values == null && callable.switchedToADifferentReplica()) { + if (callable.switchedToADifferentReplica()) { // Any accumulated partial results are no longer valid since the callable will // openScanner with the correct startkey and we must pick up from there clearPartialResults(); this.currentRegion = callable.getHRegionInfo(); - continue; + // Now we will also fetch data when openScanner so usually we should not get a null + // result, but at some places we still use null to indicate the scan is terminated, so add + // a sanity check here. Should be removed later. + if (values == null) { + continue; + } } retryAfterOutOfOrderException = true; } catch (DoNotRetryIOException e) { @@ -487,7 +498,8 @@ public abstract class ClientScanner extends AbstractClientScanner { // the exception we got was UnknownScanner or the Server is going down. callable = null; // reopen the scanner - if (!nextScanner(countdown, false)) { + values = nextScanner(countdown); + if (values == null) { break; } continue; @@ -521,8 +533,17 @@ public abstract class ClientScanner extends AbstractClientScanner { this.lastCellLoadedToCache = null; } } + if (scan.getLimit() > 0) { + int limit = scan.getLimit() - resultsToAddToCache.size(); + assert limit >= 0; + scan.setLimit(limit); + } } - boolean exhausted = regionExhausted(values); + if (scanExhausted(values)) { + closeScanner(); + break; + } + boolean regionExhausted = regionExhausted(values); if (callable.isHeartbeatMessage()) { if (!cache.isEmpty()) { // Caller of this method just wants a Result. If we see a heartbeat message, it means @@ -540,12 +561,12 @@ public abstract class ClientScanner extends AbstractClientScanner { } if (countdown <= 0) { // we have enough result. - closeScannerIfExhausted(exhausted); + closeScannerIfExhausted(regionExhausted); break; } if (remainingResultSize <= 0) { if (!cache.isEmpty()) { - closeScannerIfExhausted(exhausted); + closeScannerIfExhausted(regionExhausted); break; } else { // we have reached the max result size but we still can not find anything to return to the @@ -554,17 +575,21 @@ public abstract class ClientScanner extends AbstractClientScanner { } } // we are done with the current region - if (exhausted) { + if (regionExhausted) { if (!partialResults.isEmpty()) { // XXX: continue if there are partial results. But in fact server should not set // hasMoreResults to false if there are partial results. LOG.warn("Server tells us there is no more results for this region but we still have" + " partialResults, this should not happen, retry on the current scanner anyway"); + values = null; // reset values for the next call continue; } - if (!nextScanner(countdown, values == null)) { + values = nextScanner(countdown); + if (values == null) { break; } + } else { + values = null; // reset values for the next call } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java deleted file mode 100644 index 8f0c2f8..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ /dev/null @@ -1,336 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; - -import com.google.common.annotations.VisibleForTesting; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.ExecutorService; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.util.Bytes; - -/** - *

- * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the - * scan results, unless the results cross multiple regions or the row count of - * results exceed the caching. - *

- * For small scan, it will get better performance than {@link ReversedClientScanner} - */ -@InterfaceAudience.Private -public class ClientSmallReversedScanner extends ReversedClientScanner { - private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); - private ScannerCallableWithReplicas smallReversedScanCallable = null; - private SmallReversedScannerCallableFactory callableFactory; - - /** - * Create a new ReversibleClientScanner for the specified table. Take note that the passed - * {@link Scan} 's start row maybe changed changed. - * - * @param conf - * The {@link Configuration} to use. - * @param scan - * {@link Scan} to use in this scanner - * @param tableName - * The table that we wish to rangeGet - * @param connection - * Connection identifying the cluster - * @param rpcFactory - * Factory used to create the {@link RpcRetryingCaller} - * @param controllerFactory - * Factory used to access RPC payloads - * @param pool - * Threadpool for RPC threads - * @param primaryOperationTimeout - * Call timeout - * @throws IOException - * If the remote call fails - */ - public ClientSmallReversedScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { - this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout, new SmallReversedScannerCallableFactory()); - } - - /** - * Create a new ReversibleClientScanner for the specified table. Take note that the passed - * {@link Scan}'s start row may be changed. - * - * @param conf - * The {@link Configuration} to use. - * @param scan - * {@link Scan} to use in this scanner - * @param tableName - * The table that we wish to rangeGet - * @param connection - * Connection identifying the cluster - * @param rpcFactory - * Factory used to create the {@link RpcRetryingCaller} - * @param controllerFactory - * Factory used to access RPC payloads - * @param pool - * Threadpool for RPC threads - * @param primaryOperationTimeout - * Call timeout - * @param callableFactory - * Factory used to create the {@link SmallScannerCallable} - * @throws IOException - * If the remote call fails - */ - @VisibleForTesting - ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - SmallReversedScannerCallableFactory callableFactory) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); - this.callableFactory = callableFactory; - } - - /** - * Gets a scanner for following scan. Move to next region or continue from the last result or - * start from the start row. - * - * @param nbRows - * @param done - * true if Server-side says we're done scanning. - * @param currentRegionDone - * true if scan is over on current region - * @return true if has next scanner - * @throws IOException - */ - private boolean nextScanner(int nbRows, final boolean done, - boolean currentRegionDone) throws IOException { - // Where to start the next getter - byte[] localStartKey; - int cacheNum = nbRows; - boolean regionChanged = true; - boolean isFirstRegionToLocate = false; - // if we're at end of table, close and return false to stop iterating - if (this.currentRegion != null && currentRegionDone) { - byte[] startKey = this.currentRegion.getStartKey(); - if (startKey == null - || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(startKey) || done) { - close(); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished with small scan at " + this.currentRegion); - } - return false; - } - // We take the row just under to get to the previous region. - localStartKey = createClosestRowBefore(startKey); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished with region " + this.currentRegion); - } - } else if (this.lastResult != null) { - regionChanged = false; - localStartKey = createClosestRowBefore(lastResult.getRow()); - } else { - isFirstRegionToLocate = true; - localStartKey = this.scan.getStartRow(); - } - - if (!isFirstRegionToLocate - && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) { - // when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan. - // otherwise, maybe infinity results with RowKey=0x00 will return. - return false; - } - - if (LOG.isTraceEnabled()) { - LOG.trace("Advancing internal small scanner to startKey at '" - + Bytes.toStringBinary(localStartKey) + "'"); - } - - smallReversedScanCallable = - callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(), - localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), - getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate); - - if (this.scanMetrics != null && regionChanged) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - return true; - } - - @Override - public Result next() throws IOException { - // If the scanner is closed and there's nothing left in the cache, next is a - // no-op. - if (cache.isEmpty() && this.closed) { - return null; - } - if (cache.isEmpty()) { - loadCache(); - } - - if (cache.size() > 0) { - return cache.poll(); - } - // if we exhausted this scanner before calling close, write out the scan - // metrics - writeScanMetrics(); - return null; - } - - @Override - protected void loadCache() throws IOException { - Result[] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; - boolean currentRegionDone = false; - // Values == null means server-side filter has determined we must STOP - while (remainingResultSize > 0 && countdown > 0 - && nextScanner(countdown, values == null, currentRegionDone)) { - // Server returns a null values if scanning is to stop. Else, - // returns an empty array if scanning is to go on and we've just - // exhausted current region. - // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, - // we do a callWithRetries - values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout); - this.currentRegion = smallReversedScanCallable.getHRegionInfo(); - long currentTime = System.currentTimeMillis(); - if (this.scanMetrics != null) { - this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - - lastNext); - } - lastNext = currentTime; - if (values != null && values.length > 0) { - for (int i = 0; i < values.length; i++) { - Result rs = values[i]; - cache.add(rs); - // We don't make Iterator here - for (Cell cell : rs.rawCells()) { - remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); - } - countdown--; - this.lastResult = rs; - } - } - if (smallReversedScanCallable.hasMoreResultsContext()) { - currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults(); - } else { - currentRegionDone = countdown > 0; - } - } - } - - @Override - protected void initializeScannerInConstruction() throws IOException { - // No need to initialize the scanner when constructing instance, do it when - // calling next(). Do nothing here. - } - - @Override - public void close() { - if (!scanMetricsPublished) writeScanMetrics(); - closed = true; - } - - /** - * A reversed ScannerCallable which supports backward small scanning. - */ - static class SmallReversedScannerCallable extends ReversedScannerCallable { - - public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan, - ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory, - int caching, int replicaId) { - super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId); - this.setCaching(caching); - } - - @Override - protected Result[] rpcCall() throws Exception { - if (this.closed) return null; - if (Thread.interrupted()) { - throw new InterruptedIOException(); - } - ClientProtos.ScanRequest request = RequestConverter.buildScanRequest( - getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true); - ClientProtos.ScanResponse response = null; - response = getStub().scan(getRpcController(), request); - Result[] results = ResponseConverter.getResults(getRpcControllerCellScanner(), response); - if (response.hasMoreResultsInRegion()) { - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - setHasMoreResultsContext(false); - } - // We need to update result metrics since we are overriding call() - updateResultsMetrics(results); - return results; - } - - @Override - public ScannerCallable getScannerCallableForReplica(int id) { - return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(), - scanMetrics, locateStartRow, rpcControllerFactory, getCaching(), id); - } - } - - @VisibleForTesting - protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) { - this.callableFactory = callableFactory; - } - - protected static class SmallReversedScannerCallableFactory { - - public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, - Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller, - boolean isFirstRegionToLocate) { - byte[] locateStartRow = null; - if (isFirstRegionToLocate - && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) { - // HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to - // locate a region list, and the last one in region list is the region where our scan start. - locateStartRow = ConnectionUtils.MAX_BYTE_ARRAY; - } - - scan.setStartRow(localStartKey); - SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan, - scanMetrics, locateStartRow, controllerFactory, cacheNum, 0); - ScannerCallableWithReplicas scannerCallableWithReplicas = - new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan, - retries, scannerTimeout, cacheNum, conf, caller); - return scannerCallableWithReplicas; - } - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java deleted file mode 100644 index 52a291b..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.ExecutorService; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Client scanner for small scan. Generally, only one RPC is called to fetch the - * scan results, unless the results cross multiple regions or the row count of - * results excess the caching. - * - * For small scan, it will get better performance than {@link ClientScanner} - */ -@InterfaceAudience.Private -public class ClientSmallScanner extends ClientSimpleScanner { - private static final Log LOG = LogFactory.getLog(ClientSmallScanner.class); - private ScannerCallableWithReplicas smallScanCallable = null; - private SmallScannerCallableFactory callableFactory; - - /** - * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan} - * 's start row maybe changed changed. - * - * @param conf - * The {@link Configuration} to use. - * @param scan - * {@link Scan} to use in this scanner - * @param tableName - * The table that we wish to rangeGet - * @param connection - * Connection identifying the cluster - * @param rpcFactory - * Factory used to create the {@link RpcRetryingCaller} - * @param controllerFactory - * Factory used to access RPC payloads - * @param pool - * Threadpool for RPC threads - * @param primaryOperationTimeout - * Call timeout - * @throws IOException - * If the remote call fails - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { - this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout, new SmallScannerCallableFactory()); - } - - /** - * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan} - * 's start row maybe changed changed. Intended for unit tests to provide their own - * {@link SmallScannerCallableFactory} implementation/mock. - * - * @param conf - * The {@link Configuration} to use. - * @param scan - * {@link Scan} to use in this scanner - * @param tableName - * The table that we wish to rangeGet - * @param connection - * Connection identifying the cluster - * @param rpcFactory - * Factory used to create the {@link RpcRetryingCaller} - * @param controllerFactory - * Factory used to access RPC payloads - * @param pool - * Threadpool for RPC threads - * @param primaryOperationTimeout - * Call timeout - * @param callableFactory - * Factory used to create the {@link SmallScannerCallable} - * @throws IOException - */ - @VisibleForTesting - ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - SmallScannerCallableFactory callableFactory) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); - this.callableFactory = callableFactory; - } - - @Override - protected void initializeScannerInConstruction() throws IOException { - // No need to initialize the scanner when constructing instance, do it when - // calling next(). Do nothing here. - } - - /** - * Gets a scanner for following scan. Move to next region or continue from the - * last result or start from the start row. - * @param nbRows - * @param done true if Server-side says we're done scanning. - * @param currentRegionDone true if scan is over on current region - * @return true if has next scanner - * @throws IOException - */ - private boolean nextScanner(int nbRows, final boolean done, - boolean currentRegionDone) throws IOException { - // Where to start the next getter - byte[] localStartKey; - int cacheNum = nbRows; - boolean regionChanged = true; - // if we're at end of table, close and return false to stop iterating - if (this.currentRegion != null && currentRegionDone) { - byte[] endKey = this.currentRegion.getEndKey(); - if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(endKey) || done) { - close(); - if (LOG.isTraceEnabled()) { - LOG.trace("Finished with small scan at " + this.currentRegion); - } - return false; - } - localStartKey = endKey; - if (LOG.isTraceEnabled()) { - LOG.trace("Finished with region " + this.currentRegion); - } - } else if (this.lastResult != null) { - regionChanged = false; - localStartKey = Bytes.add(lastResult.getRow(), new byte[1]); - } else { - localStartKey = this.scan.getStartRow(); - } - - if (LOG.isTraceEnabled()) { - LOG.trace("Advancing internal small scanner to startKey at '" - + Bytes.toStringBinary(localStartKey) + "'"); - } - smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, - getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), - getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); - if (this.scanMetrics != null && regionChanged) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - return true; - } - - static class SmallScannerCallable extends ScannerCallable { - public SmallScannerCallable( - ClusterConnection connection, TableName table, Scan scan, - ScanMetrics scanMetrics, RpcControllerFactory controllerFactory, int caching, int id) { - super(connection, table, scan, scanMetrics, controllerFactory, id); - this.setCaching(caching); - } - - @Override - protected Result[] rpcCall() throws Exception { - if (this.closed) return null; - if (Thread.interrupted()) { - throw new InterruptedIOException(); - } - ScanRequest request = RequestConverter.buildScanRequest(getLocation() - .getRegionInfo().getRegionName(), getScan(), getCaching(), true); - ScanResponse response = null; - response = getStub().scan(getRpcController(), request); - Result[] results = ResponseConverter.getResults(getRpcControllerCellScanner(), response); - if (response.hasMoreResultsInRegion()) { - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - setHasMoreResultsContext(false); - } - // We need to update result metrics since we are overriding call() - updateResultsMetrics(results); - return results; - } - - @Override - public ScannerCallable getScannerCallableForReplica(int id) { - return new SmallScannerCallable((ClusterConnection)getConnection(), getTableName(), getScan(), - scanMetrics, rpcControllerFactory, getCaching(), id); - } - } - - @Override - public Result next() throws IOException { - // If the scanner is closed and there's nothing left in the cache, next is a - // no-op. - if (cache.isEmpty() && this.closed) { - return null; - } - if (cache.isEmpty()) { - loadCache(); - } - - if (cache.size() > 0) { - return cache.poll(); - } - // if we exhausted this scanner before calling close, write out the scan - // metrics - writeScanMetrics(); - return null; - } - - @Override - protected void loadCache() throws IOException { - Result[] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; - boolean currentRegionDone = false; - // Values == null means server-side filter has determined we must STOP - while (remainingResultSize > 0 && countdown > 0 - && nextScanner(countdown, values == null, currentRegionDone)) { - // Server returns a null values if scanning is to stop. Else, - // returns an empty array if scanning is to go on and we've just - // exhausted current region. - // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, - // we do a callWithRetries - values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout); - this.currentRegion = smallScanCallable.getHRegionInfo(); - long currentTime = System.currentTimeMillis(); - if (this.scanMetrics != null) { - this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - - lastNext); - } - lastNext = currentTime; - if (values != null && values.length > 0) { - for (int i = 0; i < values.length; i++) { - Result rs = values[i]; - cache.add(rs); - // We don't make Iterator here - for (Cell cell : rs.rawCells()) { - remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); - } - countdown--; - this.lastResult = rs; - } - } - if (smallScanCallable.hasMoreResultsContext()) { - // If the server has more results, the current region is not done - currentRegionDone = !smallScanCallable.getServerHasMoreResults(); - } else { - // not guaranteed to get the context in older versions, fall back to checking countdown - currentRegionDone = countdown > 0; - } - } - } - - public void close() { - if (!scanMetricsPublished) writeScanMetrics(); - closed = true; - } - - @VisibleForTesting - protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) { - this.callableFactory = callableFactory; - } - - @InterfaceAudience.Private - protected static class SmallScannerCallableFactory { - - public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, - Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller) { - scan.setStartRow(localStartKey); - SmallScannerCallable s = new SmallScannerCallable( - connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0); - ScannerCallableWithReplicas scannerCallableWithReplicas = - new ScannerCallableWithReplicas(table, connection, - s, pool, primaryOperationTimeout, scan, retries, - scannerTimeout, cacheNum, conf, caller); - return scannerCallableWithReplicas; - } - } -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index ca21365..c5911ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -789,8 +789,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { s.setReversed(true); s.setStartRow(metaKey); s.addFamily(HConstants.CATALOG_FAMILY); - s.setSmall(true); - s.setCaching(1); + s.setLimit(1); + s.setReadType(Scan.ReadType.PREAD); if (this.useMetaReplicas) { s.setConsistency(Consistency.TIMELINE); } @@ -820,7 +820,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { Result regionInfoRow = null; ReversedClientScanner rcs = null; try { - rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this, + rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0); regionInfoRow = rcs.next(); } finally { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 72d71eb..1fa33b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -350,21 +350,9 @@ public class HTable implements Table { } if (scan.isReversed()) { - if (scan.isSmall()) { - return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); - } else { - return new ReversedClientScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); - } - } - - if (scan.isSmall()) { - return new ClientSmallScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + return new ReversedClientScanner(getConfiguration(), scan, getName(), + this.connection, this.rpcCallerFactory, this.rpcControllerFactory, + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { if (async) { return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index e1a522a..4211253 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -60,8 +61,7 @@ public class ReversedClientScanner extends ClientSimpleScanner { } @Override - protected boolean nextScanner(int nbRows, final boolean done) - throws IOException { + protected Result[] nextScanner(int nbRows) throws IOException { // Close the previous scanner if it's open closeScanner(); @@ -71,14 +71,13 @@ public class ReversedClientScanner extends ClientSimpleScanner { // if we're at start of table, close and return false to stop iterating if (this.currentRegion != null) { byte[] startKey = this.currentRegion.getStartKey(); - if (startKey == null - || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(startKey) || done) { + if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(startKey)) { close(); if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } - return false; + return null; } localStartKey = startKey; if (LOG.isDebugEnabled()) { @@ -111,17 +110,21 @@ public class ReversedClientScanner extends ClientSimpleScanner { // beginning of the region // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // we do a callWithRetries - this.caller.callWithoutRetries(callable, scannerTimeout); + Result[] rrs = this.caller.callWithoutRetries(callable, scannerTimeout); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } + if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) { + // no results for the scan, return null to terminate the scan. + return null; + } + return rrs; } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); close(); throw e; } - return true; } protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index c7d78c6..195bcba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -42,7 +42,7 @@ public class ReversedScannerCallable extends ScannerCallable { /** * The start row for locating regions. In reversed scanner, may locate the * regions for a range of keys when doing - * {@link ReversedClientScanner#nextScanner(int, boolean)} + * {@link ReversedClientScanner#nextScanner(int)} */ protected final byte[] locateStartRow; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index f867acb..c1ff262 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -42,13 +42,12 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; /** @@ -75,9 +74,15 @@ public class ScannerCallable extends ClientServiceCallable { private int logCutOffLatency = 1000; private static String myAddress; protected final int id; - protected boolean serverHasMoreResultsContext; - protected boolean serverHasMoreResults; + enum MoreResults { + YES, NO, UNKNOWN + } + + private MoreResults moreResultsInRegion; + private MoreResults moreResultsForScan; + + private boolean isOpenScanner; /** * Saves whether or not the most recent response from the server was a heartbeat message. * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} @@ -174,120 +179,121 @@ public class ScannerCallable extends ClientServiceCallable { } } + private ScanResponse next() throws IOException { + // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server + setHeartbeatMessage(false); + incRPCcallsMetrics(); + ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, + this.scanMetrics != null, renew, scan.getLimit()); + try { + ScanResponse response = getStub().scan(getRpcController(), request); + nextCallSeq++; + return response; + } catch (Exception e) { + IOException ioe = ProtobufUtil.handleRemoteException(e); + if (logScannerActivity) { + LOG.info( + "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(), + e); + } + if (logScannerActivity) { + if (ioe instanceof UnknownScannerException) { + try { + HRegionLocation location = + getConnection().relocateRegion(getTableName(), scan.getStartRow()); + LOG.info("Scanner=" + scannerId + " expired, current region location is " + + location.toString()); + } catch (Throwable t) { + LOG.info("Failed to relocate region", t); + } + } else if (ioe instanceof ScannerResetException) { + LOG.info("Scanner=" + scannerId + " has received an exception, and the server " + + "asked us to reset the scanner state.", + ioe); + } + } + // The below convertion of exceptions into DoNotRetryExceptions is a little strange. + // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want + // ServerCallable#withRetries to just retry when it gets these exceptions. In here in + // a scan when doing a next in particular, we want to break out and get the scanner to + // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, + // yeah and hard to follow and in need of a refactor). + if (ioe instanceof NotServingRegionException) { + // Throw a DNRE so that we break out of cycle of calling NSRE + // when what we need is to open scanner against new location. + // Attach NSRE to signal client that it needs to re-setup scanner. + if (this.scanMetrics != null) { + this.scanMetrics.countOfNSRE.incrementAndGet(); + } + throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); + } else if (ioe instanceof RegionServerStoppedException) { + // Throw a DNRE so that we break out of cycle of the retries and instead go and + // open scanner against new location. + throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); + } else { + // The outer layers will retry + throw ioe; + } + } + } + + private void setAlreadyClosed() { + this.scannerId = -1L; + this.closed = true; + } + @Override - protected Result [] rpcCall() throws Exception { + protected Result[] rpcCall() throws Exception { if (Thread.interrupted()) { throw new InterruptedIOException(); } - if (this.closed) { - if (this.scannerId != -1) { - close(); + if (closed) { + close(); + return null; + } + ScanResponse response; + if (this.scannerId == -1L) { + this.isOpenScanner = true; + response = openScanner(); + } else { + this.isOpenScanner = false; + response = next(); + } + long timestamp = System.currentTimeMillis(); + setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); + Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); + if (logScannerActivity) { + long now = System.currentTimeMillis(); + if (now - timestamp > logCutOffLatency) { + int rows = rrs == null ? 0 : rrs.length; + LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + + scannerId); + } + } + updateServerSideMetrics(response); + // moreResults is only used for the case where a filter exhausts all elements + if (response.hasMoreResults()) { + if (response.getMoreResults()) { + setMoreResultsForScan(MoreResults.YES); + } else { + setMoreResultsForScan(MoreResults.NO); + setAlreadyClosed(); } } else { - if (this.scannerId == -1L) { - this.scannerId = openScanner(); + setMoreResultsForScan(MoreResults.UNKNOWN); + } + if (response.hasMoreResultsInRegion()) { + if (response.getMoreResultsInRegion()) { + setMoreResultsInRegion(MoreResults.YES); } else { - Result [] rrs = null; - ScanRequest request = null; - // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server - setHeartbeatMessage(false); - try { - incRPCcallsMetrics(); - request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, - this.scanMetrics != null, renew, -1); - ScanResponse response = null; - response = getStub().scan(getRpcController(), request); - // Client and RS maintain a nextCallSeq number during the scan. Every next() call - // from client to server will increment this number in both sides. Client passes this - // number along with the request and at RS side both the incoming nextCallSeq and its - // nextCallSeq will be matched. In case of a timeout this increment at the client side - // should not happen. If at the server side fetching of next batch of data was over, - // there will be mismatch in the nextCallSeq number. Server will throw - // OutOfOrderScannerNextException and then client will reopen the scanner with startrow - // as the last successfully retrieved row. - // See HBASE-5974 - nextCallSeq++; - long timestamp = System.currentTimeMillis(); - setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); - rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); - if (logScannerActivity) { - long now = System.currentTimeMillis(); - if (now - timestamp > logCutOffLatency) { - int rows = rrs == null ? 0 : rrs.length; - LOG.info("Took " + (now-timestamp) + "ms to fetch " - + rows + " rows from scanner=" + scannerId); - } - } - updateServerSideMetrics(response); - // moreResults is only used for the case where a filter exhausts all elements - if (response.hasMoreResults() && !response.getMoreResults()) { - this.scannerId = -1L; - this.closed = true; - // Implied that no results were returned back, either. - return null; - } - // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due - // to size or quantity of results in the response. - if (response.hasMoreResultsInRegion()) { - // Set what the RS said - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - // Server didn't respond whether it has more results or not. - setHasMoreResultsContext(false); - } - updateResultsMetrics(rrs); - } catch (IOException e) { - if (logScannerActivity) { - LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " + - getLocation(), e); - } - IOException ioe = e; - if (e instanceof RemoteException) { - ioe = ((RemoteException) e).unwrapRemoteException(); - } - if (logScannerActivity) { - if (ioe instanceof UnknownScannerException) { - try { - HRegionLocation location = - getConnection().relocateRegion(getTableName(), scan.getStartRow()); - LOG.info("Scanner=" + scannerId + " expired, current region location is " + - location.toString()); - } catch (Throwable t) { - LOG.info("Failed to relocate region", t); - } - } else if (ioe instanceof ScannerResetException) { - LOG.info("Scanner=" + scannerId + " has received an exception, and the server " - + "asked us to reset the scanner state.", ioe); - } - } - // The below convertion of exceptions into DoNotRetryExceptions is a little strange. - // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want - // ServerCallable#withRetries to just retry when it gets these exceptions. In here in - // a scan when doing a next in particular, we want to break out and get the scanner to - // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, - // yeah and hard to follow and in need of a refactor). - if (ioe instanceof NotServingRegionException) { - // Throw a DNRE so that we break out of cycle of calling NSRE - // when what we need is to open scanner against new location. - // Attach NSRE to signal client that it needs to re-setup scanner. - if (this.scanMetrics != null) { - this.scanMetrics.countOfNSRE.incrementAndGet(); - } - throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); - } else if (ioe instanceof RegionServerStoppedException) { - // Throw a DNRE so that we break out of cycle of the retries and instead go and - // open scanner against new location. - throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); - } else { - // The outer layers will retry - throw ioe; - } - } - return rrs; + setMoreResultsInRegion(MoreResults.NO); + setAlreadyClosed(); } + } else { + setMoreResultsInRegion(MoreResults.UNKNOWN); } - return null; + updateResultsMetrics(rrs); + return rrs; } /** @@ -296,11 +302,11 @@ public class ScannerCallable extends ClientServiceCallable { * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid * timeouts during long running scan operations. */ - protected boolean isHeartbeatMessage() { + boolean isHeartbeatMessage() { return heartbeatMessage; } - protected void setHeartbeatMessage(boolean heartbeatMessage) { + private void setHeartbeatMessage(boolean heartbeatMessage) { this.heartbeatMessage = heartbeatMessage; } @@ -367,10 +373,10 @@ public class ScannerCallable extends ClientServiceCallable { this.scannerId = -1L; } - protected long openScanner() throws IOException { + private ScanResponse openScanner() throws IOException { incRPCcallsMetrics(); ScanRequest request = RequestConverter.buildScanRequest( - getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); + getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false); try { ScanResponse response = getStub().scan(getRpcController(), request); long id = response.getScannerId(); @@ -381,7 +387,8 @@ public class ScannerCallable extends ClientServiceCallable { if (response.hasMvccReadPoint()) { this.scan.setMvccReadPoint(response.getMvccReadPoint()); } - return id; + this.scannerId = id; + return response; } catch (Exception e) { throw ProtobufUtil.handleRemoteException(e); } @@ -443,27 +450,31 @@ public class ScannerCallable extends ClientServiceCallable { /** * Should the client attempt to fetch more results from this region - * @return True if the client should attempt to fetch more results, false otherwise. */ - protected boolean getServerHasMoreResults() { - assert serverHasMoreResultsContext; - return this.serverHasMoreResults; + MoreResults moreResultsInRegion() { + return moreResultsInRegion; } - protected void setServerHasMoreResults(boolean serverHasMoreResults) { - this.serverHasMoreResults = serverHasMoreResults; + void setMoreResultsInRegion(MoreResults moreResults) { + this.moreResultsInRegion = moreResults; } /** - * Did the server respond with information about whether more results might exist. - * Not guaranteed to respond with older server versions - * @return True if the server responded with information about more results. + * Should the client attempt to fetch more results for the whole scan. */ - protected boolean hasMoreResultsContext() { - return serverHasMoreResultsContext; + MoreResults moreResultsForScan() { + return moreResultsForScan; + } + + void setMoreResultsForScan(MoreResults moreResults) { + this.moreResultsForScan = moreResults; } - protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) { - this.serverHasMoreResultsContext = serverHasMoreResultsContext; + /** + * Whether the previous call is openScanner. This is used to keep compatible with the old + * implementation that we always returns empty result for openScanner. + */ + boolean isOpenScanner() { + return isOpenScanner; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index e04fd6e..c99fe9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.util.Pair; /** @@ -113,20 +114,16 @@ class ScannerCallableWithReplicas implements RetryingCallable { return currentScannerCallable.getHRegionInfo(); } - public boolean getServerHasMoreResults() { - return currentScannerCallable.getServerHasMoreResults(); + public MoreResults moreResultsInRegion() { + return currentScannerCallable.moreResultsInRegion(); } - public void setServerHasMoreResults(boolean serverHasMoreResults) { - currentScannerCallable.setServerHasMoreResults(serverHasMoreResults); + public MoreResults moreResultsForScan() { + return currentScannerCallable.moreResultsForScan(); } - public boolean hasMoreResultsContext() { - return currentScannerCallable.hasMoreResultsContext(); - } - - public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) { - currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext); + public boolean isOpenScanner() { + return currentScannerCallable.isOpenScanner(); } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 4319b9a..cf0e995 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hbase.client; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.After; @@ -86,6 +89,7 @@ public class TestClientScanner { private boolean rpcFinished = false; private boolean rpcFinishedFired = false; + private boolean initialized = false; public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, @@ -96,9 +100,13 @@ public class TestClientScanner { } @Override - protected boolean nextScanner(int nbRows, final boolean done) throws IOException { + protected Result[] nextScanner(int nbRows) throws IOException { + if (!initialized) { + initialized = true; + return super.nextScanner(nbRows); + } if (!rpcFinished) { - return super.nextScanner(nbRows, done); + return super.nextScanner(nbRows); } // Enforce that we don't short-circuit more than once @@ -107,7 +115,7 @@ public class TestClientScanner { " short-circuit was triggered."); } rpcFinishedFired = true; - return false; + return null; } @Override @@ -158,14 +166,13 @@ public class TestClientScanner { ScannerCallableWithReplicas.class); switch (count) { case 0: // initialize - case 2: // detect no more results - case 3: // close - count++; - return null; - case 1: count++; - callable.setHasMoreResultsContext(false); + callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.UNKNOWN); return results; + case 1: // detect no more results + case 2: // close + count++; + return null; default: throw new RuntimeException("Expected only 2 invocations"); } @@ -221,15 +228,13 @@ public class TestClientScanner { ScannerCallableWithReplicas.class); switch (count) { case 0: // initialize - case 2: // close - count++; - return null; - case 1: count++; - callable.setHasMoreResultsContext(true); - // if we set false here the implementation will trigger a close - callable.setServerHasMoreResults(true); + // if we set no here the implementation will trigger a close + callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); return results; + case 1: // close + count++; + return null; default: throw new RuntimeException("Expected only 2 invocations"); } @@ -245,16 +250,11 @@ public class TestClientScanner { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - // Due to initializeScannerInConstruction() - Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class), - Mockito.anyInt()); - InOrder inOrder = Mockito.inOrder(caller); scanner.loadCache(); - inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( + inOrder.verify(caller, Mockito.times(1)).callWithoutRetries( Mockito.any(RetryingCallable.class), Mockito.anyInt()); assertEquals(1, scanner.cache.size()); @@ -289,15 +289,13 @@ public class TestClientScanner { ScannerCallableWithReplicas.class); switch (count) { case 0: // initialize - case 2: // close - count++; - return null; - case 1: count++; - callable.setHasMoreResultsContext(true); - // if we set false here the implementation will trigger a close - callable.setServerHasMoreResults(true); + // if we set no here the implementation will trigger a close + callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); return results; + case 1: // close + count++; + return null; default: throw new RuntimeException("Expected only 2 invocations"); } @@ -313,18 +311,11 @@ public class TestClientScanner { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - // Due to initializeScannerInConstruction() - Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class), - Mockito.anyInt()); - InOrder inOrder = Mockito.inOrder(caller); scanner.loadCache(); - // Ensures that possiblyNextScanner isn't called at the end which would trigger - // another call to callWithoutRetries - inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( + inOrder.verify(caller, Mockito.times(1)).callWithoutRetries( Mockito.any(RetryingCallable.class), Mockito.anyInt()); assertEquals(3, scanner.cache.size()); @@ -371,14 +362,12 @@ public class TestClientScanner { ScannerCallableWithReplicas.class); switch (count) { case 0: // initialize - case 2: // close - count++; - return null; - case 1: count++; - callable.setHasMoreResultsContext(true); - callable.setServerHasMoreResults(false); + callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO); return results; + case 1: // close + count++; + return null; default: throw new RuntimeException("Expected only 2 invocations"); } @@ -393,18 +382,13 @@ public class TestClientScanner { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - // Due to initializeScannerInConstruction() - Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class), - Mockito.anyInt()); - scanner.setRpcFinished(true); InOrder inOrder = Mockito.inOrder(caller); scanner.loadCache(); - inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( + inOrder.verify(caller, Mockito.times(1)).callWithoutRetries( Mockito.any(RetryingCallable.class), Mockito.anyInt()); assertEquals(1, scanner.cache.size()); @@ -443,22 +427,19 @@ public class TestClientScanner { ScannerCallableWithReplicas.class); switch (count) { case 0: // initialize - case 3: // close - count++; - return null; - case 1: count++; - callable.setHasMoreResultsContext(true); - callable.setServerHasMoreResults(true); + callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); return results1; - case 2: + case 1: count++; // The server reports back false WRT more results - callable.setHasMoreResultsContext(true); - callable.setServerHasMoreResults(false); + callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO); return results2; + case 2: // close + count++; + return null; default: - throw new RuntimeException("Expected only 2 invocations"); + throw new RuntimeException("Expected only 3 invocations"); } } }); @@ -469,17 +450,12 @@ public class TestClientScanner { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - // Due to initializeScannerInConstruction() - Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class), - Mockito.anyInt()); - InOrder inOrder = Mockito.inOrder(caller); scanner.setRpcFinished(true); scanner.loadCache(); - inOrder.verify(caller, Mockito.times(3)).callWithoutRetries( + inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( Mockito.any(RetryingCallable.class), Mockito.anyInt()); assertEquals(2, scanner.cache.size()); @@ -524,8 +500,8 @@ public class TestClientScanner { iter.next(); } fail("Should have failed with RetriesExhaustedException"); - } catch (RetriesExhaustedException expected) { - + } catch (RuntimeException expected) { + assertThat(expected.getCause(), instanceOf(RetriesExhaustedException.class)); } } @@ -560,7 +536,5 @@ public class TestClientScanner { } }; } - } - } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java deleted file mode 100644 index 090c55a..0000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java +++ /dev/null @@ -1,345 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.client.ClientSmallReversedScanner.SmallReversedScannerCallableFactory; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Test the ClientSmallReversedScanner. - */ -@Category(SmallTests.class) -public class TestClientSmallReversedScanner { - - Scan scan; - ExecutorService pool; - Configuration conf; - - ClusterConnection clusterConn; - RpcRetryingCallerFactory rpcFactory; - RpcControllerFactory controllerFactory; - RpcRetryingCaller caller; - - @Before - @SuppressWarnings({"deprecation", "unchecked"}) - public void setup() throws IOException { - clusterConn = Mockito.mock(ClusterConnection.class); - rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); - controllerFactory = Mockito.mock(RpcControllerFactory.class); - pool = Executors.newSingleThreadExecutor(); - scan = new Scan(); - conf = new Configuration(); - Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); - // Mock out the RpcCaller - caller = Mockito.mock(RpcRetryingCaller.class); - // Return the mock from the factory - Mockito.when(rpcFactory. newCaller()).thenReturn(caller); - } - - @After - public void teardown() { - if (null != pool) { - pool.shutdownNow(); - } - } - - /** - * Create a simple Answer which returns true the first time, and false every time after. - */ - private Answer createTrueThenFalseAnswer() { - return new Answer() { - boolean first = true; - - @Override - public Boolean answer(InvocationOnMock invocation) { - if (first) { - first = false; - return true; - } - return false; - } - }; - } - - private SmallReversedScannerCallableFactory getFactory( - final ScannerCallableWithReplicas callableWithReplicas) { - return new SmallReversedScannerCallableFactory() { - @Override - public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, - Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller, - boolean isFirstRegionToLocate) { - return callableWithReplicas; - } - }; - } - - @Test - public void testContextPresent() throws Exception { - final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum); - - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // Mock out the RpcCaller - @SuppressWarnings("unchecked") - RpcRetryingCaller caller = Mockito.mock(RpcRetryingCaller.class); - // Return the mock from the factory - Mockito.when(rpcFactory. newCaller()).thenReturn(caller); - - // Intentionally leave a "default" caching size in the Scan. No matter the value, we - // should continue based on the server context - - SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, - TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, - Integer.MAX_VALUE)) { - - csrs.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) - .thenAnswer(new Answer() { - int count = 0; - - @Override - public Result[] answer(InvocationOnMock invocation) { - Result[] results; - if (0 == count) { - results = new Result[] {Result.create(new Cell[] {kv3}), - Result.create(new Cell[] {kv2})}; - } else if (1 == count) { - results = new Result[] {Result.create(new Cell[] {kv1})}; - } else { - results = new Result[0]; - } - count++; - return results; - } - }); - - // Pass back the context always - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer( - createTrueThenFalseAnswer()); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - csrs.loadCache(); - - Queue results = csrs.cache; - Iterator iter = results.iterator(); - assertEquals(3, results.size()); - for (int i = 3; i >= 1 && iter.hasNext(); i--) { - Result result = iter.next(); - byte[] row = result.getRow(); - assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - } - assertTrue(csrs.closed); - } - } - - @Test - public void testNoContextFewerRecords() throws Exception { - final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum); - - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // While the server returns 2 records per batch, we expect more records. - scan.setCaching(2); - - SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, - TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, - Integer.MAX_VALUE)) { - - csrs.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) - .thenAnswer(new Answer() { - int count = 0; - - @Override - public Result[] answer(InvocationOnMock invocation) { - Result[] results; - if (0 == count) { - results = new Result[] {Result.create(new Cell[] {kv3}), - Result.create(new Cell[] {kv2})}; - } else if (1 == count) { - // Return fewer records than expected (2) - results = new Result[] {Result.create(new Cell[] {kv1})}; - } else { - throw new RuntimeException("Should not fetch a third batch from the server"); - } - count++; - return results; - } - }); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); - // getServerHasMoreResults shouldn't be called when hasMoreResultsContext returns false - Mockito.when(callableWithReplicas.getServerHasMoreResults()) - .thenThrow(new RuntimeException("Should not be called")); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - csrs.loadCache(); - - Queue results = csrs.cache; - Iterator iter = results.iterator(); - assertEquals(2, results.size()); - for (int i = 3; i >= 2 && iter.hasNext(); i--) { - Result result = iter.next(); - byte[] row = result.getRow(); - assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - } - - // "consume" the Results - results.clear(); - - csrs.loadCache(); - - assertEquals(1, results.size()); - Result result = results.peek(); - assertEquals("row1", new String(result.getRow(), StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - - assertTrue(csrs.closed); - } - } - - @Test - public void testNoContextNoRecords() throws Exception { - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // While the server return 2 records per RPC, we expect there to be more records. - scan.setCaching(2); - - SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, - TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, - Integer.MAX_VALUE)) { - - csrs.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) - .thenReturn(new Result[0]); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()) - .thenThrow(new RuntimeException("Should not be called")); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - csrs.loadCache(); - - assertEquals(0, csrs.cache.size()); - assertTrue(csrs.closed); - } - } - - @Test - public void testContextNoRecords() throws Exception { - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, - TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, - Integer.MAX_VALUE)) { - - csrs.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) - .thenReturn(new Result[0]); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()) - .thenReturn(false); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - csrs.loadCache(); - - assertEquals(0, csrs.cache.size()); - assertTrue(csrs.closed); - } - } -} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java deleted file mode 100644 index 318fbe7..0000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java +++ /dev/null @@ -1,335 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Test the ClientSmallScanner. - */ -@Category(SmallTests.class) -public class TestClientSmallScanner { - - Scan scan; - ExecutorService pool; - Configuration conf; - - ClusterConnection clusterConn; - RpcRetryingCallerFactory rpcFactory; - RpcControllerFactory controllerFactory; - RpcRetryingCaller caller; - - @Before - @SuppressWarnings({"deprecation", "unchecked"}) - public void setup() throws IOException { - clusterConn = Mockito.mock(ClusterConnection.class); - rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); - controllerFactory = Mockito.mock(RpcControllerFactory.class); - pool = Executors.newSingleThreadExecutor(); - scan = new Scan(); - conf = new Configuration(); - Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); - // Mock out the RpcCaller - caller = Mockito.mock(RpcRetryingCaller.class); - // Return the mock from the factory - Mockito.when(rpcFactory. newCaller()).thenReturn(caller); - } - - @After - public void teardown() { - if (null != pool) { - pool.shutdownNow(); - } - } - - /** - * Create a simple Answer which returns true the first time, and false every time after. - */ - private Answer createTrueThenFalseAnswer() { - return new Answer() { - boolean first = true; - - @Override - public Boolean answer(InvocationOnMock invocation) { - if (first) { - first = false; - return true; - } - return false; - } - }; - } - - private SmallScannerCallableFactory getFactory( - final ScannerCallableWithReplicas callableWithReplicas) { - return new SmallScannerCallableFactory() { - @Override - public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, - Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, - int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, - RpcRetryingCaller caller) { - return callableWithReplicas; - } - }; - } - - @Test - public void testContextPresent() throws Exception { - final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum); - - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // Mock out the RpcCaller - @SuppressWarnings("unchecked") - RpcRetryingCaller caller = Mockito.mock(RpcRetryingCaller.class); - // Return the mock from the factory - Mockito.when(rpcFactory. newCaller()).thenReturn(caller); - - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); - - // Intentionally leave a "default" caching size in the Scan. No matter the value, we - // should continue based on the server context - - try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - css.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) - .thenAnswer(new Answer() { - int count = 0; - - @Override - public Result[] answer(InvocationOnMock invocation) { - Result[] results; - if (0 == count) { - results = new Result[] {Result.create(new Cell[] {kv1}), - Result.create(new Cell[] {kv2})}; - } else if (1 == count) { - results = new Result[] {Result.create(new Cell[] {kv3})}; - } else { - results = new Result[0]; - } - count++; - return results; - } - }); - - // Pass back the context always - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer( - createTrueThenFalseAnswer()); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - css.loadCache(); - - Queue results = css.cache; - assertEquals(3, results.size()); - for (int i = 1; i <= 3; i++) { - Result result = results.poll(); - byte[] row = result.getRow(); - assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - } - - assertTrue(css.closed); - } - } - - @Test - public void testNoContextFewerRecords() throws Exception { - final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum); - - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // While the server returns 2 records per batch, we expect more records. - scan.setCaching(2); - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - css.setScannerCallableFactory(factory); - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) - .thenAnswer(new Answer() { - int count = 0; - - @Override - public Result[] answer(InvocationOnMock invocation) { - Result[] results; - if (0 == count) { - results = new Result[] {Result.create(new Cell[] {kv1}), - Result.create(new Cell[] {kv2})}; - } else if (1 == count) { - // Return fewer records than expected (2) - results = new Result[] {Result.create(new Cell[] {kv3})}; - } else { - throw new RuntimeException("Should not fetch a third batch from the server"); - } - count++; - return results; - } - }); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow( - new RuntimeException("Should not be called")); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - css.loadCache(); - - Queue results = css.cache; - assertEquals(2, results.size()); - for (int i = 1; i <= 2; i++) { - Result result = results.poll(); - byte[] row = result.getRow(); - assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - } - - // "consume" the results we verified - results.clear(); - - css.loadCache(); - - assertEquals(1, results.size()); - Result result = results.peek(); - assertEquals("row3", new String(result.getRow(), StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - assertTrue(css.closed); - } - } - - @Test - public void testNoContextNoRecords() throws Exception { - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // While the server return 2 records per RPC, we expect there to be more records. - scan.setCaching(2); - - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - css.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) - .thenReturn(new Result[0]); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow( - new RuntimeException("Should not be called")); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - css.loadCache(); - - assertEquals(0, css.cache.size()); - assertTrue(css.closed); - } - } - - @Test - public void testContextNoRecords() throws Exception { - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - css.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) - .thenReturn(new Result[0]); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenReturn(false); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - css.loadCache(); - - assertEquals(0, css.cache.size()); - assertTrue(css.closed); - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7307372..8f98607 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -208,8 +208,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; - /** * Implements the regionserver RPC services. */ @@ -354,14 +352,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final String scannerName; private final RegionScanner s; private final Region r; + private final boolean allowPartial; private final RpcCallback closeCallBack; private final RpcCallback shippedCallback; - public RegionScannerHolder(String scannerName, RegionScanner s, Region r, + public RegionScannerHolder(String scannerName, RegionScanner s, Region r, boolean allowPartial, RpcCallback closeCallBack, RpcCallback shippedCallback) { this.scannerName = scannerName; this.s = s; this.r = r; + this.allowPartial = allowPartial; this.closeCallBack = closeCallBack; this.shippedCallback = shippedCallback; } @@ -1212,8 +1212,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return lastBlock; } - private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r) - throws LeaseStillHeldException { + private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r, + boolean allowPartial) throws LeaseStillHeldException { Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease); @@ -1224,7 +1224,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, closeCallback = new RegionScannerCloseCallBack(s); } RegionScannerHolder rsh = - new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback); + new RegionScannerHolder(scannerName, s, r, allowPartial, closeCallback, shippedCallback); RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; return rsh; @@ -2682,8 +2682,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return rsh; } - private Pair newRegionScanner(ScanRequest request, - ScanResponse.Builder builder) throws IOException { + private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) + throws IOException { Region region = getRegion(request.getRegion()); ClientProtos.Scan protoScan = request.getScan(); boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); @@ -2714,7 +2714,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.setMvccReadPoint(scanner.getMvccReadPoint()); builder.setTtl(scannerLeaseTimeoutPeriod); String scannerName = String.valueOf(scannerId); - return Pair.newPair(addScanner(scannerName, scanner, region), scan.isSmall()); + return addScanner(scannerName, scanner, region, + !scan.isSmall() && !(request.hasLimitOfRows() && request.getLimitOfRows() > 0)); } private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) @@ -2770,9 +2771,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // return whether we have more results in region. private boolean scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, - boolean isSmallScan, long maxQuotaResultSize, int rows, List results, - ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) - throws IOException { + long maxQuotaResultSize, int rows, List results, ScanResponse.Builder builder, + MutableObject lastBlock, RpcCallContext context) throws IOException { Region region = rsh.r; RegionScanner scanner = rsh.s; long maxResultSize; @@ -2803,7 +2803,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // formed. boolean serverGuaranteesOrderOfPartials = results.isEmpty(); boolean allowPartialResults = - clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; + clientHandlesPartials && serverGuaranteesOrderOfPartials && rsh.allowPartial; boolean moreRows = false; // Heartbeat messages occur when the processing of the ScanRequest is exceeds a @@ -2960,15 +2960,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rpcScanRequestCount.increment(); RegionScannerHolder rsh; ScanResponse.Builder builder = ScanResponse.newBuilder(); - boolean isSmallScan; try { if (request.hasScannerId()) { rsh = getRegionScanner(request); - isSmallScan = false; } else { - Pair pair = newRegionScanner(request, builder); - rsh = pair.getFirst(); - isSmallScan = pair.getSecond().booleanValue(); + rsh = newRegionScanner(request, builder); } } catch (IOException e) { if (e == SCANNER_ALREADY_CLOSED) { @@ -3055,7 +3051,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } if (!done) { - moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh, isSmallScan, + moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, results, builder, lastBlock, context); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index c6d3e80..ff5d898 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType; import org.apache.hadoop.hbase.util.Bytes; @@ -367,7 +368,8 @@ public final class Canary implements Tool { scan.setFilter(new FirstKeyOnlyFilter()); scan.addFamily(column.getName()); scan.setMaxResultSize(1L); - scan.setSmall(true); + scan.setLimit(1); + scan.setReadType(Scan.ReadType.PREAD); } if (LOG.isDebugEnabled()) { @@ -502,7 +504,8 @@ public final class Canary implements Tool { scan.setFilter(new FirstKeyOnlyFilter()); scan.setCaching(1); scan.setMaxResultSize(1L); - scan.setSmall(true); + scan.setLimit(1); + scan.setReadType(Scan.ReadType.PREAD); stopWatch.start(); ResultScanner s = table.getScanner(scan); s.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java index 368f050..414ffa7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java @@ -167,7 +167,7 @@ public class TestMetaTableAccessorNoCluster { public ScanResponse answer(InvocationOnMock invocation) throws Throwable { ((HBaseRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil .createCellScanner(cellScannables)); - return builder.build(); + return builder.setScannerId(1234567890L).build(); } }).thenReturn(ScanResponse.newBuilder().setMoreResults(false).build()); // Associate a spied-upon Connection with UTIL.getConfiguration. Need diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index 18a4d86..e6299f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -100,7 +100,7 @@ public class TestPartialResultsFromClientSide { // getCellHeapSize(). private static long CELL_HEAP_SIZE = -1; - private static long timeout = 10000; + private static long timeout = 10000000; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -609,11 +609,11 @@ public class TestPartialResultsFromClientSide { scan.setAllowPartialResults(true); scan.setSmall(true); scan.setMaxResultSize(1); - ResultScanner scanner = TABLE.getScanner(scan); Result r = null; while ((r = scanner.next()) != null) { + System.out.println(r); assertFalse(r.isPartial()); } @@ -733,11 +733,13 @@ public class TestPartialResultsFromClientSide { byte[] value = Bytes.createMaxByteArray(100); Table tmpTable = createTestTable(testName, rows, families, qualifiers, value); - // Open scanner before deletes ResultScanner scanner = tmpTable.getScanner(new Scan().setMaxResultSize(1).setAllowPartialResults(true)); - + // now the openScanner will also fetch data and will be executed lazily, i.e, only openScanner + // when you call next, so here we need to make a next call to open scanner. The maxResultSize + // limit can make sure that we will not fetch all the data at once, so the test sill works. + int scannerCount = scanner.next().rawCells().length; Delete delete1 = new Delete(rows[0]); delete1.addColumn(families[0], qualifiers[0], 0); tmpTable.delete(delete1); @@ -747,7 +749,7 @@ public class TestPartialResultsFromClientSide { tmpTable.delete(delete2); // Should see all cells because scanner was opened prior to deletes - int scannerCount = countCellsFromScanner(scanner); + scannerCount += countCellsFromScanner(scanner); int expectedCount = numRows * numFamilies * numQualifiers; assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount, scannerCount == expectedCount); @@ -760,6 +762,7 @@ public class TestPartialResultsFromClientSide { scannerCount == expectedCount); scanner = tmpTable.getScanner(new Scan().setMaxResultSize(1).setAllowPartialResults(true)); + scannerCount = scanner.next().rawCells().length; // Put in 2 new rows. The timestamps differ from the deleted rows Put put1 = new Put(rows[0]); put1.add(new KeyValue(rows[0], families[0], qualifiers[0], 1, value)); @@ -770,7 +773,7 @@ public class TestPartialResultsFromClientSide { tmpTable.put(put2); // Scanner opened prior to puts. Cell count shouldn't have changed - scannerCount = countCellsFromScanner(scanner); + scannerCount += countCellsFromScanner(scanner); expectedCount = numRows * numFamilies * numQualifiers - 2; assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount, scannerCount == expectedCount); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index a15cbb3..178b537 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -45,12 +45,11 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import junit.framework.Assert; - @Category({RegionServerTests.class, MediumTests.class}) public class TestScannerWithBulkload { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -213,6 +212,7 @@ public class TestScannerWithBulkload { final Admin admin = TEST_UTIL.getAdmin(); createTable(admin, tableName); Scan scan = createScan(); + scan.setCaching(1); final Table table = init(admin, l, scan, tableName); // use bulkload final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", @@ -221,6 +221,7 @@ public class TestScannerWithBulkload { conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); // Create a scanner and then do bulk load final CountDownLatch latch = new CountDownLatch(1); new Thread() { @@ -242,7 +243,6 @@ public class TestScannerWithBulkload { latch.await(); // By the time we do next() the bulk loaded files are also added to the kv // scanner - Result result = scanner.next(); scanAfterBulkLoad(scanner, result, "version1"); scanner.close(); table.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 2efc5ff..c9afbfa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -898,7 +898,7 @@ public class TestAccessController extends SecureTestUtil { private void verifyRead(AccessTestAction action) throws Exception { verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER, USER_CREATE, USER_RW, USER_RO, USER_GROUP_READ); - verifyDenied(action, USER_NONE, USER_GROUP_CREATE, USER_GROUP_ADMIN, USER_GROUP_WRITE); + verifyDenied(action, USER_NONE); } private void verifyReadWrite(AccessTestAction action) throws Exception { @@ -937,7 +937,6 @@ public class TestAccessController extends SecureTestUtil { for (Result r = scanner.next(); r != null; r = scanner.next()) { // do nothing } - } catch (IOException e) { } finally { scanner.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java index b939156..6e1e09c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java @@ -384,6 +384,7 @@ public class TestAccessController2 extends SecureTestUtil { Scan s1 = new Scan(); s1.addFamily(TEST_FAMILY_2); try (ResultScanner scanner1 = table.getScanner(s1);) { + scanner1.next(); } } return null; @@ -414,6 +415,7 @@ public class TestAccessController2 extends SecureTestUtil { Scan s1 = new Scan(); s1.addFamily(TEST_FAMILY_2); try (ResultScanner scanner1 = table.getScanner(s1);) { + scanner1.next(); } } return null; @@ -428,6 +430,7 @@ public class TestAccessController2 extends SecureTestUtil { Scan s1 = new Scan(); s1.addColumn(TEST_FAMILY, Q2); try (ResultScanner scanner1 = table.getScanner(s1);) { + scanner1.next(); } } return null; -- 1.9.1