Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1234269) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -4375,6 +4375,33 @@ } + @Test + public void testScanSeekTo() throws Exception { + byte [] TABLENAME = Bytes.toBytes("testScanSeekTo"); + + HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY); + Put p = new Put(ROW); + for (int i=0; i<1000; i++) { + p.add(FAMILY, Bytes.toBytes(String.format("%03d", i)), VALUE); + } + t.put(p); + Scan s = new Scan(KeyValue.createFirstOnRow(ROW, FAMILY, Bytes.toBytes("500"))); + s.setBatch(2); + ResultScanner scanner = t.getScanner(s); + KeyValue[] kvs = scanner.next().raw(); + assertEquals(2, kvs.length); + assertEquals("500", Bytes.toString(kvs[0].getQualifier())); + assertEquals("501", Bytes.toString(kvs[1].getQualifier())); + + s = new Scan(KeyValue.createFirstOnRow(ROW, FAMILY, Bytes.toBytes("8990"))); + s.setBatch(2); + scanner = t.getScanner(s); + kvs = scanner.next().raw(); + assertEquals(2, kvs.length); + assertEquals("900", Bytes.toString(kvs[0].getQualifier())); + assertEquals("901", Bytes.toString(kvs[1].getQualifier())); + } + /** * Test ScanMetrics * @throws Exception Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1234269) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.TimeRange; @@ -84,9 +85,10 @@ private static final String RAW_ATTR = "_raw_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; - private static final byte SCAN_VERSION = (byte)2; + private static final byte SCAN_VERSION = (byte)3; private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; + private KeyValue seekTo = null; private int maxVersions = 1; private int batch = -1; // If application wants to collect scan metrics, it needs to @@ -128,6 +130,18 @@ } /** + * Create a Scan operation starting at the specified KeyValue. + * This is advanced use only. Batching should be enabled as well. + *
+ * If the specified KeyValue does not exist, the Scanner will start from the + * next closest KeyValue after the specified KeyValue. + * @param seekTo KeyValue to seek to + */ + public Scan(KeyValue seekTo) { + this.seekTo = seekTo; + } + + /** * Create a Scan operation for the range of rows specified. * @param startRow row to start scanner at or after (inclusive) * @param stopRow row to stop scanner before (exclusive) @@ -265,6 +279,10 @@ * @return this */ public Scan setStartRow(byte [] startRow) { + if (seekTo != null) { + throw new IllegalArgumentException( + "Cannot set startRow when seekTo is already defined"); + } this.startRow = startRow; return this; } @@ -276,11 +294,33 @@ * @return this */ public Scan setStopRow(byte [] stopRow) { + if (seekTo != null) { + throw new IllegalArgumentException( + "Cannot set stopRow when seekTo is already defined"); + } this.stopRow = stopRow; return this; } /** + * Set the start KeyValue of the scan. + * This is an advanced feature for intra-row scanning. + * The stopRow is automically set and cannot be altered. + * @param seekTo KeyValue to start scan on (inclusive) + * @return this + */ + public Scan setSeekTo(KeyValue seekTo) { + if (startRow != HConstants.EMPTY_START_ROW + || startRow != HConstants.EMPTY_END_ROW) { + throw new IllegalArgumentException( + "Cannot set seekTo when startRow or stopRow is already defined"); + } + this.seekTo = seekTo; + this.stopRow = seekTo.getRow(); + return this; + } + + /** * Get all available versions. * @return this */ @@ -392,6 +432,13 @@ } /** + * @return the KeyValue to seek to (null if none) + */ + public KeyValue getSeekTo() { + return seekTo; + } + + /** * @return the max number of versions to fetch */ public int getMaxVersions() { @@ -579,6 +626,15 @@ if (version > 1) { readAttributes(in); } + if (version > 2) { + seekTo = null; + int length = in.readInt(); + if (length > 0) { + byte [] buf = new byte[length]; + in.readFully(buf); + seekTo = new KeyValue(buf); + } + } } public void write(final DataOutput out) @@ -612,6 +668,12 @@ } } writeAttributes(out); + if (seekTo == null) { + out.writeInt(0); + } else { + out.writeInt(seekTo.getLength()); + out.write(seekTo.getBuffer(), seekTo.getOffset(), seekTo.getLength()); + } } /** Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1234269) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -216,7 +216,9 @@ protected ScannerCallable getScannerCallable(byte [] localStartKey, int nbRows) { - scan.setStartRow(localStartKey); + if (scan.getSeekTo() == null) { + scan.setStartRow(localStartKey); + } ScannerCallable s = new ScannerCallable(getConnection(), getTableName(), scan, this.scanMetrics); s.setCaching(nbRows); Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1234269) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -142,8 +142,9 @@ this.rowComparator = scanInfo.getComparator().getRawComparator(); this.deletes = new ScanDeleteTracker(); this.stopRow = scan.getStopRow(); - this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(), - scanInfo.getFamily()); + this.startKey = scan.getSeekTo() != null ? + scan.getSeekTo() : + KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(), scanInfo.getFamily()); this.filter = scan.getFilter(); this.earliestPutTs = earliestPutTs; this.maxReadPointToTrackVersions = readPointToUse;