Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1418818) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3490,6 +3490,10 @@ this(scan, null); } + @Override + public long getMvccReadPoint() { + return this.readPt; + } /** * Reset both the filter and the old filter. */ @@ -3500,7 +3504,7 @@ } @Override - public synchronized boolean next(List outResults, int limit) + public boolean next(List outResults, int limit) throws IOException { return next(outResults, limit, null); } @@ -3520,30 +3524,42 @@ // This could be a new thread from the last time we called next(). MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); - results.clear(); - - boolean returnResult = nextInternal(limit, metric); - - outResults.addAll(results); - resetFilters(); - if (isFilterDone()) { - return false; - } - return returnResult; + return nextRaw(outResults, limit, metric); } finally { closeRegionOperation(); } } @Override - public synchronized boolean next(List outResults) + public boolean nextRaw(List outResults, String metric) throws IOException { + return nextRaw(outResults, batch, metric); + } + + @Override + public boolean nextRaw(List outResults, int limit, + String metric) throws IOException { + results.clear(); + + boolean returnResult = nextInternal(limit, metric); + + outResults.addAll(results); + resetFilters(); + if (isFilterDone()) { + return false; + } + return returnResult; + } + + @Override + public boolean next(List outResults) + throws IOException { // apply the batching limit by default return next(outResults, batch, null); } @Override - public synchronized boolean next(List outResults, String metric) + public boolean next(List outResults, String metric) throws IOException { // apply the batching limit by default return next(outResults, batch, metric); @@ -5245,7 +5261,7 @@ * @throws RegionTooBusyException if failed to get the lock in time * @throws InterruptedIOException if interrupted while waiting for a lock */ - private void startRegionOperation() + public void startRegionOperation() throws NotServingRegionException, RegionTooBusyException, InterruptedIOException { if (this.closing.get()) { throw new NotServingRegionException(regionInfo.getRegionNameAsString() + @@ -5263,7 +5279,7 @@ * Closes the lock. This needs to be called in the finally block corresponding * to the try block of #startRegionOperation */ - private void closeRegionOperation(){ + public void closeRegionOperation(){ lock.readLock().unlock(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1418818) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -165,6 +165,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; import org.codehaus.jackson.map.ObjectMapper; +import org.joda.time.field.MillisDurationField; import com.google.common.base.Function; import com.google.common.collect.Lists; @@ -2430,23 +2431,32 @@ } } - for (int i = 0; i < nbRows - && currentScanResultSize < maxScannerResultSize; i++) { - requestCount.incrementAndGet(); - // Collect values to be returned here - boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE); - if (!values.isEmpty()) { - for (KeyValue kv : values) { - currentScanResultSize += kv.heapSize(); + MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint()); + region.startRegionOperation(); + try { + int i = 0; + synchronized(s) { + for (; i < nbRows + && currentScanResultSize < maxScannerResultSize; i++) { + // Collect values to be returned here + boolean moreRows = s.nextRaw(values, SchemaMetrics.METRIC_NEXTSIZE); + if (!values.isEmpty()) { + for (KeyValue kv : values) { + currentScanResultSize += kv.heapSize(); + } + results.add(new Result(values)); + } + if (!moreRows) { + break; + } + values.clear(); } - results.add(new Result(values)); } - if (!moreRows) { - break; - } - values.clear(); + requestCount.addAndGet(i); + region.readRequestsCount.add(i); + } finally { + region.closeRegionOperation(); } - // coprocessor postNext hook if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerNext(s, results, nbRows, true); Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (revision 1418818) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (working copy) @@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; + import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; /** * RegionScanner describes iterators over rows in an HRegion. @@ -49,4 +52,50 @@ */ public boolean reseek(byte[] row) throws IOException; + /** + * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl} + */ + public long getMvccReadPoint(); + + /** + * Grab the next row's worth of values with the default limit on the number of values + * to return. + * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. + * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object. + * See {@link #nextRaw(List, int, String)} + * @param result return output array + * @param metric the metric name + * @return true if more rows exist after this one, false if scanner is done + * @throws IOException e + */ + public boolean nextRaw(List result, String metric) throws IOException; + + /** + * Grab the next row's worth of values with a limit on the number of values + * to return. + * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. + * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object. + * Example: + *
+   * HRegion region = ...;
+   * RegionScanner scanner = ...
+   * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+   * region.startRegionOperation();
+   * try {
+   *   synchronized(scanner) {
+   *     ...
+   *     boolean moreRows = scanner.nextRaw(values);
+   *     ...
+   *   }
+   * } finally {
+   *   region.closeRegionOperation();
+   * }
+   * 
+ * @param result return output array + * @param limit limit on row count to get + * @param metric the metric name + * @return true if more rows exist after this one, false if scanner is done + * @throws IOException e + */ + public boolean nextRaw(List result, int limit, String metric) throws IOException; } Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (revision 1418818) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (working copy) @@ -85,6 +85,18 @@ } @Override + public boolean nextRaw(List result, int limit, String metric) + throws IOException { + return delegate.nextRaw(result, limit, metric); + } + + @Override + public boolean nextRaw(List result, String metric) + throws IOException { + return delegate.nextRaw(result, metric); + } + + @Override public void close() throws IOException { delegate.close(); } @@ -104,6 +116,10 @@ return false; } + @Override + public long getMvccReadPoint() { + return delegate.getMvccReadPoint(); + } } public static class CoprocessorImpl extends BaseRegionObserver {