Index: src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (revision 1172331)
+++ src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (working copy)
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -157,6 +158,17 @@
ResultScanner getScanner(Scan scan) throws IOException;
/**
+ * Returns a scanner on the specified region.
+ *
+ * @param scan A configured {@link Scan} object.
+ * @param hri An {@link HRegionInfo} object.
+ * @return A scanner.
+ * @throws IOException if a remote or network exception occurs.
+ */
+ ResultScanner getScanner(final Scan scan, final HRegionInfo hri)
+ throws IOException;
+
+ /**
* Gets a scanner on the current table for the given family.
*
* @param family The column family to scan.
Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1172331)
+++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy)
@@ -53,7 +53,6 @@
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -576,12 +575,12 @@
/**
* {@inheritDoc}
*/
- @Override
- public ResultScanner getScanner(final Scan scan) throws IOException {
- ClientScanner s = new ClientScanner(scan);
- s.initialize();
- return s;
- }
+ @Override
+ public ResultScanner getScanner(final Scan scan) throws IOException {
+ ClientScanner s = new ClientScanner(scan);
+ s.initialize();
+ return s;
+ }
/**
* {@inheritDoc}
@@ -607,6 +606,16 @@
/**
* {@inheritDoc}
*/
+ @Override
+ public ResultScanner getScanner(final Scan scan, final HRegionInfo hri)
+ throws IOException {
+ // TODO: Verify that scan's start/endRow - if specified - fall inside region
+ return new ClientRegionScanner(scan, hri.getStartKey());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
@Override
public Result get(final Get get) throws IOException {
return connection.getRegionServerWithRetries(
@@ -1015,20 +1024,106 @@
return writeBuffer;
}
+ protected abstract class AbstractClientScanner implements ResultScanner {
+ protected ScannerCallable callable = null;
+ protected boolean closed = false;
+
+ /**
+ * Get nbRows rows.
+ * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
+ * setting (or hbase.client.scanner.caching in hbase-site.xml).
+ * @param nbRows number of rows to return
+ * @return Between zero and nbRows RowResults. Scan is done
+ * if returned array is of zero-length (We never return null).
+ * @throws IOException
+ */
+ public Result [] next(int nbRows) throws IOException {
+ // Collect values to be returned here
+ ArrayList resultSets = new ArrayList(nbRows);
+ for(int i = 0; i < nbRows; i++) {
+ Result next = next();
+ if (next != null) {
+ resultSets.add(next);
+ } else {
+ break;
+ }
+ }
+ return resultSets.toArray(new Result[resultSets.size()]);
+ }
+
+ public void close() {
+ if (callable != null) {
+ callable.setClose();
+ try {
+ getConnection().getRegionServerWithRetries(callable);
+ } catch (IOException e) {
+ // We used to catch this error, interpret, and rethrow. However, we
+ // have since decided that it's not nice for a scanner's close to
+ // throw exceptions. Chances are it was just an UnknownScanner
+ // exception due to lease time out.
+ }
+ callable = null;
+ }
+ closed = true;
+ }
+
+ public Iterator iterator() {
+ return new Iterator() {
+ // The next RowResult, possibly pre-read
+ Result next = null;
+
+ // return true if there is another item pending, false if there isn't.
+ // this method is where the actual advancing takes place, but you need
+ // to call next() to consume it. hasNext() will only advance if there
+ // isn't a pending next().
+ public boolean hasNext() {
+ if (next == null) {
+ try {
+ next = AbstractClientScanner.this.next();
+ return next != null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+
+ // get the pending next item and advance the iterator. returns null if
+ // there is no next item.
+ public Result next() {
+ // since hasNext() does the real advancing, we call this to determine
+ // if there is a next before proceeding.
+ if (!hasNext()) {
+ return null;
+ }
+
+ // if we get to here, then hasNext() has given us an item to return.
+ // we want to return the item and then null out the next pointer, so
+ // we use a temporary variable.
+ Result temp = next;
+ next = null;
+ return temp;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
* through them all.
*/
- protected class ClientScanner implements ResultScanner {
+ protected class ClientScanner extends AbstractClientScanner {
private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
// HEADSUP: The scan internal start row can change as we move through table.
private Scan scan;
- private boolean closed = false;
// Current region scanner is against. Gets cleared if current region goes
// wonky: e.g. if it splits on us.
private HRegionInfo currentRegion = null;
- private ScannerCallable callable = null;
private final LinkedList cache = new LinkedList();
private final int caching;
private long lastNext;
@@ -1238,88 +1333,6 @@
return null;
}
- /**
- * Get nbRows rows.
- * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
- * setting (or hbase.client.scanner.caching in hbase-site.xml).
- * @param nbRows number of rows to return
- * @return Between zero and nbRows RowResults. Scan is done
- * if returned array is of zero-length (We never return null).
- * @throws IOException
- */
- public Result [] next(int nbRows) throws IOException {
- // Collect values to be returned here
- ArrayList resultSets = new ArrayList(nbRows);
- for(int i = 0; i < nbRows; i++) {
- Result next = next();
- if (next != null) {
- resultSets.add(next);
- } else {
- break;
- }
- }
- return resultSets.toArray(new Result[resultSets.size()]);
- }
-
- public void close() {
- if (callable != null) {
- callable.setClose();
- try {
- getConnection().getRegionServerWithRetries(callable);
- } catch (IOException e) {
- // We used to catch this error, interpret, and rethrow. However, we
- // have since decided that it's not nice for a scanner's close to
- // throw exceptions. Chances are it was just an UnknownScanner
- // exception due to lease time out.
- }
- callable = null;
- }
- closed = true;
- }
-
- public Iterator iterator() {
- return new Iterator() {
- // The next RowResult, possibly pre-read
- Result next = null;
-
- // return true if there is another item pending, false if there isn't.
- // this method is where the actual advancing takes place, but you need
- // to call next() to consume it. hasNext() will only advance if there
- // isn't a pending next().
- public boolean hasNext() {
- if (next == null) {
- try {
- next = ClientScanner.this.next();
- return next != null;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return true;
- }
-
- // get the pending next item and advance the iterator. returns null if
- // there is no next item.
- public Result next() {
- // since hasNext() does the real advancing, we call this to determine
- // if there is a next before proceeding.
- if (!hasNext()) {
- return null;
- }
-
- // if we get to here, then hasNext() has given us an item to return.
- // we want to return the item and then null out the next pointer, so
- // we use a temporary variable.
- Result temp = next;
- next = null;
- return temp;
- }
-
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
}
/**
@@ -1533,4 +1546,71 @@
return operationTimeout;
}
+ /**
+ * Implements the scanner interface for the HBase client.
+ * The scanner is used for exactly one region of this table.
+ */
+ protected class ClientRegionScanner extends AbstractClientScanner {
+ private final LinkedList cache = new LinkedList();
+ private final int caching;
+ private Scan scan;
+
+ /**
+ * @param scan The Scan to use
+ * @param startRow Row to identify the region
+ * @throws IOException
+ */
+ protected ClientRegionScanner(final Scan scan, final byte[] startRow)
+ throws IOException {
+ // clone the scan, as we're going to modify it.
+ this.scan = new Scan(scan);
+ this.scan.setStartRow(startRow);
+ if (this.scan.getCaching() > 0) {
+ this.caching = this.scan.getCaching();
+ } else {
+ this.caching = HTable.this.scannerCaching;
+ }
+ }
+
+ /**
+ * Get the next result.
+ * Exception are passed back to the caller.
+ */
+ @Override
+ public Result next() throws IOException {
+ if (callable == null) {
+ callable = new ScannerCallable(getConnection(),
+ getTableName(), scan);
+ callable.setCaching(this.caching);
+ // open the scanner
+ getConnection().getRegionServerWithRetries(callable);
+ }
+
+ if (cache.size() == 0 && this.closed) {
+ return null;
+ }
+ if (cache.size() == 0) {
+ Result [] values = null;
+ long remainingResultSize = maxScannerResultSize;
+ int countdown = this.caching;
+ do {
+ values = getConnection().getRegionServerWithRetries(callable);
+ if (values != null && values.length > 0) {
+ for (Result rs : values) {
+ cache.add(rs);
+ for (KeyValue kv : rs.raw()) {
+ remainingResultSize -= kv.heapSize();
+ }
+ countdown--;
+ }
+ }
+ } while (remainingResultSize > 0 && countdown > 0 && values != null
+ && values.length > 0);
+ }
+ if (cache.size() > 0) {
+ return cache.poll();
+ }
+ return null;
+ }
+ }
}