Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1438850) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -291,6 +291,12 @@ } @Override + public boolean postScannerFilterRow(final ObserverContext e, + final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException { + return hasMore; + } + + @Override public void preScannerClose(final ObserverContext e, final InternalScanner s) throws IOException { } Index: src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1438850) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -695,6 +695,27 @@ final InternalScanner s, final List result, final int limit, final boolean hasNext) throws IOException; + + /** + * This will be called by the scan flow when the current scanned row is being filtered out by the + * filter. The filter may be filtering out the row via any of the below scenarios + *
    + *
  1. + * boolean filterRowKey(byte [] buffer, int offset, int length) returning true
  2. + *
  3. + * boolean filterRow() returning true
  4. + *
  5. + * void filterRow(List kvs) removing all the kvs from the passed List
  6. + *
+ * @param c the environment provided by the region server + * @param s the scanner + * @param currentRow The current rowkey which got filtered out + * @param hasMore the 'has more' indication + * @return whether more rows are available for the scanner or not + * @throws IOException + */ + boolean postScannerFilterRow(final ObserverContext c, + final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException; /** * Called before the client closes a scanner. Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1438850) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1710,7 +1710,7 @@ protected RegionScanner instantiateRegionScanner(Scan scan, List additionalScanners) throws IOException { - return new RegionScannerImpl(scan, additionalScanners); + return new RegionScannerImpl(scan, additionalScanners, this); } /* @@ -3524,13 +3524,16 @@ private int isScan; private boolean filterClosed = false; private long readPt; + private HRegion region; public HRegionInfo getRegionInfo() { return regionInfo; } - RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { - //DebugPrint.println("HRegionScanner."); - + + RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) + throws IOException { + // DebugPrint.println("HRegionScanner."); + this.region = region; this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -3581,8 +3584,8 @@ } } - RegionScannerImpl(Scan scan) throws IOException { - this(scan, null); + RegionScannerImpl(Scan scan, HRegion region) throws IOException { + this(scan, null, region); } @Override @@ -3760,7 +3763,8 @@ // Check if rowkey filter wants to exclude this row. If so, loop to next. // Techically, if we hit limits before on this row, we don't need this call. if (filterRowKey(currentRow, offset, length)) { - nextRow(currentRow, offset, length); + boolean moreRows = nextRow(currentRow, offset, length); + if (!moreRows) return false; results.clear(); continue; } @@ -3786,7 +3790,8 @@ } if (isEmptyRow || filterRow()) { - nextRow(currentRow, offset, length); + boolean moreRows = nextRow(currentRow, offset, length); + if (!moreRows) return false; results.clear(); // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. @@ -3825,7 +3830,8 @@ // Double check to prevent empty rows from appearing in result. It could be // the case when SingleValueExcludeFilter is used. if (results.isEmpty()) { - nextRow(currentRow, offset, length); + boolean moreRows = nextRow(currentRow, offset, length); + if (!moreRows) return false; if (!stopRow) continue; } @@ -3843,12 +3849,17 @@ && filter.filterRowKey(row, offset, length); } - protected void nextRow(byte [] currentRow, int offset, short length) throws IOException { + protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException { KeyValue next; while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) { this.storeHeap.next(MOCKED_LIST); } resetFilters(); + // Calling the hook in CP which allows it to do a fast forward + if (this.region.getCoprocessorHost() != null) { + return this.region.getCoprocessorHost().postScannerFilterRow(this, currentRow); + } + return true; } private boolean isStopRow(byte [] currentRow, int offset, short length) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1438850) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -1330,6 +1330,35 @@ } /** + * This will be called by the scan flow when the current scanned row is being filtered out by the + * filter. + * @param s the scanner + * @param currentRow The current rowkey which got filtered out + * @return whether more rows are available for the scanner or not + * @throws IOException + */ + public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow) + throws IOException { + boolean hasMore = true; // By default assume more rows there. + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow, + hasMore); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return hasMore; + } + + /** * @param s the scanner * @return true if default behavior should be bypassed, false otherwise * @exception IOException Exception