Index: src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java (revision 1517092) +++ src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java (working copy) @@ -69,6 +69,8 @@ public static int NUM_COLS_TO_CHECK = 50; + public static Random RANDOM = new Random(); + // when run as main private Configuration conf; @@ -193,16 +195,21 @@ HTable table; AtomicLong numScans = new AtomicLong(); AtomicLong numRowsScanned = new AtomicLong(); + boolean isReverseScan; - public AtomicScanReader(TestContext ctx, - byte targetFamilies[][]) throws IOException { + public AtomicScanReader(TestContext ctx, byte targetFamilies[][], boolean isReverseScan) + throws IOException { super(ctx); this.targetFamilies = targetFamilies; table = new HTable(ctx.getConf(), TABLE_NAME); + this.isReverseScan = isReverseScan; } public void doAnAction() throws Exception { Scan s = new Scan(); + if (isReverseScan) { + s.setReverse(true); + } for (byte[] family : targetFamilies) { s.addFamily(family); } @@ -301,7 +308,7 @@ List scanners = Lists.newArrayList(); for (int i = 0; i < numScanners; i++) { - AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES); + AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, RANDOM.nextBoolean()); scanners.add(scanner); ctx.addThread(scanner); } Index: src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (revision 1517092) +++ src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (working copy) @@ -122,6 +122,7 @@ private boolean flushCommits = true; private boolean writeToWAL = true; private int presplitRegions = 0; + private static boolean reverse = false; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); /** @@ -849,6 +850,9 @@ Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setFilter(new WhileMatchFilter(new PageFilter(120))); + if (reverse) { + scan.setReverse(true); + } ResultScanner s = this.table.getScanner(scan); //int count = 0; for (Result rr = null; (rr = s.next()) != null;) { @@ -874,7 +878,12 @@ @Override void testRow(final int i) throws IOException { Pair startAndStopRow = getStartAndStopRow(); - Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); + Scan scan; + if (reverse) { + scan = new Scan(startAndStopRow.getSecond(), startAndStopRow.getFirst()); + } else { + scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); + } scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); ResultScanner s = this.table.getScanner(scan); int count = 0; @@ -1010,7 +1019,13 @@ @Override void testRow(final int i) throws IOException { if (this.testScanner == null) { - Scan scan = new Scan(format(this.startRow)); + Scan scan; + if (reverse) { + scan = new Scan(format(this.startRow + this.perClientRunRows - 1)); + scan.setReverse(true); + } else { + scan = new Scan(format(this.startRow)); + } scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); this.testScanner = table.getScanner(scan); } @@ -1078,6 +1093,9 @@ Scan scan = new Scan(); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setFilter(filter); + if (reverse) { + scan.setReverse(true); + } return scan; } } @@ -1222,6 +1240,7 @@ System.err.println(" flushCommits Used to determine if the test should flush the table. Default: false"); System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); System.err.println(" presplit Create presplit table. Recommended for accurate perf analysis (see guide). Default: disabled"); + System.err.println(" reverse Set scan direction to backward"); System.err.println(); System.err.println("Command:"); for (CmdDescriptor command : commands.values()) { @@ -1304,6 +1323,12 @@ continue; } + final String reverseStr = "--reverse"; + if (cmd.startsWith(reverseStr)) { + reverse = true; + continue; + } + Class cmdClass = determineCommandClass(cmd); if (cmdClass != null) { getArgs(i + 1, args); Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1517092) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -47,7 +47,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.ArrayUtils; @@ -111,10 +110,10 @@ public class TestFromClientSide { final Log LOG = LogFactory.getLog(getClass()); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static byte [] ROW = Bytes.toBytes("testRow"); - private static byte [] FAMILY = Bytes.toBytes("testFamily"); - private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); - private static byte [] VALUE = Bytes.toBytes("testValue"); + private static final byte [] ROW = Bytes.toBytes("testRow"); + private static final byte [] FAMILY = Bytes.toBytes("testFamily"); + private static final byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte [] VALUE = Bytes.toBytes("testValue"); protected static int SLAVES = 3; /** @@ -679,6 +678,50 @@ } @Test + public void testSuperSimpleWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testSuperSimpleWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + Put put = new Put(Bytes.toBytes("0-b11111-0000000000000000000")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000002")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000004")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000006")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000008")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000001")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000003")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000005")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000007")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000009")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + ht.flushCommits(); + Scan scan = new Scan(Bytes.toBytes("0-b11111-9223372036854775807"), + Bytes.toBytes("0-b11111-0000000000000000000")); + scan.setReverse(true); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertTrue(Bytes.equals(result.getRow(), Bytes.toBytes("0-b11111-0000000000000000008"))); + scanner.close(); + } + + @Test public void testMaxKeyValueSize() throws Exception { byte [] TABLE = Bytes.toBytes("testMaxKeyValueSize"); Configuration conf = TEST_UTIL.getConfiguration(); @@ -737,6 +780,43 @@ } @Test + public void testFiltersWithReverseScan() throws Exception { + byte [] TABLE = Bytes.toBytes("testFiltersWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte [][] ROWS = makeN(ROW, 10); + byte [][] QUALIFIERS = { + Bytes.toBytes("col0--"), Bytes.toBytes("col1--"), + Bytes.toBytes("col2--"), Bytes.toBytes("col3--"), + Bytes.toBytes("col4--"), Bytes.toBytes("col5--"), + Bytes.toBytes("col6--"), Bytes.toBytes("col7--"), + Bytes.toBytes("col8--"), Bytes.toBytes("col9--") + }; + for(int i=0;i<10;i++) { + Put put = new Put(ROWS[i]); + put.setWriteToWAL(false); + put.add(FAMILY, QUALIFIERS[i], VALUE); + ht.put(put); + } + Scan scan = new Scan(); + scan.setReverse(true); + scan.addFamily(FAMILY); + Filter filter = new QualifierFilter(CompareOp.EQUAL, + new RegexStringComparator("col[1-5]")); + scan.setFilter(filter); + ResultScanner scanner = ht.getScanner(scan); + int expectedIndex = 5; + for(Result result : scanner) { + assertEquals(result.size(), 1); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[expectedIndex])); + assertTrue(Bytes.equals(result.raw()[0].getQualifier(), + QUALIFIERS[expectedIndex])); + expectedIndex--; + } + assertEquals(expectedIndex, 0); + scanner.close(); + } + + @Test public void testKeyOnlyFilter() throws Exception { byte [] TABLE = Bytes.toBytes("testKeyOnlyFilter"); HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); @@ -770,6 +850,41 @@ scanner.close(); } + @Test + public void testKeyOnlyFilterWithReverseScan() throws Exception { + byte [] TABLE = Bytes.toBytes("testKeyOnlyFilterWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte [][] ROWS = makeN(ROW, 10); + byte [][] QUALIFIERS = { + Bytes.toBytes("col0--"), Bytes.toBytes("col1--"), + Bytes.toBytes("col2--"), Bytes.toBytes("col3--"), + Bytes.toBytes("col4--"), Bytes.toBytes("col5--"), + Bytes.toBytes("col6--"), Bytes.toBytes("col7--"), + Bytes.toBytes("col8--"), Bytes.toBytes("col9--") + }; + for(int i=0;i<10;i++) { + Put put = new Put(ROWS[i]); + put.setWriteToWAL(false); + put.add(FAMILY, QUALIFIERS[i], VALUE); + ht.put(put); + } + Scan scan = new Scan(); + scan.setReverse(true); + scan.addFamily(FAMILY); + Filter filter = new KeyOnlyFilter(true); + scan.setFilter(filter); + ResultScanner scanner = ht.getScanner(scan); + int count = 0; + for(Result result : ht.getScanner(scan)) { + assertEquals(result.size(), 1); + assertEquals(result.raw()[0].getValueLength(), Bytes.SIZEOF_INT); + assertEquals(Bytes.toInt(result.raw()[0].getValue()), VALUE.length); + count++; + } + assertEquals(count, 10); + scanner.close(); + } + /** * Test simple table and non-existent row cases. */ @@ -881,6 +996,75 @@ } /** + * Test simple table and non-existent row cases. + */ + @Test + public void testSimpleMissingWithReverseScan() throws Exception { + byte [] TABLE = Bytes.toBytes("testSimpleMissingWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte [][] ROWS = makeN(ROW, 4); + + // Try to get a row on an empty table + Scan scan = new Scan(); + scan.setReverse(true); + Result result = getSingleScanResult(ht, scan); + assertNullResult(result); + + + scan = new Scan(ROWS[0]); + scan.setReverse(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[0],ROWS[1]); + scan.setReverse(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.setReverse(true); + scan.addFamily(FAMILY); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.setReverse(true); + scan.addColumn(FAMILY, QUALIFIER); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Insert a row + + Put put = new Put(ROWS[2]); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + + // Make sure we can scan the row + + scan = new Scan(); + scan.setReverse(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[3],ROWS[0]); + scan.setReverse(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[2],ROWS[1]); + scan.setReverse(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + // Try to scan empty rows around it + // Introduced MemStore#shouldSeekForReverseScan to fix the following + scan = new Scan(ROWS[1]); + scan.setReverse(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + } + + /** * Test basic puts, gets, scans, and deletes for a single row * in a multiple family table. */ @@ -1294,6 +1478,42 @@ } @Test + public void testNullWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testNullWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + // Null qualifier (should work) + Put put = new Put(ROW); + put.add(FAMILY, null, VALUE); + ht.put(put); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + Delete delete = new Delete(ROW); + delete.deleteColumns(FAMILY, null); + ht.delete(delete); + // Use a new table + byte[] TABLE2 = Bytes.toBytes("testNull2WithReverseScan"); + ht = TEST_UTIL.createTable(TABLE2, FAMILY); + // Empty qualifier, byte[0] instead of null (should work) + put = new Put(ROW); + put.add(FAMILY, HConstants.EMPTY_BYTE_ARRAY, VALUE); + ht.put(put); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + TEST_UTIL.flush(); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + delete = new Delete(ROW); + delete.deleteColumns(FAMILY, HConstants.EMPTY_BYTE_ARRAY); + ht.delete(delete); + // Null value + put = new Put(ROW); + put.add(FAMILY, QUALIFIER, null); + ht.put(put); + Scan scan = new Scan(); + scan.setReverse(true); + scan.addColumn(FAMILY, QUALIFIER); + Result result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROW, FAMILY, QUALIFIER, null); + } + + @Test public void testVersions() throws Exception { byte [] TABLE = Bytes.toBytes("testVersions"); @@ -2003,6 +2223,180 @@ } } + @Test + public void testDeletesWithReverseScan() throws Exception { + byte [] TABLE = Bytes.toBytes("testDeletesWithReverseScan"); + byte [][] ROWS = makeNAscii(ROW, 6); + byte [][] FAMILIES = makeNAscii(FAMILY, 3); + byte [][] VALUES = makeN(VALUE, 5); + long [] ts = {1000, 2000, 3000, 4000, 5000}; + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES); + + Put put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[0], QUALIFIER, ts[1], VALUES[1]); + ht.put(put); + + Delete delete = new Delete(ROW); + delete.deleteFamily(FAMILIES[0], ts[0]); + ht.delete(delete); + + Scan scan = new Scan(ROW); + scan.setReverse(true); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + Result result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1] }, + new byte[][] { VALUES[1] }, 0, 0); + + // Test delete latest version + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); + put.add(FAMILIES[0], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[0], QUALIFIER, ts[3], VALUES[3]); + put.add(FAMILIES[0], null, ts[4], VALUES[4]); + put.add(FAMILIES[0], null, ts[2], VALUES[2]); + put.add(FAMILIES[0], null, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROW); + delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4] + ht.delete(delete); + + scan = new Scan(ROW); + scan.setReverse(true); + scan.addColumn(FAMILIES[0], QUALIFIER); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1], ts[2], ts[3] }, + new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2); + + // Test for HBASE-1847 + delete = new Delete(ROW); + delete.deleteColumn(FAMILIES[0], null); + ht.delete(delete); + + // Cleanup null qualifier + delete = new Delete(ROW); + delete.deleteColumns(FAMILIES[0], null); + ht.delete(delete); + + // Expected client behavior might be that you can re-put deleted values + // But alas, this is not to be. We can't put them back in either case. + + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000 + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000 + ht.put(put); + + // The Scanner returns the previous values, the expected-naive-unexpected behavior + + scan = new Scan(ROW); + scan.setReverse(true); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1], ts[2], ts[3] }, + new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2); + + // Test deleting an entire family from one row but not the other various ways + + put = new Put(ROWS[0]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[1]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[2]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROWS[0]); + delete.deleteFamily(FAMILIES[2]); + ht.delete(delete); + + delete = new Delete(ROWS[1]); + delete.deleteColumns(FAMILIES[1], QUALIFIER); + ht.delete(delete); + + delete = new Delete(ROWS[2]); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[2], QUALIFIER); + ht.delete(delete); + + scan = new Scan(ROWS[0]); + scan.setReverse(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), result.size() == 2); + assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, new long[] { ts[0], ts[1] }, + new byte[][] { VALUES[0], VALUES[1] }, 0, 1); + + scan = new Scan(ROWS[1]); + scan.setReverse(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), result.size() == 2); + + scan = new Scan(ROWS[2]); + scan.setReverse(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertEquals(1, result.size()); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, new long[] { ts[2] }, + new byte[][] { VALUES[2] }, 0, 0); + + // Test if we delete the family first in one row (HBASE-1541) + + delete = new Delete(ROWS[3]); + delete.deleteFamily(FAMILIES[1]); + ht.delete(delete); + + put = new Put(ROWS[3]); + put.add(FAMILIES[2], QUALIFIER, VALUES[0]); + ht.put(put); + + put = new Put(ROWS[4]); + put.add(FAMILIES[1], QUALIFIER, VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, VALUES[2]); + ht.put(put); + + scan = new Scan(ROWS[4]); + scan.setReverse(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = ht.getScanner(scan); + result = scanner.next(); + assertTrue("Expected 2 keys but received " + result.size(), result.size() == 2); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.raw()[1].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[1])); + assertTrue(Bytes.equals(result.raw()[1].getValue(), VALUES[2])); + result = scanner.next(); + assertTrue("Expected 1 key but received " + result.size(), result.size() == 1); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[3])); + assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[0])); + scanner.close(); + } /* * Baseline "scalability" test. * @@ -2508,23 +2902,31 @@ private void scanTestNull(HTable ht, byte [] row, byte [] family, byte [] value) throws Exception { + scanTestNull(ht, row, family, value, false); + } + private void scanTestNull(HTable ht, byte [] row, byte [] family, byte[] value, + boolean isReverseScan) throws Exception { Scan scan = new Scan(); + scan.setReverse(isReverseScan); scan.addColumn(family, null); Result result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReverse(isReverseScan); scan.addColumn(family, HConstants.EMPTY_BYTE_ARRAY); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReverse(isReverseScan); scan.addFamily(family); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReverse(isReverseScan); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); @@ -4621,7 +5023,7 @@ */ @Test public void testCacheOnWriteEvictOnClose() throws Exception { - byte [] tableName = Bytes.toBytes("testCOWEOCfromClient"); + byte [] tableName = Bytes.toBytes("testCacheOnWriteEvictOnClose"); byte [] data = Bytes.toBytes("data"); HTable table = TEST_UTIL.createTable(tableName, new byte [][] {FAMILY}); // get the block cache and region Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1517092) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -121,7 +121,7 @@ private final int MAX_VERSIONS = 2; // Test names - protected final byte[] tableName = Bytes.toBytes("testtable");; + protected final byte[] tableName = Bytes.toBytes("testtable"); protected final byte[] qual1 = Bytes.toBytes("qual1"); protected final byte[] qual2 = Bytes.toBytes("qual2"); protected final byte[] qual3 = Bytes.toBytes("qual3"); @@ -155,6 +155,531 @@ // /tmp/testtable ////////////////////////////////////////////////////////////////////////////// + public void testReverseScanner_FromMemStore_SingleCF_Normal() throws IOException { + byte [] rowC = Bytes.toBytes("rowC"); + byte [] rowA = Bytes.toBytes("rowA"); + byte [] rowB = Bytes.toBytes("rowB"); + byte [] cf = Bytes.toBytes("CF"); + byte [][] families = {cf}; + byte [] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts+1, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + + Scan scan = new Scan(rowC); + scan.setMaxVersions(5); + scan.setReverse(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_LargerKey() throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + + Scan scan = new Scan(rowD); + List currRow = new ArrayList(); + scan.setReverse(true); + scan.setMaxVersions(5); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_FullScan() throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + Scan scan = new Scan(); + List currRow = new ArrayList(); + scan.setReverse(true); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_moreRowsMayExistAfter() throws IOException { + //case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReverse(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReverse(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_smaller_blocksize() throws IOException { + //case to ensure no conflict with HFile index optimization + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration config = new HBaseConfiguration(); + config.setInt("test.block.size", 1); + this.region = initHRegion(tableName, method, config, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReverse(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReverse(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1() throws IOException { + byte [] row0 = Bytes.toBytes("row0"); // 1 kv + byte [] row1 = Bytes.toBytes("row1"); // 2 kv + byte [] row2 = Bytes.toBytes("row2"); // 4 kv + byte [] row3 = Bytes.toBytes("row3"); // 2 kv + byte [] row4 = Bytes.toBytes("row4"); // 5 kv + byte [] row5 = Bytes.toBytes("row5"); // 2 kv + byte [] cf1 = Bytes.toBytes("CF1"); + byte [] cf2 = Bytes.toBytes("CF2"); + byte [] cf3 = Bytes.toBytes("CF3"); + byte [][] families = {cf1, cf2, cf3}; + byte [] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + // disable compactions in this test. + conf.setInt("hbase.hstore.compactionThreshold", 10000); + this.region = initHRegion(tableName, method, conf, families); + try { + //kv naming style: kv(row number) + totalKvCountInThisRow + seq no + KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts+1, KeyValue.Type.Put, null); + KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put, null); + KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts+4, KeyValue.Type.Put, null); + KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts+4, KeyValue.Type.Put, null); + KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put, null); + KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts+5, KeyValue.Type.Put, null); + KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts+3, KeyValue.Type.Put, null); + KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put, null); + // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv) + Put put = null; + put = new Put(row1); + put.add(kv1_2_1); + region.put(put); + put = new Put(row2); + put.add(kv2_4_1); + region.put(put); + put = new Put(row4); + put.add(kv4_5_4); + put.add(kv4_5_5); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv) + put = new Put(row4); + put.add(kv4_5_1); + put.add(kv4_5_3); + region.put(put); + put = new Put(row1); + put.add(kv1_2_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_4); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv) + put = new Put(row4); + put.add(kv4_5_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_2); + put.add(kv2_4_3); + region.put(put); + put = new Put(row3); + put.add(kv3_2_2); + region.put(put); + region.flushcache(); + // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max) ( 2 kv) + put = new Put(row0); + put.add(kv0_1_1); + region.put(put); + put = new Put(row3); + put.add(kv3_2_1); + region.put(put); + put = new Put(row5); + put.add(kv5_2_1); + put.add(kv5_2_2); + region.put(put); + // scan range = ["row4", min), skip the max "row5" + Scan scan = new Scan(row4); + scan.setMaxVersions(5); + scan.setBatch(3); + scan.setReverse(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = false; + // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not included in scan range + // "row4" takes 2 next() calls since batch=3 + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + // 2. scan out "row3" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row3)); + assertTrue(hasNext); + // 3. scan out "row2" (4 kvs) + // "row2" takes 2 next() calls since batch=3 + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + // 4. scan out "row1" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row1)); + assertTrue(hasNext); + // 5. scan out "row0" (1 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row0)); + assertFalse(hasNext); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2() throws IOException { + byte [] row1 = Bytes.toBytes("row1"); + byte [] row2 = Bytes.toBytes("row2"); + byte [] row3 = Bytes.toBytes("row3"); + byte [] row4 = Bytes.toBytes("row4"); + byte [] cf1 = Bytes.toBytes("CF1"); + byte [] cf2 = Bytes.toBytes("CF2"); + byte [] cf3 = Bytes.toBytes("CF3"); + byte [] cf4 = Bytes.toBytes("CF4"); + byte [][] families = {cf1, cf2, cf3, cf4}; + byte [] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + // disable compactions in this test. + conf.setInt("hbase.hstore.compactionThreshold", 10000); + this.region = initHRegion(tableName, method, conf, families); + try { + KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null); + KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null); + // storefile1 + Put put = new Put(row1); + put.add(kv1); + region.put(put); + region.flushcache(); + // storefile2 + put = new Put(row2); + put.add(kv2); + region.put(put); + region.flushcache(); + // storefile3 + put = new Put(row3); + put.add(kv3); + region.put(put); + region.flushcache(); + // memstore + put = new Put(row4); + put.add(kv4); + region.put(put); + // scan range = ["row4", min) + Scan scan = new Scan(row4); + scan.setReverse(true); + scan.setBatch(10); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row3)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row1)); + assertFalse(hasNext); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + public void testCompactionAffectedByScanners() throws Exception { String method = "testCompactionAffectedByScanners"; byte[] tableName = Bytes.toBytes(method); @@ -4287,13 +4812,20 @@ * @throws IOException * @return A region on which you must call {@link HRegion#closeHRegion(HRegion)} when done. */ + private static HRegion initHRegion (byte [] tableName, String callingMethod, + byte[] ... families) + throws IOException { + return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); + } + private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); htd.setReadOnly(isReadOnly); for(byte [] family : families) { - htd.addFamily(new HColumnDescriptor(family)); + int blockSize = conf.getInt("test.block.size", 65536); + htd.addFamily(new HColumnDescriptor(family).setBlocksize(blockSize)); } HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false); Path path = new Path(DIR + callingMethod); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 1517092) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -154,6 +154,45 @@ } /** + * Test that on a major compaction, if all cells are expired or deleted, then + * we'll end up with no product. Make sure scanner over region returns + * right answer in this case - and that it just basically works. + * @throws IOException + */ + public void testMajorCompactingToNoOutputWithReverseScan() throws IOException { + createStoreFile(r); + for (int i = 0; i < compactionThreshold; i++) { + createStoreFile(r); + } + // Now delete everything. + Scan scan = new Scan(); + scan.setReverse(true); + InternalScanner s = r.getScanner(scan); + do { + List results = new ArrayList(); + boolean result = s.next(results); + r.delete(new Delete(results.get(0).getRow()), null, false); + if (!result) break; + } while(true); + s.close(); + // Flush + r.flushcache(); + // Major compact. + r.compactStores(true); + scan = new Scan(); + scan.setReverse(true); + s = r.getScanner(scan); + int counter = 0; + do { + List results = new ArrayList(); + boolean result = s.next(results); + if (!result) break; + counter++; + } while(true); + assertEquals(0, counter); + } + + /** * Run compaction and flushing memstore * Assert deletes get cleaned up. * @throws Exception Index: src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (revision 1517092) +++ src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (working copy) @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import junit.framework.Assert; @@ -298,6 +299,16 @@ verifyScan(s, expectedRows, expectedKeys); } + public void testPrefixFilterWithReverseScan() throws Exception { + // Grab rows from group one (half of total) + long expectedRows = this.numRows / 2; + long expectedKeys = this.colsPerRow; + Scan s = new Scan(); + s.setReverse(true); + s.setFilter(new PrefixFilter(Bytes.toBytes("testRowOne"))); + verifyScan(s, expectedRows, expectedKeys); + } + public void testPageFilter() throws Exception { // KVs in first 6 rows @@ -384,6 +395,86 @@ } + public void testPageFilterWithReverseScan() throws Exception { + // KVs in first 6 rows + KeyValue [] expectedKVs = { + // testRowOne-0 + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowOne-2 + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowOne-3 + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowTwo-0 + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-2 + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-3 + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]) + }; + + // Grab all 6 rows + long expectedRows = 6; + long expectedKeys = this.colsPerRow; + Scan s = new Scan(); + s.setReverse(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first 4 rows (6 cols per row) + expectedRows = 4; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReverse(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first 2 rows + expectedRows = 2; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReverse(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first row + expectedRows = 1; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReverse(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + } + /** * Tests the the {@link WhileMatchFilter} works in combination with a * {@link Filter} that uses the @@ -416,6 +507,30 @@ Assert.assertEquals("The page filter returned more rows than expected", pageSize, scannerCounter); } + public void testWhileMatchFilterWithFilterRowWithReverseScan() throws Exception { + final int pageSize = 4; + + Scan s = new Scan(); + s.setReverse(true); + WhileMatchFilter filter = new WhileMatchFilter(new PageFilter(pageSize)); + s.setFilter(filter); + + InternalScanner scanner = this.region.getScanner(s); + int scannerCounter = 0; + while (true) { + boolean isMoreResults = scanner.next(new ArrayList()); + scannerCounter++; + + if (scannerCounter >= pageSize) { + Assert.assertTrue("The WhileMatchFilter should now filter all remaining", filter.filterAllRemaining()); + } + if (!isMoreResults) { + break; + } + } + Assert.assertEquals("The page filter returned more rows than expected", pageSize, scannerCounter); + } + /** * Tests the the {@link WhileMatchFilter} works in combination with a * {@link Filter} that uses the @@ -444,6 +559,27 @@ } } + public void testWhileMatchFilterWithFilterRowKeyWithReverseScan() throws Exception { + Scan s = new Scan(); + String prefix = "testRowOne"; + WhileMatchFilter filter = new WhileMatchFilter(new PrefixFilter(Bytes.toBytes(prefix))); + s.setFilter(filter); + s.setReverse(true); + + InternalScanner scanner = this.region.getScanner(s); + while (true) { + ArrayList values = new ArrayList(); + boolean isMoreResults = scanner.next(values); + if (!isMoreResults || !Bytes.toString(values.get(0).getRow()).startsWith(prefix)) { + Assert.assertTrue("The WhileMatchFilter should now filter all remaining", + filter.filterAllRemaining()); + } + if (!isMoreResults) { + break; + } + } + } + /** * Tests the the {@link WhileMatchFilter} works in combination with a * {@link Filter} that uses the Index: src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (working copy) @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.util; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -126,4 +127,34 @@ public void close() { // do nothing } + + @Override + public boolean seekBeforeRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public byte[] getMaxRow() { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekBefore(KeyValue kv) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public byte[] getLastRow() { + throw new UnsupportedOperationException("Not implemented"); + } } Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -81,6 +81,7 @@ */ public class Scan extends OperationWithAttributes implements Writable { private static final String RAW_ATTR = "_raw_"; + private static final String REVERSE_ATTR = "_rev_"; private static final String ONDEMAND_ATTR = "_ondemand_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; @@ -335,6 +336,9 @@ */ public Scan setFilter(Filter filter) { this.filter = filter; + if (filter != null) { + this.filter.setReverse(getReverse() && !isGetScan()); + } return this; } @@ -463,6 +467,29 @@ } /** + * Set whether this scan is a reverse one + *

+ * This is false by default which means forward(normal) scan. + * + * @param reverse if true, scan will be reverse order + */ + public void setReverse(boolean reverse) { + setAttribute(REVERSE_ATTR, Bytes.toBytes(reverse)); + if (this.filter != null) { + this.filter.setReverse(reverse && !isGetScan()); + } + } + + /** + * Get whether this scan is a reverse one. + * @return true if reverse scan, false if forward(default) scan + */ + public boolean getReverse() { + byte[] attr = getAttribute(REVERSE_ATTR); + return attr != null ? Bytes.toBoolean(attr) : false; + } + + /** * Set the value indicating whether loading CFs on demand should be allowed (cluster * default is false). On-demand CF loading doesn't load column families until necessary, e.g. * if you filter on one column, the other column family data will be loaded only for the rows Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -125,6 +125,8 @@ private final boolean isUserScan; + private final boolean isReverse; + /** * Construct a QueryMatcher for a scan * @param scan @@ -148,6 +150,7 @@ this.earliestPutTs = earliestPutTs; this.maxReadPointToTrackVersions = readPointToUse; this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes(); + this.isReverse = scan.getReverse(); /* how to deal with deletes */ this.isUserScan = scanType == ScanType.USER_SCAN; @@ -379,15 +382,25 @@ } public boolean moreRowsMayExistAfter(KeyValue kv) { - if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) && - rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(), - kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) { + if (isReverse) { + if (!Bytes.equals(stopRow, HConstants.EMPTY_END_ROW) + && rowComparator.compareRows(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), + stopRow, 0, stopRow.length) <= 0) { + // KV <= STOPROW + // then NO there is nothing left. + return false; + } + return true; + } + // forward scan + if (!Bytes.equals(stopRow, HConstants.EMPTY_END_ROW) + && rowComparator.compareRows(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), stopRow, + 0, stopRow.length) >= 0) { // KV >= STOPROW // then NO there is nothing left. return false; - } else { - return true; } + return true; } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.util.Bytes; /** * Implements a heap merge across any number of KeyValueScanners. @@ -58,6 +61,9 @@ private KVScannerComparator comparator; + private boolean reverse = false; + private List tmpScanners = null; + /** * Constructor. This KeyValueHeap will handle closing of passed in * KeyValueScanners. @@ -66,12 +72,19 @@ */ public KeyValueHeap(List scanners, KVComparator comparator) throws IOException { + this(scanners, comparator, false); + } + + public KeyValueHeap(List scanners, + KVComparator comparator, boolean reverse) throws IOException { this.comparator = new KVScannerComparator(comparator); + this.reverse = reverse; + this.tmpScanners = new ArrayList(scanners.size()); if (!scanners.isEmpty()) { this.heap = new PriorityQueue(scanners.size(), this.comparator); for (KeyValueScanner scanner : scanners) { - if (scanner.peek() != null) { + if (scanner.peek() != null || reverse) { this.heap.add(scanner); } else { scanner.close(); @@ -88,21 +101,25 @@ return this.current.peek(); } - public KeyValue next() throws IOException { - if(this.current == null) { + public KeyValue next() throws IOException { + if (this.current == null) { return null; } KeyValue kvReturn = this.current.next(); KeyValue kvNext = this.current.peek(); - if (kvNext == null) { - this.current.close(); + if (reverse) { + this.heap.add(this.current); this.current = pollRealKV(); } else { - KeyValueScanner topScanner = this.heap.peek(); - if (topScanner == null || - this.comparator.compare(kvNext, topScanner.peek()) >= 0) { - this.heap.add(this.current); + if (kvNext == null) { + this.current.close(); this.current = pollRealKV(); + } else { + KeyValueScanner topScanner = this.heap.peek(); + if (topScanner == null || this.comparator.compare(kvNext, topScanner.peek()) >= 0) { + this.heap.add(this.current); + this.current = pollRealKV(); + } } } return kvReturn; @@ -149,7 +166,7 @@ * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ - if (pee == null || !mayContainMoreRows) { + if ((!reverse && pee == null) || !mayContainMoreRows) { this.current.close(); } else { this.heap.add(this.current); @@ -186,22 +203,22 @@ public KVScannerComparator(KVComparator kvComparator) { this.kvComparator = kvComparator; } + public int compare(KeyValueScanner left, KeyValueScanner right) { int comparison = compare(left.peek(), right.peek()); if (comparison != 0) { return comparison; + } + // Since both the keys are exactly the same, we break the tie in favor + // of the key which came latest. + long leftSequenceID = left.getSequenceID(); + long rightSequenceID = right.getSequenceID(); + if (leftSequenceID > rightSequenceID) { + return -1; + } else if (leftSequenceID < rightSequenceID) { + return 1; } else { - // Since both the keys are exactly the same, we break the tie in favor - // of the key which came latest. - long leftSequenceID = left.getSequenceID(); - long rightSequenceID = right.getSequenceID(); - if (leftSequenceID > rightSequenceID) { - return -1; - } else if (leftSequenceID < rightSequenceID) { - return 1; - } else { - return 0; - } + return 0; } } /** @@ -211,6 +228,15 @@ * @return less than 0 if left is smaller, 0 if equal etc.. */ public int compare(KeyValue left, KeyValue right) { + // for reverse scan : scanner.peek() can be null + if (left == null && right == null) { + return 0; + } else if (left == null) { + return 1; + } else if (right == null) { + return -1; + } + return this.kvComparator.compare(left, right); } /** @@ -301,6 +327,11 @@ KeyValueScanner scanner; while ((scanner = heap.poll()) != null) { KeyValue topKey = scanner.peek(); + if (reverse && topKey == null) { + heap.add(scanner); + current = pollRealKV(); + return current != null; + } if (comparator.getComparator().compare(seekKey, topKey) <= 0) { // Top KeyValue is at-or-after Seek KeyValue. We only know that all // scanners are at or after seekKey (because fake keys of @@ -322,7 +353,7 @@ scanner, seekKey, forward); } - if (!seekResult) { + if (!seekResult && !reverse) { scanner.close(); } else { heap.add(scanner); @@ -404,4 +435,175 @@ KeyValueScanner getCurrentForTesting() { return current; } + + // used by RegionScannerImpl.storeHeap + public boolean seekToPrevRow(byte[] row) throws IOException { + if (this.heap == null) { + return false; + } + // 1. all storeScanner.seekBeforeRow(row); + if (this.current != null) { + this.heap.add(this.current); + this.current = null; + } + + // remove scanners without valid rows within the given scan key-range + tmpScanners.clear(); + for (KeyValueScanner scanner : this.heap) { + if (!scanner.seekBeforeRow(row)) + tmpScanners.add(scanner); + } + for (KeyValueScanner scanner : tmpScanners) + this.heap.remove(scanner); + + // 2. get prev-row: the max-row of all storeScanners; + byte[] max = HConstants.EMPTY_BYTE_ARRAY; + for (KeyValueScanner scanner : this.heap) { + byte[] r = scanner.getMaxRow(); + if (Bytes.compareTo(r, max) > 0) + max = r; + } + + // 3. all storeScanner.seek(prev-row); + tmpScanners.clear(); + while (!this.heap.isEmpty()) { + KeyValueScanner scanner = this.heap.poll(); + scanner.seekRow(max); + tmpScanners.add(scanner); + } + for (KeyValueScanner scanner : tmpScanners) { + this.heap.add(scanner); + } + + this.current = pollRealKV(); + return (this.current != null); + } + + // used by StoreScanner.heap + @Override + public boolean seekBefore(KeyValue kv) throws IOException { + if (this.heap == null) { + return false; + } + if (this.current != null) { + this.heap.add(this.current); + this.current = null; + } + + // remove scanners without valid rows within the given scan key-range + boolean existPrevRow = false; + tmpScanners.clear(); + for (KeyValueScanner scanner : this.heap) { + if (scanner.seekBefore(kv)) { + existPrevRow = true; + } else { + tmpScanners.add(scanner); + } + } + for (KeyValueScanner scanner : tmpScanners) { + this.heap.remove(scanner); + } + + return existPrevRow; + } + + // used by StoreScanner.heap + @Override + public byte[] getMaxRow() { + if (this.current != null) { + this.heap.add(this.current); + this.current = null; + } + byte[] max = HConstants.EMPTY_BYTE_ARRAY; + for (KeyValueScanner scanner : this.heap) { + if (scanner.peek() != null) { + byte[] row = scanner.peek().getRow(); + if (Bytes.compareTo(row, max) > 0) { + max = row; + } + } + } + return max; + } + + @Override + public byte[] getLastRow() { + if (this.heap == null) { + return null; + } + if (this.current != null) { + this.heap.add(this.current); + this.current = null; + } + byte[] max = HConstants.EMPTY_BYTE_ARRAY; + for (KeyValueScanner scanner : this.heap) { + if (scanner.getLastRow() != null) { + byte[] row = scanner.getLastRow(); + if (row != null && Bytes.compareTo(row, max) > 0) { + max = row; + } + } + } + return max; + } + + // used by StoreScanner.heap : why not re-use "seek(kv)" is because + // this seek need to seek backward, but "seek(kv)" assume seeking is + // always forward + public boolean seekTo(KeyValue kv) throws IOException { + if (this.heap == null) { + return false; + } + if (this.current != null) { + this.heap.add(this.current); + this.current = null; + } + + tmpScanners.clear(); + while (!this.heap.isEmpty()) { + KeyValueScanner scanner = this.heap.poll(); + scanner.seek(kv); + tmpScanners.add(scanner); + } + for (KeyValueScanner scanner : tmpScanners) { + this.heap.add(scanner); + } + + this.current = pollRealKV(); + return (this.current != null); + } + + @Override + public boolean seekRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekBeforeRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekToLastRow() throws IOException { + if (this.heap == null) { + return false; + } + if (this.current != null) { + this.heap.add(this.current); + this.current = null; + } + + byte[] max = getLastRow(); + tmpScanners.clear(); + while (!this.heap.isEmpty()) { + KeyValueScanner scanner = this.heap.poll(); + scanner.seekRow(max); + tmpScanners.add(scanner); + } + for (KeyValueScanner scanner : tmpScanners) { + this.heap.add(scanner); + } + this.current = pollRealKV(); + return (this.current != null); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -72,6 +72,7 @@ // if heap == null and lastTop != null, you need to reseek given the key below private KeyValue lastTop = null; + private final boolean isReversed; /** An internal constructor. */ private StoreScanner(Store store, boolean cacheBlocks, Scan scan, @@ -85,6 +86,7 @@ this.columns = columns; oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl; this.minVersions = minVersions; + this.isReversed = scan.getReverse() && !scan.isGetScan(); // We look up row-column Bloom filters for multi-column queries as part of // the seek operation. However, we also look the row-column Bloom filter @@ -133,7 +135,7 @@ } // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.comparator); + heap = new KeyValueHeap(scanners, store.comparator, isReversed); this.store.addChangedReaderObserver(this); } @@ -192,7 +194,7 @@ for (KeyValueScanner scanner : scanners) { scanner.seek(matcher.getStartKey()); } - heap = new KeyValueHeap(scanners, scanInfo.getComparator()); + heap = new KeyValueHeap(scanners, scanInfo.getComparator(), isReversed); } /** @@ -294,7 +296,7 @@ List scanners = getScannersNoCompaction(); - heap = new KeyValueHeap(scanners, store.comparator); + heap = new KeyValueHeap(scanners, store.comparator, isReversed); } return this.heap.seek(key); @@ -374,6 +376,7 @@ if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { if (!matcher.moreRowsMayExistAfter(kv)) { + close(); return false; } reseek(matcher.getKeyForNextRow(kv)); @@ -401,6 +404,7 @@ // This is just a relatively simple end of scan fix, to short-cut end // us if there is an endKey in the scan. if (!matcher.moreRowsMayExistAfter(kv)) { + close(); return false; } @@ -439,6 +443,11 @@ return true; } + if (isReversed) { + //corner case for reverse scan: when the start key >= last key, count will be zero and + //maybe still more rows need to be read, let's return true here + return true; + } // No more keys close(); return false; @@ -515,7 +524,7 @@ } // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.comparator); + heap = new KeyValueHeap(scanners, store.comparator, isReversed); // Reset the state of the Query Matcher and set to top row. // Only reset and call setRow if the row changes; avoids confusing the @@ -568,5 +577,58 @@ static void enableLazySeekGlobally(boolean enable) { lazySeekEnabledGlobally = enable; } + + @Override + public boolean seekBeforeRow(byte[] row) throws IOException { + KeyValue firstKV = KeyValue.createFirstDeleteFamilyOnRow(row, store.getFamily().getName()); + if (this.heap == null) { + List scanners = getScannersNoCompaction(); + for (KeyValueScanner scanner : scanners) { + scanner.seek(firstKV); + } + heap = new KeyValueHeap(scanners, store.comparator, scan.getReverse() && !scan.isGetScan()); + } + return this.heap.seekBefore(firstKV); + } + + @Override + public byte[] getMaxRow() { + return this.heap.getMaxRow(); + } + + public byte[] getMaxRow(List scanners) { + byte[] max = HConstants.EMPTY_BYTE_ARRAY; + for (KeyValueScanner scanner : scanners) { + if (scanner.peek() != null) { + byte[] row = scanner.peek().getRow(); + if (Bytes.compareTo(row, max) > 0) { + max = row; + } + } + } + return max; + } + + @Override + public byte[] getLastRow() { + return this.heap.getLastRow(); + } + + @Override + public boolean seekRow(byte[] row) throws IOException { + KeyValue firstKV = KeyValue.createFirstDeleteFamilyOnRow(row, + store.getFamily().getName()); + return this.heap.seekTo(firstKV); + } + + @Override + public boolean seekBefore(KeyValue kv) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -372,4 +372,54 @@ return reader.passesTimerangeFilter(scan, oldestUnexpiredTS) && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns); } + + @Override + public boolean seekBefore(KeyValue kv) throws IOException { + seekCount.incrementAndGet(); + + try { + try { + if(!hfs.seekBefore(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength())) { + close(); + return false; + } + + cur = hfs.getKeyValue(); + //return skipKVsNewerThanReadpoint(); + //CAUTION!!! see http://phabricator.n.miliao.com/T293 for more detail + //in short, the original intention of seekBefore() is moving to a smaller kv, but + //skipKVsNewerThanReadpoint could break this assumption under special cases. + return true; + } finally { + realSeekDone = true; + } + } catch (IOException ioe) { + throw new IOException("Could not seek " + this + " before kv " + kv, ioe); + } + } + + @Override + public boolean seekBeforeRow(byte[] row) throws IOException { + throw new IOException("Not implemented"); + } + + @Override + public byte[] getMaxRow() { + return reader.getLastRowKey(); + } + + @Override + public byte[] getLastRow() { + return reader.getLastRowKey(); + } + + @Override + public boolean seekRow(byte[] row) throws IOException { + throw new IOException("Not implemented"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new IOException("Not implemented"); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3764,6 +3764,7 @@ private Filter filter; private int batch; private int isScan; + private boolean reverse = false; private boolean filterClosed = false; private long readPt; private HRegion region; @@ -3778,6 +3779,10 @@ this.region = region; this.filter = scan.getFilter(); this.batch = scan.getBatch(); + this.reverse = (!scan.isGetScan()) && scan.getReverse(); + if (this.reverse && scan.getFilter() != null) { + scan.getFilter().setReverse(true); + } if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { this.stopRow = null; } else { @@ -3820,7 +3825,18 @@ joinedScanners.add(scanner); } } - this.storeHeap = new KeyValueHeap(scanners, comparator); + this.storeHeap = new KeyValueHeap(scanners, comparator, this.reverse); + if (this.reverse + && (storeHeap.peek() == null || Bytes.equals(scan.getStartRow(), + HConstants.EMPTY_START_ROW))) { + this.storeHeap.seekToLastRow(); + } + if (this.reverse && !Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) { + KeyValue kv = storeHeap.peek(); + if (kv != null && Bytes.compareTo(kv.getRow(), scan.getStartRow()) > 0) { + this.storeHeap.seekToPrevRow(kv.getRow()); + } + } if (!joinedScanners.isEmpty()) { this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); } @@ -4007,8 +4023,16 @@ // Techically, if we hit limits before on this row, we don't need this call. if (filterRowKey(currentRow, offset, length)) { results.clear(); - boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) return false; + if (reverse) { + if (isFilterDone()) { + return false; + } + this.storeHeap.seekToPrevRow(current.getRow()); + resetFilters(); + } else { + boolean moreRows = nextRow(currentRow, offset, length); + if (!moreRows) return false; + } continue; } @@ -4020,8 +4044,22 @@ throw new IncompatibleFilterException( "Filter whose hasFilterRow() returns true is incompatible with scan with limit!"); } + // if current row already change, seek to 'next' (previous) row immediately + // we can't just return here because we can't use the natually 'next' row like + // forward scan does + if (reverse && !Bytes.equals(current.getRow(), peekRow())) { + this.storeHeap.seekToPrevRow(current.getRow()); + } return true; // We hit the limit. } + // current row is done, if reverse we needs to seek to the 'next' + // row explicitly since we can't use the natually 'next' row like + // forward scan does + if (reverse) { + this.storeHeap.seekToPrevRow(current.getRow()); + resetFilters(); + nextKv = peekKeyValue(); + } stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength()); // save that the row was empty before filters applied to it. @@ -4035,9 +4073,12 @@ if (isEmptyRow || filterRow()) { results.clear(); - boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) return false; - + if (reverse) { + resetFilters(); + } else { + boolean moreRows = nextRow(currentRow, offset, length); + if (!moreRows) return false; + } // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. if (!stopRow) continue; @@ -4109,13 +4150,27 @@ return true; } - private boolean isStopRow(byte [] currentRow, int offset, short length) { - return currentRow == null || - (stopRow != null && - comparator.compareRows(stopRow, 0, stopRow.length, - currentRow, offset, length) <= isScan); + private byte[] peekRow() { + KeyValue kv = this.storeHeap.peek(); + return kv == null ? null : kv.getRow(); } + private KeyValue peekKeyValue() { + KeyValue kv = this.storeHeap.peek(); + return kv; + } + + private boolean isStopRow(byte[] currentRow, int offset, short length) { + if (this.reverse) { + return currentRow == null + || (stopRow != null && comparator.compareRows(stopRow, 0, stopRow.length, currentRow, + offset, length) >= isScan); + } + return currentRow == null + || (stopRow != null && comparator.compareRows(stopRow, 0, stopRow.length, currentRow, + offset, length) <= isScan); + } + @Override public synchronized void close() { if (storeHeap != null) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -1661,11 +1661,14 @@ && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { return true; } - KeyValue startKeyValue = KeyValue.createFirstOnRow(scan.getStartRow()); - KeyValue stopKeyValue = KeyValue.createLastOnRow(scan.getStopRow()); + KeyValue smallestScanKeyValue = scan.getReverse() ? KeyValue.createFirstOnRow(scan + .getStopRow()) : KeyValue.createFirstOnRow(scan.getStartRow()); + KeyValue largestScanKeyValue = scan.getReverse() ? KeyValue.createLastOnRow(scan + .getStartRow()) : KeyValue.createLastOnRow(scan.getStopRow()); boolean nonOverLapping = (getComparator().compare(this.getFirstKey(), - stopKeyValue.getKey()) > 0 && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) - || getComparator().compare(this.getLastKey(), startKeyValue.getKey()) < 0; + largestScanKeyValue.getKey()) > 0 && !Bytes.equals(scan.getReverse() ? scan.getStartRow() + : scan.getStopRow(), HConstants.EMPTY_END_ROW)) + || getComparator().compare(this.getLastKey(), smallestScanKeyValue.getKey()) < 0; return !nonOverLapping; } @@ -1770,6 +1773,10 @@ return reader.getLastKey(); } + public byte[] getLastRowKey() { + return reader.getLastRowKey(); + } + public byte[] midkey() throws IOException { return reader.midkey(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (working copy) @@ -120,5 +120,45 @@ * @return true if this is a file scanner. Otherwise a memory scanner is * assumed. */ - public boolean isFileScanner(); + public boolean isFileScanner(); + + /** + * seek to KeyValue which is just before the given row + * @return true if seek to a valid KeyValue, false if not existing such kv. + * used by RegionScannerImpl.storeHeap(implemented by StoreScanner) + */ + public boolean seekBeforeRow(byte[] row) throws IOException; + + /** + * get the max row among all underlying scanners + * @return the max row + */ + public byte[] getMaxRow(); + + /** + * get the last row + * @return the last row + */ + public byte[] getLastRow(); + + /** + * seek to the first KeyValue of the given row + * @return true if seek to a valid KeyValue, false if not existing such kv. + */ + public boolean seekRow(byte[] row) throws IOException; + + /** + * seek to KeyValue which is just before the given KeyValue + * @return true if seek to a valid KeyValue, false if not existing such kv. + */ + public boolean seekBefore(KeyValue kv) throws IOException; + + /** + * Seek the scanner to the first kv which has a same row with lastKey, if + * key > lastKey or equals to EMPTY_BYTE_ARRAY + * CAUTION: it's an experimental internal method used by reverse scan ONLY + * @return true if seek to a valid KeyValue, false if the underlying data is empty + * @throws IOException + */ + public boolean seekToLastRow() throws IOException; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; @@ -680,13 +681,49 @@ * @return False if the key definitely does not exist in this Memstore */ public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { - return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) || - snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange())) + return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) || snapshotTimeRangeTracker + .includesTimeRange(scan.getTimeRange())) && (Math.max(timeRangeTracker.getMaximumTimestamp(), - snapshotTimeRangeTracker.getMaximumTimestamp()) >= - oldestUnexpiredTS); + snapshotTimeRangeTracker.getMaximumTimestamp()) >= oldestUnexpiredTS) + && shouldSeekForReverseScan(scan); } + /** + * Check if this memstore may contain the required keys for reverse scan. + * see TestFromClientSide#testSimpleMissingWithReverseScan + * @param scan + * @return False if the key definitely does not exist in this Memstore + */ + private boolean shouldSeekForReverseScan(Scan scan) { + if (!scan.getReverse() || scan.isGetScan()) { + return true; + } + + if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) { + return true; + } + + KeyValue largestScanKeyValue = KeyValue.createLastOnRow(scan.getStartRow()); + KeyValue smallestMemStoreKeyValue = getSmallestKeyValue(); + if (smallestMemStoreKeyValue == null) { + LOG.warn("smallestMemStoreKeyValue is null, BUGON?"); + return false; + } + return comparator.compare(smallestMemStoreKeyValue, largestScanKeyValue) <= 0; + } + + public KeyValue getSmallestKeyValue() { + KeyValue first = kvset.isEmpty() ? null : kvset.first(); + KeyValue second = snapshot.isEmpty() ? null : snapshot.first(); + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + return comparator.compare(first, second) < 0 ? first : second; + } + return first != null ? first : second; + } + public TimeRangeTracker getSnapshotTimeRangeTracker() { return this.snapshotTimeRangeTracker; } @@ -924,6 +961,61 @@ long oldestUnexpiredTS) { return shouldSeek(scan, oldestUnexpiredTS); } + + /** + * Set the scanner just before the seek key. + * @param kv seek KeyValue + * @return false if the key is null or if there is no data + */ + @Override + public synchronized boolean seekBefore(KeyValue kv) { + if (kv == null) { + close(); + return false; + } + + // kvset and snapshot will never be null. + // if headSet can't find anything, SortedSet is empty (not null). + kvsetIt = ((KeyValueSkipListSet)kvsetAtCreation.headSet(kv)).descendingIterator(); + snapshotIt = ((KeyValueSkipListSet)snapshotAtCreation.headSet(kv)).descendingIterator(); + kvsetItRow = null; + snapshotItRow = null; + + kvsetNextRow = getNext(kvsetIt); + snapshotNextRow = getNext(snapshotIt); + + // locate at the KeyValue with the higher row before given kv + theNext = getHighest(kvsetNextRow, snapshotNextRow); + return (theNext != null); + } + + @Override + public boolean seekBeforeRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public byte[] getMaxRow() { + KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation.last(); + KeyValue second = snapshotAtCreation.isEmpty() ? null : snapshotAtCreation.last(); + KeyValue higherKv = getHighest(first, second); + return higherKv == null ? null : higherKv.getRow(); + } + + @Override + public byte[] getLastRow() { + return getMaxRow(); + } + + @Override + public boolean seekRow(byte[] row) { + return seek(KeyValue.createFirstOnRow(row)); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } } public final static long FIXED_OVERHEAD = ClassSize.align( Index: src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (working copy) @@ -60,7 +60,7 @@ // if we are passed the prefix, set flag int cmp = Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0, this.prefix.length); - if(cmp > 0) { + if ((!isReverse() && cmp > 0) || (isReverse() && cmp < 0)) { passedPrefix = true; } return cmp != 0; Index: src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (working copy) @@ -34,6 +34,16 @@ */ public abstract class FilterBase implements Filter { + protected boolean reverse; + + public void setReverse(boolean reverse) { + this.reverse = reverse; + } + + public boolean isReverse() { + return this.reverse; + } + /** * Filters that are purely stateless and do nothing in their reset() methods can inherit * this null/empty implementation. Index: src/main/java/org/apache/hadoop/hbase/filter/FilterList.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (working copy) @@ -68,6 +68,12 @@ /** Reference KeyValue used by {@link #transform(KeyValue)} for validation purpose. */ private KeyValue referenceKV = null; + public void setReverse(boolean reverse) { + for (Filter filter : filters) { + filter.setReverse(reverse); + } + } + /** * When filtering a given KeyValue in {@link #filterKeyValue(KeyValue)}, * this stores the transformed KeyValue to be returned by {@link #transform(KeyValue)}. Index: src/main/java/org/apache/hadoop/hbase/filter/Filter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/Filter.java (revision 1517092) +++ src/main/java/org/apache/hadoop/hbase/filter/Filter.java (working copy) @@ -52,6 +52,12 @@ */ public interface Filter extends Writable { /** + * alter the reverseScan flag + * @param reverse flag + */ + public void setReverse(boolean reverse); + + /** * Reset the state of the filter between rows. */ public void reset();