### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (revision 1542632) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (working copy) @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -314,6 +315,16 @@ verifyScan(s, expectedRows, expectedKeys); } + public void testPrefixFilterWithReverseScan() throws Exception { + // Grab rows from group one (half of total) + long expectedRows = this.numRows / 2; + long expectedKeys = this.colsPerRow; + Scan s = new Scan(); + s.setReversed(true); + s.setFilter(new PrefixFilter(Bytes.toBytes("testRowOne"))); + verifyScan(s, expectedRows, expectedKeys); + } + @Test public void testPageFilter() throws Exception { @@ -401,6 +412,140 @@ } + public void testPageFilterWithReverseScan() throws Exception { + // KVs in first 6 rows + KeyValue[] expectedKVs = { + // testRowOne-0 + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowOne-2 + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowOne-3 + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowTwo-0 + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-2 + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-3 + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]) }; + + // Grab all 6 rows + long expectedRows = 6; + long expectedKeys = this.colsPerRow; + Scan s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first 4 rows (6 cols per row) + expectedRows = 4; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first 2 rows + expectedRows = 2; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first row + expectedRows = 1; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + } + + public void testWhileMatchFilterWithFilterRowWithReverseScan() + throws Exception { + final int pageSize = 4; + + Scan s = new Scan(); + s.setReversed(true); + WhileMatchFilter filter = new WhileMatchFilter(new PageFilter(pageSize)); + s.setFilter(filter); + + InternalScanner scanner = this.region.getScanner(s); + int scannerCounter = 0; + while (true) { + boolean isMoreResults = scanner.next(new ArrayList()); + scannerCounter++; + + if (scannerCounter >= pageSize) { + Assert.assertTrue( + "The WhileMatchFilter should now filter all remaining", + filter.filterAllRemaining()); + } + if (!isMoreResults) { + break; + } + } + scanner.close(); + Assert.assertEquals("The page filter returned more rows than expected", + pageSize, scannerCounter); + } + + public void testWhileMatchFilterWithFilterRowKeyWithReverseScan() + throws Exception { + Scan s = new Scan(); + String prefix = "testRowOne"; + WhileMatchFilter filter = new WhileMatchFilter(new PrefixFilter( + Bytes.toBytes(prefix))); + s.setFilter(filter); + s.setReversed(true); + + InternalScanner scanner = this.region.getScanner(s); + while (true) { + ArrayList values = new ArrayList(); + boolean isMoreResults = scanner.next(values); + if (!isMoreResults + || !Bytes.toString(values.get(0).getRow()).startsWith(prefix)) { + Assert.assertTrue( + "The WhileMatchFilter should now filter all remaining", + filter.filterAllRemaining()); + } + if (!isMoreResults) { + break; + } + } + scanner.close(); + } + /** * Tests the the {@link WhileMatchFilter} works in combination with a * {@link Filter} that uses the Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 1542632) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -62,11 +62,11 @@ public static final Log LOG = LogFactory.getLog(ScannerCallable.class); private long scannerId = -1L; - private boolean instantiated = false; + protected boolean instantiated = false; private boolean closed = false; private Scan scan; private int caching = 1; - private ScanMetrics scanMetrics; + protected ScanMetrics scanMetrics; private boolean logScannerActivity = false; private int logCutOffLatency = 1000; private static String myAddress; @@ -79,7 +79,7 @@ } // indicate if it is a remote server call - private boolean isRegionServerRemote = true; + protected boolean isRegionServerRemote = true; private long nextCallSeq = 0; /** @@ -135,7 +135,7 @@ * compare the local machine hostname with region server's hostname * to decide if hbase client connects to a remote region server */ - private void checkIfRegionServerIsRemote() { + protected void checkIfRegionServerIsRemote() { if (getLocation().getHostname().equalsIgnoreCase(myAddress)) { isRegionServerRemote = false; } else { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1542632) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -4011,4 +4012,563 @@ assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv))); } + + public void testReverseScanner_FromMemStore_SingleCF_Normal() + throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, + null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + + Scan scan = new Scan(rowC); + scan.setMaxVersions(5); + scan.setReversed(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_LargerKey() + throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, + null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + + Scan scan = new Scan(rowD); + List currRow = new ArrayList(); + scan.setReversed(true); + scan.setMaxVersions(5); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_FullScan() + throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, + null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + Scan scan = new Scan(); + List currRow = new ArrayList(); + scan.setReversed(true); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_moreRowsMayExistAfter() throws IOException { + // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReversed(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReversed(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_smaller_blocksize() throws IOException { + // case to ensure no conflict with HFile index optimization + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration config = new HBaseConfiguration(); + config.setInt("test.block.size", 1); + this.region = initHRegion(tableName, method, config, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReversed(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReversed(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1() + throws IOException { + byte[] row0 = Bytes.toBytes("row0"); // 1 kv + byte[] row1 = Bytes.toBytes("row1"); // 2 kv + byte[] row2 = Bytes.toBytes("row2"); // 4 kv + byte[] row3 = Bytes.toBytes("row3"); // 2 kv + byte[] row4 = Bytes.toBytes("row4"); // 5 kv + byte[] row5 = Bytes.toBytes("row5"); // 2 kv + byte[] cf1 = Bytes.toBytes("CF1"); + byte[] cf2 = Bytes.toBytes("CF2"); + byte[] cf3 = Bytes.toBytes("CF3"); + byte[][] families = { cf1, cf2, cf3 }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + // disable compactions in this test. + conf.setInt("hbase.hstore.compactionThreshold", 10000); + this.region = initHRegion(tableName, method, conf, families); + try { + // kv naming style: kv(row number) totalKvCountInThisRow seq no + KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put, + null); + KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1, + KeyValue.Type.Put, null); + KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put, + null); + KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put, + null); + KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4, + KeyValue.Type.Put, null); + KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4, + KeyValue.Type.Put, null); + KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put, + null); + KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put, + null); + KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5, + KeyValue.Type.Put, null); + KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3, + KeyValue.Type.Put, null); + KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put, + null); + // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv) + Put put = null; + put = new Put(row1); + put.add(kv1_2_1); + region.put(put); + put = new Put(row2); + put.add(kv2_4_1); + region.put(put); + put = new Put(row4); + put.add(kv4_5_4); + put.add(kv4_5_5); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv) + put = new Put(row4); + put.add(kv4_5_1); + put.add(kv4_5_3); + region.put(put); + put = new Put(row1); + put.add(kv1_2_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_4); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv) + put = new Put(row4); + put.add(kv4_5_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_2); + put.add(kv2_4_3); + region.put(put); + put = new Put(row3); + put.add(kv3_2_2); + region.put(put); + region.flushcache(); + // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max) + // ( 2 kv) + put = new Put(row0); + put.add(kv0_1_1); + region.put(put); + put = new Put(row3); + put.add(kv3_2_1); + region.put(put); + put = new Put(row5); + put.add(kv5_2_1); + put.add(kv5_2_2); + region.put(put); + // scan range = ["row4", min), skip the max "row5" + Scan scan = new Scan(row4); + scan.setMaxVersions(5); + scan.setBatch(3); + scan.setReversed(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = false; + // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not + // included in scan range + // "row4" takes 2 next() calls since batch=3 + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + // 2. scan out "row3" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row3)); + assertTrue(hasNext); + // 3. scan out "row2" (4 kvs) + // "row2" takes 2 next() calls since batch=3 + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + // 4. scan out "row1" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row1)); + assertTrue(hasNext); + // 5. scan out "row0" (1 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row0)); + assertFalse(hasNext); + + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2() + throws IOException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] row2 = Bytes.toBytes("row2"); + byte[] row3 = Bytes.toBytes("row3"); + byte[] row4 = Bytes.toBytes("row4"); + byte[] cf1 = Bytes.toBytes("CF1"); + byte[] cf2 = Bytes.toBytes("CF2"); + byte[] cf3 = Bytes.toBytes("CF3"); + byte[] cf4 = Bytes.toBytes("CF4"); + byte[][] families = { cf1, cf2, cf3, cf4 }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + // disable compactions in this test. + conf.setInt("hbase.hstore.compactionThreshold", 10000); + this.region = initHRegion(tableName, method, conf, families); + try { + KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null); + KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null); + // storefile1 + Put put = new Put(row1); + put.add(kv1); + region.put(put); + region.flushcache(); + // storefile2 + put = new Put(row2); + put.add(kv2); + region.put(put); + region.flushcache(); + // storefile3 + put = new Put(row3); + put.add(kv3); + region.put(put); + region.flushcache(); + // memstore + put = new Put(row4); + put.add(kv4); + region.put(put); + // scan range = ["row4", min) + Scan scan = new Scan(row4); + scan.setReversed(true); + scan.setBatch(10); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row3)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row1)); + assertFalse(hasNext); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + private static HRegion initHRegion(byte[] tableName, String callingMethod, + byte[]... families) throws IOException { + return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), + families); + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java (revision 0) @@ -0,0 +1,192 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + +/** + * ReversedKeyValueHeap is used for supporting reversed scanning. Compared with + * KeyValueHeap, its scanner comparator is a little different (see + * ReversedKVScannerComparator), all seek is backward seek(see + * {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row + * if it is already at the end of one row when calling next(). + */ +@InterfaceAudience.Private +public class ReversedKeyValueHeap extends KeyValueHeap { + + /** + * @param scanners + * @param comparator + * @throws IOException + */ + public ReversedKeyValueHeap(List scanners, + KVComparator comparator) throws IOException { + super(scanners, new ReversedKVScannerComparator(comparator)); + } + + @Override + public boolean seek(KeyValue seekKey) throws IOException { + throw new IllegalStateException( + "seek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean reseek(KeyValue seekKey) throws IOException { + throw new IllegalStateException( + "reseek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean requestSeek(KeyValue key, boolean forward, boolean useBloom) + throws IOException { + throw new IllegalStateException( + "requestSeek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean seekToPreviousRow(KeyValue seekKey) throws IOException { + if (current == null) { + return false; + } + heap.add(current); + current = null; + + KeyValueScanner scanner; + while ((scanner = heap.poll()) != null) { + KeyValue topKey = scanner.peek(); + if (comparator.getComparator().compareRows(topKey.getBuffer(), + topKey.getRowOffset(), topKey.getRowLength(), seekKey.getBuffer(), + seekKey.getRowOffset(), seekKey.getRowLength()) < 0) { + // Row of Top KeyValue is before Seek row. + heap.add(scanner); + current = pollRealKV(); + return current != null; + } + + if (!scanner.seekToPreviousRow(seekKey)) { + scanner.close(); + } else { + heap.add(scanner); + } + } + + // Heap is returning empty, scanner is done + return false; + } + + @Override + public boolean backwardSeek(KeyValue seekKey) throws IOException { + if (current == null) { + return false; + } + heap.add(current); + current = null; + + KeyValueScanner scanner; + while ((scanner = heap.poll()) != null) { + KeyValue topKey = scanner.peek(); + if ((comparator.getComparator().matchingRows(seekKey, topKey) && comparator + .getComparator().compare(seekKey, topKey) <= 0) + || comparator.getComparator().compareRows(seekKey, topKey) > 0) { + heap.add(scanner); + current = pollRealKV(); + return current != null; + } + if (!scanner.backwardSeek(seekKey)) { + scanner.close(); + } else { + heap.add(scanner); + } + } + return false; + } + + @Override + public KeyValue next() throws IOException { + if (this.current == null) { + return null; + } + KeyValue kvReturn = this.current.next(); + KeyValue kvNext = this.current.peek(); + if (kvNext == null + || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) { + if (this.current.seekToPreviousRow(kvReturn)) { + this.heap.add(this.current); + } else { + this.current.close(); + } + this.current = pollRealKV(); + } else { + KeyValueScanner topScanner = this.heap.peek(); + if (topScanner != null + && this.comparator.compare(this.current, topScanner) > 0) { + this.heap.add(this.current); + this.current = pollRealKV(); + } + } + return kvReturn; + } + + /** + * In ReversedKVScannerComparator, we compare the row of scanners' peek values + * first, sort bigger one before the smaller one. Then compare the KeyValue if + * they have the equal row, sort smaller one before the bigger one + */ + private static class ReversedKVScannerComparator extends + KVScannerComparator { + + /** + * Constructor + * @param kvComparator + */ + public ReversedKVScannerComparator(KVComparator kvComparator) { + super(kvComparator); + } + + @Override + public int compare(KeyValueScanner left, KeyValueScanner right) { + int rowComparison = compareRows(left.peek(), right.peek()); + if (rowComparison != 0) { + return -rowComparison; + } + return super.compare(left, right); + } + + /** + * Compares rows of two KeyValue + * @param left + * @param right + * @return less than 0 if left is smaller, 0 if equal etc.. + */ + public int compareRows(KeyValue left, KeyValue right) { + return super.kvComparator.compareRows(left, right); + } + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException("Not implemented"); + } +} Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java (revision 0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java (revision 0) @@ -0,0 +1,140 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A reversed ScannerCallable which supports backward scanning. + */ +@InterfaceAudience.Public +public class ReversedScannerCallable extends ScannerCallable { + /** + * The start row for locating regions. In reversed scanner, may locate the + * regions for a range of keys when doing + * {@link ReversedClientScanner#nextScanner(int, boolean)} + */ + protected final byte[] locateStartRow; + + /** + * + * @param connection + * @param tableName + * @param scan + * @param scanMetrics + * @param locateStartRow The start row for locating regions + */ + public ReversedScannerCallable(HConnection connection, TableName tableName, + Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) { + super(connection, tableName, scan, scanMetrics); + this.locateStartRow = locateStartRow; + } + + /** + * @param reload force reload of server location + * @throws IOException + */ + @Override + public void prepare(boolean reload) throws IOException { + if (!instantiated || reload) { + if (locateStartRow == null) { + // Just locate the region with the row + this.location = connection.getRegionLocation(tableName, row, reload); + if (this.location == null) { + throw new IOException("Failed to find location, tableName=" + + tableName + ", row=" + Bytes.toString(row) + ", reload=" + + reload); + } + } else { + // Need to locate the regions with the range, and the target location is + // the last one which is the previous region of last region scanner + List locatedRegions = locateRegionsInRange( + locateStartRow, row, reload); + if (locatedRegions.isEmpty()) { + throw new DoNotRetryIOException( + "Does .META. exist hole? Couldn't get regions for the range from " + + Bytes.toStringBinary(locateStartRow) + " to " + + Bytes.toStringBinary(row)); + } + this.location = locatedRegions.get(locatedRegions.size() - 1); + } + setStub(getConnection().getClient(getLocation().getServerName())); + checkIfRegionServerIsRemote(); + instantiated = true; + } + + // check how often we retry. + // HConnectionManager will call instantiateServer with reload==true + // if and only if for retries. + if (reload && this.scanMetrics != null) { + this.scanMetrics.countOfRPCRetries.incrementAndGet(); + if (isRegionServerRemote) { + this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet(); + } + } + } + + /** + * Get the corresponding regions for an arbitrary range of keys. + * @param tableName + * @param startKey Starting row in range, inclusive + * @param endKey Ending row in range, exclusive + * @param reload force reload of server location + * @return A list of HRegionLocation corresponding to the regions that contain + * the specified range + * @throws IOException + */ + private List locateRegionsInRange(byte[] startKey, + byte[] endKey, boolean reload) throws IOException { + final boolean endKeyIsEndOfTable = Bytes.equals(endKey, + HConstants.EMPTY_END_ROW); + if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { + throw new IllegalArgumentException("Invalid range: " + + Bytes.toStringBinary(startKey) + " > " + + Bytes.toStringBinary(endKey)); + } + List regionList = new ArrayList(); + byte[] currentKey = startKey; + do { + HRegionLocation regionLocation = connection.getRegionLocation(tableName, + currentKey, reload); + if (regionLocation.getRegionInfo().containsRow(currentKey)) { + regionList.add(regionLocation); + } else { + throw new DoNotRetryIOException("Does .META. exist hole? Locating row " + + Bytes.toStringBinary(currentKey) + " returns incorrect region " + + regionLocation.getRegionInfo()); + } + currentKey = regionLocation.getRegionInfo().getEndKey(); + } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) + && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0)); + return regionList; + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -132,6 +132,8 @@ private final boolean isUserScan; + private final boolean isReversed; + /** * Construct a QueryMatcher for a scan * @param scan @@ -186,6 +188,7 @@ this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); } + this.isReversed = scan.isReversed(); } /** @@ -258,15 +261,24 @@ int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength, bytes, offset, rowLength); - if (ret <= -1) { - return MatchCode.DONE; - } else if (ret >= 1) { - // could optimize this, if necessary? - // Could also be called SEEK_TO_CURRENT_ROW, but this - // should be rare/never happens. - return MatchCode.SEEK_NEXT_ROW; + if (!this.isReversed) { + if (ret <= -1) { + return MatchCode.DONE; + } else if (ret >= 1) { + // could optimize this, if necessary? + // Could also be called SEEK_TO_CURRENT_ROW, but this + // should be rare/never happens. + return MatchCode.SEEK_NEXT_ROW; + } + } else { + if (ret <= -1) { + return MatchCode.SEEK_NEXT_ROW; + } else if (ret >= 1) { + return MatchCode.DONE; + } } + // optimize case. if (this.stickyNextRow) return MatchCode.SEEK_NEXT_ROW; @@ -454,6 +466,14 @@ } public boolean moreRowsMayExistAfter(KeyValue kv) { + if (this.isReversed) { + if (rowComparator.compareRows(kv.getBuffer(), kv.getRowOffset(), + kv.getRowLength(), stopRow, 0, stopRow.length) <= 0) { + return false; + } else { + return true; + } + } if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) && rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(), kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java (revision 0) @@ -0,0 +1,54 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; + +/** + * A "non-reversed & non-lazy" scanner which does not support backward scanning + * and always does a real seek operation. Most scanners are inherited from this + * class. + */ +@InterfaceAudience.Private +public abstract class NonReversedNonLazyKeyValueScanner extends + NonLazyKeyValueScanner { + + @Override + public boolean backwardSeek(KeyValue key) throws IOException { + throw new NotImplementedException("backwardSeek must not be called on a " + + "non-reversed scanner"); + } + + @Override + public boolean seekToPreviousRow(KeyValue key) throws IOException { + throw new NotImplementedException("seekToPreviousRow must not be called on a " + + "non-reversed scanner"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException("seekToLastRow must not be called on a " + + "non-reversed scanner"); + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; +import org.apache.hadoop.hbase.util.Bytes; /** * KeyValueScanner adaptor over the Reader. It also provides hooks into @@ -54,6 +55,9 @@ private boolean enforceMVCC = false; private boolean hasMVCCInfo = false; + // A flag represents whether could stop skipping KeyValues for MVCC + // if have encountered the next row. Only used for reversed scan + private boolean stopSkippingKVsIfNextRow = false; private static AtomicLong seekCount; @@ -186,11 +190,18 @@ protected boolean skipKVsNewerThanReadpoint() throws IOException { // We want to ignore all key-values that are newer than our current // readPoint + KeyValue startKV = cur; while(enforceMVCC && cur != null && (cur.getMvccVersion() > readPt)) { hfs.next(); cur = hfs.getKeyValue(); + if (this.stopSkippingKVsIfNextRow + && Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(), + cur.getRowLength(), startKV.getBuffer(), startKV.getRowOffset(), + startKV.getRowLength()) > 0) { + return false; + } } if (cur == null) { @@ -389,4 +400,76 @@ return reader.passesTimerangeFilter(scan, oldestUnexpiredTS) && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns); } + + @Override + public boolean seekToPreviousRow(KeyValue key) throws IOException { + try { + try { + KeyValue seekKey = KeyValue.createFirstOnRow(key.getRow()); + if (seekCount != null) seekCount.incrementAndGet(); + if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(), + seekKey.getKeyLength())) { + close(); + return false; + } + KeyValue firstKeyOfPreviousRow = KeyValue.createFirstOnRow(hfs + .getKeyValue().getRow()); + + if (seekCount != null) seekCount.incrementAndGet(); + if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) { + close(); + return false; + } + + cur = hfs.getKeyValue(); + this.stopSkippingKVsIfNextRow = true; + boolean resultOfSkipKVs; + try { + resultOfSkipKVs = skipKVsNewerThanReadpoint(); + } finally { + this.stopSkippingKVsIfNextRow = false; + } + if (!resultOfSkipKVs + || Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(), + cur.getRowLength(), firstKeyOfPreviousRow.getBuffer(), + firstKeyOfPreviousRow.getRowOffset(), + firstKeyOfPreviousRow.getRowLength()) > 0) { + return seekToPreviousRow(firstKeyOfPreviousRow); + } + + return true; + } finally { + realSeekDone = true; + } + } catch (IOException ioe) { + throw new IOException("Could not seekToPreviousRow " + this + " to key " + + key, ioe); + } + } + + @Override + public boolean seekToLastRow() throws IOException { + byte[] lastRow = reader.getLastRowKey(); + if (lastRow == null) { + return false; + } + KeyValue seekKey = KeyValue.createFirstOnRow(lastRow); + if (seek(seekKey)) { + return true; + } else { + return seekToPreviousRow(seekKey); + } + } + + @Override + public boolean backwardSeek(KeyValue key) throws IOException { + seek(key); + if (cur == null + || Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(), + cur.getRowLength(), key.getBuffer(), key.getRowOffset(), + key.getRowLength()) > 0) { + return seekToPreviousRow(key); + } + return true; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -1688,7 +1688,9 @@ scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); } if (scanner == null) { - scanner = new StoreScanner(this, getScanInfo(), scan, targetCols, readPt); + scanner = scan.isReversed() ? new ReversedStoreScanner(this, + getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, + getScanInfo(), scan, targetCols, readPt); } return scanner; } finally { Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (revision 0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (revision 0) @@ -0,0 +1,169 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A reversed client scanner which support backward scanning + */ +@InterfaceAudience.Public +public class ReversedClientScanner extends ClientScanner { + private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); + // A byte array in which all elements are the max byte, and it is used to + // construct closest front row + static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); + /** + * Create a new ReversibleClientScanner for the specified table Note that the + * passed {@link Scan}'s start row maybe changed. + * @param conf The {@link Configuration} to use. + * @param scan {@link Scan} to use in this scanner + * @param tableName The table that we wish to scan + * @param connection Connection identifying the cluster + * @throws IOException + */ + public ReversedClientScanner(Configuration conf, Scan scan, + TableName tableName, HConnection connection) throws IOException { + super(conf, scan, tableName, connection); + } + + @Override + protected boolean nextScanner(int nbRows, final boolean done) + throws IOException { + // Close the previous scanner if it's open + if (this.callable != null) { + this.callable.setClose(); + this.caller.callWithRetries(callable); + this.callable = null; + } + + // Where to start the next scanner + byte[] localStartKey; + boolean locateTheClosestFrontRow = true; + // if we're at start of table, close and return false to stop iterating + if (this.currentRegion != null) { + byte[] startKey = this.currentRegion.getStartKey(); + if (startKey == null + || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(startKey) || done) { + close(); + if (LOG.isDebugEnabled()) { + LOG.debug("Finished " + this.currentRegion); + } + return false; + } + localStartKey = startKey; + if (LOG.isDebugEnabled()) { + LOG.debug("Finished " + this.currentRegion); + } + } else { + localStartKey = this.scan.getStartRow(); + if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) { + locateTheClosestFrontRow = false; + } + } + + if (LOG.isDebugEnabled() && this.currentRegion != null) { + // Only worth logging if NOT first region in scan. + LOG.debug("Advancing internal scanner to startKey at '" + + Bytes.toStringBinary(localStartKey) + "'"); + } + try { + // In reversed scan, we want to locate the previous region through current + // region's start key. In order to get that previous region, first we + // create a closest row before the start key of current region, then + // locate all the regions from the created closest row to start key of + // current region, thus the last one of located regions should be the + // previous region of current region. The related logic of locating + // regions is implemented in ReversedScannerCallable + byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey) + : null; + callable = getScannerCallable(localStartKey, nbRows, locateStartRow); + // Open a scanner on the region server starting at the + // beginning of the region + this.caller.callWithRetries(callable); + this.currentRegion = callable.getHRegionInfo(); + if (this.scanMetrics != null) { + this.scanMetrics.countOfRegions.incrementAndGet(); + } + } catch (IOException e) { + close(); + throw e; + } + return true; + } + + protected ScannerCallable getScannerCallable(byte[] localStartKey, + int nbRows, byte[] locateStartRow) { + scan.setStartRow(localStartKey); + ScannerCallable s = new ReversedScannerCallable(getConnection(), + getTable(), scan, this.scanMetrics, locateStartRow); + s.setCaching(nbRows); + return s; + } + + @Override + // returns true if stopRow >= passed region startKey + protected boolean checkScanStopRow(final byte[] startKey) { + if (this.scan.getStopRow().length > 0) { + // there is a stop row, check to see if we are past it. + byte[] stopRow = scan.getStopRow(); + int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0, + startKey.length); + if (cmp >= 0) { + // stopRow >= startKey (stopRow is equals to or larger than endKey) + // This is a stop. + return true; + } + } + return false; // unlikely. + } + + /** + * Create the closest row before the specified row + * @param row + * @return a new byte array which is the closest front row of the specified one + */ + private byte[] createClosestRowBefore(byte[] row) { + if (row == null) { + throw new IllegalArgumentException("The passed row is empty"); + } + if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) { + return MAX_BYTE_ARRAY; + } + if (row[row.length - 1] == 0) { + return Arrays.copyOf(row, row.length - 1); + } else { + byte[] closestFrontRow = Arrays.copyOf(row, row.length); + closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1); + closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY); + return closestFrontRow; + } + } + +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java (revision 0) @@ -0,0 +1,703 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; +/** + * Test cases against ReversibleKeyValueScanner + */ +@Category(MediumTests.class) +public class TestReversibleScanners { + private static final Log LOG = LogFactory.getLog(TestReversibleScanners.class); + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static byte[] FAMILYNAME = Bytes.toBytes("testCf"); + private static long TS = System.currentTimeMillis(); + private static int MAXMVCC = 7; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static final int ROWSIZE = 200; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + private static byte[] QUAL = Bytes.toBytes("testQual"); + private static final int QUALSIZE = 5; + private static byte[][] QUALS = makeN(QUAL, QUALSIZE); + private static byte[] VALUE = Bytes.toBytes("testValue"); + private static final int VALUESIZE = 3; + private static byte[][] VALUES = makeN(VALUE, VALUESIZE); + + @Test + public void testReversibleStoreFileScanner() throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path hfilePath = new Path(new Path( + TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), + "regionname"), "familyname"); + CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + HFileContextBuilder hcBuilder = new HFileContextBuilder(); + hcBuilder.withBlockSize(2 * 1024); + HFileContext hFileContext = hcBuilder.build(); + StoreFile.Writer writer = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( + hfilePath).withFileContext(hFileContext).build(); + writeStoreFile(writer); + + StoreFile sf = new StoreFile(fs, writer.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE); + + List scanners = StoreFileScanner + .getScannersForStoreFiles(Collections.singletonList(sf), false, true, + false, Long.MAX_VALUE); + StoreFileScanner scanner = scanners.get(0); + seekTestOfReversibleKeyValueScanner(scanner); + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + scanners = StoreFileScanner.getScannersForStoreFiles( + Collections.singletonList(sf), false, true, false, readPoint); + seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); + } + } + + @Test + public void testReversibleMemstoreScanner() throws IOException { + MemStore memstore = new MemStore(); + writeMemstore(memstore); + List scanners = memstore.getScanners(Long.MAX_VALUE); + seekTestOfReversibleKeyValueScanner(scanners.get(0)); + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + scanners = memstore.getScanners(readPoint); + seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); + } + + } + + @Test + public void testReversibleKeyValueHeap() throws IOException { + // write data to one memstore and two store files + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path hfilePath = new Path(new Path( + TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), + "familyname"); + CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + HFileContextBuilder hcBuilder = new HFileContextBuilder(); + hcBuilder.withBlockSize(2 * 1024); + HFileContext hFileContext = hcBuilder.build(); + StoreFile.Writer writer1 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( + hfilePath).withFileContext(hFileContext).build(); + StoreFile.Writer writer2 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( + hfilePath).withFileContext(hFileContext).build(); + + MemStore memstore = new MemStore(); + writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1, + writer2 }); + + StoreFile sf1 = new StoreFile(fs, writer1.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE); + + StoreFile sf2 = new StoreFile(fs, writer2.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE); + /** + * Test without MVCC + */ + int startRowNum = ROWSIZE / 2; + ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, + ROWS[startRowNum], MAXMVCC); + internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); + + startRowNum = ROWSIZE - 1; + kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, + HConstants.EMPTY_START_ROW, MAXMVCC); + internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); + + /** + * Test with MVCC + */ + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + startRowNum = ROWSIZE - 1; + kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, + HConstants.EMPTY_START_ROW, readPoint); + for (int i = startRowNum; i >= 0; i--) { + if (i - 2 < 0) break; + i = i - 2; + kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1])); + Pair nextReadableNum = getNextReadableNumWithBackwardScan( + i, 0, readPoint); + if (nextReadableNum == null) break; + KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), + nextReadableNum.getSecond()); + assertEquals(expecedKey, kvHeap.peek()); + i = nextReadableNum.getFirst(); + int qualNum = nextReadableNum.getSecond(); + if (qualNum + 1 < QUALSIZE) { + kvHeap.backwardSeek(makeKV(i, qualNum + 1)); + nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, + readPoint); + if (nextReadableNum == null) break; + expecedKey = makeKV(nextReadableNum.getFirst(), + nextReadableNum.getSecond()); + assertEquals(expecedKey, kvHeap.peek()); + i = nextReadableNum.getFirst(); + qualNum = nextReadableNum.getSecond(); + } + + kvHeap.next(); + + if (qualNum + 1 >= QUALSIZE) { + nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, + readPoint); + } else { + nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, + readPoint); + } + if (nextReadableNum == null) break; + expecedKey = makeKV(nextReadableNum.getFirst(), + nextReadableNum.getSecond()); + assertEquals(expecedKey, kvHeap.peek()); + i = nextReadableNum.getFirst(); + } + } + } + + @Test + public void testReversibleStoreScanner() throws IOException { + // write data to one memstore and two store files + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path hfilePath = new Path(new Path( + TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), + "familyname"); + CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + HFileContextBuilder hcBuilder = new HFileContextBuilder(); + hcBuilder.withBlockSize(2 * 1024); + HFileContext hFileContext = hcBuilder.build(); + StoreFile.Writer writer1 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( + hfilePath).withFileContext(hFileContext).build(); + StoreFile.Writer writer2 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( + hfilePath).withFileContext(hFileContext).build(); + + MemStore memstore = new MemStore(); + writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1, + writer2 }); + + StoreFile sf1 = new StoreFile(fs, writer1.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE); + + StoreFile sf2 = new StoreFile(fs, writer2.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE); + + ScanType scanType = ScanType.USER_SCAN; + ScanInfo scanInfo = new ScanInfo(FAMILYNAME, 0, Integer.MAX_VALUE, + Long.MAX_VALUE, false, 0, KeyValue.COMPARATOR); + + // Case 1.Test a full reversed scan + Scan scan = new Scan(); + scan.setReversed(true); + StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, + scan, scanType, scanInfo, MAXMVCC); + verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false); + + // Case 2.Test reversed scan with a specified start row + int startRowNum = ROWSIZE / 2; + byte[] startRow = ROWS[startRowNum]; + scan.setStartRow(startRow); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, + scanType, scanInfo, MAXMVCC); + verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), + startRowNum + 1, false); + + // Case 3.Test reversed scan with a specified start row and specified + // qualifiers + assertTrue(QUALSIZE > 2); + scan.addColumn(FAMILYNAME, QUALS[0]); + scan.addColumn(FAMILYNAME, QUALS[2]); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, + scanType, scanInfo, MAXMVCC); + verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, + false); + + // Case 4.Test reversed scan with mvcc based on case 3 + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, + scanType, scanInfo, readPoint); + int expectedRowCount = 0; + int expectedKVCount = 0; + for (int i = startRowNum; i >= 0; i--) { + int kvCount = 0; + if (makeMVCC(i, 0) <= readPoint) { + kvCount++; + } + if (makeMVCC(i, 2) <= readPoint) { + kvCount++; + } + if (kvCount > 0) { + expectedRowCount++; + expectedKVCount += kvCount; + } + } + verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, + false); + } + } + + @Test + public void testReversibleRegionScanner() throws IOException { + byte[] tableName = Bytes.toBytes("testtable"); + byte[] FAMILYNAME2 = Bytes.toBytes("testCf2"); + Configuration conf = HBaseConfiguration.create(); + HRegion region = TEST_UTIL.createLocalHRegion(tableName, null, null, + "testReversibleRegionScanner", conf, false, Durability.SYNC_WAL, null, + FAMILYNAME, FAMILYNAME2); + loadDataToRegion(region, FAMILYNAME2); + + // verify row count with forward scan + Scan scan = new Scan(); + InternalScanner scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true); + + // Case1:Full reversed scan + scan.setReversed(true); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false); + + // Case2:Full reversed scan with one family + scan = new Scan(); + scan.setReversed(true); + scan.addFamily(FAMILYNAME); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false); + + // Case3:Specify qualifiers + One family + byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] }; + for (byte[] specifiedQualifier : specifiedQualifiers) + scan.addColumn(FAMILYNAME, specifiedQualifier); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false); + + // Case4:Specify qualifiers + Two families + for (byte[] specifiedQualifier : specifiedQualifiers) + scan.addColumn(FAMILYNAME2, specifiedQualifier); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false); + + // Case5: Case4 + specify start row + int startRowNum = ROWSIZE * 3 / 4; + scan.setStartRow(ROWS[startRowNum]); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), + false); + + // Case6: Case4 + specify stop row + int stopRowNum = ROWSIZE / 4; + scan.setStartRow(HConstants.EMPTY_BYTE_ARRAY); + scan.setStopRow(ROWS[stopRowNum]); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE + - stopRowNum - 1), false); + + // Case7: Case4 + specify start row + specify stop row + scan.setStartRow(ROWS[startRowNum]); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, + (startRowNum - stopRowNum), false); + + // Case8: Case7 + SingleColumnValueFilter + int valueNum = startRowNum % VALUESIZE; + Filter filter = new SingleColumnValueFilter(FAMILYNAME, + specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]); + scan.setFilter(filter); + scanner = region.getScanner(scan); + int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE + + (stopRowNum / VALUESIZE == valueNum ? 0 : 1); + verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, + false); + + // Case9: Case7 + PageFilter + int pageSize = 10; + filter = new PageFilter(pageSize); + scan.setFilter(filter); + scanner = region.getScanner(scan); + int expectedRowNum = pageSize; + verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); + + // Case10: Case7 + FilterList+MUST_PASS_ONE + SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter( + FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]); + SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter( + FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]); + expectedRowNum = 0; + for (int i = startRowNum; i > stopRowNum; i--) { + if (i % VALUESIZE == 0 || i % VALUESIZE == 1) { + expectedRowNum++; + } + } + filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2); + scan.setFilter(filter); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); + + // Case10: Case7 + FilterList+MUST_PASS_ALL + filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2); + expectedRowNum = 0; + scan.setFilter(filter); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); + } + + private StoreScanner getReversibleStoreScanner(MemStore memstore, + StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType, + ScanInfo scanInfo, int readPoint) throws IOException { + List scanners = getScanners(memstore, sf1, sf2, null, + false, readPoint); + NavigableSet columns = null; + for (Map.Entry> entry : scan.getFamilyMap() + .entrySet()) { + // Should only one family + columns = entry.getValue(); + } + StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, + scanType, columns, scanners); + return storeScanner; + } + + private void verifyCountAndOrder(InternalScanner scanner, + int expectedKVCount, int expectedRowCount, boolean forward) + throws IOException { + List kvList = new ArrayList(); + Result lastResult = null; + int rowCount = 0; + int kvCount = 0; + try { + while (scanner.next(kvList)) { + if (kvList.isEmpty()) continue; + rowCount++; + kvCount += kvList.size(); + if (lastResult != null) { + Result curResult = Result.create(kvList); + assertEquals("LastResult:" + lastResult + "CurResult:" + curResult, + forward, + Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0); + } + lastResult = Result.create(kvList); + kvList.clear(); + } + } finally { + scanner.close(); + } + if (!kvList.isEmpty()) { + rowCount++; + kvCount += kvList.size(); + kvList.clear(); + } + assertEquals(expectedKVCount, kvCount); + assertEquals(expectedRowCount, rowCount); + } + + private void internalTestSeekAndNextForReversibleKeyValueHeap( + ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException { + // Test next and seek + for (int i = startRowNum; i >= 0; i--) { + if (i % 2 == 1 && i - 2 >= 0) { + i = i - 2; + kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1])); + } + for (int j = 0; j < QUALSIZE; j++) { + if (j % 2 == 1 && (j + 1) < QUALSIZE) { + j = j + 1; + kvHeap.backwardSeek(makeKV(i, j)); + } + assertEquals(makeKV(i, j), kvHeap.peek()); + kvHeap.next(); + } + } + assertEquals(null, kvHeap.peek()); + } + + private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, + StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint) + throws IOException { + List scanners = getScanners(memstore, sf1, sf2, startRow, + true, readPoint); + ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, + KeyValue.COMPARATOR); + return kvHeap; + } + + private List getScanners(MemStore memstore, StoreFile sf1, + StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint) + throws IOException { + List fileScanners = StoreFileScanner + .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, + false, readPoint); + List memScanners = memstore.getScanners(readPoint); + List scanners = new ArrayList( + fileScanners.size() + 1); + scanners.addAll(fileScanners); + scanners.addAll(memScanners); + + if (doSeek) { + if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) { + for (KeyValueScanner scanner : scanners) { + scanner.seekToLastRow(); + } + } else { + KeyValue startKey = KeyValue.createFirstOnRow(startRow); + for (KeyValueScanner scanner : scanners) { + scanner.backwardSeek(startKey); + } + } + } + return scanners; + } + + private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) + throws IOException { + /** + * Test without MVCC + */ + // Test seek to last row + assertTrue(scanner.seekToLastRow()); + assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek()); + + // Test backward seek in three cases + // Case1: seek in the same row in backwardSeek + KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2); + assertTrue(scanner.backwardSeek(seekKey)); + assertEquals(seekKey, scanner.peek()); + + // Case2: seek to the previous row in backwardSeek + int seekRowNum = ROWSIZE - 2; + assertTrue(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[seekRowNum]))); + KeyValue expectedKey = makeKV(seekRowNum - 1, 0); + assertEquals(expectedKey, scanner.peek()); + + // Case3: unable to backward seek + assertFalse(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[0]))); + assertEquals(null, scanner.peek()); + + // Test seek to previous row + seekRowNum = ROWSIZE - 4; + assertTrue(scanner.seekToPreviousRow(KeyValue + .createFirstOnRow(ROWS[seekRowNum]))); + expectedKey = makeKV(seekRowNum - 1, 0); + assertEquals(expectedKey, scanner.peek()); + + // Test seek to previous row for the first row + assertFalse(scanner.seekToPreviousRow(makeKV(0, 0))); + assertEquals(null, scanner.peek()); + + } + + private void seekTestOfReversibleKeyValueScannerWithMVCC( + KeyValueScanner scanner, int readPoint) throws IOException { + /** + * Test with MVCC + */ + // Test seek to last row + KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan( + ROWSIZE - 1, 0, readPoint); + assertEquals(expectedKey != null, scanner.seekToLastRow()); + assertEquals(expectedKey, scanner.peek()); + + // Test backward seek in two cases + // Case1: seek in the same row in backwardSeek + expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, + QUALSIZE - 2, readPoint); + assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey)); + assertEquals(expectedKey, scanner.peek()); + + // Case2: seek to the previous row in backwardSeek + int seekRowNum = ROWSIZE - 3; + KeyValue seekKey = KeyValue.createLastOnRow(ROWS[seekRowNum]); + expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, + readPoint); + assertEquals(expectedKey != null, scanner.backwardSeek(seekKey)); + assertEquals(expectedKey, scanner.peek()); + + // Test seek to previous row + seekRowNum = ROWSIZE - 4; + expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, + readPoint); + assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValue + .createFirstOnRow(ROWS[seekRowNum]))); + assertEquals(expectedKey, scanner.peek()); + } + + private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, + int startQualNum, int readPoint) { + Pair nextReadableNum = getNextReadableNumWithBackwardScan( + startRowNum, startQualNum, readPoint); + if (nextReadableNum == null) + return null; + return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); + } + + private Pair getNextReadableNumWithBackwardScan( + int startRowNum, int startQualNum, int readPoint) { + Pair nextReadableNum = null; + boolean findExpected = false; + for (int i = startRowNum; i >= 0; i--) { + for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) { + if (makeMVCC(i, j) <= readPoint) { + nextReadableNum = new Pair(i, j); + findExpected = true; + break; + } + } + if (findExpected) + break; + } + return nextReadableNum; + } + + private static void loadDataToRegion(HRegion region, byte[] additionalFamily) + throws IOException { + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + for (int j = 0; j < QUALSIZE; j++) { + put.add(makeKV(i, j)); + // put additional family + put.add(makeKV(i, j, additionalFamily)); + } + region.put(put); + if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) { + region.flushcache(); + } + } + } + + private static void writeMemstoreAndStoreFiles(MemStore memstore, + final StoreFile.Writer[] writers) throws IOException { + Random rand = new Random(); + try { + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + if (i % 2 == 0) { + memstore.add(makeKV(i, j)); + } else { + writers[(i + j) % writers.length].append(makeKV(i, j)); + } + } + } + } finally { + for (int i = 0; i < writers.length; i++) { + writers[i].close(); + } + } + } + + private static void writeStoreFile(final StoreFile.Writer writer) + throws IOException { + try { + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + writer.append(makeKV(i, j)); + } + } + } finally { + writer.close(); + } + } + + private static void writeMemstore(MemStore memstore) throws IOException { + // Add half of the keyvalues to memstore + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + if ((i + j) % 2 == 0) { + memstore.add(makeKV(i, j)); + } + } + } + memstore.snapshot(); + // Add another half of the keyvalues to snapshot + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + if ((i + j) % 2 == 1) { + memstore.add(makeKV(i, j)); + } + } + } + } + + private static KeyValue makeKV(int rowNum, int cqNum) { + return makeKV(rowNum, cqNum, FAMILYNAME); + } + + private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) { + KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, + VALUES[rowNum % VALUESIZE]); + kv.setMvccVersion(makeMVCC(rowNum, cqNum)); + return kv; + } + + private static long makeMVCC(int rowNum, int cqNum) { + return (rowNum + cqNum) % (MAXMVCC + 1); + } + + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i))); + } + return ret; + } +} Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1542632) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -713,9 +713,12 @@ if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } - if (scan.isSmall()) { + if (scan.isSmall() && !scan.isReversed()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection); + } else if (scan.isReversed()) { + return new ReversedClientScanner(getConfiguration(), scan, getName(), + this.connection); } return new ClientScanner(getConfiguration(), scan, getName(), this.connection); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (working copy) @@ -26,14 +26,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.NonLazyKeyValueScanner; +import org.apache.hadoop.hbase.regionserver.NonReversedNonLazyKeyValueScanner; /** * Utility scanner that wraps a sortable collection and serves * as a KeyValueScanner. */ @InterfaceAudience.Private -public class CollectionBackedScanner extends NonLazyKeyValueScanner { +public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner { final private Iterable data; final KeyValue.KVComparator comparator; private Iterator iter; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Scan; @@ -49,7 +50,7 @@ * into List for a single row. */ @InterfaceAudience.Private -public class StoreScanner extends NonLazyKeyValueScanner +public class StoreScanner extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { static final Log LOG = LogFactory.getLog(StoreScanner.class); protected Store store; @@ -174,19 +175,8 @@ // family marker. InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { StoreScannerCompactionRace.BEFORE_SEEK.ordinal()}); - if (explicitColumnQuery && lazySeekEnabledGlobally) { - for (KeyValueScanner scanner : scanners) { - scanner.requestSeek(matcher.getStartKey(), false, true); - } - } else { - if (!isParallelSeekEnabled) { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } - } else { - parallelSeek(scanners, matcher.getStartKey()); - } - } + seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery + && lazySeekEnabledGlobally, isParallelSeekEnabled); // set storeLimit this.storeLimit = scan.getMaxResultsPerColumnFamily(); @@ -195,7 +185,7 @@ this.storeOffset = scan.getRowOffsetPerColumnFamily(); // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.getComparator()); + resetKVHeap(scanners, store.getComparator()); InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { StoreScannerCompactionRace.AFTER_SEEK.ordinal()}); } @@ -255,16 +245,10 @@ InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { StoreScannerCompactionRace.BEFORE_SEEK.ordinal()}); // Seek all scanners to the initial key - if (!isParallelSeekEnabled) { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } - } else { - parallelSeek(scanners, matcher.getStartKey()); - } + seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled); // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.getComparator()); + resetKVHeap(scanners, store.getComparator()); InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { StoreScannerCompactionRace.AFTER_SEEK.ordinal()}); } @@ -303,14 +287,8 @@ this.store.addChangedReaderObserver(this); } // Seek all scanners to the initial key - if (!isParallelSeekEnabled) { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } - } else { - parallelSeek(scanners, matcher.getStartKey()); - } - heap = new KeyValueHeap(scanners, scanInfo.getComparator()); + seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled); + resetKVHeap(scanners, scanInfo.getComparator()); } /** @@ -325,6 +303,42 @@ } /** + * Seek the specified scanners with the given key + * @param scanners + * @param seekKey + * @param isLazy true if using lazy seek + * @param isParallelSeek true if using parallel seek + * @throws IOException + */ + protected void seekScanners(List scanners, + KeyValue seekKey, boolean isLazy, boolean isParallelSeek) + throws IOException { + // Seek all scanners to the start of the Row (or if the exact matching row + // key does not exist, then to the start of the next matching Row). + // Always check bloom filter to optimize the top row seek for delete + // family marker. + if (isLazy) { + for (KeyValueScanner scanner : scanners) { + scanner.requestSeek(seekKey, false, true); + } + } else { + if (!isParallelSeek) { + for (KeyValueScanner scanner : scanners) { + scanner.seek(seekKey); + } + } else { + parallelSeek(scanners, seekKey); + } + } + } + + protected void resetKVHeap(List scanners, + KVComparator comparator) throws IOException { + // Combine all seeked scanners with a heap + heap = new KeyValueHeap(scanners, comparator); + } + + /** * Filters the given list of scanners using Bloom filter, time range, and * TTL. */ @@ -442,9 +456,7 @@ int count = 0; LOOP: while((kv = this.heap.peek()) != null) { if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap. - // Check that the heap gives us KVs in an increasing order. - assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : - "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store; + checkScanOrder(prevKV, kv, comparator); prevKV = kv; ScanQueryMatcher.MatchCode qcode = matcher.match(kv); @@ -466,7 +478,7 @@ if (!matcher.moreRowsMayExistAfter(kv)) { return false; } - reseek(matcher.getKeyForNextRow(kv)); + seekToNextRow(kv); break LOOP; } @@ -481,9 +493,9 @@ if (!matcher.moreRowsMayExistAfter(kv)) { return false; } - reseek(matcher.getKeyForNextRow(kv)); + seekToNextRow(kv); } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { - reseek(matcher.getKeyForNextColumn(kv)); + seekAsDirection(matcher.getKeyForNextColumn(kv)); } else { this.heap.next(); } @@ -507,11 +519,11 @@ return false; } - reseek(matcher.getKeyForNextRow(kv)); + seekToNextRow(kv); break; case SEEK_NEXT_COL: - reseek(matcher.getKeyForNextColumn(kv)); + seekAsDirection(matcher.getKeyForNextColumn(kv)); break; case SKIP: @@ -522,7 +534,7 @@ // TODO convert resee to Cell? KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv)); if (nextKV != null) { - reseek(nextKV); + seekAsDirection(nextKV); } else { heap.next(); } @@ -602,16 +614,11 @@ * could have done it now by storing the scan object from the constructor */ List scanners = getScannersNoCompaction(); - if (!isParallelSeekEnabled) { - for (KeyValueScanner scanner : scanners) { - scanner.seek(lastTopKey); - } - } else { - parallelSeek(scanners, lastTopKey); - } + // Seek all scanners to the initial key + seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled); // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.getComparator()); + resetKVHeap(scanners, store.getComparator()); // Reset the state of the Query Matcher and set to top row. // Only reset and call setRow if the row changes; avoids confusing the @@ -631,6 +638,36 @@ } } + /** + * Check whether scan as expected order + * @param prevKV + * @param kv + * @param comparator + * @throws IOException + */ + protected void checkScanOrder(KeyValue prevKV, KeyValue kv, + KeyValue.KVComparator comparator) throws IOException { + // Check that the heap gives us KVs in an increasing order. + assert prevKV == null || comparator == null + || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV + + " followed by a " + "smaller key " + kv + " in cf " + store; + } + + protected synchronized boolean seekToNextRow(KeyValue kv) throws IOException { + return reseek(matcher.getKeyForNextRow(kv)); + } + + /** + * Do a reseek in a normal StoreScanner(scan forward) + * @param kv + * @return true if scanner has values left, false if end of scanner + * @throws IOException + */ + protected synchronized boolean seekAsDirection(KeyValue kv) + throws IOException { + return reseek(kv); + } + @Override public synchronized boolean reseek(KeyValue kv) throws IOException { //Heap will not be null, if this is called from next() which. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java (revision 0) @@ -0,0 +1,81 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; + +/** + * ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to + * support reversed scanning. + */ +@InterfaceAudience.Private +class ReversedRegionScannerImpl extends RegionScannerImpl { + + /** + * @param scan + * @param additionalScanners + * @param region + * @throws IOException + */ + ReversedRegionScannerImpl(Scan scan, + List additionalScanners, HRegion region) + throws IOException { + region.super(scan, additionalScanners, region); + } + + @Override + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) throws IOException { + this.storeHeap = new ReversedKeyValueHeap(scanners, region.getComparator()); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new ReversedKeyValueHeap(joinedScanners, + region.getComparator()); + } + } + + @Override + protected boolean isStopRow(byte[] currentRow, int offset, short length) { + return currentRow == null + || (super.stopRow != null && region.getComparator().compareRows( + stopRow, 0, stopRow.length, currentRow, offset, length) >= super.isScan); + } + + @Override + protected boolean nextRow(byte[] currentRow, int offset, short length) + throws IOException { + assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; + byte row[] = new byte[length]; + System.arraycopy(currentRow, offset, row, 0, length); + this.storeHeap.seekToPreviousRow(KeyValue.createFirstOnRow(row)); + resetFilters(); + // Calling the hook in CP which allows it to do a fast forward + if (this.region.getCoprocessorHost() != null) { + return this.region.getCoprocessorHost().postScannerFilterRow(this, + currentRow); + } + return true; + } + +} \ No newline at end of file Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (revision 1542632) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (working copy) @@ -75,7 +75,10 @@ Store store, final Scan scan, final NavigableSet targetCols, KeyValueScanner s) throws IOException { HRegion r = c.getEnvironment().getRegion(); - return new StoreScanner(store, store.getScanInfo(), scan, targetCols, - r.getReadpoint(scan.getIsolationLevel())); + return scan.isReversed() ? new ReversedStoreScanner(store, + store.getScanInfo(), scan, targetCols, r.getReadpoint(scan + .getIsolationLevel())) : new StoreScanner(store, + store.getScanInfo(), scan, targetCols, r.getReadpoint(scan + .getIsolationLevel())); } } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (revision 1542632) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (working copy) @@ -159,6 +159,11 @@ * @param filter another filter */ public void addFilter(Filter filter) { + if (this.isReversed() != filter.isReversed()) { + throw new IllegalArgumentException( + "Filters in the list must have the same reversed flag, this.reversed=" + + this.isReversed()); + } this.filters.add(filter); } @@ -464,6 +469,14 @@ } @Override + public void setReversed(boolean reversed) { + for (Filter filter : filters) { + filter.setReversed(reversed); + } + this.reversed = reversed; + } + + @Override public String toString() { return toString(MAX_LOG_FILTERS); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1542632) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -2680,26 +2680,34 @@ } - private void scanTestNull(HTable ht, byte [] row, byte [] family, - byte [] value) - throws Exception { + private void scanTestNull(HTable ht, byte[] row, byte[] family, byte[] value) + throws Exception { + scanTestNull(ht, row, family, value, false); + } + private void scanTestNull(HTable ht, byte[] row, byte[] family, byte[] value, + boolean isReversedScan) throws Exception { + Scan scan = new Scan(); + scan.setReversed(isReversedScan); scan.addColumn(family, null); Result result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReversed(isReversedScan); scan.addColumn(family, HConstants.EMPTY_BYTE_ARRAY); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReversed(isReversedScan); scan.addFamily(family); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReversed(isReversedScan); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); @@ -5271,4 +5279,473 @@ assertEquals(insertNum, count); } + + @Test + public void testSuperSimpleWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testSuperSimpleWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + Put put = new Put(Bytes.toBytes("0-b11111-0000000000000000000")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000002")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000004")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000006")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000008")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000001")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000003")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000005")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000007")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000009")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + ht.flushCommits(); + Scan scan = new Scan(Bytes.toBytes("0-b11111-9223372036854775807"), + Bytes.toBytes("0-b11111-0000000000000000000")); + scan.setReversed(true); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertTrue(Bytes.equals(result.getRow(), + Bytes.toBytes("0-b11111-0000000000000000008"))); + scanner.close(); + ht.close(); + } + + @Test + public void testFiltersWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testFiltersWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte[][] ROWS = makeN(ROW, 10); + byte[][] QUALIFIERS = { Bytes.toBytes("col0--"), + Bytes.toBytes("col1--"), + Bytes.toBytes("col2--"), + Bytes.toBytes("col3--"), + Bytes.toBytes("col4--"), + Bytes.toBytes("col5--"), + Bytes.toBytes("col6--"), + Bytes.toBytes("col7--"), + Bytes.toBytes("col8--"), + Bytes.toBytes("col9--") }; + for (int i = 0; i < 10; i++) { + Put put = new Put(ROWS[i]); + put.add(FAMILY, QUALIFIERS[i], VALUE); + ht.put(put); + } + Scan scan = new Scan(); + scan.setReversed(true); + scan.addFamily(FAMILY); + Filter filter = new QualifierFilter(CompareOp.EQUAL, + new RegexStringComparator("col[1-5]")); + scan.setFilter(filter); + ResultScanner scanner = ht.getScanner(scan); + int expectedIndex = 5; + for (Result result : scanner) { + assertEquals(result.size(), 1); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[expectedIndex])); + assertTrue(Bytes.equals(result.raw()[0].getQualifier(), + QUALIFIERS[expectedIndex])); + expectedIndex--; + } + assertEquals(expectedIndex, 0); + scanner.close(); + ht.close(); + } + + @Test + public void testKeyOnlyFilterWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testKeyOnlyFilterWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte[][] ROWS = makeN(ROW, 10); + byte[][] QUALIFIERS = { Bytes.toBytes("col0--"), + Bytes.toBytes("col1--"), + Bytes.toBytes("col2--"), + Bytes.toBytes("col3--"), + Bytes.toBytes("col4--"), + Bytes.toBytes("col5--"), + Bytes.toBytes("col6--"), + Bytes.toBytes("col7--"), + Bytes.toBytes("col8--"), + Bytes.toBytes("col9--") }; + for (int i = 0; i < 10; i++) { + Put put = new Put(ROWS[i]); + put.add(FAMILY, QUALIFIERS[i], VALUE); + ht.put(put); + } + Scan scan = new Scan(); + scan.setReversed(true); + scan.addFamily(FAMILY); + Filter filter = new KeyOnlyFilter(true); + scan.setFilter(filter); + ResultScanner scanner = ht.getScanner(scan); + int count = 0; + for (Result result : ht.getScanner(scan)) { + assertEquals(result.size(), 1); + assertEquals(result.raw()[0].getValueLength(), Bytes.SIZEOF_INT); + assertEquals(Bytes.toInt(result.raw()[0].getValue()), VALUE.length); + count++; + } + assertEquals(count, 10); + scanner.close(); + ht.close(); + } + + /** + * Test simple table and non-existent row cases. + */ + @Test + public void testSimpleMissingWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testSimpleMissingWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte[][] ROWS = makeN(ROW, 4); + + // Try to get a row on an empty table + Scan scan = new Scan(); + scan.setReversed(true); + Result result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[0]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[0], ROWS[1]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.setReversed(true); + scan.addFamily(FAMILY); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.setReversed(true); + scan.addColumn(FAMILY, QUALIFIER); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Insert a row + + Put put = new Put(ROWS[2]); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + + // Make sure we can scan the row + scan = new Scan(); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[3], ROWS[0]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[2], ROWS[1]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + // Try to scan empty rows around it + // Introduced MemStore#shouldSeekForReverseScan to fix the following + scan = new Scan(ROWS[1]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + ht.close(); + } + + @Test + public void testNullWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testNullWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + // Null qualifier (should work) + Put put = new Put(ROW); + put.add(FAMILY, null, VALUE); + ht.put(put); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + Delete delete = new Delete(ROW); + delete.deleteColumns(FAMILY, null); + ht.delete(delete); + // Use a new table + byte[] TABLE2 = Bytes.toBytes("testNull2WithReverseScan"); + ht = TEST_UTIL.createTable(TABLE2, FAMILY); + // Empty qualifier, byte[0] instead of null (should work) + put = new Put(ROW); + put.add(FAMILY, HConstants.EMPTY_BYTE_ARRAY, VALUE); + ht.put(put); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + TEST_UTIL.flush(); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + delete = new Delete(ROW); + delete.deleteColumns(FAMILY, HConstants.EMPTY_BYTE_ARRAY); + ht.delete(delete); + // Null value + put = new Put(ROW); + put.add(FAMILY, QUALIFIER, null); + ht.put(put); + Scan scan = new Scan(); + scan.setReversed(true); + scan.addColumn(FAMILY, QUALIFIER); + Result result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROW, FAMILY, QUALIFIER, null); + ht.close(); + } + + @Test + public void testDeletesWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testDeletesWithReverseScan"); + byte[][] ROWS = makeNAscii(ROW, 6); + byte[][] FAMILIES = makeNAscii(FAMILY, 3); + byte[][] VALUES = makeN(VALUE, 5); + long[] ts = { 1000, 2000, 3000, 4000, 5000 }; + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, + TEST_UTIL.getConfiguration(), 3); + + Put put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[0], QUALIFIER, ts[1], VALUES[1]); + ht.put(put); + + Delete delete = new Delete(ROW); + delete.deleteFamily(FAMILIES[0], ts[0]); + ht.delete(delete); + + Scan scan = new Scan(ROW); + scan.setReversed(true); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + Result result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1] }, + new byte[][] { VALUES[1] }, 0, 0); + + // Test delete latest version + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); + put.add(FAMILIES[0], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[0], QUALIFIER, ts[3], VALUES[3]); + put.add(FAMILIES[0], null, ts[4], VALUES[4]); + put.add(FAMILIES[0], null, ts[2], VALUES[2]); + put.add(FAMILIES[0], null, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROW); + delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4] + ht.delete(delete); + + scan = new Scan(ROW); + scan.setReversed(true); + scan.addColumn(FAMILIES[0], QUALIFIER); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1], + ts[2], ts[3] }, new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2); + + // Test for HBASE-1847 + delete = new Delete(ROW); + delete.deleteColumn(FAMILIES[0], null); + ht.delete(delete); + + // Cleanup null qualifier + delete = new Delete(ROW); + delete.deleteColumns(FAMILIES[0], null); + ht.delete(delete); + + // Expected client behavior might be that you can re-put deleted values + // But alas, this is not to be. We can't put them back in either case. + + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000 + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000 + ht.put(put); + + // The Scanner returns the previous values, the expected-naive-unexpected + // behavior + + scan = new Scan(ROW); + scan.setReversed(true); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1], + ts[2], ts[3] }, new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2); + + // Test deleting an entire family from one row but not the other various + // ways + + put = new Put(ROWS[0]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[1]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[2]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROWS[0]); + delete.deleteFamily(FAMILIES[2]); + ht.delete(delete); + + delete = new Delete(ROWS[1]); + delete.deleteColumns(FAMILIES[1], QUALIFIER); + ht.delete(delete); + + delete = new Delete(ROWS[2]); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[2], QUALIFIER); + ht.delete(delete); + + scan = new Scan(ROWS[0]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, new long[] { ts[0], + ts[1] }, new byte[][] { VALUES[0], VALUES[1] }, 0, 1); + + scan = new Scan(ROWS[1]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + + scan = new Scan(ROWS[2]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertEquals(1, result.size()); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, + new long[] { ts[2] }, new byte[][] { VALUES[2] }, 0, 0); + + // Test if we delete the family first in one row (HBASE-1541) + + delete = new Delete(ROWS[3]); + delete.deleteFamily(FAMILIES[1]); + ht.delete(delete); + + put = new Put(ROWS[3]); + put.add(FAMILIES[2], QUALIFIER, VALUES[0]); + ht.put(put); + + put = new Put(ROWS[4]); + put.add(FAMILIES[1], QUALIFIER, VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, VALUES[2]); + ht.put(put); + + scan = new Scan(ROWS[4]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = ht.getScanner(scan); + result = scanner.next(); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.raw()[1].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[1])); + assertTrue(Bytes.equals(result.raw()[1].getValue(), VALUES[2])); + result = scanner.next(); + assertTrue("Expected 1 key but received " + result.size(), + result.size() == 1); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[3])); + assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[0])); + scanner.close(); + ht.close(); + } + + /** + * Tests reversed scan under multi regions + */ + @Test + public void testReversedScanUnderMultiRegions() throws Exception { + // Test Initialization. + byte[] TABLE = Bytes.toBytes("testReversedScanUnderMultiRegions"); + byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY; + byte[][] splitRows = new byte[][] { Bytes.toBytes("005"), + Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)), + Bytes.toBytes("006"), + Bytes.add(Bytes.toBytes("006"), Bytes.multiple(maxByteArray, 8)), + Bytes.toBytes("007"), + Bytes.add(Bytes.toBytes("007"), Bytes.multiple(maxByteArray, 4)), + Bytes.toBytes("008"), Bytes.multiple(maxByteArray, 2) }; + HTable table = TEST_UTIL.createTable(TABLE, FAMILY, splitRows); + TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); + + assertEquals(splitRows.length + 1, table.getRegionLocations().size()); + // Insert one row each region + int insertNum = splitRows.length; + for (int i = 0; i < insertNum; i++) { + Put put = new Put(splitRows[i]); + put.add(FAMILY, QUALIFIER, VALUE); + table.put(put); + } + + // scan forward + ResultScanner scanner = table.getScanner(new Scan()); + int count = 0; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + } + assertEquals(insertNum, count); + + // scan backward + Scan scan = new Scan(); + scan.setReversed(true); + scanner = table.getScanner(scan); + count = 0; + byte[] lastRow = null; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + byte[] thisRow = r.getRow(); + if (lastRow != null) { + assertTrue("Error scan order, last row= " + Bytes.toString(lastRow) + + ",this row=" + Bytes.toString(thisRow), + Bytes.compareTo(thisRow, lastRow) < 0); + } + lastRow = thisRow; + } + assertEquals(insertNum, count); + table.close(); + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -682,6 +682,10 @@ volatile MemStoreLAB allocatorAtCreation; volatile MemStoreLAB snapshotAllocatorAtCreation; + // A flag represents whether could stop skipping KeyValues for MVCC + // if have encountered the next row. Only used for reversed scan + private boolean stopSkippingKVsIfNextRow = false; + private long readPoint; /* @@ -722,6 +726,7 @@ } private KeyValue getNext(Iterator it) { + KeyValue startKV = theNext; KeyValue v = null; try { while (it.hasNext()) { @@ -729,6 +734,10 @@ if (v.getMvccVersion() <= this.readPoint) { return v; } + if (stopSkippingKVsIfNextRow && startKV != null + && comparator.compareRows(v, startKV) > 0) { + return null; + } } return null; @@ -907,6 +916,70 @@ long oldestUnexpiredTS) { return shouldSeek(scan, oldestUnexpiredTS); } + + /** + * Seek scanner to the given key first. If it returns false(means + * peek()==null) or scanner's peek row is bigger than row of given key, seek + * the scanner to the previous row of given key + */ + @Override + public synchronized boolean backwardSeek(KeyValue key) { + seek(key); + if (peek() == null || comparator.compareRows(peek(), key) > 0) { + return seekToPreviousRow(key); + } + return true; + } + + /** + * Separately get the KeyValue before the specified key from kvset and + * snapshotset, and use the row of higher one as the previous row of + * specified key, then seek to the first KeyValue of previous row + */ + @Override + public synchronized boolean seekToPreviousRow(KeyValue key) { + KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow()); + SortedSet kvHead = kvsetAtCreation.headSet(firstKeyOnRow); + KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last(); + SortedSet snapshotHead = snapshotAtCreation + .headSet(firstKeyOnRow); + KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead + .last(); + KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow); + if (lastKVBeforeRow == null) { + theNext = null; + return false; + } + KeyValue firstKeyOnPreviousRow = KeyValue + .createFirstOnRow(lastKVBeforeRow.getRow()); + this.stopSkippingKVsIfNextRow = true; + seek(firstKeyOnPreviousRow); + this.stopSkippingKVsIfNextRow = false; + if (peek() == null + || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) { + return seekToPreviousRow(lastKVBeforeRow); + } + return true; + } + + @Override + public synchronized boolean seekToLastRow() { + KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation + .last(); + KeyValue second = snapshotAtCreation.isEmpty() ? null + : snapshotAtCreation.last(); + KeyValue higherKv = getHighest(first, second); + if (higherKv == null) { + return false; + } + KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow()); + if (seek(firstKvOnLastRow)) { + return true; + } else { + return seekToPreviousRow(higherKv); + } + + } } public final static long FIXED_OVERHEAD = ClassSize.align( Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1205,6 +1205,13 @@ return size; } + /** + * @return KeyValue Comparator + */ + public KeyValue.KVComparator getComparator() { + return this.comparator; + } + /* * Do preparation for pending compaction. * @throws IOException @@ -1754,6 +1761,12 @@ protected RegionScanner instantiateRegionScanner(Scan scan, List additionalScanners) throws IOException { + if (scan.isReversed()) { + if (scan.getFilter() != null) { + scan.getFilter().setReversed(true); + } + return new ReversedRegionScannerImpl(scan, additionalScanners, this); + } return new RegionScannerImpl(scan, additionalScanners, this); } @@ -3491,17 +3504,17 @@ /** * If the joined heap data gathering is interrupted due to scan limits, this will * contain the row for which we are populating the values.*/ - private KeyValue joinedContinuationRow = null; + protected KeyValue joinedContinuationRow = null; // KeyValue indicating that limit is reached when scanning private final KeyValue KV_LIMIT = new KeyValue(); - private final byte [] stopRow; + protected final byte[] stopRow; private Filter filter; private int batch; - private int isScan; + protected int isScan; private boolean filterClosed = false; private long readPt; private long maxResultSize; - private HRegion region; + protected HRegion region; @Override public HRegionInfo getRegionInfo() { @@ -3556,16 +3569,22 @@ joinedScanners.add(scanner); } } - this.storeHeap = new KeyValueHeap(scanners, comparator); - if (!joinedScanners.isEmpty()) { - this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); - } + initializeKVHeap(scanners, joinedScanners, region); } RegionScannerImpl(Scan scan, HRegion region) throws IOException { this(scan, null, region); } + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) + throws IOException { + this.storeHeap = new KeyValueHeap(scanners, region.comparator); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator); + } + } + @Override public long getMaxResultSize() { return maxResultSize; @@ -3837,7 +3856,7 @@ currentRow); } - private boolean isStopRow(byte [] currentRow, int offset, short length) { + protected boolean isStopRow(byte[] currentRow, int offset, short length) { return currentRow == null || (stopRow != null && comparator.compareRows(stopRow, 0, stopRow.length, Index: hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (revision 1542632) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (working copy) @@ -56,6 +56,7 @@ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class Filter { + protected boolean reversed; /** * Reset the state of the filter between rows. * @@ -277,4 +278,16 @@ * @throws IOException in case an I/O or an filter specific failure needs to be signaled. */ abstract boolean areSerializedFieldsEqual(Filter other); + + /** + * alter the reversed scan flag + * @param reversed flag + */ + public void setReversed(boolean reversed) { + this.reversed = reversed; + } + + public boolean isReversed() { + return this.reversed; + } } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1542632) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -56,7 +56,7 @@ // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. protected HRegionInfo currentRegion = null; - private ScannerCallable callable = null; + protected ScannerCallable callable = null; protected final LinkedList cache = new LinkedList(); protected final int caching; protected long lastNext; @@ -226,7 +226,7 @@ * @param nbRows * @param done Server-side says we're done scanning. */ - private boolean nextScanner(int nbRows, final boolean done) + protected boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open if (this.callable != null) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (working copy) @@ -123,4 +123,37 @@ * assumed. */ boolean isFileScanner(); + + // Support for "Reversed Scanner" + /** + * Seek the scanner at or before the row of specified KeyValue, it firstly + * tries to seek the scanner at or after the specified KeyValue, return if + * peek KeyValue of scanner has the same row with specified KeyValue, + * otherwise seek the scanner at the first KeyValue of the row which is the + * previous row of specified KeyValue + * + * @param key seek KeyValue + * @return true if the scanner is at the valid KeyValue, false if such + * KeyValue does not exist + * + */ + public boolean backwardSeek(KeyValue key) throws IOException; + + /** + * Seek the scanner at the first KeyValue of the row which is the previous row + * of specified key + * @param key seek value + * @return true if the scanner at the first valid KeyValue of previous row, + * false if not existing such KeyValue + */ + public boolean seekToPreviousRow(KeyValue key) throws IOException; + + /** + * Seek the scanner at the first KeyValue of last row + * + * @return true if scanner has values left, false if the underlying data is + * empty + * @throws IOException + */ + public boolean seekToLastRow() throws IOException; } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java (revision 1542632) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java (working copy) @@ -43,10 +43,10 @@ public abstract class RegionServerCallable implements RetryingCallable { // Public because used outside of this package over in ipc. static final Log LOG = LogFactory.getLog(RegionServerCallable.class); - private final HConnection connection; - private final TableName tableName; - private final byte [] row; - private HRegionLocation location; + protected final HConnection connection; + protected final TableName tableName; + protected final byte[] row; + protected HRegionLocation location; private ClientService.BlockingInterface stub; protected final static int MIN_WAIT_DEAD_SERVER = 10000; Index: hbase-protocol/src/main/protobuf/Client.proto =================================================================== --- hbase-protocol/src/main/protobuf/Client.proto (revision 1542632) +++ hbase-protocol/src/main/protobuf/Client.proto (working copy) @@ -218,6 +218,7 @@ optional uint32 store_offset = 12; optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */ optional bool small = 14; + optional bool reversed = 15 [default = false]; } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java (revision 0) @@ -0,0 +1,135 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.client.Scan; + +/** + * ReversedStoreScanner extends from StoreScanner, and is used to support + * reversed scanning. + */ +@InterfaceAudience.Private +class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { + + /** + * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we + * are not in a compaction. + * + * @param store who we scan + * @param scanInfo + * @param scan the spec + * @param columns which columns we are scanning + * @throws IOException + */ + ReversedStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + NavigableSet columns, long readPt) + throws IOException { + super(store, scanInfo, scan, columns, readPt); + } + + /** Constructor for testing. */ + ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, + final NavigableSet columns, final List scanners) + throws IOException { + super(scan, scanInfo, scanType, columns, scanners, + HConstants.LATEST_TIMESTAMP); + } + + @Override + protected void resetKVHeap(List scanners, + KVComparator comparator) throws IOException { + // Combine all seeked scanners with a heap + heap = new ReversedKeyValueHeap(scanners, comparator); + } + + @Override + protected void seekScanners(List scanners, + KeyValue seekKey, boolean isLazy, boolean isParallelSeek) + throws IOException { + // Seek all scanners to the start of the Row (or if the exact matching row + // key does not exist, then to the start of the previous matching Row). + if (seekKey.matchingRow(HConstants.EMPTY_START_ROW)) { + for (KeyValueScanner scanner : scanners) { + scanner.seekToLastRow(); + } + } else { + for (KeyValueScanner scanner : scanners) { + scanner.backwardSeek(seekKey); + } + } + } + + @Override + protected synchronized boolean seekToNextRow(KeyValue kv) throws IOException { + return seekToPreviousRow(kv); + } + + /** + * Do a backwardSeek in a reversed StoreScanner(scan backward) + */ + @Override + protected synchronized boolean seekAsDirection(KeyValue kv) + throws IOException { + return backwardSeek(kv); + } + + @Override + protected void checkScanOrder(KeyValue prevKV, KeyValue kv, + KeyValue.KVComparator comparator) throws IOException { + // Check that the heap gives us KVs in an increasing order for same row and + // decreasing order for different rows. + assert prevKV == null || comparator == null || comparator.compareRows(kv, prevKV) < 0 + || (comparator.matchingRows(kv, prevKV) && comparator.compare(kv, + prevKV) > 0) : "Key " + prevKV + + " followed by a " + "error order key " + kv + " in cf " + store + + " in reversed scan"; + } + + @Override + public synchronized boolean reseek(KeyValue kv) throws IOException { + throw new IllegalStateException( + "reseek cannot be called on ReversedStoreScanner"); + } + + @Override + public synchronized boolean seek(KeyValue key) throws IOException { + throw new IllegalStateException( + "seek cannot be called on ReversedStoreScanner"); + } + + @Override + public boolean seekToPreviousRow(KeyValue key) throws IOException { + checkReseek(); + return this.heap.seekToPreviousRow(key); + } + + @Override + public boolean backwardSeek(KeyValue key) throws IOException { + checkReseek(); + return this.heap.backwardSeek(key); + } +} Index: hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (revision 1542632) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (working copy) @@ -839,6 +839,9 @@ if (scan.getRowOffsetPerColumnFamily() > 0) { scanBuilder.setStoreOffset(scan.getRowOffsetPerColumnFamily()); } + if (scan.isReversed()) { + scanBuilder.setReversed(scan.isReversed()); + } return scanBuilder.build(); } @@ -915,6 +918,9 @@ } } } + if (proto.hasReversed()) { + scan.setReversed(proto.getReversed()); + } return scan; } Index: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java =================================================================== --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (revision 1542632) +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (working copy) @@ -12261,6 +12261,16 @@ * optional bool small = 14; */ boolean getSmall(); + + // optional bool reversed = 15 [default = false]; + /** + * optional bool reversed = 15 [default = false]; + */ + boolean hasReversed(); + /** + * optional bool reversed = 15 [default = false]; + */ + boolean getReversed(); } /** * Protobuf type {@code Scan} @@ -12416,6 +12426,11 @@ small_ = input.readBool(); break; } + case 120: { + bitField0_ |= 0x00001000; + reversed_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -12746,6 +12761,22 @@ return small_; } + // optional bool reversed = 15 [default = false]; + public static final int REVERSED_FIELD_NUMBER = 15; + private boolean reversed_; + /** + * optional bool reversed = 15 [default = false]; + */ + public boolean hasReversed() { + return ((bitField0_ & 0x00001000) == 0x00001000); + } + /** + * optional bool reversed = 15 [default = false]; + */ + public boolean getReversed() { + return reversed_; + } + private void initFields() { column_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList(); @@ -12761,6 +12792,7 @@ storeOffset_ = 0; loadColumnFamiliesOnDemand_ = false; small_ = false; + reversed_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -12834,6 +12866,9 @@ if (((bitField0_ & 0x00000800) == 0x00000800)) { output.writeBool(14, small_); } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + output.writeBool(15, reversed_); + } getUnknownFields().writeTo(output); } @@ -12899,6 +12934,10 @@ size += com.google.protobuf.CodedOutputStream .computeBoolSize(14, small_); } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(15, reversed_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -12986,6 +13025,11 @@ result = result && (getSmall() == other.getSmall()); } + result = result && (hasReversed() == other.hasReversed()); + if (hasReversed()) { + result = result && (getReversed() + == other.getReversed()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -13055,6 +13099,10 @@ hash = (37 * hash) + SMALL_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getSmall()); } + if (hasReversed()) { + hash = (37 * hash) + REVERSED_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getReversed()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -13223,6 +13271,8 @@ bitField0_ = (bitField0_ & ~0x00001000); small_ = false; bitField0_ = (bitField0_ & ~0x00002000); + reversed_ = false; + bitField0_ = (bitField0_ & ~0x00004000); return this; } @@ -13325,6 +13375,10 @@ to_bitField0_ |= 0x00000800; } result.small_ = small_; + if (((from_bitField0_ & 0x00004000) == 0x00004000)) { + to_bitField0_ |= 0x00001000; + } + result.reversed_ = reversed_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -13429,6 +13483,9 @@ if (other.hasSmall()) { setSmall(other.getSmall()); } + if (other.hasReversed()) { + setReversed(other.getReversed()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -14540,6 +14597,39 @@ return this; } + // optional bool reversed = 15 [default = false]; + private boolean reversed_ ; + /** + * optional bool reversed = 15 [default = false]; + */ + public boolean hasReversed() { + return ((bitField0_ & 0x00004000) == 0x00004000); + } + /** + * optional bool reversed = 15 [default = false]; + */ + public boolean getReversed() { + return reversed_; + } + /** + * optional bool reversed = 15 [default = false]; + */ + public Builder setReversed(boolean value) { + bitField0_ |= 0x00004000; + reversed_ = value; + onChanged(); + return this; + } + /** + * optional bool reversed = 15 [default = false]; + */ + public Builder clearReversed() { + bitField0_ = (bitField0_ & ~0x00004000); + reversed_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:Scan) } @@ -27925,7 +28015,7 @@ "pecifier\022 \n\010mutation\030\002 \002(\0132\016.MutationPro" + "to\022\035\n\tcondition\030\003 \001(\0132\n.Condition\022\023\n\013non" + "ce_group\030\004 \001(\004\"<\n\016MutateResponse\022\027\n\006resu" + - "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\344\002\n" + + "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\375\002\n" + "\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattrib" + "ute\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003" + " \001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007" + @@ -27934,50 +28024,50 @@ "\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_", "result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024" + "\n\014store_offset\030\014 \001(\r\022&\n\036load_column_fami" + - "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\"\236\001\n\013" + - "ScanRequest\022 \n\006region\030\001 \001(\0132\020.RegionSpec" + - "ifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscanner_id" + - "\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_" + - "scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\"y\n\014" + - "ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022" + - "\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010" + - "\022\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\"", - "\263\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\013" + - "2\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132" + - " .BulkLoadHFileRequest.FamilyPath\022\026\n\016ass" + - "ign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006famil" + - "y\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRe" + - "sponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorSer" + - "viceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 " + - "\002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(" + - "\014\"d\n\031CoprocessorServiceRequest\022 \n\006region" + - "\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027", - ".CoprocessorServiceCall\"]\n\032CoprocessorSe" + - "rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" + - "ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"L" + - "\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" + - "\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\"Y\n\014" + - "RegionAction\022 \n\006region\030\001 \002(\0132\020.RegionSpe" + - "cifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007" + - ".Action\"^\n\021ResultOrException\022\r\n\005index\030\001 " + - "\001(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texceptio" + - "n\030\003 \001(\0132\016.NameBytesPair\"f\n\022RegionActionR", - "esult\022-\n\021resultOrException\030\001 \003(\0132\022.Resul" + - "tOrException\022!\n\texception\030\002 \001(\0132\016.NameBy" + - "tesPair\"G\n\014MultiRequest\022#\n\014regionAction\030" + - "\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004" + - "\"@\n\rMultiResponse\022/\n\022regionActionResult\030" + - "\001 \003(\0132\023.RegionActionResult2\261\002\n\rClientSer" + - "vice\022 \n\003Get\022\013.GetRequest\032\014.GetResponse\022)" + - "\n\006Mutate\022\016.MutateRequest\032\017.MutateRespons" + - "e\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResponse\022>" + - "\n\rBulkLoadHFile\022\025.BulkLoadHFileRequest\032\026", - ".BulkLoadHFileResponse\022F\n\013ExecService\022\032." + - "CoprocessorServiceRequest\032\033.CoprocessorS" + - "erviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016" + - ".MultiResponseBB\n*org.apache.hadoop.hbas" + - "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" + - "\001\001" + "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" + + "eversed\030\017 \001(\010:\005false\"\236\001\n\013ScanRequest\022 \n\006" + + "region\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002" + + " \001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016numbe" + + "r_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025" + + "\n\rnext_call_seq\030\006 \001(\004\"y\n\014ScanResponse\022\030\n" + + "\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 " + + "\001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022\030", + "\n\007results\030\005 \003(\0132\007.Result\"\263\001\n\024BulkLoadHFi" + + "leRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" + + "ier\0225\n\013family_path\030\002 \003(\0132 .BulkLoadHFile" + + "Request.FamilyPath\022\026\n\016assign_seq_num\030\003 \001" + + "(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path" + + "\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loade" + + "d\030\001 \002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row" + + "\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_n" + + "ame\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Coprocesso" + + "rServiceRequest\022 \n\006region\030\001 \002(\0132\020.Region", + "Specifier\022%\n\004call\030\002 \002(\0132\027.CoprocessorSer" + + "viceCall\"]\n\032CoprocessorServiceResponse\022 " + + "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005valu" + + "e\030\002 \002(\0132\016.NameBytesPair\"L\n\006Action\022\r\n\005ind" + + "ex\030\001 \001(\r\022 \n\010mutation\030\002 \001(\0132\016.MutationPro" + + "to\022\021\n\003get\030\003 \001(\0132\004.Get\"Y\n\014RegionAction\022 \n" + + "\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomi" + + "c\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"^\n\021Resu" + + "ltOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002" + + " \001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameB", + "ytesPair\"f\n\022RegionActionResult\022-\n\021result" + + "OrException\030\001 \003(\0132\022.ResultOrException\022!\n" + + "\texception\030\002 \001(\0132\016.NameBytesPair\"G\n\014Mult" + + "iRequest\022#\n\014regionAction\030\001 \003(\0132\r.RegionA" + + "ction\022\022\n\nnonceGroup\030\002 \001(\004\"@\n\rMultiRespon" + + "se\022/\n\022regionActionResult\030\001 \003(\0132\023.RegionA" + + "ctionResult2\261\002\n\rClientService\022 \n\003Get\022\013.G" + + "etRequest\032\014.GetResponse\022)\n\006Mutate\022\016.Muta" + + "teRequest\032\017.MutateResponse\022#\n\004Scan\022\014.Sca" + + "nRequest\032\r.ScanResponse\022>\n\rBulkLoadHFile", + "\022\025.BulkLoadHFileRequest\032\026.BulkLoadHFileR" + + "esponse\022F\n\013ExecService\022\032.CoprocessorServ" + + "iceRequest\032\033.CoprocessorServiceResponse\022" + + "&\n\005Multi\022\r.MultiRequest\032\016.MultiResponseB" + + "B\n*org.apache.hadoop.hbase.protobuf.gene" + + "ratedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -28055,7 +28145,7 @@ internal_static_Scan_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", }); + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", }); internal_static_ScanRequest_descriptor = getDescriptor().getMessageTypes().get(10); internal_static_ScanRequest_fieldAccessorTable = new Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1542632) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -110,6 +110,7 @@ private int caching = -1; private long maxResultSize = -1; private boolean cacheBlocks = true; + private boolean reversed = false; private Filter filter = null; private TimeRange tr = new TimeRange(); private Map> familyMap = @@ -555,6 +556,25 @@ } /** + * Set whether this scan is a reversed one + *

+ * This is false by default which means forward(normal) scan. + * + * @param reversed if true, scan will be backward order + */ + public void setReversed(boolean reversed) { + this.reversed = reversed; + } + + /** + * Get whether this scan is a reversed one. + * @return true if backward scan, false if forward(default) scan + */ + public boolean isReversed() { + return reversed; + } + + /** * Set the value indicating whether loading CFs on demand should be allowed (cluster * default is false). On-demand CF loading doesn't load column families until necessary, e.g. * if you filter on one column, the other column family data will be loaded only for the rows Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (revision 1542632) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (working copy) @@ -1951,4 +1951,36 @@ RNG.nextBytes(buf); System.arraycopy(buf, 0, b, offset, length); } + + /** + * Create a max byte array with the specified max byte count + * @param maxByteCount the length of returned byte array + * @return the created max byte array + */ + public static byte[] createMaxByteArray(int maxByteCount) { + byte[] maxByteArray = new byte[maxByteCount]; + for (int i = 0; i < maxByteArray.length; i++) { + maxByteArray[i] = (byte) 0xff; + } + return maxByteArray; + } + + /** + * Create a byte array which is multiple given bytes + * @param srcBytes + * @param multiNum + * @return byte array + */ + public static byte[] multiple(byte[] srcBytes, int multiNum) { + if (multiNum <= 0) { + return new byte[0]; + } + byte[] result = new byte[srcBytes.length * multiNum]; + for (int i = 0; i < multiNum; i++) { + System.arraycopy(srcBytes, 0, result, i * srcBytes.length, + srcBytes.length); + } + return result; + + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -42,9 +42,9 @@ * as an InternalScanner at the Store level, you will get runtime exceptions. */ @InterfaceAudience.Private -public class KeyValueHeap extends NonLazyKeyValueScanner +public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner { - private PriorityQueue heap = null; + protected PriorityQueue heap = null; /** * The current sub-scanner, i.e. the one that contains the next key/value @@ -56,9 +56,9 @@ * Bloom filter optimization, which is OK to propagate to StoreScanner. In * order to ensure that, always use {@link #pollRealKV()} to update current. */ - private KeyValueScanner current = null; + protected KeyValueScanner current = null; - private KVScannerComparator comparator; + protected KVScannerComparator comparator; /** * Constructor. This KeyValueHeap will handle closing of passed in @@ -68,7 +68,18 @@ */ public KeyValueHeap(List scanners, KVComparator comparator) throws IOException { - this.comparator = new KVScannerComparator(comparator); + this(scanners, new KVScannerComparator(comparator)); + } + + /** + * Constructor. + * @param scanners + * @param comparator + * @throws IOException + */ + KeyValueHeap(List scanners, + KVScannerComparator comparator) throws IOException { + this.comparator = comparator; if (!scanners.isEmpty()) { this.heap = new PriorityQueue(scanners.size(), this.comparator); @@ -158,8 +169,8 @@ return next(result, -1); } - private static class KVScannerComparator implements Comparator { - private KVComparator kvComparator; + protected static class KVScannerComparator implements Comparator { + protected KVComparator kvComparator; /** * Constructor * @param kvComparator @@ -325,7 +336,7 @@ * this scanner heap if (1) it has done a real seek and (2) its KV is the top * among all top KVs (some of which are fake) in the scanner heap. */ - private KeyValueScanner pollRealKV() throws IOException { + protected KeyValueScanner pollRealKV() throws IOException { KeyValueScanner kvScanner = heap.poll(); if (kvScanner == null) { return null; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1542632) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -1314,11 +1314,18 @@ && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { return true; } - KeyValue startKeyValue = KeyValue.createFirstOnRow(scan.getStartRow()); - KeyValue stopKeyValue = KeyValue.createLastOnRow(scan.getStopRow()); - boolean nonOverLapping = (getComparator().compareFlatKey(this.getFirstKey(), - stopKeyValue.getKey()) > 0 && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) - || getComparator().compareFlatKey(this.getLastKey(), startKeyValue.getKey()) < 0; + KeyValue smallestScanKeyValue = scan.isReversed() ? KeyValue + .createFirstOnRow(scan.getStopRow()) : KeyValue.createFirstOnRow(scan + .getStartRow()); + KeyValue largestScanKeyValue = scan.isReversed() ? KeyValue + .createLastOnRow(scan.getStartRow()) : KeyValue.createLastOnRow(scan + .getStopRow()); + boolean nonOverLapping = (getComparator().compareFlatKey( + this.getFirstKey(), largestScanKeyValue.getKey()) > 0 && !Bytes + .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(), + HConstants.EMPTY_END_ROW)) + || getComparator().compareFlatKey(this.getLastKey(), + smallestScanKeyValue.getKey()) < 0; return !nonOverLapping; } @@ -1425,6 +1432,10 @@ return reader.getLastKey(); } + public byte[] getLastRowKey() { + return reader.getLastRowKey(); + } + public byte[] midkey() throws IOException { return reader.midkey(); } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (revision 1542632) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (working copy) @@ -59,7 +59,7 @@ // if we are passed the prefix, set flag int cmp = Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0, this.prefix.length); - if(cmp > 0) { + if ((!isReversed() && cmp > 0) || (isReversed() && cmp < 0)) { passedPrefix = true; } filterRow = (cmp != 0); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 1542632) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -775,6 +775,47 @@ thread.interruptIfNecessary(); } + /** + * Test that on a major compaction, if all cells are expired or deleted, then we'll end up with no + * product. Make sure scanner over region returns right answer in this case - and that it just + * basically works. + * @throws IOException + */ + public void testMajorCompactingToNoOutputWithReverseScan() throws IOException { + createStoreFile(r); + for (int i = 0; i < compactionThreshold; i++) { + createStoreFile(r); + } + // Now delete everything. + Scan scan = new Scan(); + scan.setReversed(true); + InternalScanner s = r.getScanner(scan); + do { + List results = new ArrayList(); + boolean result = s.next(results); + assertTrue(!results.isEmpty()); + r.delete(new Delete(results.get(0).getRow())); + if (!result) break; + } while (true); + s.close(); + // Flush + r.flushcache(); + // Major compact. + r.compactStores(true); + scan = new Scan(); + scan.setReversed(true); + s = r.getScanner(scan); + int counter = 0; + do { + List results = new ArrayList(); + boolean result = s.next(results); + if (!result) break; + counter++; + } while (true); + s.close(); + assertEquals(0, counter); + } + private class StoreMockMaker extends StatefulStoreMockMaker { public ArrayList compacting = new ArrayList(); public ArrayList notCompacting = new ArrayList();