Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1298701) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -200,7 +200,7 @@ // Registered region protocol handlers private ClassToInstanceMap protocolHandlers = MutableClassToInstanceMap.create(); - + private Map> protocolHandlerNames = Maps.newHashMap(); @@ -342,7 +342,7 @@ * number of operations. */ public static final ConcurrentMap> - timeVaryingMetrics = new ConcurrentHashMap>(); public static void incrNumericMetric(String key, long amount) { @@ -958,7 +958,7 @@ CompletionService> completionService = new ExecutorCompletionService>( storeCloserThreadPool); - + // close each store in parallel for (final Store store : stores.values()) { completionService @@ -2903,7 +2903,7 @@ return currentEditSeqId; } finally { status.cleanup(); - if (reader != null) { + if (reader != null) { reader.close(); } } @@ -3483,6 +3483,21 @@ KeyValueHeap getStoreHeapForTesting() { return storeHeap; } + + @Override + public synchronized boolean reseek(KeyValue kv) throws IOException { + if (kv == null) { + throw new IllegalArgumentException("Row cannot be null."); + } + startRegionOperation(); + try { + // This could be a new thread from the last time we called next(). + MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); + return this.storeHeap.reseek(kv); + } finally { + closeRegionOperation(); + } + } } // Utility methods Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (revision 1296079) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (working copy) @@ -19,8 +19,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; /** * RegionScanner describes iterators over rows in an HRegion. @@ -37,4 +40,23 @@ * further rows. */ public boolean isFilterDone(); + + /** + * Do a reseek to the required Keyvalue. This reseek will be useful in the + * coprocessor while scanning. The kv that is required to seek must be given + * explicitly for reseek. Should not be used to seek to a key which may come + * before the current position. + * Note : Recommended to use knowing how seek can be done on different kvs. + * Suggested to seek to row boundaries like start of a row or end of a row. + * Seeking to the middle of a row may lead to inconsistencies across stores. + * Check the test cases in + * TestFilter.testRegionScannerReseek + * TestFilter.testRegionScannerReseekWithDiffKVs + * + * @throws IOException + * @throws IllegalArgumentException if row is null + * + */ + public boolean reseek(KeyValue kv) throws IOException; + } Index: src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (revision 1221129) +++ src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (working copy) @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; import org.apache.commons.logging.Log; @@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; @@ -63,10 +63,24 @@ Bytes.toBytes("testRowTwo-2"), Bytes.toBytes("testRowTwo-3") }; + private static final byte [][] ROWS_THREE = { + Bytes.toBytes("testRowThree-0"), Bytes.toBytes("testRowThree-1"), + Bytes.toBytes("testRowThree-2"), Bytes.toBytes("testRowThree-3") + }; + + private static final byte [][] ROWS_FOUR = { + Bytes.toBytes("testRowFour-0"), Bytes.toBytes("testRowFour-1"), + Bytes.toBytes("testRowFour-2"), Bytes.toBytes("testRowFour-3") + }; + private static final byte [][] FAMILIES = { Bytes.toBytes("testFamilyOne"), Bytes.toBytes("testFamilyTwo") }; + private static final byte [][] FAMILIES_1 = { + Bytes.toBytes("testFamilyThree"), Bytes.toBytes("testFamilyFour") + }; + private static final byte [][] QUALIFIERS_ONE = { Bytes.toBytes("testQualifierOne-0"), Bytes.toBytes("testQualifierOne-1"), Bytes.toBytes("testQualifierOne-2"), Bytes.toBytes("testQualifierOne-3") @@ -77,10 +91,24 @@ Bytes.toBytes("testQualifierTwo-2"), Bytes.toBytes("testQualifierTwo-3") }; + private static final byte [][] QUALIFIERS_THREE = { + Bytes.toBytes("testQualifierThree-0"), Bytes.toBytes("testQualifierThree-1"), + Bytes.toBytes("testQualifierThree-2"), Bytes.toBytes("testQualifierThree-3") + }; + + private static final byte [][] QUALIFIERS_FOUR = { + Bytes.toBytes("testQualifierFour-0"), Bytes.toBytes("testQualifierFour-1"), + Bytes.toBytes("testQualifierFour-2"), Bytes.toBytes("testQualifierFour-3") + }; + private static final byte [][] VALUES = { Bytes.toBytes("testValueOne"), Bytes.toBytes("testValueTwo") }; + byte [][] NEW_FAMILIES = { + Bytes.toBytes("f1"), Bytes.toBytes("f2") + }; + private long numRows = ROWS_ONE.length + ROWS_TWO.length; private long colsPerRow = FAMILIES.length * QUALIFIERS_ONE.length; @@ -90,11 +118,16 @@ HTableDescriptor htd = new HTableDescriptor(getName()); htd.addFamily(new HColumnDescriptor(FAMILIES[0])); htd.addFamily(new HColumnDescriptor(FAMILIES[1])); + htd.addFamily(new HColumnDescriptor(FAMILIES_1[0])); + htd.addFamily(new HColumnDescriptor(FAMILIES_1[1])); + htd.addFamily(new HColumnDescriptor(NEW_FAMILIES[0])); + htd.addFamily(new HColumnDescriptor(NEW_FAMILIES[1])); + htd.addFamily(new HColumnDescriptor(FAMILIES_1[1])); HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); this.region = HRegion.createHRegion(info, this.testDir, this.conf, htd); // Insert first half - for(byte [] ROW : ROWS_ONE) { + /* for(byte [] ROW : ROWS_ONE) { Put p = new Put(ROW); p.setWriteToWAL(false); for(byte [] QUALIFIER : QUALIFIERS_ONE) { @@ -159,7 +192,7 @@ d.deleteColumns(FAMILIES[0], QUALIFIER); d.deleteColumns(FAMILIES[1], QUALIFIER); this.region.delete(d, null, false); - } + }*/ numRows -= 2; } @@ -170,6 +203,137 @@ super.tearDown(); } + + public void testRegionScannerReseek() throws Exception { + // create new rows and column family to show how reseek works.. + for (byte[] ROW : ROWS_THREE) { + Put p = new Put(ROW); + p.setWriteToWAL(true); + for (byte[] QUALIFIER : QUALIFIERS_THREE) { + p.add(FAMILIES[0], QUALIFIER, VALUES[0]); + + } + this.region.put(p); + } + for (byte[] ROW : ROWS_FOUR) { + Put p = new Put(ROW); + p.setWriteToWAL(false); + for (byte[] QUALIFIER : QUALIFIERS_FOUR) { + p.add(FAMILIES[1], QUALIFIER, VALUES[1]); + } + this.region.put(p); + } + // Flush + this.region.flushcache(); + + // Insert second half (reverse families) + for (byte[] ROW : ROWS_THREE) { + Put p = new Put(ROW); + p.setWriteToWAL(false); + for (byte[] QUALIFIER : QUALIFIERS_THREE) { + p.add(FAMILIES[1], QUALIFIER, VALUES[0]); + } + this.region.put(p); + } + for (byte[] ROW : ROWS_FOUR) { + Put p = new Put(ROW); + p.setWriteToWAL(false); + for (byte[] QUALIFIER : QUALIFIERS_FOUR) { + p.add(FAMILIES[0], QUALIFIER, VALUES[1]); + } + this.region.put(p); + } + + Scan s = new Scan(); + // set a start row + s.setStartRow(ROWS_FOUR[1]); + RegionScanner scanner = region.getScanner(s); + + KeyValue kv = KeyValue.createFirstOnRow(ROWS_THREE[1]); + // reseek to row three. + scanner.reseek(kv); + List results = new ArrayList(); + + // the results should belong to ROWS_THREE[1] + scanner.next(results); + for (KeyValue keyValue : results) { + assertEquals("The rows with ROWS_TWO as row key should be appearing.", + Bytes.toString(keyValue.getRow()), Bytes.toString(ROWS_THREE[1])); + } + kv = KeyValue.createFirstOnRow(ROWS_ONE[1]); + // again try to reseek to a value before ROWS_THRER[1] + scanner.reseek(kv); + results = new ArrayList(); + // This time no seek would have been done to ROWS_ONE[1] + scanner.next(results); + for (KeyValue keyValue : results) { + assertFalse("Cannot rewind back to a value less than previous reseek.", + Bytes.toString(keyValue.getRow()).contains("testRowOne")); + } + } + + public void testRegionScannerReseekWithDiffKVs() throws Exception { + + byte[][] NEW_ROWS = { Bytes.toBytes("Row-0"), Bytes.toBytes("Row-1"), + Bytes.toBytes("Row-2"), Bytes.toBytes("Row-3") }; + + byte[][] NEW_FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2") }; + + byte[][] NEW_QUALIFIERS_ONE = { Bytes.toBytes("q-0"), Bytes.toBytes("q-1"), + Bytes.toBytes("q-2"), Bytes.toBytes("q-3") }; + + byte[][] NEW_QUALIFIERS_TWO = { Bytes.toBytes("q1-0"), + Bytes.toBytes("q1-1"), Bytes.toBytes("q1-2"), Bytes.toBytes("q1-3") }; + + for (byte[] ROW : NEW_ROWS) { + Put p = new Put(ROW); + p.setWriteToWAL(false); + for (byte[] QUALIFIER : NEW_QUALIFIERS_ONE) { + p.add(NEW_FAMILIES[0], QUALIFIER, VALUES[0]); + } + for (byte[] QUALIFIER : NEW_QUALIFIERS_TWO) { + p.add(NEW_FAMILIES[1], QUALIFIER, VALUES[1]); + } + this.region.put(p); + } + + this.region.flushcache(); + + // create a scanner for scanning from Row-0 + Scan scan = new Scan(); + scan.setStartRow(NEW_ROWS[0]); + + RegionScanner scanner = this.region.getScanner(scan); + // reseek to a kv with row and family mentioned. + scanner.reseek(new KeyValue(NEW_ROWS[2], NEW_FAMILIES[1], null)); + List results = new ArrayList(); + scanner.next(results); + for (KeyValue keyValue : results) { + assertEquals("The row should be Row-2.", + Bytes.toString(keyValue.getRow()), "Row-2"); + assertEquals("The family should be f2.", + Bytes.toString(keyValue.getFamily()), "f2"); + } + + scanner.reseek(new KeyValue(NEW_ROWS[2], NEW_FAMILIES[0], null)); + boolean next = scanner.next(results); + assertFalse( + "The seek has already moved to family f2. So should not be able to seek to f1.", + next); + + // reseek to a position where the family is null. + // here the row seeked will be one will be the next row Row-3 + // as the qualifier here is q1-2 + // the KV taken here is + // Row-2/q1-2/LATEST_TIMESTAMP/Maximum/vlen=0 + scanner.reseek(new KeyValue(NEW_ROWS[2], null, NEW_QUALIFIERS_TWO[2])); + results = new ArrayList(); + scanner.next(results); + for (KeyValue keyValue : results) { + assertEquals(Bytes.toString(keyValue.getRow()), "Row-3"); + } + } + public void testNoFilter() throws Exception { // No filter long expectedRows = this.numRows; @@ -608,7 +772,7 @@ verifyScanNoEarlyOut(s, expectedRows, expectedKeys); // Match all columns in second family - // look only in second group of rows + // look only in second group of rows expectedRows = this.numRows / 2; expectedKeys = this.colsPerRow / 2; f = new FamilyFilter(CompareOp.GREATER, @@ -1345,7 +1509,7 @@ assertFalse("Should not have returned whole value", Bytes.equals(kv.getValue(), kvs[idx].getValue())); if (useLen) { - assertEquals("Value in result is not SIZEOF_INT", + assertEquals("Value in result is not SIZEOF_INT", kv.getValue().length, Bytes.SIZEOF_INT); LOG.info("idx = " + idx + ", len=" + kvs[idx].getValueLength() + ", actual=" + Bytes.toInt(kv.getValue())); @@ -1353,7 +1517,7 @@ kvs[idx].getValueLength(), Bytes.toInt(kv.getValue()) ); LOG.info("good"); } else { - assertEquals("Value in result is not empty", + assertEquals("Value in result is not empty", kv.getValue().length, 0); } idx++;