* The default value comes from {@code hbase.client.scanner.caching}.
+ * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
*/
public int getScannerCaching() {
return scannerCaching;
@@ -347,6 +334,7 @@
* {@code next()} is called on a scanner, at the expense of memory use
* (since more rows will need to be maintained in memory by the scanners).
* @param scannerCaching the number of rows a scanner will fetch at once.
+ * @deprecated Use {@link Scan#setCaching(int)}
*/
public void setScannerCaching(int scannerCaching) {
this.scannerCaching = scannerCaching;
@@ -578,8 +566,12 @@
*/
@Override
public ResultScanner getScanner(final Scan scan) throws IOException {
- ClientScanner s = new ClientScanner(scan);
- s.initialize();
+ Scan copy = new Scan(scan);
+ if (copy.getCaching() <= 0) {
+ copy.setCaching(getScannerCaching());
+ }
+ ClientScanner s = new ClientScanner(getConfiguration(), copy,
+ getTableName(), this.connection);
return s;
}
@@ -1016,313 +1008,6 @@
}
/**
- * Implements the scanner interface for the HBase client.
- * If there are multiple regions in a table, this scanner will iterate
- * through them all.
- */
- protected class ClientScanner implements ResultScanner {
- private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
- // HEADSUP: The scan internal start row can change as we move through table.
- private Scan scan;
- private boolean closed = false;
- // Current region scanner is against. Gets cleared if current region goes
- // wonky: e.g. if it splits on us.
- private HRegionInfo currentRegion = null;
- private ScannerCallable callable = null;
- private final LinkedListcurrentRegion != null and
- * done is true.
- * @param nbRows
- * @param done Server-side says we're done scanning.
- */
- private boolean nextScanner(int nbRows, final boolean done)
- throws IOException {
- // Close the previous scanner if it's open
- if (this.callable != null) {
- this.callable.setClose();
- getConnection().getRegionServerWithRetries(callable);
- this.callable = null;
- }
-
- // Where to start the next scanner
- byte [] localStartKey;
-
- // if we're at end of table, close and return false to stop iterating
- if (this.currentRegion != null) {
- byte [] endKey = this.currentRegion.getEndKey();
- if (endKey == null ||
- Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
- checkScanStopRow(endKey) ||
- done) {
- close();
- if (CLIENT_LOG.isDebugEnabled()) {
- CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion);
- }
- return false;
- }
- localStartKey = endKey;
- if (CLIENT_LOG.isDebugEnabled()) {
- CLIENT_LOG.debug("Finished with region " + this.currentRegion);
- }
- } else {
- localStartKey = this.scan.getStartRow();
- }
-
- if (CLIENT_LOG.isDebugEnabled()) {
- CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
- Bytes.toStringBinary(localStartKey) + "'");
- }
- try {
- callable = getScannerCallable(localStartKey, nbRows);
- // Open a scanner on the region server starting at the
- // beginning of the region
- getConnection().getRegionServerWithRetries(callable);
- this.currentRegion = callable.getHRegionInfo();
- } catch (IOException e) {
- close();
- throw e;
- }
- return true;
- }
-
- protected ScannerCallable getScannerCallable(byte [] localStartKey,
- int nbRows) {
- scan.setStartRow(localStartKey);
- ScannerCallable s = new ScannerCallable(getConnection(),
- getTableName(), scan);
- s.setCaching(nbRows);
- return s;
- }
-
- public Result next() throws IOException {
- // If the scanner is closed but there is some rows left in the cache,
- // it will first empty it before returning null
- if (cache.size() == 0 && this.closed) {
- return null;
- }
- if (cache.size() == 0) {
- Result [] values = null;
- long remainingResultSize = maxScannerResultSize;
- int countdown = this.caching;
- // We need to reset it if it's a new callable that was created
- // with a countdown in nextScanner
- callable.setCaching(this.caching);
- // This flag is set when we want to skip the result returned. We do
- // this when we reset scanner because it split under us.
- boolean skipFirst = false;
- do {
- try {
- if (skipFirst) {
- // Skip only the first row (which was the last row of the last
- // already-processed batch).
- callable.setCaching(1);
- values = getConnection().getRegionServerWithRetries(callable);
- callable.setCaching(this.caching);
- skipFirst = false;
- }
- // Server returns a null values if scanning is to stop. Else,
- // returns an empty array if scanning is to go on and we've just
- // exhausted current region.
- values = getConnection().getRegionServerWithRetries(callable);
- } catch (DoNotRetryIOException e) {
- if (e instanceof UnknownScannerException) {
- long timeout = lastNext + scannerTimeout;
- // If we are over the timeout, throw this exception to the client
- // Else, it's because the region moved and we used the old id
- // against the new region server; reset the scanner.
- if (timeout < System.currentTimeMillis()) {
- long elapsed = System.currentTimeMillis() - lastNext;
- ScannerTimeoutException ex = new ScannerTimeoutException(
- elapsed + "ms passed since the last invocation, " +
- "timeout is currently set to " + scannerTimeout);
- ex.initCause(e);
- throw ex;
- }
- } else {
- Throwable cause = e.getCause();
- if (cause == null || !(cause instanceof NotServingRegionException)) {
- throw e;
- }
- }
- // Else, its signal from depths of ScannerCallable that we got an
- // NSRE on a next and that we need to reset the scanner.
- if (this.lastResult != null) {
- this.scan.setStartRow(this.lastResult.getRow());
- // Skip first row returned. We already let it out on previous
- // invocation.
- skipFirst = true;
- }
- // Clear region
- this.currentRegion = null;
- continue;
- }
- lastNext = System.currentTimeMillis();
- if (values != null && values.length > 0) {
- for (Result rs : values) {
- cache.add(rs);
- for (KeyValue kv : rs.raw()) {
- remainingResultSize -= kv.heapSize();
- }
- countdown--;
- this.lastResult = rs;
- }
- }
- // Values == null means server-side filter has determined we must STOP
- } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
- }
-
- if (cache.size() > 0) {
- return cache.poll();
- }
- return null;
- }
-
- /**
- * Get nbRows rows.
- * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
- * setting (or hbase.client.scanner.caching in hbase-site.xml).
- * @param nbRows number of rows to return
- * @return Between zero and nbRows RowResults. Scan is done
- * if returned array is of zero-length (We never return null).
- * @throws IOException
- */
- public Result [] next(int nbRows) throws IOException {
- // Collect values to be returned here
- ArrayListcurrentRegion != null and done is
+ * true.
+ *
+ * @param nbRows
+ *
+ * @param done Server-side says we're done scanning.
+ */
+ private boolean nextScanner(int nbRows, final boolean done)
+ throws IOException {
+ // Close the previous scanner if it's open
+ if (this.callable != null) {
+ this.callable.setClose();
+ getConnection().getRegionServerWithRetries(callable);
+ this.callable = null;
+ }
+
+ // Where to start the next scanner
+ byte[] localStartKey;
+
+ // if we're at end of table, close and return false to stop iterating
+ if (this.currentRegion != null) {
+ byte[] endKey = this.currentRegion.getEndKey();
+ if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
+ || checkScanStopRow(endKey) || done) {
+ close();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finished with scanning at " + this.currentRegion);
+ }
+ return false;
+ }
+ localStartKey = endKey;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finished with region " + this.currentRegion);
+ }
+ } else {
+ localStartKey = this.scan.getStartRow();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Advancing internal scanner to startKey at '"
+ + Bytes.toStringBinary(localStartKey) + "'");
+ }
+ try {
+ callable = getScannerCallable(localStartKey, nbRows);
+ // Open a scanner on the region server starting at the
+ // beginning of the region
+ getConnection().getRegionServerWithRetries(callable);
+ this.currentRegion = callable.getHRegionInfo();
+ } catch (IOException e) {
+ close();
+ throw e;
+ }
+ return true;
+ }
+
+ protected ScannerCallable getScannerCallable(byte[] localStartKey, int nbRows) {
+ scan.setStartRow(localStartKey);
+ ScannerCallable s = new ScannerCallable(getConnection(), getTableName(),
+ scan);
+ s.setCaching(nbRows);
+ return s;
+ }
+
+ public Result next() throws IOException {
+ // If the scanner is closed but there is some rows left in the cache,
+ // it will first empty it before returning null
+ if (cache.size() == 0 && this.closed) {
+ return null;
+ }
+ if (cache.size() == 0) {
+ Result[] values = null;
+ long remainingResultSize = maxScannerResultSize;
+ int countdown = this.caching;
+ // We need to reset it if it's a new callable that was created
+ // with a countdown in nextScanner
+ callable.setCaching(this.caching);
+ // This flag is set when we want to skip the result returned. We do
+ // this when we reset scanner because it split under us.
+ boolean skipFirst = false;
+ do {
+ try {
+ if (skipFirst) {
+ // Skip only the first row (which was the last row of the last
+ // already-processed batch).
+ callable.setCaching(1);
+ values = getConnection().getRegionServerWithRetries(callable);
+ callable.setCaching(this.caching);
+ skipFirst = false;
+ }
+ // Server returns a null values if scanning is to stop. Else,
+ // returns an empty array if scanning is to go on and we've just
+ // exhausted current region.
+ values = getConnection().getRegionServerWithRetries(callable);
+ } catch (DoNotRetryIOException e) {
+ if (e instanceof UnknownScannerException) {
+ long timeout = lastNext + scannerTimeout;
+ // If we are over the timeout, throw this exception to the client
+ // Else, it's because the region moved and we used the old id
+ // against the new region server; reset the scanner.
+ if (timeout < System.currentTimeMillis()) {
+ long elapsed = System.currentTimeMillis() - lastNext;
+ ScannerTimeoutException ex = new ScannerTimeoutException(elapsed
+ + "ms passed since the last invocation, "
+ + "timeout is currently set to " + scannerTimeout);
+ ex.initCause(e);
+ throw ex;
+ }
+ } else {
+ Throwable cause = e.getCause();
+ if (cause == null || !(cause instanceof NotServingRegionException)) {
+ throw e;
+ }
+ }
+ // Else, its signal from depths of ScannerCallable that we got an
+ // NSRE on a next and that we need to reset the scanner.
+ if (this.lastResult != null) {
+ this.scan.setStartRow(this.lastResult.getRow());
+ // Skip first row returned. We already let it out on previous
+ // invocation.
+ skipFirst = true;
+ }
+ // Clear region
+ this.currentRegion = null;
+ continue;
+ }
+ lastNext = System.currentTimeMillis();
+ if (values != null && values.length > 0) {
+ for (Result rs : values) {
+ cache.add(rs);
+ for (KeyValue kv : rs.raw()) {
+ remainingResultSize -= kv.heapSize();
+ }
+ countdown--;
+ this.lastResult = rs;
+ }
+ }
+ // Values == null means server-side filter has determined we must STOP
+ } while (remainingResultSize > 0 && countdown > 0
+ && nextScanner(countdown, values == null));
+ }
+
+ if (cache.size() > 0) {
+ return cache.poll();
+ }
+ return null;
+ }
+
+ public void close() {
+ if (callable != null) {
+ callable.setClose();
+ try {
+ getConnection().getRegionServerWithRetries(callable);
+ } catch (IOException e) {
+ // We used to catch this error, interpret, and rethrow. However, we
+ // have since decided that it's not nice for a scanner's close to
+ // throw exceptions. Chances are it was just an UnknownScanner
+ // exception due to lease time out.
+ }
+ callable = null;
+ }
+ closed = true;
+ }
+}