Index: src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (revision 0) @@ -0,0 +1,98 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +/** + * Helper class for custom client scanners. + */ +public abstract class AbstractClientScanner implements ResultScanner { + /** + * Get nbRows rows. + * How many RPCs are made is determined by the {@link Scan#setCaching(int)} + * setting (or hbase.client.scanner.caching in hbase-site.xml). + * @param nbRows number of rows to return + * @return Between zero and nbRows RowResults. Scan is done + * if returned array is of zero-length (We never return null). + * @throws IOException + */ + @Override + public Result [] next(int nbRows) throws IOException { + // Collect values to be returned here + ArrayList resultSets = new ArrayList(nbRows); + for(int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } + + @Override + public Iterator iterator() { + return new Iterator() { + // The next RowResult, possibly pre-read + Result next = null; + + // return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + public boolean hasNext() { + if (next == null) { + try { + next = AbstractClientScanner.this.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + // get the pending next item and advance the iterator. returns null if + // there is no next item. + public Result next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + Result temp = next; + next = null; + return temp; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1175077) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -26,8 +26,6 @@ import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -42,7 +40,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -50,10 +47,7 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -109,7 +103,6 @@ private static final Log LOG = LogFactory.getLog(HTable.class); private HConnection connection; private final byte [] tableName; - protected final int scannerTimeout; private volatile Configuration configuration; private final ArrayList writeBuffer = new ArrayList(); private long writeBufferSize; @@ -119,7 +112,6 @@ protected int scannerCaching; private int maxKeyValueSize; private ExecutorService pool; // For Multi - private long maxScannerResultSize; private boolean closed; private int operationTimeout; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. @@ -185,13 +177,10 @@ throws IOException { this.tableName = tableName; if (conf == null) { - this.scannerTimeout = 0; this.connection = null; return; } this.connection = HConnectionManager.getConnection(conf); - this.scannerTimeout = - (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @@ -203,9 +192,6 @@ this.currentWriteBufferSize = 0; this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); @@ -333,6 +319,7 @@ * Gets the number of rows that a scanner will fetch at once. *

* The default value comes from {@code hbase.client.scanner.caching}. + * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()} */ public int getScannerCaching() { return scannerCaching; @@ -347,6 +334,7 @@ * {@code next()} is called on a scanner, at the expense of memory use * (since more rows will need to be maintained in memory by the scanners). * @param scannerCaching the number of rows a scanner will fetch at once. + * @deprecated Use {@link Scan#setCaching(int)} */ public void setScannerCaching(int scannerCaching) { this.scannerCaching = scannerCaching; @@ -578,8 +566,12 @@ */ @Override public ResultScanner getScanner(final Scan scan) throws IOException { - ClientScanner s = new ClientScanner(scan); - s.initialize(); + Scan copy = new Scan(scan); + if (copy.getCaching() <= 0) { + copy.setCaching(getScannerCaching()); + } + ClientScanner s = new ClientScanner(getConfiguration(), copy, + getTableName(), this.connection); return s; } @@ -1016,313 +1008,6 @@ } /** - * Implements the scanner interface for the HBase client. - * If there are multiple regions in a table, this scanner will iterate - * through them all. - */ - protected class ClientScanner implements ResultScanner { - private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); - // HEADSUP: The scan internal start row can change as we move through table. - private Scan scan; - private boolean closed = false; - // Current region scanner is against. Gets cleared if current region goes - // wonky: e.g. if it splits on us. - private HRegionInfo currentRegion = null; - private ScannerCallable callable = null; - private final LinkedList cache = new LinkedList(); - private final int caching; - private long lastNext; - // Keep lastResult returned successfully in case we have to reset scanner. - private Result lastResult = null; - - protected ClientScanner(final Scan scan) { - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Creating scanner over " - + Bytes.toString(getTableName()) - + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'"); - } - this.scan = scan; - this.lastNext = System.currentTimeMillis(); - - // Use the caching from the Scan. If not set, use the default cache setting for this table. - if (this.scan.getCaching() > 0) { - this.caching = this.scan.getCaching(); - } else { - this.caching = HTable.this.scannerCaching; - } - - // Removed filter validation. We have a new format now, only one of all - // the current filters has a validate() method. We can add it back, - // need to decide on what we're going to do re: filter redesign. - // Need, at the least, to break up family from qualifier as separate - // checks, I think it's important server-side filters are optimal in that - // respect. - } - - public void initialize() throws IOException { - nextScanner(this.caching, false); - } - - protected Scan getScan() { - return scan; - } - - protected long getTimestamp() { - return lastNext; - } - - // returns true if the passed region endKey - private boolean checkScanStopRow(final byte [] endKey) { - if (this.scan.getStopRow().length > 0) { - // there is a stop row, check to see if we are past it. - byte [] stopRow = scan.getStopRow(); - int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, - endKey, 0, endKey.length); - if (cmp <= 0) { - // stopRow <= endKey (endKey is equals to or larger than stopRow) - // This is a stop. - return true; - } - } - return false; //unlikely. - } - - /* - * Gets a scanner for the next region. If this.currentRegion != null, then - * we will move to the endrow of this.currentRegion. Else we will get - * scanner at the scan.getStartRow(). We will go no further, just tidy - * up outstanding scanners, if currentRegion != null and - * done is true. - * @param nbRows - * @param done Server-side says we're done scanning. - */ - private boolean nextScanner(int nbRows, final boolean done) - throws IOException { - // Close the previous scanner if it's open - if (this.callable != null) { - this.callable.setClose(); - getConnection().getRegionServerWithRetries(callable); - this.callable = null; - } - - // Where to start the next scanner - byte [] localStartKey; - - // if we're at end of table, close and return false to stop iterating - if (this.currentRegion != null) { - byte [] endKey = this.currentRegion.getEndKey(); - if (endKey == null || - Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || - checkScanStopRow(endKey) || - done) { - close(); - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion); - } - return false; - } - localStartKey = endKey; - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Finished with region " + this.currentRegion); - } - } else { - localStartKey = this.scan.getStartRow(); - } - - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Advancing internal scanner to startKey at '" + - Bytes.toStringBinary(localStartKey) + "'"); - } - try { - callable = getScannerCallable(localStartKey, nbRows); - // Open a scanner on the region server starting at the - // beginning of the region - getConnection().getRegionServerWithRetries(callable); - this.currentRegion = callable.getHRegionInfo(); - } catch (IOException e) { - close(); - throw e; - } - return true; - } - - protected ScannerCallable getScannerCallable(byte [] localStartKey, - int nbRows) { - scan.setStartRow(localStartKey); - ScannerCallable s = new ScannerCallable(getConnection(), - getTableName(), scan); - s.setCaching(nbRows); - return s; - } - - public Result next() throws IOException { - // If the scanner is closed but there is some rows left in the cache, - // it will first empty it before returning null - if (cache.size() == 0 && this.closed) { - return null; - } - if (cache.size() == 0) { - Result [] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; - // We need to reset it if it's a new callable that was created - // with a countdown in nextScanner - callable.setCaching(this.caching); - // This flag is set when we want to skip the result returned. We do - // this when we reset scanner because it split under us. - boolean skipFirst = false; - do { - try { - if (skipFirst) { - // Skip only the first row (which was the last row of the last - // already-processed batch). - callable.setCaching(1); - values = getConnection().getRegionServerWithRetries(callable); - callable.setCaching(this.caching); - skipFirst = false; - } - // Server returns a null values if scanning is to stop. Else, - // returns an empty array if scanning is to go on and we've just - // exhausted current region. - values = getConnection().getRegionServerWithRetries(callable); - } catch (DoNotRetryIOException e) { - if (e instanceof UnknownScannerException) { - long timeout = lastNext + scannerTimeout; - // If we are over the timeout, throw this exception to the client - // Else, it's because the region moved and we used the old id - // against the new region server; reset the scanner. - if (timeout < System.currentTimeMillis()) { - long elapsed = System.currentTimeMillis() - lastNext; - ScannerTimeoutException ex = new ScannerTimeoutException( - elapsed + "ms passed since the last invocation, " + - "timeout is currently set to " + scannerTimeout); - ex.initCause(e); - throw ex; - } - } else { - Throwable cause = e.getCause(); - if (cause == null || !(cause instanceof NotServingRegionException)) { - throw e; - } - } - // Else, its signal from depths of ScannerCallable that we got an - // NSRE on a next and that we need to reset the scanner. - if (this.lastResult != null) { - this.scan.setStartRow(this.lastResult.getRow()); - // Skip first row returned. We already let it out on previous - // invocation. - skipFirst = true; - } - // Clear region - this.currentRegion = null; - continue; - } - lastNext = System.currentTimeMillis(); - if (values != null && values.length > 0) { - for (Result rs : values) { - cache.add(rs); - for (KeyValue kv : rs.raw()) { - remainingResultSize -= kv.heapSize(); - } - countdown--; - this.lastResult = rs; - } - } - // Values == null means server-side filter has determined we must STOP - } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null)); - } - - if (cache.size() > 0) { - return cache.poll(); - } - return null; - } - - /** - * Get nbRows rows. - * How many RPCs are made is determined by the {@link Scan#setCaching(int)} - * setting (or hbase.client.scanner.caching in hbase-site.xml). - * @param nbRows number of rows to return - * @return Between zero and nbRows RowResults. Scan is done - * if returned array is of zero-length (We never return null). - * @throws IOException - */ - public Result [] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); - for(int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - - public void close() { - if (callable != null) { - callable.setClose(); - try { - getConnection().getRegionServerWithRetries(callable); - } catch (IOException e) { - // We used to catch this error, interpret, and rethrow. However, we - // have since decided that it's not nice for a scanner's close to - // throw exceptions. Chances are it was just an UnknownScanner - // exception due to lease time out. - } - callable = null; - } - closed = true; - } - - public Iterator iterator() { - return new Iterator() { - // The next RowResult, possibly pre-read - Result next = null; - - // return true if there is another item pending, false if there isn't. - // this method is where the actual advancing takes place, but you need - // to call next() to consume it. hasNext() will only advance if there - // isn't a pending next(). - public boolean hasNext() { - if (next == null) { - try { - next = ClientScanner.this.next(); - return next != null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return true; - } - - // get the pending next item and advance the iterator. returns null if - // there is no next item. - public Result next() { - // since hasNext() does the real advancing, we call this to determine - // if there is a next before proceeding. - if (!hasNext()) { - return null; - } - - // if we get to here, then hasNext() has given us an item to return. - // we want to return the item and then null out the next pointer, so - // we use a temporary variable. - Result temp = next; - next = null; - return temp; - } - - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - } - - /** * The pool is used for mutli requests for this HTable * @return the pool used for mutli */ Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 0) @@ -0,0 +1,326 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Implements the scanner interface for the HBase client. If there are multiple + * regions in a table, this scanner will iterate through them all. + */ +public class ClientScanner extends AbstractClientScanner { + private static final Log LOG = LogFactory.getLog(ClientScanner.class); + + private Scan scan; + private boolean closed = false; + // Current region scanner is against. Gets cleared if current region goes + // wonky: e.g. if it splits on us. + private HRegionInfo currentRegion = null; + private ScannerCallable callable = null; + private final LinkedList cache = new LinkedList(); + private final int caching; + private long lastNext; + // Keep lastResult returned successfully in case we have to reset scanner. + private Result lastResult = null; + private final long maxScannerResultSize; + private final HConnection connection; + private final byte[] tableName; + private final int scannerTimeout; + + /** + * Create a new ClientScanner for the specified table. An HConnection will be + * retrieved using the passed Configuration. + * + * @param conf + * The {link Configuration} to use. + * @param scan + * {link Scan} to use in this scanner + * @param tableName + * The table that we wish to scan + * @throws IOException + */ + public ClientScanner(final Configuration conf, final Scan scan, + final byte[] tableName) throws IOException { + this(conf, scan, tableName, HConnectionManager.getConnection(conf)); + + } + + /** + * Create a new ClientScanner for the specified table + * + * @param conf + * The {link Configuration} to use. + * @param scan + * {link Scan} to use in this scanner + * @param tableName + * The table that we wish to scan + * @param connection + * Connection identifying the cluster + * @throws IOException + */ + public ClientScanner(final Configuration conf, final Scan scan, + final byte[] tableName, HConnection connection) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating scanner over " + Bytes.toString(tableName) + + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + + "'"); + } + this.scan = new Scan(scan); + this.tableName = tableName; + this.lastNext = System.currentTimeMillis(); + this.maxScannerResultSize = conf.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + + // Use the caching from the Scan. If not set, use the default cache setting. + if (this.scan.getCaching() > 0) { + this.caching = this.scan.getCaching(); + } else { + this.caching = conf.getInt("hbase.client.scanner.caching", 1); + } + + this.scannerTimeout = (int) conf.getLong( + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); + + this.connection = connection; + + // initialize the scanner + nextScanner(this.caching, false); + + // Removed filter validation. We have a new format now, only one of all + // the current filters has a validate() method. We can add it back, + // need to decide on what we're going to do re: filter redesign. + // Need, at the least, to break up family from qualifier as separate + // checks, I think it's important server-side filters are optimal in that + // respect. + } + + protected HConnection getConnection() { + return this.connection; + } + + protected byte[] getTableName() { + return this.tableName; + } + + protected Scan getScan() { + return scan; + } + + protected long getTimestamp() { + return lastNext; + } + + // returns true if the passed region endKey + private boolean checkScanStopRow(final byte[] endKey) { + if (this.scan.getStopRow().length > 0) { + // there is a stop row, check to see if we are past it. + byte[] stopRow = scan.getStopRow(); + int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, endKey, 0, + endKey.length); + if (cmp <= 0) { + // stopRow <= endKey (endKey is equals to or larger than stopRow) + // This is a stop. + return true; + } + } + return false; // unlikely. + } + + /* + * Gets a scanner for the next region. If this.currentRegion != null, then we + * will move to the endrow of this.currentRegion. Else we will get scanner at + * the scan.getStartRow(). We will go no further, just tidy up outstanding + * scanners, if currentRegion != null and done is + * true. + * + * @param nbRows + * + * @param done Server-side says we're done scanning. + */ + private boolean nextScanner(int nbRows, final boolean done) + throws IOException { + // Close the previous scanner if it's open + if (this.callable != null) { + this.callable.setClose(); + getConnection().getRegionServerWithRetries(callable); + this.callable = null; + } + + // Where to start the next scanner + byte[] localStartKey; + + // if we're at end of table, close and return false to stop iterating + if (this.currentRegion != null) { + byte[] endKey = this.currentRegion.getEndKey(); + if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(endKey) || done) { + close(); + if (LOG.isDebugEnabled()) { + LOG.debug("Finished with scanning at " + this.currentRegion); + } + return false; + } + localStartKey = endKey; + if (LOG.isDebugEnabled()) { + LOG.debug("Finished with region " + this.currentRegion); + } + } else { + localStartKey = this.scan.getStartRow(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Advancing internal scanner to startKey at '" + + Bytes.toStringBinary(localStartKey) + "'"); + } + try { + callable = getScannerCallable(localStartKey, nbRows); + // Open a scanner on the region server starting at the + // beginning of the region + getConnection().getRegionServerWithRetries(callable); + this.currentRegion = callable.getHRegionInfo(); + } catch (IOException e) { + close(); + throw e; + } + return true; + } + + protected ScannerCallable getScannerCallable(byte[] localStartKey, int nbRows) { + scan.setStartRow(localStartKey); + ScannerCallable s = new ScannerCallable(getConnection(), getTableName(), + scan); + s.setCaching(nbRows); + return s; + } + + public Result next() throws IOException { + // If the scanner is closed but there is some rows left in the cache, + // it will first empty it before returning null + if (cache.size() == 0 && this.closed) { + return null; + } + if (cache.size() == 0) { + Result[] values = null; + long remainingResultSize = maxScannerResultSize; + int countdown = this.caching; + // We need to reset it if it's a new callable that was created + // with a countdown in nextScanner + callable.setCaching(this.caching); + // This flag is set when we want to skip the result returned. We do + // this when we reset scanner because it split under us. + boolean skipFirst = false; + do { + try { + if (skipFirst) { + // Skip only the first row (which was the last row of the last + // already-processed batch). + callable.setCaching(1); + values = getConnection().getRegionServerWithRetries(callable); + callable.setCaching(this.caching); + skipFirst = false; + } + // Server returns a null values if scanning is to stop. Else, + // returns an empty array if scanning is to go on and we've just + // exhausted current region. + values = getConnection().getRegionServerWithRetries(callable); + } catch (DoNotRetryIOException e) { + if (e instanceof UnknownScannerException) { + long timeout = lastNext + scannerTimeout; + // If we are over the timeout, throw this exception to the client + // Else, it's because the region moved and we used the old id + // against the new region server; reset the scanner. + if (timeout < System.currentTimeMillis()) { + long elapsed = System.currentTimeMillis() - lastNext; + ScannerTimeoutException ex = new ScannerTimeoutException(elapsed + + "ms passed since the last invocation, " + + "timeout is currently set to " + scannerTimeout); + ex.initCause(e); + throw ex; + } + } else { + Throwable cause = e.getCause(); + if (cause == null || !(cause instanceof NotServingRegionException)) { + throw e; + } + } + // Else, its signal from depths of ScannerCallable that we got an + // NSRE on a next and that we need to reset the scanner. + if (this.lastResult != null) { + this.scan.setStartRow(this.lastResult.getRow()); + // Skip first row returned. We already let it out on previous + // invocation. + skipFirst = true; + } + // Clear region + this.currentRegion = null; + continue; + } + lastNext = System.currentTimeMillis(); + if (values != null && values.length > 0) { + for (Result rs : values) { + cache.add(rs); + for (KeyValue kv : rs.raw()) { + remainingResultSize -= kv.heapSize(); + } + countdown--; + this.lastResult = rs; + } + } + // Values == null means server-side filter has determined we must STOP + } while (remainingResultSize > 0 && countdown > 0 + && nextScanner(countdown, values == null)); + } + + if (cache.size() > 0) { + return cache.poll(); + } + return null; + } + + public void close() { + if (callable != null) { + callable.setClose(); + try { + getConnection().getRegionServerWithRetries(callable); + } catch (IOException e) { + // We used to catch this error, interpret, and rethrow. However, we + // have since decided that it's not nice for a scanner's close to + // throw exceptions. Chances are it was just an UnknownScanner + // exception due to lease time out. + } + callable = null; + } + closed = true; + } +}