diff --git a/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java b/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java index 1214a1a..3bb76f7 100644 --- a/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java +++ b/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.base.Preconditions; /** @@ -36,17 +37,27 @@ import com.google.common.base.Preconditions; public class ColumnPaginationFilter extends FilterBase { private int limit = 0; - private int offset = 0; + private int offset = -1; + private byte[] columnOffset = null; private int count = 0; - /** - * Used during serialization. Do not use. - */ + /** + * Used during serialization. Do not use. + */ public ColumnPaginationFilter() { super(); } + /** + * Initializes filter with an integer offset and limit. The offset is arrived at + * scanning sequentially and skipping entries. @limit number of columns are + * then retrieved. If multiple column families are involved, the columns may be spread + * across them. + * + * @param limit Max number of columns to return. + * @param offset The integer offset where to start pagination. + */ public ColumnPaginationFilter(final int limit, final int offset) { Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit); @@ -56,6 +67,25 @@ public class ColumnPaginationFilter extends FilterBase } /** + * Initialized filter with a string/bookmark based offset and limit. The offset is arrived + * at, by seeking to it using scanner hints. If multiple column families are involved, + * pagination starts at the first column family which contains @columnOffset. Columns are + * then retrieved sequentially upto @limit number of columns which maybe spread across + * multiple column families, depending on how the scan is setup. + * + * @param limit Max number of columns to return. + * @param columnOffset The string/bookmark offset on where to start pagination. + */ + public ColumnPaginationFilter(final int limit, final byte[] columnOffset) { + Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit); + Preconditions.checkArgument(columnOffset != null, + "columnOffset must be non-null %s", + columnOffset); + this.limit = limit; + this.columnOffset = columnOffset; + } + + /** * @return limit */ public int getLimit() { @@ -72,15 +102,47 @@ public class ColumnPaginationFilter extends FilterBase @Override public ReturnCode filterKeyValue(KeyValue v) { - if(count >= offset + limit) - { - return ReturnCode.NEXT_ROW; + if (columnOffset != null) { + if (count >= limit) { + return ReturnCode.NEXT_ROW; + } + byte[] buffer = v.getBuffer(); + if (buffer == null) { + return ReturnCode.SEEK_NEXT_USING_HINT; + } + int cmp = 0; + // Only compare if no KV's have been seen so far. + if (count == 0) { + cmp = Bytes.compareTo(buffer, + v.getQualifierOffset(), + v.getQualifierLength(), + this.columnOffset, + 0, + this.columnOffset.length); + } + if (cmp < 0) { + return ReturnCode.SEEK_NEXT_USING_HINT; + } else { + count++; + return ReturnCode.INCLUDE_AND_NEXT_COL; + } } + else { + if (count >= offset + limit) { + return ReturnCode.NEXT_ROW; + } - ReturnCode code = count < offset ? ReturnCode.NEXT_COL : - ReturnCode.INCLUDE_AND_NEXT_COL; - count++; - return code; + ReturnCode code = count < offset ? ReturnCode.NEXT_COL : + ReturnCode.INCLUDE_AND_NEXT_COL; + count++; + return code; + } + } + + public KeyValue getNextKeyHint(KeyValue kv) { + return KeyValue.createFirstOnRow( + kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(), + kv.getFamilyOffset(), kv.getFamilyLength(), columnOffset, 0, columnOffset.length); } @Override @@ -101,12 +163,17 @@ public class ColumnPaginationFilter extends FilterBase { this.limit = in.readInt(); this.offset = in.readInt(); + this.columnOffset = Bytes.readByteArray(in); + if (this.columnOffset.length == 0) { + this.columnOffset = null; + } } public void write(DataOutput out) throws IOException { out.writeInt(this.limit); out.writeInt(this.offset); + Bytes.writeByteArray(out, this.columnOffset); } @Override diff --git a/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPaginationFilter.java b/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPaginationFilter.java index 720d882..fad15f3 100644 --- a/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPaginationFilter.java +++ b/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPaginationFilter.java @@ -41,18 +41,24 @@ public class TestColumnPaginationFilter extends TestCase private static final byte[] ROW = Bytes.toBytes("row_1_test"); private static final byte[] COLUMN_FAMILY = Bytes.toBytes("test"); private static final byte[] VAL_1 = Bytes.toBytes("a"); - private static final byte [] COLUMN_QUALIFIER = Bytes.toBytes("foo"); + private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("foo"); + private Filter columnPaginationFilterOffset; private Filter columnPaginationFilter; @Override protected void setUp() throws Exception { super.setUp(); + columnPaginationFilterOffset = getColumnPaginationFilterOffset(); columnPaginationFilter = getColumnPaginationFilter(); + } + private Filter getColumnPaginationFilterOffset() { + return new ColumnPaginationFilter(1, COLUMN_QUALIFIER); } + private Filter getColumnPaginationFilter() { - return new ColumnPaginationFilter(1,0); + return new ColumnPaginationFilter(1, 0); } private Filter serializationTest(Filter filter) throws Exception { @@ -91,6 +97,9 @@ public class TestColumnPaginationFilter extends TestCase public void testSerialization() throws Exception { Filter newFilter = serializationTest(columnPaginationFilter); basicFilterTests((ColumnPaginationFilter)newFilter); + + Filter newFilterOffset = serializationTest(columnPaginationFilterOffset); + basicFilterTests((ColumnPaginationFilter)newFilterOffset); } diff --git a/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java b/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java index d583700..80d2fb4 100644 --- a/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java +++ b/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java @@ -1402,7 +1402,7 @@ public class TestFilter extends HBaseTestCase { " total but already scanned " + (results.size() + idx) + (results.isEmpty() ? "" : "(" + results.get(0).toString() + ")"), kvs.length >= idx + results.size()); - for(KeyValue kv : results) { + for (KeyValue kv : results) { LOG.info("row=" + row + ", result=" + kv.toString() + ", match=" + kvs[idx].toString()); assertTrue("Row mismatch", @@ -1469,6 +1469,90 @@ public class TestFilter extends HBaseTestCase { kvs.length, idx); } + public void testColumnPaginationFilterColumnOffset() throws Exception { + KeyValue [] expectedKVs = { + // testRowOne-0 + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + // testRowOne-2 + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + // testRowOne-3 + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + // testRowTwo-0 + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + // testRowTwo-2 + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + // testRowTwo-3 + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + }; + KeyValue [] expectedKVs1 = { + // testRowTwo-0 + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-2 + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-3 + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]) + }; + KeyValue [] expectedKVs2 = { + // testRowTwo-0 + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + // testRowTwo-2 + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + // testRowTwo-3 + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]) + }; + KeyValue [] expectedKVs3 = { + // testRowTwo-0 + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-2 + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-3 + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + }; + Scan s = new Scan(); + + // Page size 1. + long expectedRows = 6; + long expectedKeys = 1; + s.setFilter(new ColumnPaginationFilter(1, QUALIFIERS_ONE[1])); + verifyScan(s, expectedRows, expectedKeys); + this.verifyScanFull(s, expectedKVs); + + // Page size 2. + expectedRows = 3; + expectedKeys = 2; + s = new Scan(); + s.setFilter(new ColumnPaginationFilter(2, QUALIFIERS_TWO[2])); + verifyScan(s, expectedRows, expectedKeys); + this.verifyScanFull(s, expectedKVs1); + + // Page size 3 across multiple column families. + expectedRows = 3; + expectedKeys = 3; + s = new Scan(); + s.setFilter(new ColumnPaginationFilter(3, QUALIFIERS_TWO[2])); + verifyScan(s, expectedRows, expectedKeys); + this.verifyScanFull(s, expectedKVs2); + + // Page size 2 restricted to one column family. + expectedRows = 3; + expectedKeys = 2; + s = new Scan(); + s.addFamily(FAMILIES[1]); + s.setFilter(new ColumnPaginationFilter(2, QUALIFIERS_TWO[2])); + this.verifyScanFull(s, expectedKVs3); + } public void testColumnPaginationFilter() throws Exception { // Test that the filter skips multiple column versions. @@ -1494,7 +1578,6 @@ public class TestFilter extends HBaseTestCase { new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]) }; - // Set of KVs (page: 3; pageSize: 1) - the third set of 1 column per row KeyValue [] expectedKVs2 = { // testRowOne-0 @@ -1566,7 +1649,7 @@ public class TestFilter extends HBaseTestCase { expectedRows = 0; verifyScan(s, expectedRows, 0); this.verifyScanFull(s, expectedKVs4); - } + } public void testKeyOnlyFilter() throws Exception {