Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1234452) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -4375,6 +4375,33 @@ } + @Test + public void testScanSeekTo() throws Exception { + byte [] TABLENAME = Bytes.toBytes("testScanSeekTo"); + + HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY); + Put p = new Put(ROW); + for (int i=0; i<1000; i++) { + p.add(FAMILY, Bytes.toBytes(String.format("%03d", i)), VALUE); + } + t.put(p); + Scan s = new Scan(KeyValue.createFirstOnRow(ROW, FAMILY, Bytes.toBytes("500"))); + s.setBatch(2); + ResultScanner scanner = t.getScanner(s); + KeyValue[] kvs = scanner.next().raw(); + assertEquals(2, kvs.length); + assertEquals("500", Bytes.toString(kvs[0].getQualifier())); + assertEquals("501", Bytes.toString(kvs[1].getQualifier())); + + s = new Scan(KeyValue.createFirstOnRow(ROW, FAMILY, Bytes.toBytes("8990"))); + s.setBatch(2); + scanner = t.getScanner(s); + kvs = scanner.next().raw(); + assertEquals(2, kvs.length); + assertEquals("900", Bytes.toString(kvs[0].getQualifier())); + assertEquals("901", Bytes.toString(kvs[1].getQualifier())); + } + /** * Test ScanMetrics * @throws Exception Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1234452) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -22,12 +22,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableUtils; import java.io.DataInput; import java.io.DataOutput; @@ -84,9 +86,10 @@ private static final String RAW_ATTR = "_raw_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; - private static final byte SCAN_VERSION = (byte)2; + private static final byte SCAN_VERSION = (byte)3; private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; + private KeyValue seekTo = null; private int maxVersions = 1; private int batch = -1; // If application wants to collect scan metrics, it needs to @@ -128,6 +131,21 @@ } /** + * Create a Scan operation starting at the specified KeyValue. + * This is advanced use only. Batching should be enabled as well. + *
+ * If the specified KeyValue does not exist, the Scanner will start from the
+ * next closest KeyValue after the specified KeyValue.
+ * @param seekTo KeyValue to seek to
+ */
+ public Scan(KeyValue seekTo) {
+ this.seekTo = seekTo;
+ this.startRow = seekTo.getRow();
+ this.stopRow = startRow;
+
+ }
+
+ /**
* Create a Scan operation for the range of rows specified.
* @param startRow row to start scanner at or after (inclusive)
* @param stopRow row to stop scanner before (exclusive)
@@ -265,6 +283,10 @@
* @return this
*/
public Scan setStartRow(byte [] startRow) {
+ if (seekTo != null && !seekTo.matchingRow(startRow)) {
+ throw new IllegalArgumentException(
+ "Cannot change startRow when seekTo is already defined");
+ }
this.startRow = startRow;
return this;
}
@@ -276,11 +298,37 @@
* @return this
*/
public Scan setStopRow(byte [] stopRow) {
+ if (seekTo != null) {
+ throw new IllegalArgumentException(
+ "Cannot change stopRow when seekTo is already defined");
+ }
this.stopRow = stopRow;
return this;
}
/**
+ * Set the start KeyValue of the scan.
+ * Using a KeyValue give exact control over where to start scanning.
+ * The KeyValue can be any type include a delete marker.
+ * This is an advanced feature for intra-row scanning.
+ * The stopRow is automically set and cannot be altered.
+ * @param seekTo KeyValue to start scan on (inclusive)
+ * @return this
+ */
+ public Scan setSeekTo(KeyValue seekTo) {
+ if (startRow != null && startRow.length > 0 || stopRow != null
+ && stopRow.length > 0) {
+ throw new IllegalArgumentException(
+ "Cannot set seekTo when startRow or stopRow is already defined");
+ }
+ assert seekTo.getValueLength() == 0; // do not specify a value
+ this.seekTo = seekTo;
+ this.startRow = seekTo.getRow();
+ this.stopRow = startRow;
+ return this;
+ }
+
+ /**
* Get all available versions.
* @return this
*/
@@ -392,6 +440,13 @@
}
/**
+ * @return the KeyValue to seek to (null if none)
+ */
+ public KeyValue getSeekTo() {
+ return seekTo;
+ }
+
+ /**
* @return the max number of versions to fetch
*/
public int getMaxVersions() {
@@ -579,6 +634,15 @@
if (version > 1) {
readAttributes(in);
}
+ if (version > 2) {
+ seekTo = null;
+ int len = WritableUtils.readVInt(in);
+ if (len > 0) {
+ byte [] buf = new byte[len];
+ in.readFully(buf);
+ seekTo = new KeyValue(buf);
+ }
+ }
}
public void write(final DataOutput out)
@@ -612,6 +676,12 @@
}
}
writeAttributes(out);
+ if (seekTo == null) {
+ WritableUtils.writeVInt(out, 0);
+ } else {
+ WritableUtils.writeVInt(out, seekTo.getLength());
+ out.write(seekTo.getBuffer(), seekTo.getOffset(), seekTo.getLength());
+ }
}
/**
Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1234452)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy)
@@ -142,8 +142,18 @@
this.rowComparator = scanInfo.getComparator().getRawComparator();
this.deletes = new ScanDeleteTracker();
this.stopRow = scan.getStopRow();
- this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(),
- scanInfo.getFamily());
+ // during intra-row scan store's family is past the seek point
+ assert scan.getSeekTo() == null
+ || Bytes.compareTo(scan.getSeekTo().getFamily(), scanInfo.getFamily()) <= 0;
+ if (scan.getSeekTo() != null
+ && scan.getSeekTo().matchingFamily(scanInfo.getFamily())) {
+ // if intra-row is for this family use the seek to key
+ this.startKey = scan.getSeekTo();
+ } else {
+ // otherwise use first key for row (in this store)
+ this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(),
+ scanInfo.getFamily());
+ }
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
this.maxReadPointToTrackVersions = readPointToUse;
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1234452)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -3290,11 +3290,15 @@
scanners.addAll(additionalScanners);
}
+ byte[] seekFamily = scan.getSeekTo() == null ? null : scan.getSeekTo().getFamily();
for (Map.Entry