From 9a10e73dc70ae151ff5aa8ed3df2d2a12255e232 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 5 Feb 2017 21:30:55 +0800 Subject: [PATCH] Add inclusive/exclusive support for startRow and endRow of scan for sync client --- .../hbase/client/ClientAsyncPrefetchScanner.java | 2 +- .../apache/hadoop/hbase/client/ClientScanner.java | 197 +++++++-------------- .../hadoop/hbase/client/ClientSimpleScanner.java | 27 ++- .../hbase/client/ConnectionImplementation.java | 2 +- .../hadoop/hbase/client/ReversedClientScanner.java | 124 ++----------- .../hbase/client/ReversedScannerCallable.java | 48 ++--- .../hadoop/hbase/client/RpcRetryingCallerImpl.java | 1 + .../hadoop/hbase/client/ScannerCallable.java | 13 +- .../hbase/client/ScannerCallableWithReplicas.java | 4 - .../hadoop/hbase/client/TestClientScanner.java | 41 +---- .../hbase/TestMetaTableAccessorNoCluster.java | 9 +- .../hbase/client/AbstractTestAsyncTableScan.java | 2 +- .../hadoop/hbase/client/TestFromClientSide.java | 1 + .../hbase/client/TestScannersFromClientSide.java | 82 ++++++++- 14 files changed, 205 insertions(+), 348 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index ee323a9..f632bcb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.util.Threads; * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}. */ @InterfaceAudience.Private -public class ClientAsyncPrefetchScanner extends ClientScanner { +public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024; private static final int DEFAULT_QUEUE_CAPACITY = 1024; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 313cb63..e222906 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; import com.google.common.annotations.VisibleForTesting; @@ -64,7 +62,7 @@ public abstract class ClientScanner extends AbstractClientScanner { private static final Log LOG = LogFactory.getLog(ClientScanner.class); - protected Scan scan; + protected final Scan scan; protected boolean closed = false; // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. @@ -162,8 +160,6 @@ public abstract class ClientScanner extends AbstractClientScanner { initCache(); } - protected abstract void initCache(); - protected ClusterConnection getConnection() { return this.connection; } @@ -209,21 +205,6 @@ public abstract class ClientScanner extends AbstractClientScanner { return maxScannerResultSize; } - // returns true if the passed region endKey - protected boolean checkScanStopRow(final byte[] endKey) { - if (this.scan.getStopRow().length > 0) { - // there is a stop row, check to see if we are past it. - byte[] stopRow = scan.getStopRow(); - int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, endKey, 0, endKey.length); - if (cmp <= 0) { - // stopRow <= endKey (endKey is equals to or larger than stopRow) - // This is a stop. - return true; - } - } - return false; // unlikely. - } - protected final void closeScanner() throws IOException { if (this.callable != null) { this.callable.setClose(); @@ -232,66 +213,47 @@ public abstract class ClientScanner extends AbstractClientScanner { } } - /** - * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the - * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). - * @param nbRows the caching option of the scan - * @return the results fetched when open scanner, or null which means terminate the scan. - */ - protected Result[] nextScanner(int nbRows) throws IOException { + protected abstract boolean setNewStartKey(); + + protected abstract ScannerCallable createScannerCallable(); + + // return false if we should terminate the scan + // protected only because TestClientScanner need to override this method. + @VisibleForTesting + protected boolean moveToNextRegion() { // Close the previous scanner if it's open - closeScanner(); - - // Where to start the next scanner - byte[] localStartKey; - - // if we're at end of table, close and return null to stop iterating - if (this.currentRegion != null) { - byte[] endKey = this.currentRegion.getEndKey(); - if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || - checkScanStopRow(endKey)) { - close(); - if (LOG.isTraceEnabled()) { - LOG.trace("Finished " + this.currentRegion); - } - return null; + try { + closeScanner(); + } catch (IOException e) { + // not a big deal continue + LOG.warn("close scanner for " + currentRegion + " failed", e); + } + if (currentRegion != null) { + if (!setNewStartKey()) { + return false; } - localStartKey = endKey; - // clear mvcc read point if we are going to switch regions scan.resetMvccReadPoint(); if (LOG.isTraceEnabled()) { LOG.trace("Finished " + this.currentRegion); } - } else { - localStartKey = this.scan.getStartRow(); } - if (LOG.isDebugEnabled() && this.currentRegion != null) { // Only worth logging if NOT first region in scan. LOG.debug( - "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); + "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) + + "', " + (scan.includeStartRow() ? "inclusive" : "exclusive")); } - try { - callable = getScannerCallable(localStartKey, nbRows); - // Open a scanner on the region server starting at the - // beginning of the region - Result[] rrs = call(callable, caller, scannerTimeout); - this.currentRegion = callable.getHRegionInfo(); - if (this.scanMetrics != null) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) { - // no results for the scan, return null to terminate the scan. - closed = true; - callable = null; - currentRegion = null; - return null; - } - return rrs; - } catch (IOException e) { - closeScanner(); - throw e; + // clear the current region, we will set a new value to it after the first call of the new + // callable. + this.currentRegion = null; + this.callable = + new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, + primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); + this.callable.setCaching(this.caching); + if (this.scanMetrics != null) { + this.scanMetrics.countOfRegions.incrementAndGet(); } + return true; } @VisibleForTesting @@ -306,18 +268,11 @@ public abstract class ClientScanner extends AbstractClientScanner { } // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // we do a callWithRetries - return caller.callWithoutRetries(callable, scannerTimeout); - } - - @InterfaceAudience.Private - protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, int nbRows) { - scan.setStartRow(localStartKey); - ScannerCallable s = new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, - this.rpcControllerFactory); - s.setCaching(nbRows); - ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), s, - pool, primaryOperationTimeout, scan, retries, scannerTimeout, caching, conf, caller); - return sr; + Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout); + if (currentRegion == null) { + currentRegion = callable.getHRegionInfo(); + } + return rrs; } /** @@ -367,9 +322,7 @@ public abstract class ClientScanner extends AbstractClientScanner { } private boolean scanExhausted(Result[] values) { - // This means the server tells us the whole scan operation is done. Usually decided by filter or - // limit. - return values == null || callable.moreResultsForScan() == MoreResults.NO; + return callable.moreResultsForScan() == MoreResults.NO; } private boolean regionExhausted(Result[] values) { @@ -377,8 +330,8 @@ public abstract class ClientScanner extends AbstractClientScanner { // old time we always return empty result for a open scanner operation so we add a check here to // keep compatible with the old logic. Should remove the isOpenScanner in the future. // 2. Server tells us that it has no more results for this region. - return (values.length == 0 && !callable.isHeartbeatMessage() && !callable.isOpenScanner()) - || callable.moreResultsInRegion() == MoreResults.NO; + return (values.length == 0 && !callable.isHeartbeatMessage()) || + callable.moreResultsInRegion() == MoreResults.NO; } private void closeScannerIfExhausted(boolean exhausted) throws IOException { @@ -394,16 +347,6 @@ public abstract class ClientScanner extends AbstractClientScanner { } } - private Result[] nextScannerWithRetries(int nbRows) throws IOException { - for (;;) { - try { - return nextScanner(nbRows); - } catch (DoNotRetryIOException e) { - handleScanError(e, null); - } - } - } - private void handleScanError(DoNotRetryIOException e, MutableBoolean retryAfterOutOfOrderException) throws DoNotRetryIOException { // An exception was thrown which makes any partial results that we were collecting @@ -431,26 +374,15 @@ public abstract class ClientScanner extends AbstractClientScanner { // Reset the startRow to the row we've seen last so that the new scanner starts at // the correct row. Otherwise we may see previously returned rows again. // (ScannerCallable by now has "relocated" the correct region) - if (!this.lastResult.isPartial() && scan.getBatch() < 0) { - if (scan.isReversed()) { - scan.setStartRow(createClosestRowBefore(lastResult.getRow())); - } else { - scan.setStartRow(createClosestRowAfter(lastResult.getRow())); - } - } else { - // we need rescan this row because we only loaded partial row before - scan.setStartRow(lastResult.getRow()); - } + scan.withStartRow(lastResult.getRow(), lastResult.isPartial() || scan.getBatch() > 0); } if (e instanceof OutOfOrderScannerNextException) { - if (retryAfterOutOfOrderException != null) { - if (retryAfterOutOfOrderException.isTrue()) { - retryAfterOutOfOrderException.setValue(false); - } else { - // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? - throw new DoNotRetryIOException( - "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); - } + if (retryAfterOutOfOrderException.isTrue()) { + retryAfterOutOfOrderException.setValue(false); + } else { + // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? + throw new DoNotRetryIOException( + "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); } } // Clear region. @@ -468,31 +400,26 @@ public abstract class ClientScanner extends AbstractClientScanner { if (closed) { return; } - Result[] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; // This is possible if we just stopped at the boundary of a region in the previous call. if (callable == null) { - values = nextScannerWithRetries(countdown); - if (values == null) { + if (!moveToNextRegion()) { return; } } - // We need to reset it if it's a new callable that was created with a countdown in nextScanner - callable.setCaching(this.caching); // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true); for (;;) { + Result[] values; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. // now we will also fetch data when openScanner, so do not make a next call again if values // is already non-null. - if (values == null) { - values = call(callable, caller, scannerTimeout); - } + values = call(callable, caller, scannerTimeout); // When the replica switch happens, we need to do certain operations again. // The callable will openScanner with the right startkey but we need to pick up // from there. Bypass the rest of the loop and let the catch-up happen in the beginning @@ -502,19 +429,12 @@ public abstract class ClientScanner extends AbstractClientScanner { // openScanner with the correct startkey and we must pick up from there clearPartialResults(); this.currentRegion = callable.getHRegionInfo(); - // Now we will also fetch data when openScanner so usually we should not get a null - // result, but at some places we still use null to indicate the scan is terminated, so add - // a sanity check here. Should be removed later. - if (values == null) { - continue; - } } retryAfterOutOfOrderException.setValue(true); } catch (DoNotRetryIOException e) { handleScanError(e, retryAfterOutOfOrderException); // reopen the scanner - values = nextScannerWithRetries(countdown); - if (values == null) { + if (!moveToNextRegion()) { break; } continue; @@ -595,17 +515,13 @@ public abstract class ClientScanner extends AbstractClientScanner { if (!partialResults.isEmpty()) { // XXX: continue if there are partial results. But in fact server should not set // hasMoreResults to false if there are partial results. - LOG.warn("Server tells us there is no more results for this region but we still have" - + " partialResults, this should not happen, retry on the current scanner anyway"); - values = null; // reset values for the next call + LOG.warn("Server tells us there is no more results for this region but we still have" + + " partialResults, this should not happen, retry on the current scanner anyway"); continue; } - values = nextScannerWithRetries(countdown); - if (values == null) { + if (!moveToNextRegion()) { break; } - } else { - values = null; // reset values for the next call } } } @@ -876,4 +792,13 @@ public abstract class ClientScanner extends AbstractClientScanner { Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); } + + protected void initCache() { + initSyncCache(); + } + + @Override + public Result next() throws IOException { + return nextWithSyncCache(); + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index ecf083b..a365239 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; +import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import java.io.IOException; -import java.util.concurrent.ExecutorService; - /** * ClientSimpleScanner implements a sync scanner behaviour. * The cache is a simple list. @@ -41,12 +44,22 @@ public class ClientSimpleScanner extends ClientScanner { } @Override - protected void initCache() { - initSyncCache(); + protected boolean setNewStartKey() { + if (noMoreResultsForScan(scan, currentRegion)) { + return false; + } + scan.withStartRow(currentRegion.getEndKey(), true); + return true; } @Override - public Result next() throws IOException { - return nextWithSyncCache(); + protected ScannerCallable createScannerCallable() { + if (!scan.includeStartRow()) { + // we have not implemented locate to next row for sync client yet, so here we change the + // inclusive of start row to true. + scan.withStartRow(createClosestRowAfter(scan.getStartRow()), true); + } + return new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, + this.rpcControllerFactory); } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index f0b755f..0fb9758 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -787,7 +787,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { Scan s = new Scan(); s.setReversed(true); - s.setStartRow(metaKey); + s.withStartRow(metaKey); s.addFamily(HConstants.CATALOG_FAMILY); s.setOneRowLimit(); if (this.useMetaReplicas) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index c308dd4..d67f936 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -18,32 +18,25 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; import java.io.IOException; import java.util.concurrent.ExecutorService; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ExceptionUtil; /** * A reversed client scanner which support backward scanning */ @InterfaceAudience.Private -public class ReversedClientScanner extends ClientSimpleScanner { - private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); +public class ReversedClientScanner extends ClientScanner { /** - * Create a new ReversibleClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed. + * Create a new ReversibleClientScanner for the specified table Note that the passed + * {@link Scan}'s start row maybe changed. * @param conf * @param scan * @param tableName @@ -52,111 +45,26 @@ public class ReversedClientScanner extends ClientSimpleScanner { * @param primaryOperationTimeout * @throws IOException */ - public ReversedClientScanner(Configuration conf, Scan scan, - TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, - ExecutorService pool, int primaryOperationTimeout) throws IOException { + public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, + ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) + throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout); } @Override - protected Result[] nextScanner(int nbRows) throws IOException { - // Close the previous scanner if it's open - closeScanner(); - - // Where to start the next scanner - byte[] localStartKey; - boolean locateTheClosestFrontRow = true; - // if we're at start of table, close and return false to stop iterating - if (this.currentRegion != null) { - byte[] startKey = this.currentRegion.getStartKey(); - if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(startKey)) { - close(); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished " + this.currentRegion); - } - return null; - } - localStartKey = startKey; - // clear mvcc read point if we are going to switch regions - scan.resetMvccReadPoint(); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished " + this.currentRegion); - } - } else { - localStartKey = this.scan.getStartRow(); - if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) { - locateTheClosestFrontRow = false; - } - } - - if (LOG.isDebugEnabled() && this.currentRegion != null) { - // Only worth logging if NOT first region in scan. - LOG.debug("Advancing internal scanner to startKey at '" - + Bytes.toStringBinary(localStartKey) + "'"); - } - try { - // In reversed scan, we want to locate the previous region through current - // region's start key. In order to get that previous region, first we - // create a closest row before the start key of current region, then - // locate all the regions from the created closest row to start key of - // current region, thus the last one of located regions should be the - // previous region of current region. The related logic of locating - // regions is implemented in ReversedScannerCallable - byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey) - : null; - callable = getScannerCallable(localStartKey, nbRows, locateStartRow); - // Open a scanner on the region server starting at the - // beginning of the region - // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, - // we do a callWithRetries - Result[] rrs = this.caller.callWithoutRetries(callable, scannerTimeout); - this.currentRegion = callable.getHRegionInfo(); - if (this.scanMetrics != null) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) { - // no results for the scan, return null to terminate the scan. - return null; - } - return rrs; - } catch (IOException e) { - ExceptionUtil.rethrowIfInterrupt(e); - close(); - throw e; + protected boolean setNewStartKey() { + if (noMoreResultsForReverseScan(scan, currentRegion)) { + return false; } - } - - protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, - int nbRows, byte[] locateStartRow) { - scan.setStartRow(localStartKey); - ScannerCallable s = - new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, - locateStartRow, this.rpcControllerFactory); - s.setCaching(nbRows); - ScannerCallableWithReplicas sr = - new ScannerCallableWithReplicas(getTable(), getConnection(), s, pool, - primaryOperationTimeout, scan, getRetries(), getScannerTimeout(), caching, getConf(), - caller); - return sr; + scan.withStartRow(currentRegion.getStartKey(), false); + return true; } @Override - // returns true if stopRow >= passed region startKey - protected boolean checkScanStopRow(final byte[] startKey) { - if (this.scan.getStopRow().length > 0) { - // there is a stop row, check to see if we are past it. - byte[] stopRow = scan.getStopRow(); - int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0, - startKey.length); - if (cmp >= 0) { - // stopRow >= startKey (stopRow is equals to or larger than endKey) - // This is a stop. - return true; - } - } - return false; // unlikely. + protected ReversedScannerCallable createScannerCallable() { + return new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, + this.rpcControllerFactory); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index 195bcba..2d1d552 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -18,6 +18,9 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -39,26 +42,18 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Private public class ReversedScannerCallable extends ScannerCallable { - /** - * The start row for locating regions. In reversed scanner, may locate the - * regions for a range of keys when doing - * {@link ReversedClientScanner#nextScanner(int)} - */ - protected final byte[] locateStartRow; /** * @param connection * @param tableName * @param scan * @param scanMetrics - * @param locateStartRow The start row for locating regions - * @param rpcFactory to create an {@link com.google.protobuf.RpcController} - * to talk to the regionserver + * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the + * regionserver */ public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, - ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) { + ScanMetrics scanMetrics, RpcControllerFactory rpcFactory) { super(connection, tableName, scan, scanMetrics, rpcFactory); - this.locateStartRow = locateStartRow; } /** @@ -66,28 +61,13 @@ public class ReversedScannerCallable extends ScannerCallable { * @param tableName * @param scan * @param scanMetrics - * @param locateStartRow The start row for locating regions - * @param rpcFactory to create an {@link com.google.protobuf.RpcController} - * to talk to the regionserver + * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the + * regionserver * @param replicaId the replica id */ public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, - ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory, - int replicaId) { + ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) { super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId); - this.locateStartRow = locateStartRow; - } - - /** - * @deprecated use - * {@link #ReversedScannerCallable(ClusterConnection, TableName, Scan, - * ScanMetrics, byte[], RpcControllerFactory )} - */ - @Deprecated - public ReversedScannerCallable(ClusterConnection connection, TableName tableName, - Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) { - this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory - .instantiate(connection.getConfiguration())); } /** @@ -100,12 +80,15 @@ public class ReversedScannerCallable extends ScannerCallable { throw new InterruptedIOException(); } if (!instantiated || reload) { - if (locateStartRow == null) { + // we should use range locate if + // 1. we do not want the start row + // 2. the start row is empty which means we need to locate to the last region. + if (scan.includeStartRow() && !isEmptyStartRow(getRow())) { // Just locate the region with the row RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id, getConnection(), getTableName(), getRow()); this.location = id < rl.size() ? rl.getRegionLocation(id) : null; - if (this.location == null) { + if (location == null || location.getServerName() == null) { throw new IOException("Failed to find location, tableName=" + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload=" + reload); @@ -113,6 +96,7 @@ public class ReversedScannerCallable extends ScannerCallable { } else { // Need to locate the regions with the range, and the target location is // the last one which is the previous region of last region scanner + byte[] locateStartRow = createClosestRowBefore(getRow()); List locatedRegions = locateRegionsInRange( locateStartRow, getRow(), reload); if (locatedRegions.isEmpty()) { @@ -177,7 +161,7 @@ public class ReversedScannerCallable extends ScannerCallable { @Override public ScannerCallable getScannerCallableForReplica(int id) { ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(), - this.getScan(), this.scanMetrics, this.locateStartRow, rpcControllerFactory, id); + this.getScan(), this.scanMetrics, rpcControllerFactory, id); r.setCaching(this.getCaching()); return r; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 6450adf..f954627 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -108,6 +108,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { } catch (PreemptiveFastFailException e) { throw e; } catch (Throwable t) { + t.printStackTrace(); ExceptionUtil.rethrowIfInterrupt(t); // translateException throws exception when should not retry: i.e. when request is bad. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 3ef68ef..0682a7a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -67,7 +67,7 @@ public class ScannerCallable extends ClientServiceCallable { protected boolean instantiated = false; protected boolean closed = false; protected boolean renew = false; - private Scan scan; + protected final Scan scan; private int caching = 1; protected ScanMetrics scanMetrics; private boolean logScannerActivity = false; @@ -82,7 +82,6 @@ public class ScannerCallable extends ClientServiceCallable { private MoreResults moreResultsInRegion; private MoreResults moreResultsForScan; - private boolean openScanner; /** * Saves whether or not the most recent response from the server was a heartbeat message. * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} @@ -253,10 +252,8 @@ public class ScannerCallable extends ClientServiceCallable { } ScanResponse response; if (this.scannerId == -1L) { - this.openScanner = true; response = openScanner(); } else { - this.openScanner = false; response = next(); } long timestamp = System.currentTimeMillis(); @@ -469,12 +466,4 @@ public class ScannerCallable extends ClientServiceCallable { void setMoreResultsForScan(MoreResults moreResults) { this.moreResultsForScan = moreResults; } - - /** - * Whether the previous call is openScanner. This is used to keep compatible with the old - * implementation that we always returns empty result for openScanner. - */ - boolean isOpenScanner() { - return openScanner; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index c99fe9a..f7285c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -122,10 +122,6 @@ class ScannerCallableWithReplicas implements RetryingCallable { return currentScannerCallable.moreResultsForScan(); } - public boolean isOpenScanner() { - return currentScannerCallable.isOpenScanner(); - } - @Override public Result [] call(int timeout) throws IOException { // If the active replica callable was closed somewhere, invoke the RPC to diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index cf0e995..2e957d17 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -85,7 +85,7 @@ public class TestClientScanner { } } - private static class MockClientScanner extends ClientScanner { + private static class MockClientScanner extends ClientSimpleScanner { private boolean rpcFinished = false; private boolean rpcFinishedFired = false; @@ -100,50 +100,26 @@ public class TestClientScanner { } @Override - protected Result[] nextScanner(int nbRows) throws IOException { + protected boolean moveToNextRegion() { if (!initialized) { initialized = true; - return super.nextScanner(nbRows); + return super.moveToNextRegion(); } if (!rpcFinished) { - return super.nextScanner(nbRows); + return super.moveToNextRegion(); } - // Enforce that we don't short-circuit more than once if (rpcFinishedFired) { throw new RuntimeException("Expected nextScanner to only be called once after " + " short-circuit was triggered."); } rpcFinishedFired = true; - return null; - } - - @Override - protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey, - int nbRows) { - scan.setStartRow(localStartKey); - ScannerCallable s = - new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, - this.rpcControllerFactory); - s.setCaching(nbRows); - ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(), - s, pool, primaryOperationTimeout, scan, - getRetries(), scannerTimeout, caching, conf, caller); - return sr; + return false; } public void setRpcFinished(boolean rpcFinished) { this.rpcFinished = rpcFinished; } - - @Override - protected void initCache() { - initSyncCache(); - } - - @Override public Result next() throws IOException { - return nextWithSyncCache(); - } } @Test @@ -172,7 +148,7 @@ public class TestClientScanner { case 1: // detect no more results case 2: // close count++; - return null; + return new Result[0]; default: throw new RuntimeException("Expected only 2 invocations"); } @@ -192,10 +168,9 @@ public class TestClientScanner { scanner.loadCache(); - // One for initializeScannerInConstruction() // One for fetching the results - // One for fetching null results and quit as we do not have moreResults hint. - inOrder.verify(caller, Mockito.times(3)).callWithoutRetries( + // One for fetching empty results and quit as we do not have moreResults hint. + inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( Mockito.any(RetryingCallable.class), Mockito.anyInt()); assertEquals(1, scanner.cache.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java index 414ffa7..870ebb3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java @@ -162,14 +162,13 @@ public class TestMetaTableAccessorNoCluster { .thenThrow(new ServiceException("Server not running (1 of 3)")) .thenThrow(new ServiceException("Server not running (2 of 3)")) .thenThrow(new ServiceException("Server not running (3 of 3)")) - .thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build()) .thenAnswer(new Answer() { public ScanResponse answer(InvocationOnMock invocation) throws Throwable { ((HBaseRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil .createCellScanner(cellScannables)); - return builder.setScannerId(1234567890L).build(); + return builder.setScannerId(1234567890L).setMoreResults(false).build(); } - }).thenReturn(ScanResponse.newBuilder().setMoreResults(false).build()); + }); // Associate a spied-upon Connection with UTIL.getConfiguration. Need // to shove this in here first so it gets picked up all over; e.g. by // HTable. @@ -198,8 +197,8 @@ public class TestMetaTableAccessorNoCluster { assertTrue(hris.firstEntry().getKey().equals(HRegionInfo.FIRST_META_REGIONINFO)); assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow())); // Finally verify that scan was called four times -- three times - // with exception and then on 4th, 5th and 6th attempt we succeed - Mockito.verify(implementation, Mockito.times(6)). + // with exception and then on 4th attempt we succeed + Mockito.verify(implementation, Mockito.times(4)). scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any()); } finally { if (connection != null && !connection.isClosed()) connection.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index b80efae..d0c9806 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -175,7 +175,7 @@ public abstract class AbstractTestAsyncTableScan { @Test public void testReversedScanWithStartKeyAndStopKey() throws Exception { - testReversedScan(998, true, 1, false); // from first region to first region + testReversedScan(998, true, 1, false); // from last region to first region testReversedScan(543, true, 321, true); testReversedScan(654, true, 432, false); testReversedScan(765, false, 543, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 74df9cb..f28256b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -146,6 +146,7 @@ public class TestFromClientSide { conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MultiRowMutationEndpoint.class.getName()); conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); // We need more than one region server in this test TEST_UTIL.startMiniCluster(SLAVES); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index c48ec31..d971f4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -43,7 +44,6 @@ import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -142,7 +142,7 @@ public class TestScannersFromClientSide { ht.delete(delete); // without batch - scan = new Scan(ROW); + scan = new Scan().withStartRow(ROW); scan.setMaxVersions(); scanner = ht.getScanner(scan); @@ -156,7 +156,7 @@ public class TestScannersFromClientSide { verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); // with batch - scan = new Scan(ROW); + scan = new Scan().withStartRow(ROW); scan.setMaxVersions(); scan.setBatch(2); scanner = ht.getScanner(scan); @@ -267,7 +267,7 @@ public class TestScannersFromClientSide { * @param columns * @throws Exception */ - public void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { + private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { Scan baseScan = new Scan(); baseScan.setReversed(reversed); baseScan.setSmall(true); @@ -294,9 +294,7 @@ public class TestScannersFromClientSide { Result r = null; while ((r = scanner.next()) != null) { rowCount++; - for (Cell c : r.rawCells()) { - cellCount++; - } + cellCount += r.rawCells().length; } assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, @@ -599,7 +597,7 @@ public class TestScannersFromClientSide { } ht.put(put); - scan = new Scan(ROW); + scan = new Scan().withStartRow(ROW); scanner = ht.getScanner(scan); HRegionLocation loc; @@ -764,5 +762,73 @@ public class TestScannersFromClientSide { assertEquals(expKvList.size(), result.size()); } + private void assertResultEquals(Result result, int i) { + assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + } + + private void testStartRowStopRowInclusive(Table table, int start, boolean startInclusive, + int stop, boolean stopInclusive) throws IOException { + int actualStart = startInclusive ? start : start + 1; + int actualStop = stopInclusive ? stop + 1 : stop; + int expectedCount = actualStop - actualStart; + Result[] results; + try (ResultScanner scanner = table.getScanner( + new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive) + .withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive))) { + results = scanner.next(expectedCount); + } + assertEquals(expectedCount, results.length); + for (int i = 0; i < expectedCount; i++) { + assertResultEquals(results[i], actualStart + i); + } + } + private void testReversedStartRowStopRowInclusive(Table table, int start, boolean startInclusive, + int stop, boolean stopInclusive) throws IOException { + int actualStart = startInclusive ? start : start - 1; + int actualStop = stopInclusive ? stop - 1 : stop; + int expectedCount = actualStart - actualStop; + Result[] results; + try (ResultScanner scanner = table.getScanner( + new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive) + .withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive) + .setReversed(true))) { + results = scanner.next(expectedCount); + } + assertEquals(expectedCount, results.length); + for (int i = 0; i < expectedCount; i++) { + assertResultEquals(results[i], actualStart - i); + } + } + + @Test + public void testStartRowStopRowInclusive() throws IOException, InterruptedException { + TableName tableName = TableName.valueOf("testStartRowStopRowInclusive"); + byte[][] splitKeys = new byte[8][]; + for (int i = 11; i < 99; i += 11) { + splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i)); + } + Table table = TEST_UTIL.createTable(tableName, FAMILY, splitKeys); + TEST_UTIL.waitTableAvailable(tableName); + try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(tableName)) { + for (int i = 0; i < 100; i++) { + mutator.mutate(new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, QUALIFIER, + Bytes.toBytes(i))); + } + } + // from first region to last region + testStartRowStopRowInclusive(table, 1, true, 98, false); + testStartRowStopRowInclusive(table, 12, true, 34, true); + testStartRowStopRowInclusive(table, 23, true, 45, false); + testStartRowStopRowInclusive(table, 34, false, 56, true); + testStartRowStopRowInclusive(table, 45, false, 67, false); + + // from last region to first region + testReversedStartRowStopRowInclusive(table, 98, true, 1, false); + testReversedStartRowStopRowInclusive(table, 54, true, 32, true); + testReversedStartRowStopRowInclusive(table, 65, true, 43, false); + testReversedStartRowStopRowInclusive(table, 76, false, 54, true); + testReversedStartRowStopRowInclusive(table, 87, false, 65, false); + } } -- 1.9.1