Index: src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (revision 1172331) +++ src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (working copy) @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -157,6 +158,17 @@ ResultScanner getScanner(Scan scan) throws IOException; /** + * Returns a scanner on the specified region. + * + * @param scan A configured {@link Scan} object. + * @param hri An {@link HRegionInfo} object. + * @return A scanner. + * @throws IOException if a remote or network exception occurs. + */ + ResultScanner getScanner(final Scan scan, final HRegionInfo hri) + throws IOException; + + /** * Gets a scanner on the current table for the given family. * * @param family The column family to scan. Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1172331) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -576,12 +575,12 @@ /** * {@inheritDoc} */ - @Override - public ResultScanner getScanner(final Scan scan) throws IOException { - ClientScanner s = new ClientScanner(scan); - s.initialize(); - return s; - } + @Override + public ResultScanner getScanner(final Scan scan) throws IOException { + ClientScanner s = new ClientScanner(scan); + s.initialize(); + return s; + } /** * {@inheritDoc} @@ -607,6 +606,16 @@ /** * {@inheritDoc} */ + @Override + public ResultScanner getScanner(final Scan scan, final HRegionInfo hri) + throws IOException { + // TODO: Verify that scan's start/endRow - if specified - fall inside region + return new ClientRegionScanner(scan, hri.getStartKey()); + } + + /** + * {@inheritDoc} + */ @Override public Result get(final Get get) throws IOException { return connection.getRegionServerWithRetries( @@ -1015,20 +1024,106 @@ return writeBuffer; } + protected abstract class AbstractClientScanner implements ResultScanner { + protected ScannerCallable callable = null; + protected boolean closed = false; + + /** + * Get nbRows rows. + * How many RPCs are made is determined by the {@link Scan#setCaching(int)} + * setting (or hbase.client.scanner.caching in hbase-site.xml). + * @param nbRows number of rows to return + * @return Between zero and nbRows RowResults. Scan is done + * if returned array is of zero-length (We never return null). + * @throws IOException + */ + public Result [] next(int nbRows) throws IOException { + // Collect values to be returned here + ArrayList resultSets = new ArrayList(nbRows); + for(int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } + + public void close() { + if (callable != null) { + callable.setClose(); + try { + getConnection().getRegionServerWithRetries(callable); + } catch (IOException e) { + // We used to catch this error, interpret, and rethrow. However, we + // have since decided that it's not nice for a scanner's close to + // throw exceptions. Chances are it was just an UnknownScanner + // exception due to lease time out. + } + callable = null; + } + closed = true; + } + + public Iterator iterator() { + return new Iterator() { + // The next RowResult, possibly pre-read + Result next = null; + + // return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + public boolean hasNext() { + if (next == null) { + try { + next = AbstractClientScanner.this.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + // get the pending next item and advance the iterator. returns null if + // there is no next item. + public Result next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + Result temp = next; + next = null; + return temp; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate * through them all. */ - protected class ClientScanner implements ResultScanner { + protected class ClientScanner extends AbstractClientScanner { private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); // HEADSUP: The scan internal start row can change as we move through table. private Scan scan; - private boolean closed = false; // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. private HRegionInfo currentRegion = null; - private ScannerCallable callable = null; private final LinkedList cache = new LinkedList(); private final int caching; private long lastNext; @@ -1238,88 +1333,6 @@ return null; } - /** - * Get nbRows rows. - * How many RPCs are made is determined by the {@link Scan#setCaching(int)} - * setting (or hbase.client.scanner.caching in hbase-site.xml). - * @param nbRows number of rows to return - * @return Between zero and nbRows RowResults. Scan is done - * if returned array is of zero-length (We never return null). - * @throws IOException - */ - public Result [] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); - for(int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - - public void close() { - if (callable != null) { - callable.setClose(); - try { - getConnection().getRegionServerWithRetries(callable); - } catch (IOException e) { - // We used to catch this error, interpret, and rethrow. However, we - // have since decided that it's not nice for a scanner's close to - // throw exceptions. Chances are it was just an UnknownScanner - // exception due to lease time out. - } - callable = null; - } - closed = true; - } - - public Iterator iterator() { - return new Iterator() { - // The next RowResult, possibly pre-read - Result next = null; - - // return true if there is another item pending, false if there isn't. - // this method is where the actual advancing takes place, but you need - // to call next() to consume it. hasNext() will only advance if there - // isn't a pending next(). - public boolean hasNext() { - if (next == null) { - try { - next = ClientScanner.this.next(); - return next != null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return true; - } - - // get the pending next item and advance the iterator. returns null if - // there is no next item. - public Result next() { - // since hasNext() does the real advancing, we call this to determine - // if there is a next before proceeding. - if (!hasNext()) { - return null; - } - - // if we get to here, then hasNext() has given us an item to return. - // we want to return the item and then null out the next pointer, so - // we use a temporary variable. - Result temp = next; - next = null; - return temp; - } - - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } } /** @@ -1533,4 +1546,71 @@ return operationTimeout; } + /** + * Implements the scanner interface for the HBase client. + * The scanner is used for exactly one region of this table. + */ + protected class ClientRegionScanner extends AbstractClientScanner { + private final LinkedList cache = new LinkedList(); + private final int caching; + private Scan scan; + + /** + * @param scan The Scan to use + * @param startRow Row to identify the region + * @throws IOException + */ + protected ClientRegionScanner(final Scan scan, final byte[] startRow) + throws IOException { + // clone the scan, as we're going to modify it. + this.scan = new Scan(scan); + this.scan.setStartRow(startRow); + if (this.scan.getCaching() > 0) { + this.caching = this.scan.getCaching(); + } else { + this.caching = HTable.this.scannerCaching; + } + } + + /** + * Get the next result. + * Exception are passed back to the caller. + */ + @Override + public Result next() throws IOException { + if (callable == null) { + callable = new ScannerCallable(getConnection(), + getTableName(), scan); + callable.setCaching(this.caching); + // open the scanner + getConnection().getRegionServerWithRetries(callable); + } + + if (cache.size() == 0 && this.closed) { + return null; + } + if (cache.size() == 0) { + Result [] values = null; + long remainingResultSize = maxScannerResultSize; + int countdown = this.caching; + do { + values = getConnection().getRegionServerWithRetries(callable); + if (values != null && values.length > 0) { + for (Result rs : values) { + cache.add(rs); + for (KeyValue kv : rs.raw()) { + remainingResultSize -= kv.heapSize(); + } + countdown--; + } + } + } while (remainingResultSize > 0 && countdown > 0 && values != null + && values.length > 0); + } + if (cache.size() > 0) { + return cache.poll(); + } + return null; + } + } }