Index: src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (revision 87702) +++ src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (working copy) @@ -132,6 +132,7 @@ private boolean flushCommits = true; private boolean writeToWAL = true; private int presplitRegions = 0; + private static boolean reverse = false; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); /** @@ -554,7 +555,7 @@ * @param cmd Command to run. * @throws IOException */ - private void doMultipleClients(final Class cmd) throws IOException { + private void doMultipleClients(final Class cmd) { final List threads = new ArrayList(this.N); final long[] timings = new long[this.N]; final int perClientRows = R/N; @@ -1060,6 +1061,9 @@ Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setFilter(new WhileMatchFilter(new PageFilter(120))); + if (reverse) { + scan.setReverse(true); + } ResultScanner s = this.table.getScanner(scan); //int count = 0; for (Result rr = null; (rr = s.next()) != null;) { @@ -1085,7 +1089,13 @@ @Override void testRow(final int i) throws IOException { Pair startAndStopRow = getStartAndStopRow(); - Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); + Scan scan; + if (reverse) { + scan = new Scan(startAndStopRow.getSecond(), startAndStopRow.getFirst()); + scan.setReverse(true); + } else { + scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); + } scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); ResultScanner s = this.table.getScanner(scan); int count = 0; @@ -1253,13 +1263,18 @@ @Override void testRow(final int i) throws IOException { if (this.testScanner == null) { - Scan scan = new Scan(format(this.startRow)); + Scan scan; + if (reverse) { + scan = new Scan(format(this.startRow + this.perClientRunRows - 1)); + scan.setReverse(true); + } else { + scan = new Scan(format(this.startRow)); + } scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); this.testScanner = table.getScanner(scan); } testScanner.next(); } - } static class AsyncScanTest extends AsyncTest { @@ -1395,6 +1410,9 @@ Scan scan = new Scan(); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setFilter(filter); + if (reverse) { + scan.setReverse(true); + } return scan; } } @@ -1539,6 +1557,7 @@ System.err.println(" flushCommits Used to determine if the test should flush the table. Default: false"); System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); System.err.println(" presplit Create presplit table. Recommended for accurate perf analysis (see guide). Default: disabled"); + System.err.println(" reverse Set scan direction to backward"); System.err.println(); System.err.println("Command:"); for (CmdDescriptor command : commands.values()) { @@ -1621,6 +1640,12 @@ continue; } + final String reverseStr = "--reverse"; + if (cmd.startsWith(reverseStr)) { + this.reverse = true; + continue; + } + Class cmdClass = determineCommandClass(cmd); if (cmdClass != null) { getArgs(i + 1, args); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 87702) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -56,12 +56,10 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; @@ -71,8 +69,6 @@ import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.io.hfile.Compression; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -119,7 +115,7 @@ private final int MAX_VERSIONS = 2; // Test names - protected final byte[] tableName = Bytes.toBytes("testtable");; + protected final byte[] tableName = Bytes.toBytes("testtable"); protected final byte[] qual1 = Bytes.toBytes("qual1"); protected final byte[] qual2 = Bytes.toBytes("qual2"); protected final byte[] qual3 = Bytes.toBytes("qual3"); @@ -153,6 +149,458 @@ // /tmp/testtable ////////////////////////////////////////////////////////////////////////////// + public void testReverseScanner_FromMemStore_SingleCF_Normal() throws IOException { + byte [] rowC = Bytes.toBytes("rowC"); + byte [] rowA = Bytes.toBytes("rowA"); + byte [] rowB = Bytes.toBytes("rowB"); + byte [] cf = Bytes.toBytes("CF"); + byte [][] families = {cf}; + byte [] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts+1, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + + Scan scan = new Scan(rowC); + scan.setMaxVersions(5); + scan.setReverse(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_LargerKey() throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + Scan scan = new Scan(rowD); + List currRow = new ArrayList(); + scan.setReverse(true); + scan.setMaxVersions(5); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_FullScan() throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + Scan scan = new Scan(); + List currRow = new ArrayList(); + scan.setReverse(true); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_moreRowsMayExistAfter() throws IOException { + //case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReverse(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReverse(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_smaller_blocksize() throws IOException { + //case to ensure no conflict with HFile index optimization + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration config = new HBaseConfiguration(); + config.setInt("test.block..size", 1); + this.region = initHRegion(tableName, method, config, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReverse(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReverse(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs() throws IOException { + byte [] row0 = Bytes.toBytes("row0"); // 1 kv + byte [] row1 = Bytes.toBytes("row1"); // 2 kv + byte [] row2 = Bytes.toBytes("row2"); // 4 kv + byte [] row3 = Bytes.toBytes("row3"); // 2 kv + byte [] row4 = Bytes.toBytes("row4"); // 5 kv + byte [] row5 = Bytes.toBytes("row5"); // 2 kv + byte [] cf1 = Bytes.toBytes("CF1"); + byte [] cf2 = Bytes.toBytes("CF2"); + byte [] cf3 = Bytes.toBytes("CF3"); + byte [][] families = {cf1, cf2, cf3}; + byte [] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + // disable compactions in this test. + conf.setInt("hbase.hstore.compactionThreshold", 10000); + this.region = initHRegion(tableName, method, conf, families); + try { + //kv naming style: kv(row number) + totalKvCountInThisRow + seq no + KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts+1, KeyValue.Type.Put, null); + KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put, null); + KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts+4, KeyValue.Type.Put, null); + KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts+4, KeyValue.Type.Put, null); + KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put, null); + KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts+5, KeyValue.Type.Put, null); + KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts+3, KeyValue.Type.Put, null); + KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put, null); + // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv) + Put put = null; + put = new Put(row1); + put.add(kv1_2_1); + region.put(put); + put = new Put(row2); + put.add(kv2_4_1); + region.put(put); + put = new Put(row4); + put.add(kv4_5_4); + put.add(kv4_5_5); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv) + put = new Put(row4); + put.add(kv4_5_1); + put.add(kv4_5_3); + region.put(put); + put = new Put(row1); + put.add(kv1_2_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_4); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv) + put = new Put(row4); + put.add(kv4_5_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_2); + put.add(kv2_4_3); + region.put(put); + put = new Put(row3); + put.add(kv3_2_2); + region.put(put); + region.flushcache(); + // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max) ( 2 kv) + put = new Put(row0); + put.add(kv0_1_1); + region.put(put); + put = new Put(row3); + put.add(kv3_2_1); + region.put(put); + put = new Put(row5); + put.add(kv5_2_1); + put.add(kv5_2_2); + region.put(put); + // scan range = ["row4", min), skip the max "row5" + Scan scan = new Scan(row4); + scan.setMaxVersions(5); + scan.setBatch(3); + scan.setReverse(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = false; + // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not included in scan range + // "row4" takes 2 next() calls since batch=3 + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + // 2. scan out "row3" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row3)); + assertTrue(hasNext); + // 3. scan out "row2" (4 kvs) + // "row2" takes 2 next() calls since batch=3 + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + // 4. scan out "row1" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row1)); + assertTrue(hasNext); + // 5. scan out "row0" (1 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row0)); + assertFalse(hasNext); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + public void testCompactionAffectedByScanners() throws Exception { String method = "testCompactionAffectedByScanners"; byte[] tableName = Bytes.toBytes(method); @@ -4067,7 +4515,8 @@ throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); for(byte [] family : families) { - htd.addFamily(new HColumnDescriptor(family)); + int blockSize = conf.getInt("test.block..size", 65536); + htd.addFamily(new HColumnDescriptor(family).setBlocksize(blockSize)); } HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false); Path path = new Path(DIR + callingMethod); Index: src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (revision 87702) +++ src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (working copy) @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.util; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -126,4 +127,29 @@ public void close() { // do nothing } + + @Override + public boolean seekBeforeRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public byte[] getMaxRow() { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekBefore(KeyValue kv) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } } Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 87702) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -101,6 +101,7 @@ */ private int caching = -1; private boolean cacheBlocks = true; + private boolean reverse = false; // forward scan (not reverse) by default private Filter filter = null; private TimeRange tr = new TimeRange(); private Map> familyMap = @@ -150,6 +151,7 @@ batch = scan.getBatch(); caching = scan.getCaching(); cacheBlocks = scan.getCacheBlocks(); + reverse = scan.getReverse(); filter = scan.getFilter(); // clone? TimeRange ctr = scan.getTimeRange(); tr = new TimeRange(ctr.getMin(), ctr.getMax()); @@ -179,6 +181,7 @@ this.stopRow = get.getRow(); this.filter = get.getFilter(); this.cacheBlocks = get.getCacheBlocks(); + this.reverse = false; // disable reverse for 'get' scan this.maxVersions = get.getMaxVersions(); this.tr = get.getTimeRange(); this.familyMap = get.getFamilyMap(); @@ -457,6 +460,25 @@ } /** + * Set whether this scan is a reverse one + *

+ * This is false by default which means forward(normal) scan. + * + * @param reverse if true, scan will be reverse order + */ + public void setReverse(boolean reverse) { + this.reverse = reverse; + } + + /** + * Get whether this scan is a reverse one. + * @return true if reverse scan, false if forward(default) scan + */ + public boolean getReverse() { + return reverse; + } + + /** * Compile the table and column family (i.e. schema) information * into a String. Useful for parsing and aggregation by debugging, * logging, and administration tools. @@ -501,6 +523,7 @@ map.put("batch", this.batch); map.put("caching", this.caching); map.put("cacheBlocks", this.cacheBlocks); + map.put("reverse", this.reverse); List timeRange = new ArrayList(); timeRange.add(this.tr.getMin()); timeRange.add(this.tr.getMax()); @@ -563,6 +586,7 @@ this.batch = in.readInt(); this.caching = in.readInt(); this.cacheBlocks = in.readBoolean(); + this.reverse = in.readBoolean(); if(in.readBoolean()) { this.filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(in))); this.filter.readFields(in); @@ -597,6 +621,7 @@ out.writeInt(this.batch); out.writeInt(this.caching); out.writeBoolean(this.cacheBlocks); + out.writeBoolean(this.reverse); if(this.filter == null) { out.writeBoolean(false); } else { Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 87702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.util.Bytes; /** * Implements a heap merge across any number of KeyValueScanners. @@ -58,6 +61,9 @@ private KVScannerComparator comparator; + private boolean reverse = false; + List tmpScanners = null; + /** * Constructor. This KeyValueHeap will handle closing of passed in * KeyValueScanners. @@ -66,12 +72,19 @@ */ public KeyValueHeap(List scanners, KVComparator comparator) throws IOException { + this(scanners, comparator, false); + } + + public KeyValueHeap(List scanners, + KVComparator comparator, boolean reverse) throws IOException { this.comparator = new KVScannerComparator(comparator); + this.reverse = reverse; + this.tmpScanners = new ArrayList(scanners.size()); if (!scanners.isEmpty()) { this.heap = new PriorityQueue(scanners.size(), this.comparator); for (KeyValueScanner scanner : scanners) { - if (scanner.peek() != null) { + if (scanner.peek() != null || reverse) { this.heap.add(scanner); } else { scanner.close(); @@ -94,15 +107,20 @@ } KeyValue kvReturn = this.current.next(); KeyValue kvNext = this.current.peek(); - if (kvNext == null) { - this.current.close(); + if (reverse) { + this.heap.add(this.current); this.current = pollRealKV(); } else { - KeyValueScanner topScanner = this.heap.peek(); - if (topScanner == null || - this.comparator.compare(kvNext, topScanner.peek()) >= 0) { - this.heap.add(this.current); + if (kvNext == null) { + this.current.close(); this.current = pollRealKV(); + } else { + KeyValueScanner topScanner = this.heap.peek(); + if (topScanner == null || + this.comparator.compare(kvNext, topScanner.peek()) >= 0) { + this.heap.add(this.current); + this.current = pollRealKV(); + } } } return kvReturn; @@ -149,7 +167,7 @@ * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ - if (pee == null || !mayContainMoreRows) { + if (!reverse && (pee == null || !mayContainMoreRows)) { this.current.close(); } else { this.heap.add(this.current); @@ -186,24 +204,25 @@ public KVScannerComparator(KVComparator kvComparator) { this.kvComparator = kvComparator; } + public int compare(KeyValueScanner left, KeyValueScanner right) { int comparison = compare(left.peek(), right.peek()); if (comparison != 0) { return comparison; + } + // Since both the keys are exactly the same, we break the tie in favor + // of the key which came latest. + long leftSequenceID = left.getSequenceID(); + long rightSequenceID = right.getSequenceID(); + if (leftSequenceID > rightSequenceID) { + return -1; + } else if (leftSequenceID < rightSequenceID) { + return 1; } else { - // Since both the keys are exactly the same, we break the tie in favor - // of the key which came latest. - long leftSequenceID = left.getSequenceID(); - long rightSequenceID = right.getSequenceID(); - if (leftSequenceID > rightSequenceID) { - return -1; - } else if (leftSequenceID < rightSequenceID) { - return 1; - } else { - return 0; - } + return 0; } } + /** * Compares two KeyValue * @param left @@ -211,7 +230,12 @@ * @return less than 0 if left is smaller, 0 if equal etc.. */ public int compare(KeyValue left, KeyValue right) { - return this.kvComparator.compare(left, right); + // for reverse scan : scanner.peek() can be null + if (left == null && right == null) return 0; + else if(left == null) return 1; + else if(right == null) return -1; + + return this.kvComparator.compare(left, right); } /** * @return KVComparator @@ -301,6 +325,11 @@ KeyValueScanner scanner; while ((scanner = heap.poll()) != null) { KeyValue topKey = scanner.peek(); + if (reverse && topKey == null) { + heap.add(scanner); + current = pollRealKV(); + return current != null; + } if (comparator.getComparator().compare(seekKey, topKey) <= 0) { // Top KeyValue is at-or-after Seek KeyValue. We only know that all // scanners are at or after seekKey (because fake keys of @@ -317,11 +346,10 @@ if (isLazy) { seekResult = scanner.requestSeek(seekKey, forward, useBloom); } else { - seekResult = NonLazyKeyValueScanner.doRealSeek( - scanner, seekKey, forward); + seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey, forward); } - if (!seekResult) { + if (!seekResult && !reverse) { scanner.close(); } else { heap.add(scanner); @@ -403,4 +431,123 @@ KeyValueScanner getCurrentForTesting() { return current; } + + // used by RegionScannerImpl.storeHeap + public boolean seekToPrevRow(byte[] row) throws IOException { + // 1. all storeScanner.seekBeforeRow(row); + if (this.current != null) { + this.heap.add(this.current); + this.current = null; + } + + // remove scanners without valid rows within the given scan key-range + tmpScanners.clear(); + for (KeyValueScanner scanner : this.heap) { + if (!scanner.seekBeforeRow(row)) + tmpScanners.add(scanner); + } + for (KeyValueScanner scanner : tmpScanners) + this.heap.remove(scanner); + + // 2. get prev-row: the max-row of all storeScanners; + byte[] max = HConstants.EMPTY_BYTE_ARRAY; + for (KeyValueScanner scanner : this.heap) { + byte[] r = scanner.getMaxRow(); + if (Bytes.compareTo(r, max) > 0) + max = r; + } + + // 3. all storeScanner.seek(prev-row); + tmpScanners.clear(); + while (!this.heap.isEmpty()) { + KeyValueScanner scanner = this.heap.poll(); + scanner.seekRow(max); + tmpScanners.add(scanner); + } + for (KeyValueScanner scanner : tmpScanners) { + this.heap.add(scanner); + } + + this.current = pollRealKV(); + return (this.current != null); + } + + // used by StoreScanner.heap + @Override + public boolean seekBefore(KeyValue kv) throws IOException { + if (this.current != null) { + this.heap.add(this.current); + this.current = null; + } + + // remove scanners without valid rows within the given scan key-range + boolean existPrevRow = false; + tmpScanners.clear(); + for (KeyValueScanner scanner : this.heap) { + if (scanner.seekBefore(kv)) { + existPrevRow = true; + } else { + tmpScanners.add(scanner); + } + } + for (KeyValueScanner scanner : tmpScanners) { + this.heap.remove(scanner); + } + + return existPrevRow; + } + + // used by StoreScanner.heap + @Override + public byte[] getMaxRow() { + byte[] max = HConstants.EMPTY_BYTE_ARRAY; + for (KeyValueScanner scanner : this.heap) { + if (scanner.peek() != null) { + byte[] row = scanner.peek().getRow(); + if (Bytes.compareTo(row, max) > 0) { + max = row; + } + } + } + return max; + } + + // used by StoreScanner.heap : why not re-use "seek(kv)" is because + // this seek need to seek backward, but "seek(kv)" assume seeking is + // always forward + public boolean seekTo(KeyValue kv) throws IOException { + if (this.current != null) { + this.heap.add(this.current); + this.current = null; + } + + tmpScanners.clear(); + while (!this.heap.isEmpty()) { + KeyValueScanner scanner = this.heap.poll(); + scanner.seek(kv); + tmpScanners.add(scanner); + } + for (KeyValueScanner scanner : tmpScanners) { + this.heap.add(scanner); + } + + this.current = pollRealKV(); + return (this.current != null); + } + + @Override + public boolean seekRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekBeforeRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + + @Override + public boolean seekToLastRow() throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 87702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -149,7 +149,6 @@ this.isReseekable = true; cur = hfs.getKeyValue(); - return skipKVsNewerThanReadpoint(); } finally { realSeekDone = true; @@ -376,4 +375,46 @@ && reader.passesTimerangeFilter(scan, oldestUnexpiredTS) && reader.passesBloomFilter(scan, columns); } + + @Override + public boolean seekBefore(KeyValue kv) throws IOException { + seekCount.incrementAndGet(); + + try { + try { + if(!hfs.seekBefore(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength())) { + close(); + return false; + } + + this.isReseekable = true; + cur = hfs.getKeyValue(); + return skipKVsNewerThanReadpoint(); + } finally { + realSeekDone = true; + } + } catch (IOException ioe) { + throw new IOException("Could not seek " + this + " before kv " + kv, ioe); + } + } + + @Override + public boolean seekBeforeRow(byte[] row) throws IOException { + throw new IOException("Not implemented"); + } + + @Override + public byte[] getMaxRow() { + return reader.getLastRowKey(); + } + + @Override + public boolean seekRow(byte[] row) throws IOException { + throw new IOException("Not implemented"); + } + + @Override + public boolean seekToLastRow() throws IOException { + return seek(KeyValue.createFirstOnRow(getMaxRow())); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 87702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -86,6 +86,7 @@ // if heap == null and lastTop != null, you need to reseek given the key below private KeyValue lastTop = null; + private final boolean isReversed; /** An internal constructor. */ private StoreScanner(Store store, boolean cacheBlocks, Scan scan, @@ -99,7 +100,7 @@ this.columns = columns; oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl; this.minVersions = minVersions; - + this.isReversed = scan.getReverse() && !scan.isGetScan(); // We look up row-column Bloom filters for multi-column queries as part of // the seek operation. However, we also look the row-column Bloom filter // for multi-row (non-"get") scans because this is not done in @@ -154,7 +155,7 @@ scanner.requestSeek(matcher.getStartKey(), false, true); } } else { - if (!isParallelSeekEnabled) { + if (!isParallelSeekEnabled || scan.getReverse()) { for (KeyValueScanner scanner : scanners) { scanner.seek(matcher.getStartKey()); } @@ -163,8 +164,19 @@ } } + //handle reverse scan case: TestHRegion#testReverseScanner_FromMemStore_SingleCF_FullScan + if (scan.getReverse() && Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) { + for (KeyValueScanner scanner : scanners) { + scanner.seekToLastRow(); + } + } // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.comparator); + heap = new KeyValueHeap(scanners, store.comparator, isReversed); + if (heap.peek() == null && scan.getReverse()) { + for (KeyValueScanner scanner : scanners) { + scanner.seekToLastRow(); + } + } this.store.addChangedReaderObserver(this); } @@ -224,14 +236,21 @@ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); // Seek all scanners to the initial key - if (!isParallelSeekEnabled) { + if (!isParallelSeekEnabled || isReversed) { for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); + if (scan.getReverse() && Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) { + scanner.seekToLastRow(); + } else { + boolean ret = scanner.seek(matcher.getStartKey()); + if (!ret && scan.getReverse()) { + scanner.seekToLastRow(); + } + } } } else { parallelSeek(scanners, matcher.getStartKey()); } - heap = new KeyValueHeap(scanners, scanInfo.getComparator()); + heap = new KeyValueHeap(scanners, scanInfo.getComparator(), isReversed); } /** @@ -330,12 +349,9 @@ @Override public synchronized boolean seek(KeyValue key) throws IOException { if (this.heap == null) { - List scanners = getScannersNoCompaction(); - - heap = new KeyValueHeap(scanners, store.comparator); + heap = new KeyValueHeap(scanners, store.comparator, isReversed); } - return this.heap.seek(key); } @@ -359,7 +375,6 @@ @Override public synchronized boolean next(List outResult, int limit, String metric) throws IOException { - if (checkReseek()) { return true; } @@ -409,7 +424,7 @@ count++; if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { - if (!matcher.moreRowsMayExistAfter(kv)) { + if (!matcher.moreRowsMayExistAfter(kv) && !isReversed) { return false; } reseek(matcher.getKeyForNextRow(kv)); @@ -436,7 +451,7 @@ case SEEK_NEXT_ROW: // This is just a relatively simple end of scan fix, to short-cut end // us if there is an endKey in the scan. - if (!matcher.moreRowsMayExistAfter(kv)) { + if (!matcher.moreRowsMayExistAfter(kv) && !isReversed) { return false; } @@ -546,16 +561,19 @@ * could have done it now by storing the scan object from the constructor */ List scanners = getScannersNoCompaction(); - if (!isParallelSeekEnabled) { + if (!isParallelSeekEnabled || isReversed) { for (KeyValueScanner scanner : scanners) { - scanner.seek(lastTopKey); + boolean ret = scanner.seek(lastTopKey); + if (!ret && scan.getReverse()) { + scanner.seekToLastRow(); + } } } else { parallelSeek(scanners, lastTopKey); } // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.comparator); + heap = new KeyValueHeap(scanners, store.comparator, isReversed); // Reset the state of the Query Matcher and set to top row. // Only reset and call setRow if the row changes; avoids confusing the @@ -578,9 +596,8 @@ checkReseek(); if (explicitColumnQuery && lazySeekEnabledGlobally) { return heap.requestSeek(kv, true, useRowColBloom); - } else { - return heap.reseek(kv); } + return heap.reseek(kv); } @Override @@ -657,5 +674,54 @@ static void enableLazySeekGlobally(boolean enable) { lazySeekEnabledGlobally = enable; } + + @Override + public boolean seekBeforeRow(byte[] row) throws IOException { + KeyValue firstKV = KeyValue.createFirstDeleteFamilyOnRow(row, store.getFamily().getName()); + if (this.heap == null) { + List scanners = getScannersNoCompaction(); + for (KeyValueScanner scanner : scanners) { + scanner.seek(firstKV); + } + heap = new KeyValueHeap(scanners, store.comparator, scan.getReverse() && !scan.isGetScan()); + } + return this.heap.seekBefore(firstKV); + } + + @Override + public byte[] getMaxRow() { + return this.heap.getMaxRow(); + } + + public byte[] getMaxRow(List scanners) { + byte[] max = HConstants.EMPTY_BYTE_ARRAY; + for (KeyValueScanner scanner : scanners) { + if (scanner.peek() != null) { + byte[] row = scanner.peek().getRow(); + if (Bytes.compareTo(row, max) > 0) { + max = row; + } + } + } + return max; + } + + @Override + public boolean seekRow(byte[] row) throws IOException { + KeyValue firstKV = KeyValue.createFirstDeleteFamilyOnRow(row, + store.getFamily().getName()); + + return this.heap.seekTo(firstKV); + } + + @Override + public boolean seekBefore(KeyValue kv) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 87702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3360,6 +3360,7 @@ private List results = new ArrayList(); private int batch; private int isScan; + private boolean reverse = false; private boolean filterClosed = false; private long readPt; @@ -3371,6 +3372,7 @@ this.filter = scan.getFilter(); this.batch = scan.getBatch(); + this.reverse = (!scan.isGetScan()) && scan.getReverse(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { this.stopRow = null; } else { @@ -3405,7 +3407,7 @@ KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); scanners.add(scanner); } - this.storeHeap = new KeyValueHeap(scanners, comparator); + this.storeHeap = new KeyValueHeap(scanners, comparator, this.reverse); } RegionScannerImpl(Scan scan) throws IOException { @@ -3497,23 +3499,42 @@ if (filter != null && filter.filterRow()) { results.clear(); } - return false; } else if (filterRowKey(currentRow)) { - nextRow(currentRow); + if (reverse) { + this.storeHeap.seekToPrevRow(currentRow); + resetFilters(); + } else { + nextRow(currentRow); + } } else { byte [] nextRow; do { - this.storeHeap.next(results, limit - results.size(), metric); + this.storeHeap.next(results, limit - results.size(), metric, reverse); if (limit > 0 && results.size() == limit) { if (this.filter != null && filter.hasFilterRow()) { throw new IncompatibleFilterException( "Filter with filterRow(List) incompatible with scan with limit!"); } + // if current row already change, seek to 'next' (previous) row immediately + // we can't just return here because we can't use the natually 'next' row like + // forward scan does + if (reverse && !Bytes.equals(currentRow, peekRow())) { + this.storeHeap.seekToPrevRow(currentRow); + } return true; // we are expecting more yes, but also limited to how many we can return. } } while (Bytes.equals(currentRow, nextRow = peekRow())); + // current row is done, if reverse we needs to seek to the 'next' + // row explicitly since we can't use the natually 'next' row like + // forward scan does + if (reverse) { + this.storeHeap.seekToPrevRow(currentRow); + resetFilters(); + nextRow = peekRow(); + } + final boolean stopRow = isStopRow(nextRow); // now that we have an entire row, lets process with a filters: @@ -3529,7 +3550,11 @@ // the reasons for calling this method are: // 1. reset the filters. // 2. provide a hook to fast forward the row (used by subclasses) - nextRow(currentRow); + if (reverse) { + resetFilters(); + } else { + nextRow(currentRow); + } // This row was totally filtered out, if this is NOT the last row, // we should continue on. @@ -3563,11 +3588,15 @@ return kv == null ? null : kv.getRow(); } - private boolean isStopRow(byte [] currentRow) { - return currentRow == null || - (stopRow != null && - comparator.compareRows(stopRow, 0, stopRow.length, - currentRow, 0, currentRow.length) <= isScan); + private boolean isStopRow(byte[] currentRow) { + if (this.reverse) { + return currentRow == null + || (stopRow != null && comparator.compareRows(stopRow, 0, stopRow.length, currentRow, + 0, currentRow.length) >= isScan); + } + return currentRow == null + || (stopRow != null && comparator.compareRows(stopRow, 0, stopRow.length, currentRow, 0, + currentRow.length) <= isScan); } @Override Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 87702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -1548,12 +1548,14 @@ && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { return true; } - if (getComparator() instanceof MetaKeyComparator) { - return true; - } - boolean nonOverLapping = (Bytes.compareTo(this.getFirstRowKey(), scan.getStopRow()) > 0 - && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) - || Bytes.compareTo(this.getLastRowKey(), scan.getStartRow()) < 0; + KeyValue smallestScanKeyValue = scan.getReverse() ? KeyValue.createFirstOnRow(scan + .getStopRow()) : KeyValue.createFirstOnRow(scan.getStartRow()); + KeyValue largestScanKeyValue = scan.getReverse() ? KeyValue.createLastOnRow(scan + .getStartRow()) : KeyValue.createLastOnRow(scan.getStopRow()); + boolean nonOverLapping = (getComparator().compare(this.getFirstKey(), + largestScanKeyValue.getKey()) > 0 && !Bytes.equals(scan.getReverse() ? scan.getStartRow() + : scan.getStopRow(), HConstants.EMPTY_END_ROW)) + || getComparator().compare(this.getLastKey(), smallestScanKeyValue.getKey()) < 0; return !nonOverLapping; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (revision 87702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (working copy) @@ -121,4 +121,41 @@ * assumed. */ public boolean isFileScanner(); + + /** + * seek to KeyValue which is just before the given row + * @return true if seek to a valid KeyValue, false if not existing such kv. + * used by RegionScannerImpl.storeHeap(implemented by StoreScanner) + */ + public boolean seekBeforeRow(byte[] row) throws IOException; + + /** + * get the max row among all underlying scanners + * @return the max row + * used by RegionScannerImpl.storeHeap(implemented by StoreScanner) + */ + public byte[] getMaxRow(); + + /** + * seek to the first KeyValue of the given row + * @return true if seek to a valid KeyValue, false if not existing such kv. + * used by RegionScannerImpl.storeHeap(implemented by StoreScanner) + */ + public boolean seekRow(byte[] row) throws IOException; + + /** + * seek to KeyValue which is just before the given KeyValue + * @return true if seek to a valid KeyValue, false if not existing such kv. + * used by StoreScanner.heap(implemented by MemstoreScanner/StoreFileScanner) + */ + public boolean seekBefore(KeyValue kv) throws IOException; + + /** + * Seek the scanner to the first kv which has a same row with lastKey, if + * key > lastKey or equals to EMPTY_BYTE_ARRAY + * CAUTION: it's an experimental internal method used by reverse scan ONLY + * @return true if seek to a valid KeyValue, false if the underlying data is empty + * @throws IOException + */ + public boolean seekToLastRow() throws IOException; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 87702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; @@ -894,6 +895,64 @@ long oldestUnexpiredTS) { return shouldSeek(scan, oldestUnexpiredTS); } + + /** + * Set the scanner just before the seek key. + * @param kv seek KeyValue + * @return false if the key is null or if there is no data + */ + @Override + public synchronized boolean seekBefore(KeyValue kv) { + if (kv == null) { + close(); + return false; + } + + // kvset and snapshot will never be null. + // if headSet can't find anything, SortedSet is empty (not null). + kvsetIt = ((KeyValueSkipListSet)kvsetAtCreation.headSet(kv)).descendingIterator(); + snapshotIt = ((KeyValueSkipListSet)snapshotAtCreation.headSet(kv)).descendingIterator(); + kvsetItRow = null; + snapshotItRow = null; + + kvsetNextRow = getNext(kvsetIt); + snapshotNextRow = getNext(snapshotIt); + + // locate at the KeyValue with the higher row before given kv + theNext = getHighest(kvsetNextRow, snapshotNextRow); + return (theNext != null); + } + + @Override + public boolean seekBeforeRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public byte[] getMaxRow() { + KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation.last(); + KeyValue second = snapshotAtCreation.isEmpty() ? null : snapshotAtCreation.last(); + KeyValue higherKv = getHighest(first, second); + return higherKv == null ? null : higherKv.getRow(); + } + + @Override + public boolean seekRow(byte[] row) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean seekToLastRow() throws IOException { + KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation.last(); + KeyValue second = snapshotAtCreation.isEmpty() ? null : snapshotAtCreation.last(); + KeyValue higherKv = getHighest(first, second); + if (higherKv == null) { + close(); + return false; + } + KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow()); + return seek(firstKvOnLastRow); + } } public final static long FIXED_OVERHEAD = ClassSize.align(