Index: src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java (revision 0) @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Helper class for custom client scanners. + */ +public abstract class AbstractClientScanner implements ResultScanner { + + @Override + public Iterator iterator() { + return new Iterator() { + // The next RowResult, possibly pre-read + Result next = null; + + // return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + public boolean hasNext() { + if (next == null) { + try { + next = AbstractClientScanner.this.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + // get the pending next item and advance the iterator. returns null if + // there is no next item. + public Result next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + Result temp = next; + next = null; + return temp; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} Index: src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (revision 1221860) +++ src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (working copy) @@ -148,6 +148,8 @@ /** * Returns a scanner on the current table as specified by the {@link Scan} * object. + * Note that the passed {@link Scan}'s start row and caching properties + * maybe changed. * * @param scan A configured {@link Scan} object. * @return A scanner. Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1221860) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -26,8 +26,6 @@ import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -42,7 +40,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -50,21 +47,16 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.ExecRPCInvoker; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.io.DataOutputBuffer; /** *

Used to communicate with a single HBase table. @@ -111,7 +103,6 @@ private static final Log LOG = LogFactory.getLog(HTable.class); private HConnection connection; private final byte [] tableName; - protected int scannerTimeout; private volatile Configuration configuration; private final ArrayList writeBuffer = new ArrayList(); private long writeBufferSize; @@ -121,7 +112,6 @@ protected int scannerCaching; private int maxKeyValueSize; private ExecutorService pool; // For Multi - private long maxScannerResultSize; private boolean closed; private int operationTimeout; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. @@ -158,7 +148,6 @@ this.tableName = tableName; this.cleanupOnClose = true; if (conf == null) { - this.scannerTimeout = 0; this.connection = null; return; } @@ -218,9 +207,6 @@ */ private void finishSetup() throws IOException { this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); - this.scannerTimeout = (int) this.configuration.getLong( - HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, - HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @@ -232,9 +218,6 @@ this.scannerCaching = this.configuration.getInt( "hbase.client.scanner.caching", 1); - this.maxScannerResultSize = this.configuration.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); this.maxKeyValueSize = this.configuration.getInt( "hbase.client.keyvalue.maxsize", -1); this.closed = false; @@ -352,6 +335,7 @@ * Gets the number of rows that a scanner will fetch at once. *

* The default value comes from {@code hbase.client.scanner.caching}. + * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()} */ public int getScannerCaching() { return scannerCaching; @@ -366,6 +350,7 @@ * {@code next()} is called on a scanner, at the expense of memory use * (since more rows will need to be maintained in memory by the scanners). * @param scannerCaching the number of rows a scanner will fetch at once. + * @deprecated Use {@link Scan#setCaching(int)} */ public void setScannerCaching(int scannerCaching) { this.scannerCaching = scannerCaching; @@ -598,9 +583,11 @@ */ @Override public ResultScanner getScanner(final Scan scan) throws IOException { - ClientScanner s = new ClientScanner(scan); - s.initialize(); - return s; + if (scan.getCaching() <= 0) { + scan.setCaching(getScannerCaching()); + } + return new ClientScanner(getConfiguration(), scan, getTableName(), + this.connection); } /** @@ -1073,355 +1060,6 @@ } /** - * Implements the scanner interface for the HBase client. - * If there are multiple regions in a table, this scanner will iterate - * through them all. - */ - protected class ClientScanner implements ResultScanner { - private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); - // HEADSUP: The scan internal start row can change as we move through table. - private Scan scan; - private boolean closed = false; - // Current region scanner is against. Gets cleared if current region goes - // wonky: e.g. if it splits on us. - private HRegionInfo currentRegion = null; - private ScannerCallable callable = null; - private final LinkedList cache = new LinkedList(); - private final int caching; - private long lastNext; - // Keep lastResult returned successfully in case we have to reset scanner. - private Result lastResult = null; - private ScanMetrics scanMetrics = null; - - protected ClientScanner(final Scan scan) { - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Creating scanner over " - + Bytes.toString(getTableName()) - + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'"); - } - this.scan = scan; - this.lastNext = System.currentTimeMillis(); - - // check if application wants to collect scan metrics - byte[] enableMetrics = scan.getAttribute( - Scan.SCAN_ATTRIBUTES_METRICS_ENABLE); - if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) { - scanMetrics = new ScanMetrics(); - } - - // Use the caching from the Scan. If not set, use the default cache setting for this table. - if (this.scan.getCaching() > 0) { - this.caching = this.scan.getCaching(); - } else { - this.caching = HTable.this.scannerCaching; - } - - // Removed filter validation. We have a new format now, only one of all - // the current filters has a validate() method. We can add it back, - // need to decide on what we're going to do re: filter redesign. - // Need, at the least, to break up family from qualifier as separate - // checks, I think it's important server-side filters are optimal in that - // respect. - } - - public void initialize() throws IOException { - nextScanner(this.caching, false); - } - - protected Scan getScan() { - return scan; - } - - protected long getTimestamp() { - return lastNext; - } - - // returns true if the passed region endKey - private boolean checkScanStopRow(final byte [] endKey) { - if (this.scan.getStopRow().length > 0) { - // there is a stop row, check to see if we are past it. - byte [] stopRow = scan.getStopRow(); - int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, - endKey, 0, endKey.length); - if (cmp <= 0) { - // stopRow <= endKey (endKey is equals to or larger than stopRow) - // This is a stop. - return true; - } - } - return false; //unlikely. - } - - /* - * Gets a scanner for the next region. If this.currentRegion != null, then - * we will move to the endrow of this.currentRegion. Else we will get - * scanner at the scan.getStartRow(). We will go no further, just tidy - * up outstanding scanners, if currentRegion != null and - * done is true. - * @param nbRows - * @param done Server-side says we're done scanning. - */ - private boolean nextScanner(int nbRows, final boolean done) - throws IOException { - // Close the previous scanner if it's open - if (this.callable != null) { - this.callable.setClose(); - getConnection().getRegionServerWithRetries(callable); - this.callable = null; - } - - // Where to start the next scanner - byte [] localStartKey; - - // if we're at end of table, close and return false to stop iterating - if (this.currentRegion != null) { - byte [] endKey = this.currentRegion.getEndKey(); - if (endKey == null || - Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || - checkScanStopRow(endKey) || - done) { - close(); - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion); - } - return false; - } - localStartKey = endKey; - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Finished with region " + this.currentRegion); - } - } else { - localStartKey = this.scan.getStartRow(); - } - - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Advancing internal scanner to startKey at '" + - Bytes.toStringBinary(localStartKey) + "'"); - } - try { - callable = getScannerCallable(localStartKey, nbRows); - // Open a scanner on the region server starting at the - // beginning of the region - getConnection().getRegionServerWithRetries(callable); - this.currentRegion = callable.getHRegionInfo(); - if (this.scanMetrics != null) { - this.scanMetrics.countOfRegions.inc(); - } - } catch (IOException e) { - close(); - throw e; - } - return true; - } - - protected ScannerCallable getScannerCallable(byte [] localStartKey, - int nbRows) { - scan.setStartRow(localStartKey); - ScannerCallable s = new ScannerCallable(getConnection(), - getTableName(), scan, this.scanMetrics); - s.setCaching(nbRows); - return s; - } - - /** - * publish the scan metrics - * For now, we use scan.setAttribute to pass the metrics for application - * or TableInputFormat to consume - * Later, we could push it to other systems - * We don't use metrics framework because it doesn't support - * multi instances of the same metrics on the same machine; for scan/map - * reduce scenarios, we will have multiple scans running at the same time - */ - private void writeScanMetrics() throws IOException - { - // by default, scanMetrics is null - // if application wants to collect scanMetrics, it can turn it on by - // calling scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, - // Bytes.toBytes(Boolean.TRUE)) - if (this.scanMetrics == null) { - return; - } - final DataOutputBuffer d = new DataOutputBuffer(); - scanMetrics.write(d); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData()); - } - - public Result next() throws IOException { - // If the scanner is closed but there is some rows left in the cache, - // it will first empty it before returning null - if (cache.size() == 0 && this.closed) { - writeScanMetrics(); - return null; - } - if (cache.size() == 0) { - Result [] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; - // We need to reset it if it's a new callable that was created - // with a countdown in nextScanner - callable.setCaching(this.caching); - // This flag is set when we want to skip the result returned. We do - // this when we reset scanner because it split under us. - boolean skipFirst = false; - do { - try { - if (skipFirst) { - // Skip only the first row (which was the last row of the last - // already-processed batch). - callable.setCaching(1); - values = getConnection().getRegionServerWithRetries(callable); - callable.setCaching(this.caching); - skipFirst = false; - } - // Server returns a null values if scanning is to stop. Else, - // returns an empty array if scanning is to go on and we've just - // exhausted current region. - values = getConnection().getRegionServerWithRetries(callable); - } catch (DoNotRetryIOException e) { - if (e instanceof UnknownScannerException) { - long timeout = lastNext + scannerTimeout; - // If we are over the timeout, throw this exception to the client - // Else, it's because the region moved and we used the old id - // against the new region server; reset the scanner. - if (timeout < System.currentTimeMillis()) { - long elapsed = System.currentTimeMillis() - lastNext; - ScannerTimeoutException ex = new ScannerTimeoutException( - elapsed + "ms passed since the last invocation, " + - "timeout is currently set to " + scannerTimeout); - ex.initCause(e); - throw ex; - } - } else { - Throwable cause = e.getCause(); - if (cause == null || (!(cause instanceof NotServingRegionException) - && !(cause instanceof RegionServerStoppedException))) { - throw e; - } - } - // Else, its signal from depths of ScannerCallable that we got an - // NSRE on a next and that we need to reset the scanner. - if (this.lastResult != null) { - this.scan.setStartRow(this.lastResult.getRow()); - // Skip first row returned. We already let it out on previous - // invocation. - skipFirst = true; - } - // Clear region - this.currentRegion = null; - continue; - } - long currentTime = System.currentTimeMillis(); - if (this.scanMetrics != null ) { - this.scanMetrics.sumOfMillisSecBetweenNexts.inc( - currentTime-lastNext); - } - lastNext = currentTime; - if (values != null && values.length > 0) { - for (Result rs : values) { - cache.add(rs); - for (KeyValue kv : rs.raw()) { - remainingResultSize -= kv.heapSize(); - } - countdown--; - this.lastResult = rs; - } - } - // Values == null means server-side filter has determined we must STOP - } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null)); - } - - if (cache.size() > 0) { - return cache.poll(); - } - writeScanMetrics(); - return null; - } - - /** - * Get nbRows rows. - * How many RPCs are made is determined by the {@link Scan#setCaching(int)} - * setting (or hbase.client.scanner.caching in hbase-site.xml). - * @param nbRows number of rows to return - * @return Between zero and nbRows RowResults. Scan is done - * if returned array is of zero-length (We never return null). - * @throws IOException - */ - public Result [] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); - for(int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - - public void close() { - if (callable != null) { - callable.setClose(); - try { - getConnection().getRegionServerWithRetries(callable); - } catch (IOException e) { - // We used to catch this error, interpret, and rethrow. However, we - // have since decided that it's not nice for a scanner's close to - // throw exceptions. Chances are it was just an UnknownScanner - // exception due to lease time out. - } - callable = null; - } - closed = true; - } - - public Iterator iterator() { - return new Iterator() { - // The next RowResult, possibly pre-read - Result next = null; - - // return true if there is another item pending, false if there isn't. - // this method is where the actual advancing takes place, but you need - // to call next() to consume it. hasNext() will only advance if there - // isn't a pending next(). - public boolean hasNext() { - if (next == null) { - try { - next = ClientScanner.this.next(); - return next != null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return true; - } - - // get the pending next item and advance the iterator. returns null if - // there is no next item. - public Result next() { - // since hasNext() does the real advancing, we call this to determine - // if there is a next before proceeding. - if (!hasNext()) { - return null; - } - - // if we get to here, then hasNext() has given us an item to return. - // we want to return the item and then null out the next pointer, so - // we use a temporary variable. - Result temp = next; - next = null; - return temp; - } - - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - } - - /** * The pool is used for mutli requests for this HTable * @return the pool used for mutli */ Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 0) @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.DataOutputBuffer; + +/** + * Implements the scanner interface for the HBase client. + * If there are multiple regions in a table, this scanner will iterate + * through them all. + */ +public class ClientScanner extends AbstractClientScanner { + private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); + private Scan scan; + private boolean closed = false; + // Current region scanner is against. Gets cleared if current region goes + // wonky: e.g. if it splits on us. + private HRegionInfo currentRegion = null; + private ScannerCallable callable = null; + private final LinkedList cache = new LinkedList(); + private final int caching; + private long lastNext; + // Keep lastResult returned successfully in case we have to reset scanner. + private Result lastResult = null; + private ScanMetrics scanMetrics = null; + private final long maxScannerResultSize; + private final HConnection connection; + private final byte[] tableName; + private final int scannerTimeout; + + /** + * Create a new ClientScanner for the specified table. An HConnection will be + * retrieved using the passed Configuration. + * Note that the passed {@link Scan}'s start row maybe changed changed. + * + * @param conf The {@link Configuration} to use. + * @param scan {@link Scan} to use in this scanner + * @param tableName The table that we wish to scan + * @throws IOException + */ + public ClientScanner(final Configuration conf, final Scan scan, + final byte[] tableName) throws IOException { + this(conf, scan, tableName, HConnectionManager.getConnection(conf)); + } + + /** + * Create a new ClientScanner for the specified table + * Note that the passed {@link Scan}'s start row maybe changed changed. + * + * @param conf The {@link Configuration} to use. + * @param scan {@link Scan} to use in this scanner + * @param tableName The table that we wish to scan + * @param connection Connection identifying the cluster + * @throws IOException + */ + public ClientScanner(final Configuration conf, final Scan scan, + final byte[] tableName, HConnection connection) throws IOException { + if (CLIENT_LOG.isDebugEnabled()) { + CLIENT_LOG.debug("Creating scanner over " + + Bytes.toString(tableName) + + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'"); + } + this.scan = scan; + this.tableName = tableName; + this.lastNext = System.currentTimeMillis(); + this.connection = connection; + this.maxScannerResultSize = conf.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.scannerTimeout = (int) conf.getLong( + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); + + // check if application wants to collect scan metrics + byte[] enableMetrics = scan.getAttribute( + Scan.SCAN_ATTRIBUTES_METRICS_ENABLE); + if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) { + scanMetrics = new ScanMetrics(); + } + + // Use the caching from the Scan. If not set, use the default cache setting for this table. + if (this.scan.getCaching() > 0) { + this.caching = this.scan.getCaching(); + } else { + this.caching = conf.getInt("hbase.client.scanner.caching", 1); + } + + // initialize the scanner + nextScanner(this.caching, false); + } + + protected HConnection getConnection() { + return this.connection; + } + + protected byte[] getTableName() { + return this.tableName; + } + + protected Scan getScan() { + return scan; + } + + protected long getTimestamp() { + return lastNext; + } + + // returns true if the passed region endKey + private boolean checkScanStopRow(final byte [] endKey) { + if (this.scan.getStopRow().length > 0) { + // there is a stop row, check to see if we are past it. + byte [] stopRow = scan.getStopRow(); + int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, + endKey, 0, endKey.length); + if (cmp <= 0) { + // stopRow <= endKey (endKey is equals to or larger than stopRow) + // This is a stop. + return true; + } + } + return false; //unlikely. + } + + /* + * Gets a scanner for the next region. If this.currentRegion != null, then + * we will move to the endrow of this.currentRegion. Else we will get + * scanner at the scan.getStartRow(). We will go no further, just tidy + * up outstanding scanners, if currentRegion != null and + * done is true. + * @param nbRows + * @param done Server-side says we're done scanning. + */ + private boolean nextScanner(int nbRows, final boolean done) + throws IOException { + // Close the previous scanner if it's open + if (this.callable != null) { + this.callable.setClose(); + getConnection().getRegionServerWithRetries(callable); + this.callable = null; + } + + // Where to start the next scanner + byte [] localStartKey; + + // if we're at end of table, close and return false to stop iterating + if (this.currentRegion != null) { + byte [] endKey = this.currentRegion.getEndKey(); + if (endKey == null || + Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || + checkScanStopRow(endKey) || + done) { + close(); + if (CLIENT_LOG.isDebugEnabled()) { + CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion); + } + return false; + } + localStartKey = endKey; + if (CLIENT_LOG.isDebugEnabled()) { + CLIENT_LOG.debug("Finished with region " + this.currentRegion); + } + } else { + localStartKey = this.scan.getStartRow(); + } + + if (CLIENT_LOG.isDebugEnabled()) { + CLIENT_LOG.debug("Advancing internal scanner to startKey at '" + + Bytes.toStringBinary(localStartKey) + "'"); + } + try { + callable = getScannerCallable(localStartKey, nbRows); + // Open a scanner on the region server starting at the + // beginning of the region + getConnection().getRegionServerWithRetries(callable); + this.currentRegion = callable.getHRegionInfo(); + if (this.scanMetrics != null) { + this.scanMetrics.countOfRegions.inc(); + } + } catch (IOException e) { + close(); + throw e; + } + return true; + } + + protected ScannerCallable getScannerCallable(byte [] localStartKey, + int nbRows) { + scan.setStartRow(localStartKey); + ScannerCallable s = new ScannerCallable(getConnection(), + getTableName(), scan, this.scanMetrics); + s.setCaching(nbRows); + return s; + } + + /** + * publish the scan metrics + * For now, we use scan.setAttribute to pass the metrics for application + * or TableInputFormat to consume + * Later, we could push it to other systems + * We don't use metrics framework because it doesn't support + * multi instances of the same metrics on the same machine; for scan/map + * reduce scenarios, we will have multiple scans running at the same time + */ + private void writeScanMetrics() throws IOException + { + // by default, scanMetrics is null + // if application wants to collect scanMetrics, it can turn it on by + // calling scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, + // Bytes.toBytes(Boolean.TRUE)) + if (this.scanMetrics == null) { + return; + } + final DataOutputBuffer d = new DataOutputBuffer(); + scanMetrics.write(d); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData()); + } + + public Result next() throws IOException { + // If the scanner is closed but there is some rows left in the cache, + // it will first empty it before returning null + if (cache.size() == 0 && this.closed) { + writeScanMetrics(); + return null; + } + if (cache.size() == 0) { + Result [] values = null; + long remainingResultSize = maxScannerResultSize; + int countdown = this.caching; + // We need to reset it if it's a new callable that was created + // with a countdown in nextScanner + callable.setCaching(this.caching); + // This flag is set when we want to skip the result returned. We do + // this when we reset scanner because it split under us. + boolean skipFirst = false; + do { + try { + if (skipFirst) { + // Skip only the first row (which was the last row of the last + // already-processed batch). + callable.setCaching(1); + values = getConnection().getRegionServerWithRetries(callable); + callable.setCaching(this.caching); + skipFirst = false; + } + // Server returns a null values if scanning is to stop. Else, + // returns an empty array if scanning is to go on and we've just + // exhausted current region. + values = getConnection().getRegionServerWithRetries(callable); + } catch (DoNotRetryIOException e) { + if (e instanceof UnknownScannerException) { + long timeout = lastNext + scannerTimeout; + // If we are over the timeout, throw this exception to the client + // Else, it's because the region moved and we used the old id + // against the new region server; reset the scanner. + if (timeout < System.currentTimeMillis()) { + long elapsed = System.currentTimeMillis() - lastNext; + ScannerTimeoutException ex = new ScannerTimeoutException( + elapsed + "ms passed since the last invocation, " + + "timeout is currently set to " + scannerTimeout); + ex.initCause(e); + throw ex; + } + } else { + Throwable cause = e.getCause(); + if (cause == null || (!(cause instanceof NotServingRegionException) + && !(cause instanceof RegionServerStoppedException))) { + throw e; + } + } + // Else, its signal from depths of ScannerCallable that we got an + // NSRE on a next and that we need to reset the scanner. + if (this.lastResult != null) { + this.scan.setStartRow(this.lastResult.getRow()); + // Skip first row returned. We already let it out on previous + // invocation. + skipFirst = true; + } + // Clear region + this.currentRegion = null; + continue; + } + long currentTime = System.currentTimeMillis(); + if (this.scanMetrics != null ) { + this.scanMetrics.sumOfMillisSecBetweenNexts.inc( + currentTime-lastNext); + } + lastNext = currentTime; + if (values != null && values.length > 0) { + for (Result rs : values) { + cache.add(rs); + for (KeyValue kv : rs.raw()) { + remainingResultSize -= kv.heapSize(); + } + countdown--; + this.lastResult = rs; + } + } + // Values == null means server-side filter has determined we must STOP + } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null)); + } + + if (cache.size() > 0) { + return cache.poll(); + } + writeScanMetrics(); + return null; + } + + /** + * Get nbRows rows. + * How many RPCs are made is determined by the {@link Scan#setCaching(int)} + * setting (or hbase.client.scanner.caching in hbase-site.xml). + * @param nbRows number of rows to return + * @return Between zero and nbRows RowResults. Scan is done + * if returned array is of zero-length (We never return null). + * @throws IOException + */ + public Result [] next(int nbRows) throws IOException { + // Collect values to be returned here + ArrayList resultSets = new ArrayList(nbRows); + for(int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } + + public void close() { + if (callable != null) { + callable.setClose(); + try { + getConnection().getRegionServerWithRetries(callable); + } catch (IOException e) { + // We used to catch this error, interpret, and rethrow. However, we + // have since decided that it's not nice for a scanner's close to + // throw exceptions. Chances are it was just an UnknownScanner + // exception due to lease time out. + } + callable = null; + } + closed = true; + } +}