Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1333121) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3231,6 +3231,7 @@ private int isScan; private boolean filterClosed = false; private long readPt; + private long maxResultSize; public HRegionInfo getRegionInfo() { return regionInfo; @@ -3238,6 +3239,7 @@ RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); + this.maxResultSize = scan.getMaxResultSize(); this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -3281,6 +3283,11 @@ this(scan, null); } + @Override + public long getMaxResultSize() { + return maxResultSize; + } + /** * Reset both the filter and the old filter. */ Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (revision 1333121) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.Scan; /** * RegionScanner describes iterators over rows in an HRegion. @@ -49,4 +50,8 @@ */ public boolean reseek(byte[] row) throws IOException; + /** + * @return The preferred max buffersize. See {@link Scan#setMaxResultSize(long)} + */ + public long getMaxResultSize(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1333121) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2385,9 +2385,13 @@ : results.toArray(new Result[0]); } } - - for (int i = 0; i < nbRows - && currentScanResultSize < maxScannerResultSize; i++) { + long maxResultSize; + if (s.getMaxResultSize() > 0) { + maxResultSize = s.getMaxResultSize(); + } else { + maxResultSize = maxScannerResultSize; + } + for (int i = 0; i < nbRows && currentScanResultSize < maxResultSize; i++) { requestCount.incrementAndGet(); // Collect values to be returned here boolean moreRows = s.next(values); Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1333121) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -95,9 +95,13 @@ this.tableName = tableName; this.lastNext = System.currentTimeMillis(); this.connection = connection; - this.maxScannerResultSize = conf.getLong( + if (scan.getMaxResultSize() > 0) { + this.maxScannerResultSize = scan.getMaxResultSize(); + } else { + this.maxScannerResultSize = conf.getLong( HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + } this.scannerTimeout = (int) conf.getLong( HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); @@ -272,6 +276,7 @@ // returns an empty array if scanning is to go on and we've just // exhausted current region. values = callable.withRetries(); + if (values != null) System.out.println(values.length); } catch (DoNotRetryIOException e) { if (e instanceof UnknownScannerException) { long timeout = lastNext + scannerTimeout; Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1333121) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -51,8 +51,11 @@ * To scan everything for each row, instantiate a Scan object. *

* To modify scanner caching for just this scan, use {@link #setCaching(int) setCaching}. - * If caching is NOT set, we will use the caching value of the hosting - * {@link HTable}. See {@link HTable#setScannerCaching(int)}. + * If caching is NOT set, we will use the caching value of the hosting {@link HTable}. See + * {@link HTable#setScannerCaching(int)}. In addition to row caching, it is possible to specify a + * maximum result size, using {@link #setMaxResultSize(long)}. When both are used, + * single server requests are limited by either number of rows or maximum result size, whichever + * limit comes first. *

* To further define the scope of what to get when scanning, perform additional * methods as outlined below. @@ -84,7 +87,7 @@ private static final String RAW_ATTR = "_raw_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; - private static final byte SCAN_VERSION = (byte)2; + private static final byte SCAN_VERSION = (byte)3; private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; private int maxVersions = 1; @@ -100,6 +103,7 @@ * -1 means no caching */ private int caching = -1; + private long maxResultSize = -1; private boolean cacheBlocks = true; private Filter filter = null; private TimeRange tr = new TimeRange(); @@ -149,6 +153,7 @@ maxVersions = scan.getMaxVersions(); batch = scan.getBatch(); caching = scan.getCaching(); + maxResultSize = scan.getMaxResultSize(); cacheBlocks = scan.getCacheBlocks(); filter = scan.getFilter(); // clone? TimeRange ctr = scan.getTimeRange(); @@ -323,6 +328,24 @@ } /** + * @return the maximum result size in bytes. See {@link #setMaxResultSize(long)} + */ + public long getMaxResultSize() { + return maxResultSize; + } + + /** + * Set the maximum result size. The default is -1; this means that no specific + * maximum result size will be set for this scan, and the global configured + * value will be used instead. (Defaults to unlimited). + * + * @param maxResultSize The maximum result size in bytes. + */ + public void setMaxResultSize(long maxResultSize) { + this.maxResultSize = maxResultSize; + } + + /** * Apply the specified server-side filter when performing the Scan. * @param filter filter to run on the server * @return this @@ -500,6 +523,7 @@ map.put("maxVersions", this.maxVersions); map.put("batch", this.batch); map.put("caching", this.caching); + map.put("maxResultSize", this.maxResultSize); map.put("cacheBlocks", this.cacheBlocks); List timeRange = new ArrayList(); timeRange.add(this.tr.getMin()); @@ -582,6 +606,9 @@ if (version > 1) { readAttributes(in); } + if (version > 2) { + this.maxResultSize = in.readLong(); + } } public void write(final DataOutput out) @@ -615,6 +642,7 @@ } } writeAttributes(out); + out.writeLong(maxResultSize); } /** Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (revision 1333121) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (working copy) @@ -89,6 +89,11 @@ return false; } + @Override + public long getMaxResultSize() { + return delegate.getMaxResultSize(); + } + } public static class CoprocessorImpl extends BaseRegionObserver {