diff --git src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java index c6946bb..2bca1c4 100644 --- src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java +++ src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java @@ -64,7 +64,8 @@ public class ColumnCountGetFilter extends FilterBase { @Override public ReturnCode filterKeyValue(KeyValue v) { this.count++; - return filterAllRemaining() ? ReturnCode.SKIP: ReturnCode.INCLUDE; + return filterAllRemaining() ? ReturnCode.NEXT_COL: + ReturnCode.INCLUDE_AND_NEXT_COL; } @Override diff --git src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java index d6b1280..1214a1a 100644 --- src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java +++ src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java @@ -30,7 +30,8 @@ import com.google.common.base.Preconditions; /** * A filter, based on the ColumnCountGetFilter, takes two arguments: limit and offset. * This filter can be used for row-based indexing, where references to other tables are stored across many columns, - * in order to efficient lookups and paginated results for end users. + * in order to efficient lookups and paginated results for end users. Only most recent versions are considered + * for pagination. */ public class ColumnPaginationFilter extends FilterBase { @@ -76,7 +77,8 @@ public class ColumnPaginationFilter extends FilterBase return ReturnCode.NEXT_ROW; } - ReturnCode code = count < offset ? ReturnCode.SKIP : ReturnCode.INCLUDE; + ReturnCode code = count < offset ? ReturnCode.NEXT_COL : + ReturnCode.INCLUDE_AND_NEXT_COL; count++; return code; } diff --git src/main/java/org/apache/hadoop/hbase/filter/Filter.java src/main/java/org/apache/hadoop/hbase/filter/Filter.java index 02ea5f5..ac1079f 100644 --- src/main/java/org/apache/hadoop/hbase/filter/Filter.java +++ src/main/java/org/apache/hadoop/hbase/filter/Filter.java @@ -113,6 +113,10 @@ public interface Filter extends Writable { */ INCLUDE, /** + * Include the KeyValue and seek to the next column skipping older versions. + */ + INCLUDE_AND_NEXT_COL, + /** * Skip this KeyValue */ SKIP, diff --git src/main/java/org/apache/hadoop/hbase/filter/FilterList.java src/main/java/org/apache/hadoop/hbase/filter/FilterList.java index c3064e9..8e12d39 100644 --- src/main/java/org/apache/hadoop/hbase/filter/FilterList.java +++ src/main/java/org/apache/hadoop/hbase/filter/FilterList.java @@ -201,6 +201,9 @@ public class FilterList implements Filter { } ReturnCode code = filter.filterKeyValue(v); switch (code) { + // Override INCLUDE and continue to evaluate. + case INCLUDE_AND_NEXT_COL: + rc = ReturnCode.INCLUDE_AND_NEXT_COL; case INCLUDE: continue; default: @@ -213,7 +216,12 @@ public class FilterList implements Filter { switch (filter.filterKeyValue(v)) { case INCLUDE: - rc = ReturnCode.INCLUDE; + if (rc != ReturnCode.INCLUDE_AND_NEXT_COL) { + rc = ReturnCode.INCLUDE; + } + break; + case INCLUDE_AND_NEXT_COL: + rc = ReturnCode.INCLUDE_AND_NEXT_COL; // must continue here to evaluate all filters case NEXT_ROW: case SKIP: diff --git src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index 3435d62..d1cab8e 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -337,8 +337,9 @@ public class ScanQueryMatcher { * counter for even that KV which may be discarded later on by Filter. This * would lead to incorrect results in certain cases. */ + ReturnCode filterResponse = ReturnCode.SKIP; if (filter != null) { - ReturnCode filterResponse = filter.filterKeyValue(kv); + filterResponse = filter.filterKeyValue(kv); if (filterResponse == ReturnCode.SKIP) { return MatchCode.SKIP; } else if (filterResponse == ReturnCode.NEXT_COL) { @@ -360,6 +361,9 @@ public class ScanQueryMatcher { */ if (colChecker == MatchCode.SEEK_NEXT_ROW) { stickyNextRow = true; + } else if (filter != null && colChecker == MatchCode.INCLUDE && + filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL) { + return MatchCode.INCLUDE_AND_SEEK_NEXT_COL; } return colChecker; diff --git src/test/java/org/apache/hadoop/hbase/filter/TestColumnPaginationFilter.java src/test/java/org/apache/hadoop/hbase/filter/TestColumnPaginationFilter.java index e26ce7e..720d882 100644 --- src/test/java/org/apache/hadoop/hbase/filter/TestColumnPaginationFilter.java +++ src/test/java/org/apache/hadoop/hbase/filter/TestColumnPaginationFilter.java @@ -81,7 +81,7 @@ public class TestColumnPaginationFilter extends TestCase private void basicFilterTests(ColumnPaginationFilter filter) throws Exception { KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1); - assertTrue("basicFilter1", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); + assertTrue("basicFilter1", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE_AND_NEXT_COL); } /** diff --git src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java index 130d84d..d583700 100644 --- src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java +++ src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java @@ -1471,8 +1471,14 @@ public class TestFilter extends HBaseTestCase { public void testColumnPaginationFilter() throws Exception { + // Test that the filter skips multiple column versions. + Put p = new Put(ROWS_ONE[0]); + p.setWriteToWAL(false); + p.add(FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]); + this.region.put(p); + this.region.flushcache(); - // Set of KVs (page: 1; pageSize: 1) - the first set of 1 column per row + // Set of KVs (page: 1; pageSize: 1) - the first set of 1 column per row KeyValue [] expectedKVs = { // testRowOne-0 new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), diff --git src/test/java/org/apache/hadoop/hbase/filter/TestFilterList.java src/test/java/org/apache/hadoop/hbase/filter/TestFilterList.java index 2b69a6f..05c0269 100644 --- src/test/java/org/apache/hadoop/hbase/filter/TestFilterList.java +++ src/test/java/org/apache/hadoop/hbase/filter/TestFilterList.java @@ -234,6 +234,77 @@ public class TestFilterList extends TestCase { } /** + * Test filterKeyValue logic. + * @throws Exception + */ + public void testFilterKeyValue() throws Exception { + Filter includeFilter = new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(KeyValue v) { + return Filter.ReturnCode.INCLUDE; + } + + @Override + public void readFields(DataInput arg0) throws IOException {} + + @Override + public void write(DataOutput arg0) throws IOException {} + }; + + Filter alternateFilter = new FilterBase() { + boolean returnInclude = true; + + @Override + public Filter.ReturnCode filterKeyValue(KeyValue v) { + Filter.ReturnCode returnCode = returnInclude ? Filter.ReturnCode.INCLUDE : + Filter.ReturnCode.SKIP; + returnInclude = !returnInclude; + return returnCode; + } + + @Override + public void readFields(DataInput arg0) throws IOException {} + + @Override + public void write(DataOutput arg0) throws IOException {} + }; + + Filter alternateIncludeFilter = new FilterBase() { + boolean returnIncludeOnly = false; + + @Override + public Filter.ReturnCode filterKeyValue(KeyValue v) { + Filter.ReturnCode returnCode = returnIncludeOnly ? Filter.ReturnCode.INCLUDE : + Filter.ReturnCode.INCLUDE_AND_NEXT_COL; + returnIncludeOnly = !returnIncludeOnly; + return returnCode; + } + + @Override + public void readFields(DataInput arg0) throws IOException {} + + @Override + public void write(DataOutput arg0) throws IOException {} + }; + + // Check must pass one filter. + FilterList mpOnefilterList = new FilterList(Operator.MUST_PASS_ONE, + Arrays.asList(new Filter[] { includeFilter, alternateIncludeFilter, alternateFilter })); + // INCLUDE, INCLUDE, INCLUDE_AND_NEXT_COL. + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, mpOnefilterList.filterKeyValue(null)); + // INCLUDE, SKIP, INCLUDE. + assertEquals(Filter.ReturnCode.INCLUDE, mpOnefilterList.filterKeyValue(null)); + + // Check must pass all filter. + FilterList mpAllfilterList = new FilterList(Operator.MUST_PASS_ALL, + Arrays.asList(new Filter[] { includeFilter, alternateIncludeFilter, alternateFilter })); + // INCLUDE, INCLUDE, INCLUDE_AND_NEXT_COL. + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, mpAllfilterList.filterKeyValue(null)); + // INCLUDE, SKIP, INCLUDE. + assertEquals(Filter.ReturnCode.SKIP, mpAllfilterList.filterKeyValue(null)); + } + + /** * Test pass-thru of hints. */ public void testHintPassThru() throws Exception {