Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1428320) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -291,6 +291,12 @@ } @Override + public boolean postScannerFilterRow(final ObserverContext e, + final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException { + return hasMore; + } + + @Override public void preScannerClose(final ObserverContext e, final InternalScanner s) throws IOException { } Index: src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1428320) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -697,6 +697,27 @@ throws IOException; /** + * This will be called by the scan flow when the current scanned row is being filtered out by the + * filter. The filter may be filtering out the row via any of the below scenarios + *
    + *
  1. + * boolean filterRowKey(byte [] buffer, int offset, int length) returning true
  2. + *
  3. + * boolean filterRow() returning true
  4. + *
  5. + * void filterRow(List kvs) removing all the kvs from the passed List
  6. + *
+ * @param c the environment provided by the region server + * @param s the scanner + * @param currentRow The current rowkey which got filtered out. + * @param hasMore the 'has more' indication + * @return Returns whether more rows are available for the scanner or not. + * @throws IOException + */ + boolean postScannerFilterRow(final ObserverContext c, + final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException; + + /** * Called before the client closes a scanner. *

* Call CoprocessorEnvironment#bypass to skip default actions Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1428320) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1697,7 +1697,7 @@ protected RegionScanner instantiateRegionScanner(Scan scan, List additionalScanners) throws IOException { - return new RegionScannerImpl(scan, additionalScanners); + return new RegionScannerImpl(scan, additionalScanners, this); } /* @@ -3474,13 +3474,16 @@ private int isScan; private boolean filterClosed = false; private long readPt; + private HRegion region; public HRegionInfo getRegionInfo() { return regionInfo; } - RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { - //DebugPrint.println("HRegionScanner."); - + + RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) + throws IOException { + // DebugPrint.println("HRegionScanner."); + this.region = region; this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -3520,8 +3523,8 @@ this.storeHeap = new KeyValueHeap(scanners, comparator); } - RegionScannerImpl(Scan scan) throws IOException { - this(scan, null); + RegionScannerImpl(Scan scan, HRegion region) throws IOException { + this(scan, null, region); } @Override @@ -3636,7 +3639,8 @@ return false; } else if (filterRowKey(currentRow, offset, length)) { - nextRow(currentRow, offset, length); + boolean moreRows = nextRow(currentRow, offset, length); + if (!moreRows) return false; } else { KeyValue nextKv; do { @@ -3666,8 +3670,9 @@ // the reasons for calling this method are: // 1. reset the filters. // 2. provide a hook to fast forward the row (used by subclasses) - nextRow(currentRow, offset, length); - + boolean moreRows = nextRow(currentRow, offset, length); + if (!moreRows) return false; + // This row was totally filtered out, if this is NOT the last row, // we should continue on. @@ -3687,13 +3692,18 @@ && filter.filterRowKey(row, offset, length); } - protected void nextRow(byte [] currentRow, int offset, short length) throws IOException { + protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException { KeyValue next; while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) { this.storeHeap.next(MOCKED_LIST); } results.clear(); resetFilters(); + // Calling the hook in CP which allows it to do a fast forward + if (this.region.getCoprocessorHost() != null) { + return this.region.getCoprocessorHost().postScannerFilterRow(this, currentRow); + } + return true; } private boolean isStopRow(byte [] currentRow, int offset, short length) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1428320) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -1329,6 +1329,32 @@ } /** + * + * @param s + * @param currentRow + * @return + * @throws IOException + */ + public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow) + throws IOException { + boolean hasMore = true; // By default assume more rows there. + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow, + hasMore); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + return hasMore; + } + + /** * @param s the scanner * @return true if default behavior should be bypassed, false otherwise * @exception IOException Exception