Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1234452) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -4375,6 +4375,33 @@ } + @Test + public void testScanSeekTo() throws Exception { + byte [] TABLENAME = Bytes.toBytes("testScanSeekTo"); + + HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY); + Put p = new Put(ROW); + for (int i=0; i<1000; i++) { + p.add(FAMILY, Bytes.toBytes(String.format("%03d", i)), VALUE); + } + t.put(p); + Scan s = new Scan(KeyValue.createFirstOnRow(ROW, FAMILY, Bytes.toBytes("500"))); + s.setBatch(2); + ResultScanner scanner = t.getScanner(s); + KeyValue[] kvs = scanner.next().raw(); + assertEquals(2, kvs.length); + assertEquals("500", Bytes.toString(kvs[0].getQualifier())); + assertEquals("501", Bytes.toString(kvs[1].getQualifier())); + + s = new Scan(KeyValue.createFirstOnRow(ROW, FAMILY, Bytes.toBytes("8990"))); + s.setBatch(2); + scanner = t.getScanner(s); + kvs = scanner.next().raw(); + assertEquals(2, kvs.length); + assertEquals("900", Bytes.toString(kvs[0].getQualifier())); + assertEquals("901", Bytes.toString(kvs[1].getQualifier())); + } + /** * Test ScanMetrics * @throws Exception Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1234452) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -22,12 +22,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableUtils; import java.io.DataInput; import java.io.DataOutput; @@ -84,9 +86,10 @@ private static final String RAW_ATTR = "_raw_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; - private static final byte SCAN_VERSION = (byte)2; + private static final byte SCAN_VERSION = (byte)3; private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; + private KeyValue seekTo = null; private int maxVersions = 1; private int batch = -1; // If application wants to collect scan metrics, it needs to @@ -128,6 +131,21 @@ } /** + * Create a Scan operation starting at the specified KeyValue. + * This is advanced use only. Batching should be enabled as well. + *

+ * If the specified KeyValue does not exist, the Scanner will start from the + * next closest KeyValue after the specified KeyValue. + * @param seekTo KeyValue to seek to + */ + public Scan(KeyValue seekTo) { + this.seekTo = seekTo; + this.startRow = seekTo.getRow(); + this.stopRow = startRow; + + } + + /** * Create a Scan operation for the range of rows specified. * @param startRow row to start scanner at or after (inclusive) * @param stopRow row to stop scanner before (exclusive) @@ -265,6 +283,10 @@ * @return this */ public Scan setStartRow(byte [] startRow) { + if (seekTo != null && !seekTo.matchingRow(startRow)) { + throw new IllegalArgumentException( + "Cannot change startRow when seekTo is already defined"); + } this.startRow = startRow; return this; } @@ -276,11 +298,37 @@ * @return this */ public Scan setStopRow(byte [] stopRow) { + if (seekTo != null) { + throw new IllegalArgumentException( + "Cannot change stopRow when seekTo is already defined"); + } this.stopRow = stopRow; return this; } /** + * Set the start KeyValue of the scan. + * Using a KeyValue give exact control over where to start scanning. + * The KeyValue can be any type include a delete marker. + * This is an advanced feature for intra-row scanning. + * The stopRow is automically set and cannot be altered. + * @param seekTo KeyValue to start scan on (inclusive) + * @return this + */ + public Scan setSeekTo(KeyValue seekTo) { + if (startRow != null && startRow.length > 0 || stopRow != null + && stopRow.length > 0) { + throw new IllegalArgumentException( + "Cannot set seekTo when startRow or stopRow is already defined"); + } + assert seekTo.getValueLength() == 0; // do not specify a value + this.seekTo = seekTo; + this.startRow = seekTo.getRow(); + this.stopRow = startRow; + return this; + } + + /** * Get all available versions. * @return this */ @@ -392,6 +440,13 @@ } /** + * @return the KeyValue to seek to (null if none) + */ + public KeyValue getSeekTo() { + return seekTo; + } + + /** * @return the max number of versions to fetch */ public int getMaxVersions() { @@ -579,6 +634,15 @@ if (version > 1) { readAttributes(in); } + if (version > 2) { + seekTo = null; + int len = WritableUtils.readVInt(in); + if (len > 0) { + byte [] buf = new byte[len]; + in.readFully(buf); + seekTo = new KeyValue(buf); + } + } } public void write(final DataOutput out) @@ -612,6 +676,12 @@ } } writeAttributes(out); + if (seekTo == null) { + WritableUtils.writeVInt(out, 0); + } else { + WritableUtils.writeVInt(out, seekTo.getLength()); + out.write(seekTo.getBuffer(), seekTo.getOffset(), seekTo.getLength()); + } } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1234452) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -142,8 +142,18 @@ this.rowComparator = scanInfo.getComparator().getRawComparator(); this.deletes = new ScanDeleteTracker(); this.stopRow = scan.getStopRow(); - this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(), - scanInfo.getFamily()); + // during intra-row scan store's family is past the seek point + assert scan.getSeekTo() == null + || Bytes.compareTo(scan.getSeekTo().getFamily(), scanInfo.getFamily()) <= 0; + if (scan.getSeekTo() != null + && scan.getSeekTo().matchingFamily(scanInfo.getFamily())) { + // if intra-row is for this family use the seek to key + this.startKey = scan.getSeekTo(); + } else { + // otherwise use first key for row (in this store) + this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(), + scanInfo.getFamily()); + } this.filter = scan.getFilter(); this.earliestPutTs = earliestPutTs; this.maxReadPointToTrackVersions = readPointToUse; Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1234452) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3290,11 +3290,15 @@ scanners.addAll(additionalScanners); } + byte[] seekFamily = scan.getSeekTo() == null ? null : scan.getSeekTo().getFamily(); for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - StoreScanner scanner = store.getScanner(scan, entry.getValue()); - scanners.add(scanner); + // if we're seeking inside a row, only include families past the seek point + if (seekFamily == null || Bytes.compareTo(seekFamily, store.getFamily().getName()) <= 0) { + StoreScanner scanner = store.getScanner(scan, entry.getValue()); + scanners.add(scanner); + } } this.storeHeap = new KeyValueHeap(scanners, comparator); }