### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (revision 1517733) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (working copy) @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -312,6 +313,16 @@ verifyScan(s, expectedRows, expectedKeys); } + public void testPrefixFilterWithReverseScan() throws Exception { + // Grab rows from group one (half of total) + long expectedRows = this.numRows / 2; + long expectedKeys = this.colsPerRow; + Scan s = new Scan(); + s.setReversed(true); + s.setFilter(new PrefixFilter(Bytes.toBytes("testRowOne"))); + verifyScan(s, expectedRows, expectedKeys); + } + @Test public void testPageFilter() throws Exception { @@ -399,6 +410,140 @@ } + public void testPageFilterWithReverseScan() throws Exception { + // KVs in first 6 rows + KeyValue[] expectedKVs = { + // testRowOne-0 + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowOne-2 + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowOne-3 + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowTwo-0 + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-2 + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-3 + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]) }; + + // Grab all 6 rows + long expectedRows = 6; + long expectedKeys = this.colsPerRow; + Scan s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first 4 rows (6 cols per row) + expectedRows = 4; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first 2 rows + expectedRows = 2; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first row + expectedRows = 1; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + } + + public void testWhileMatchFilterWithFilterRowWithReverseScan() + throws Exception { + final int pageSize = 4; + + Scan s = new Scan(); + s.setReversed(true); + WhileMatchFilter filter = new WhileMatchFilter(new PageFilter(pageSize)); + s.setFilter(filter); + + InternalScanner scanner = this.region.getScanner(s); + int scannerCounter = 0; + while (true) { + boolean isMoreResults = scanner.next(new ArrayList()); + scannerCounter++; + + if (scannerCounter >= pageSize) { + Assert.assertTrue( + "The WhileMatchFilter should now filter all remaining", + filter.filterAllRemaining()); + } + if (!isMoreResults) { + break; + } + } + scanner.close(); + Assert.assertEquals("The page filter returned more rows than expected", + pageSize, scannerCounter); + } + + public void testWhileMatchFilterWithFilterRowKeyWithReverseScan() + throws Exception { + Scan s = new Scan(); + String prefix = "testRowOne"; + WhileMatchFilter filter = new WhileMatchFilter(new PrefixFilter( + Bytes.toBytes(prefix))); + s.setFilter(filter); + s.setReversed(true); + + InternalScanner scanner = this.region.getScanner(s); + while (true) { + ArrayList values = new ArrayList(); + boolean isMoreResults = scanner.next(values); + if (!isMoreResults + || !Bytes.toString(values.get(0).getRow()).startsWith(prefix)) { + Assert.assertTrue( + "The WhileMatchFilter should now filter all remaining", + filter.filterAllRemaining()); + } + if (!isMoreResults) { + break; + } + } + scanner.close(); + } + /** * Tests the the {@link WhileMatchFilter} works in combination with a * {@link Filter} that uses the Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1517733) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -4082,5 +4082,565 @@ assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts, Bytes.toString(kv.getValue())); } + + public void testReverseScanner_FromMemStore_SingleCF_Normal() + throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, + null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + + Scan scan = new Scan(rowC); + scan.setMaxVersions(5); + scan.setReversed(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_LargerKey() + throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, + null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + + Scan scan = new Scan(rowD); + List currRow = new ArrayList(); + scan.setReversed(true); + scan.setMaxVersions(5); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_FullScan() + throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, + null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + Scan scan = new Scan(); + List currRow = new ArrayList(); + scan.setReversed(true); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_moreRowsMayExistAfter() throws IOException { + // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReversed(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReversed(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_smaller_blocksize() throws IOException { + // case to ensure no conflict with HFile index optimization + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration config = new HBaseConfiguration(); + config.setInt("test.block.size", 1); + this.region = initHRegion(tableName, method, config, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReversed(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReversed(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1() + throws IOException { + byte[] row0 = Bytes.toBytes("row0"); // 1 kv + byte[] row1 = Bytes.toBytes("row1"); // 2 kv + byte[] row2 = Bytes.toBytes("row2"); // 4 kv + byte[] row3 = Bytes.toBytes("row3"); // 2 kv + byte[] row4 = Bytes.toBytes("row4"); // 5 kv + byte[] row5 = Bytes.toBytes("row5"); // 2 kv + byte[] cf1 = Bytes.toBytes("CF1"); + byte[] cf2 = Bytes.toBytes("CF2"); + byte[] cf3 = Bytes.toBytes("CF3"); + byte[][] families = { cf1, cf2, cf3 }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + // disable compactions in this test. + conf.setInt("hbase.hstore.compactionThreshold", 10000); + this.region = initHRegion(tableName, method, conf, families); + try { + // kv naming style: kv(row number) + totalKvCountInThisRow + seq no + KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put, + null); + KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1, + KeyValue.Type.Put, null); + KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put, + null); + KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put, + null); + KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4, + KeyValue.Type.Put, null); + KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4, + KeyValue.Type.Put, null); + KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put, + null); + KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put, + null); + KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5, + KeyValue.Type.Put, null); + KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3, + KeyValue.Type.Put, null); + KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put, + null); + // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv) + Put put = null; + put = new Put(row1); + put.add(kv1_2_1); + region.put(put); + put = new Put(row2); + put.add(kv2_4_1); + region.put(put); + put = new Put(row4); + put.add(kv4_5_4); + put.add(kv4_5_5); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv) + put = new Put(row4); + put.add(kv4_5_1); + put.add(kv4_5_3); + region.put(put); + put = new Put(row1); + put.add(kv1_2_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_4); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv) + put = new Put(row4); + put.add(kv4_5_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_2); + put.add(kv2_4_3); + region.put(put); + put = new Put(row3); + put.add(kv3_2_2); + region.put(put); + region.flushcache(); + // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max) + // ( 2 kv) + put = new Put(row0); + put.add(kv0_1_1); + region.put(put); + put = new Put(row3); + put.add(kv3_2_1); + region.put(put); + put = new Put(row5); + put.add(kv5_2_1); + put.add(kv5_2_2); + region.put(put); + // scan range = ["row4", min), skip the max "row5" + Scan scan = new Scan(row4); + scan.setMaxVersions(5); + scan.setBatch(3); + scan.setReversed(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = false; + // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not + // included in scan range + // "row4" takes 2 next() calls since batch=3 + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + // 2. scan out "row3" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row3)); + assertTrue(hasNext); + // 3. scan out "row2" (4 kvs) + // "row2" takes 2 next() calls since batch=3 + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + // 4. scan out "row1" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row1)); + assertTrue(hasNext); + // 5. scan out "row0" (1 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row0)); + assertFalse(hasNext); + + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2() + throws IOException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] row2 = Bytes.toBytes("row2"); + byte[] row3 = Bytes.toBytes("row3"); + byte[] row4 = Bytes.toBytes("row4"); + byte[] cf1 = Bytes.toBytes("CF1"); + byte[] cf2 = Bytes.toBytes("CF2"); + byte[] cf3 = Bytes.toBytes("CF3"); + byte[] cf4 = Bytes.toBytes("CF4"); + byte[][] families = { cf1, cf2, cf3, cf4 }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + // disable compactions in this test. + conf.setInt("hbase.hstore.compactionThreshold", 10000); + this.region = initHRegion(tableName, method, conf, families); + try { + KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null); + KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null); + // storefile1 + Put put = new Put(row1); + put.add(kv1); + region.put(put); + region.flushcache(); + // storefile2 + put = new Put(row2); + put.add(kv2); + region.put(put); + region.flushcache(); + // storefile3 + put = new Put(row3); + put.add(kv3); + region.put(put); + region.flushcache(); + // memstore + put = new Put(row4); + put.add(kv4); + region.put(put); + // scan range = ["row4", min) + Scan scan = new Scan(row4); + scan.setReversed(true); + scan.setBatch(10); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row3)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row1)); + assertFalse(hasNext); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + + private static HRegion initHRegion(byte[] tableName, String callingMethod, + byte[]... families) throws IOException { + return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), + families); + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java (revision 0) @@ -0,0 +1,192 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + +/** + * Reversed KeyValueHeap is used for supporting reversed scanning. Compared with + * KeyValueHeap, its scanner comparator is a little different (see + * ReversedKVScannerComparator), all seek is backward seek(see + * {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row + * if it is already at the end of one row when calling next(). + */ +@InterfaceAudience.Private +public class ReversedKeyValueHeap extends KeyValueHeap { + + /** + * @param scanners + * @param comparator + * @throws IOException + */ + public ReversedKeyValueHeap(List scanners, + KVComparator comparator) throws IOException { + super(scanners, new ReversedKVScannerComparator(comparator)); + } + + @Override + public boolean seek(KeyValue seekKey) throws IOException { + throw new IllegalStateException( + "seek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean reseek(KeyValue seekKey) throws IOException { + throw new IllegalStateException( + "reseek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean requestSeek(KeyValue key, boolean forward, boolean useBloom) + throws IOException { + throw new IllegalStateException( + "requestSeek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean seekToPreviousRow(KeyValue seekKey) throws IOException { + if (current == null) { + return false; + } + heap.add(current); + current = null; + + KeyValueScanner scanner; + while ((scanner = heap.poll()) != null) { + KeyValue topKey = scanner.peek(); + if (comparator.getComparator().compareRows(topKey.getBuffer(), + topKey.getRowOffset(), topKey.getRowLength(), seekKey.getBuffer(), + seekKey.getRowOffset(), seekKey.getRowLength()) < 0) { + // Row of Top KeyValue is before Seek row. + heap.add(scanner); + current = pollRealKV(); + return current != null; + } + + if (!scanner.seekToPreviousRow(seekKey)) { + scanner.close(); + } else { + heap.add(scanner); + } + } + + // Heap is returning empty, scanner is done + return false; + } + + @Override + public boolean backwardSeek(KeyValue seekKey) throws IOException { + if (current == null) { + return false; + } + heap.add(current); + current = null; + + KeyValueScanner scanner; + while ((scanner = heap.poll()) != null) { + KeyValue topKey = scanner.peek(); + if ((comparator.getComparator().matchingRows(seekKey, topKey) && comparator + .getComparator().compare(seekKey, topKey) <= 0) + || comparator.getComparator().compareRows(seekKey, topKey) > 0) { + heap.add(scanner); + current = pollRealKV(); + return current != null; + } + if (!scanner.backwardSeek(seekKey)) { + scanner.close(); + } else { + heap.add(scanner); + } + } + return false; + } + + @Override + public KeyValue next() throws IOException { + if (this.current == null) { + return null; + } + KeyValue kvReturn = this.current.next(); + KeyValue kvNext = this.current.peek(); + if (kvNext == null + || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) { + if (this.current.seekToPreviousRow(kvReturn)) { + this.heap.add(this.current); + } else { + this.current.close(); + } + this.current = pollRealKV(); + } else { + KeyValueScanner topScanner = this.heap.peek(); + if (topScanner != null + && this.comparator.compare(this.current, topScanner) > 0) { + this.heap.add(this.current); + this.current = pollRealKV(); + } + } + return kvReturn; + } + + /** + * In ReversedKVScannerComparator, we compare the row of scanners' peek value + * first, sort bigger one before the smaller one. Then compare the KeyValue if + * they have the equal row, sort smaller one before the bigger one + */ + private static class ReversedKVScannerComparator extends + KVScannerComparator { + + /** + * Constructor + * @param kvComparator + */ + public ReversedKVScannerComparator(KVComparator kvComparator) { + super(kvComparator); + } + + @Override + public int compare(KeyValueScanner left, KeyValueScanner right) { + int rowComparison = compareRows(left.peek(), right.peek()); + if (rowComparison != 0) { + return -rowComparison; + } + return super.compare(left, right); + } + + /** + * Compares rows of two KeyValue + * @param left + * @param right + * @return less than 0 if left is smaller, 0 if equal etc.. + */ + public int compareRows(KeyValue left, KeyValue right) { + return super.kvComparator.compareRows(left, right); + } + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException("Not implemented"); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -131,6 +131,8 @@ private final boolean isUserScan; + private final boolean isReversed; + /** * Construct a QueryMatcher for a scan * @param scan @@ -185,6 +187,7 @@ this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); } + this.isReversed = scan.getReversed(); } /** @@ -256,15 +259,24 @@ int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength, bytes, offset, rowLength); - if (ret <= -1) { - return MatchCode.DONE; - } else if (ret >= 1) { - // could optimize this, if necessary? - // Could also be called SEEK_TO_CURRENT_ROW, but this - // should be rare/never happens. - return MatchCode.SEEK_NEXT_ROW; + if (!this.isReversed) { + if (ret <= -1) { + return MatchCode.DONE; + } else if (ret >= 1) { + // could optimize this, if necessary? + // Could also be called SEEK_TO_CURRENT_ROW, but this + // should be rare/never happens. + return MatchCode.SEEK_NEXT_ROW; + } + } else { + if (ret <= -1) { + return MatchCode.SEEK_NEXT_ROW; + } else if (ret >= 1) { + return MatchCode.DONE; + } } + // optimize case. if (this.stickyNextRow) return MatchCode.SEEK_NEXT_ROW; @@ -434,6 +446,14 @@ } public boolean moreRowsMayExistAfter(KeyValue kv) { + if (this.isReversed) { + if (rowComparator.compareRows(kv.getBuffer(), kv.getRowOffset(), + kv.getRowLength(), stopRow, 0, stopRow.length) <= 0) { + return false; + } else { + return true; + } + } if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) && rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(), kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java (revision 0) @@ -0,0 +1,54 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; + +/** + * A "non-reversed & non-lazy" scanner which does not support backward scanning + * and always does a real seek operation. Most scanners are inherited from this + * class. + */ +@InterfaceAudience.Private +public abstract class NonReversedNonLazyKeyValueScanner extends + NonLazyKeyValueScanner { + + @Override + public boolean backwardSeek(KeyValue key) throws IOException { + throw new NotImplementedException("backwardSeek must not be called on a " + + "non-reversed scanner"); + } + + @Override + public boolean seekToPreviousRow(KeyValue key) throws IOException { + throw new NotImplementedException("seekToPreviousRow must not be called on a " + + "non-reversed scanner"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException("seekToLastRow must not be called on a " + + "non-reversed scanner"); + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; +import org.apache.hadoop.hbase.util.Bytes; /** * KeyValueScanner adaptor over the Reader. It also provides hooks into @@ -53,6 +54,9 @@ private KeyValue delayedSeekKV; private boolean enforceMVCC = false; + // A flag represents whether could stop skipping KeyValues for MVCC + // if have encountered the next row. Only used for reversed scan + private boolean stopSkippingKVsIfNextRow = false; private static final AtomicLong seekCount = new AtomicLong(); @@ -179,6 +183,7 @@ protected boolean skipKVsNewerThanReadpoint() throws IOException { long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); + KeyValue startKV = cur; // We want to ignore all key-values that are newer than our current // readPoint while(enforceMVCC @@ -186,6 +191,12 @@ && (cur.getMvccVersion() > readPoint)) { hfs.next(); cur = hfs.getKeyValue(); + if (this.stopSkippingKVsIfNextRow + && Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(), + cur.getRowLength(), startKV.getBuffer(), startKV.getRowOffset(), + startKV.getRowLength()) > 0) { + return false; + } } if (cur == null) { @@ -381,4 +392,76 @@ return reader.passesTimerangeFilter(scan, oldestUnexpiredTS) && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns); } + + @Override + public boolean seekToPreviousRow(KeyValue key) throws IOException { + try { + try { + KeyValue seekKey = KeyValue.createFirstOnRow(key.getRow()); + seekCount.incrementAndGet(); + if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(), + seekKey.getKeyLength())) { + close(); + return false; + } + KeyValue firstKeyOfPreviousRow = KeyValue.createFirstOnRow(hfs + .getKeyValue().getRow()); + + seekCount.incrementAndGet(); + if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) { + close(); + return false; + } + + cur = hfs.getKeyValue(); + this.stopSkippingKVsIfNextRow = true; + boolean resultOfSkipKVs; + try { + resultOfSkipKVs = skipKVsNewerThanReadpoint(); + } finally { + this.stopSkippingKVsIfNextRow = false; + } + if (!resultOfSkipKVs + || Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(), + cur.getRowLength(), firstKeyOfPreviousRow.getBuffer(), + firstKeyOfPreviousRow.getRowOffset(), + firstKeyOfPreviousRow.getRowLength()) > 0) { + return seekToPreviousRow(firstKeyOfPreviousRow); + } + + return true; + } finally { + realSeekDone = true; + } + } catch (IOException ioe) { + throw new IOException("Could not seekToPreviousRow " + this + " to key " + + key, ioe); + } + } + + @Override + public boolean seekToLastRow() throws IOException { + byte[] lastRow = reader.getLastRowKey(); + if (lastRow == null) { + return false; + } + KeyValue seekKey = KeyValue.createFirstOnRow(lastRow); + if (seek(seekKey)) { + return true; + } else { + return seekToPreviousRow(seekKey); + } + } + + @Override + public boolean backwardSeek(KeyValue key) throws IOException { + seek(key); + if (cur == null + || Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(), + cur.getRowLength(), key.getBuffer(), key.getRowOffset(), + key.getRowLength()) > 0) { + return seekToPreviousRow(key); + } + return true; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -1619,7 +1619,9 @@ scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); } if (scanner == null) { - scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); + scanner = scan.getReversed() ? new ReversedStoreScanner(this, + getScanInfo(), scan, targetCols) : new StoreScanner(this, + getScanInfo(), scan, targetCols); } return scanner; } finally { Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (revision 0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (revision 0) @@ -0,0 +1,113 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A reversed client scanner which support backward scanning + */ +public class ReversedClientScanner extends ClientScanner { + // A biggest byte array for this scanner which is used to construct closest + // front row + private byte[] biggest_byte_array; + /** + * Create a new ReversibleClientScanner for the specified table Note that the + * passed {@link Scan}'s start row maybe changed. + * @param conf The {@link Configuration} to use. + * @param scan {@link Scan} to use in this scanner + * @param tableName The table that we wish to scan + * @param connection Connection identifying the cluster + * @throws IOException + */ + public ReversedClientScanner(Configuration conf, Scan scan, + TableName tableName, HConnection connection) throws IOException { + super(conf, scan, tableName, connection); + } + + @Override + protected void initialize(final Configuration conf) throws IOException { + if (Bytes.equals(this.scan.getStartRow(), HConstants.EMPTY_BYTE_ARRAY)) { + throw new IllegalArgumentException( + "StartRow shouldn't be empty in reversed scan"); + } + biggest_byte_array = Scan.createBiggestByteArray(conf.getInt( + "hbase.client.reversedscanner.maxbyte.length", 9)); + super.initialize(conf); + } + + @Override + protected byte[] getNextScannerStartKey() { + byte[] startKey = this.currentRegion.getStartKey(); + if (startKey != null + && !Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)) { + startKey = createClosestRowBefore(startKey); + } + return startKey; + } + + @Override + // returns true if the passed region endKey + protected boolean checkScanStopRow(final byte[] startKey) { + if (this.scan.getStopRow().length > 0) { + // there is a stop row, check to see if we are past it. + byte[] stopRow = scan.getStopRow(); + int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0, + startKey.length); + if (cmp >= 0) { + // stopRow >= endKey (stopRow is equals to or larger than endKey) + // This is a stop. + return true; + } + } + return false; // unlikely. + } + + /** + * Create the closest row before the specified row + * @param row + * @return a new byte array which is the closest front row of the specified one + */ + private byte[] createClosestRowBefore(byte[] row) { + if (row == null || row.length == 0) { + throw new RuntimeException("The passed row is empty"); + } + if (row[row.length - 1] == 0) { + return Arrays.copyOf(row, row.length - 1); + } else { + if (biggest_byte_array == null) { + throw new RuntimeException( + "Argument biggest_byte_array is not initialized"); + } + byte[] closestFrontRow = Arrays.copyOf(row, row.length); + closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1); + closestFrontRow = Bytes.add(closestFrontRow, biggest_byte_array); + return closestFrontRow; + } + } + +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java (revision 0) @@ -0,0 +1,689 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; +/** + * Test cases against ReversibleKeyValueScanner + */ +@Category(MediumTests.class) +public class TestReversibleScanners { + private static final Log LOG = LogFactory.getLog(TestReversibleScanners.class); + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static byte[] FAMILYNAME = Bytes.toBytes("testCf"); + private static long TS = System.currentTimeMillis(); + private static int MAXMVCC = 7; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static final int ROWSIZE = 200; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + private static byte[] QUAL = Bytes.toBytes("testQual"); + private static final int QUALSIZE = 5; + private static byte[][] QUALS = makeN(QUAL, QUALSIZE); + private static byte[] VALUE = Bytes.toBytes("testValue"); + private static final int VALUESIZE = 3; + private static byte[][] VALUES = makeN(VALUE, VALUESIZE); + + @After + public void tearDown() { + MultiVersionConsistencyControl.setThreadReadPoint(Long.MAX_VALUE); + } + + @Test + public void testReversibleStoreFileScanner() throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path hfilePath = new Path(new Path( + TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), + "regionname"), "familyname"); + CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + StoreFile.Writer writer = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + writeStoreFile(writer); + + StoreFile sf = new StoreFile(fs, writer.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + + List scanners = StoreFileScanner + .getScannersForStoreFiles(Collections.singletonList(sf), false, true, + false); + StoreFileScanner scanner = scanners.get(0); + seekTestOfReversibleKeyValueScanner(scanner); + } + + @Test + public void testReversibleMemstoreScanner() throws IOException { + MemStore memstore = new MemStore(); + writeMemstore(memstore); + List scanners = memstore.getScanners(); + seekTestOfReversibleKeyValueScanner(scanners.get(0)); + } + + @Test + public void testReversibleKeyValueHeap() throws IOException { + // write data to one memstore and two store files + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path hfilePath = new Path(new Path( + TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), + "familyname"); + CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + StoreFile.Writer writer1 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + StoreFile.Writer writer2 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + + MemStore memstore = new MemStore(); + writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1, + writer2 }); + + StoreFile sf1 = new StoreFile(fs, writer1.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + + StoreFile sf2 = new StoreFile(fs, writer2.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + /** + * Test without MVCC + */ + int startRowNum = ROWSIZE / 2; + ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, + sf2, ROWS[startRowNum]); + internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); + + startRowNum = ROWSIZE - 1; + kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, + HConstants.EMPTY_START_ROW); + internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); + + /** + * Test with MVCC + */ + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + MultiVersionConsistencyControl.setThreadReadPoint(readPoint); + startRowNum = ROWSIZE - 1; + kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, + HConstants.EMPTY_START_ROW); + for (int i = startRowNum; i >= 0; i--) { + if (i - 2 < 0) break; + i = i - 2; + kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1])); + Pair nextReadableNum = getNextReadableNumWithBackwardScan( + i, 0, readPoint); + if (nextReadableNum == null) break; + KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), + nextReadableNum.getSecond()); + assertEquals(expecedKey, kvHeap.peek()); + i = nextReadableNum.getFirst(); + int qualNum = nextReadableNum.getSecond(); + if (qualNum + 1 < QUALSIZE) { + kvHeap.backwardSeek(makeKV(i, qualNum + 1)); + nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, + readPoint); + if (nextReadableNum == null) break; + expecedKey = makeKV(nextReadableNum.getFirst(), + nextReadableNum.getSecond()); + assertEquals(expecedKey, kvHeap.peek()); + i = nextReadableNum.getFirst(); + qualNum = nextReadableNum.getSecond(); + } + + kvHeap.next(); + + if (qualNum + 1 >= QUALSIZE) { + nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, + readPoint); + } else { + nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, + readPoint); + } + if (nextReadableNum == null) break; + expecedKey = makeKV(nextReadableNum.getFirst(), + nextReadableNum.getSecond()); + assertEquals(expecedKey, kvHeap.peek()); + i = nextReadableNum.getFirst(); + } + } + } + + @Test + public void testReversibleStoreScanner() throws IOException { + // write data to one memstore and two store files + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path hfilePath = new Path(new Path( + TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), + "familyname"); + CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + StoreFile.Writer writer1 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + StoreFile.Writer writer2 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + + MemStore memstore = new MemStore(); + writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1, + writer2 }); + + StoreFile sf1 = new StoreFile(fs, writer1.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + + StoreFile sf2 = new StoreFile(fs, writer2.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + + ScanType scanType = ScanType.USER_SCAN; + ScanInfo scanInfo = new ScanInfo(FAMILYNAME, 0, Integer.MAX_VALUE, + Long.MAX_VALUE, false, 0, KeyValue.COMPARATOR); + + // Case 1.Test a full reversed scan + Scan scan = new Scan(); + scan.setReversed(true); + StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, + scan, scanType, scanInfo); + verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false); + + // Case 2.Test reversed scan with a specified start row + int startRowNum = ROWSIZE / 2; + byte[] startRow = ROWS[startRowNum]; + scan.setStartRow(startRow); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, + scanType, scanInfo); + verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), + startRowNum + 1, false); + + // Case 3.Test reversed scan with a specified start row and specified + // qualifiers + assertTrue(QUALSIZE > 2); + scan.addColumn(FAMILYNAME, QUALS[0]); + scan.addColumn(FAMILYNAME, QUALS[2]); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, + scanType, scanInfo); + verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, + false); + + // Case 4.Test reversed scan with mvcc based on case 3 + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + MultiVersionConsistencyControl.setThreadReadPoint(readPoint); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, + scanType, scanInfo); + int expectedRowCount = 0; + int expectedKVCount = 0; + for (int i = startRowNum; i >= 0; i--) { + int kvCount = 0; + if (makeMVCC(i, 0) <= readPoint) { + kvCount++; + } + if (makeMVCC(i, 2) <= readPoint) { + kvCount++; + } + if (kvCount > 0) { + expectedRowCount++; + expectedKVCount += kvCount; + } + } + verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, + false); + } + } + + @Test + public void testReversibleRegionScanner() throws IOException { + byte[] tableName = Bytes.toBytes("testtable"); + byte[] FAMILYNAME2 = Bytes.toBytes("testCf2"); + Configuration conf = HBaseConfiguration.create(); + HRegion region = TestHRegion.initHRegion(tableName, + "testReversibleRegionScanner", conf, FAMILYNAME, FAMILYNAME2); + loadDataToRegion(region, FAMILYNAME2); + + // verify row count with forward scan + Scan scan = new Scan(); + InternalScanner scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true); + + // Case1:Full reversed scan + scan.setReversed(true); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false); + + // Case2:Full reversed scan with one family + scan = new Scan(); + scan.setReversed(true); + scan.addFamily(FAMILYNAME); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false); + + // Case3:Specify qualifiers + One family + byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] }; + for (byte[] specifiedQualifier : specifiedQualifiers) + scan.addColumn(FAMILYNAME, specifiedQualifier); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false); + + // Case4:Specify qualifiers + Two families + for (byte[] specifiedQualifier : specifiedQualifiers) + scan.addColumn(FAMILYNAME2, specifiedQualifier); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false); + + // Case5: Case4 + specify start row + int startRowNum = ROWSIZE * 3 / 4; + scan.setStartRow(ROWS[startRowNum]); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), + false); + + // Case6: Case4 + specify stop row + int stopRowNum = ROWSIZE / 4; + scan.setStartRow(HConstants.EMPTY_BYTE_ARRAY); + scan.setStopRow(ROWS[stopRowNum]); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE + - stopRowNum - 1), false); + + // Case7: Case4 + specify start row + specify stop row + scan.setStartRow(ROWS[startRowNum]); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, + (startRowNum - stopRowNum), false); + + // Case8: Case7 + SingleColumnValueFilter + int valueNum = startRowNum % VALUESIZE; + Filter filter = new SingleColumnValueFilter(FAMILYNAME, + specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]); + scan.setFilter(filter); + scanner = region.getScanner(scan); + int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE + + (stopRowNum / VALUESIZE == valueNum ? 0 : 1); + verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, + false); + + // Case9: Case7 + PageFilter + int pageSize = 10; + filter = new PageFilter(pageSize); + scan.setFilter(filter); + scanner = region.getScanner(scan); + int expectedRowNum = pageSize; + verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); + + // Case10: Case7 + FilterList+MUST_PASS_ONE + SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter( + FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]); + SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter( + FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]); + expectedRowNum = 0; + for (int i = startRowNum; i > stopRowNum; i--) { + if (i % VALUESIZE == 0 || i % VALUESIZE == 1) { + expectedRowNum++; + } + } + filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2); + scan.setFilter(filter); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); + + // Case10: Case7 + FilterList+MUST_PASS_ALL + filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2); + expectedRowNum = 0; + scan.setFilter(filter); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); + } + + private StoreScanner getReversibleStoreScanner(MemStore memstore, + StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType, + ScanInfo scanInfo) throws IOException { + List scanners = getScanners(memstore, sf1, sf2, null, + false); + NavigableSet columns = null; + for (Map.Entry> entry : scan.getFamilyMap() + .entrySet()) { + // Should only one family + columns = entry.getValue(); + } + StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, + scanType, columns, scanners); + return storeScanner; + } + + private void verifyCountAndOrder(InternalScanner scanner, + int expectedKVCount, int expectedRowCount, boolean forward) + throws IOException { + List kvList = new ArrayList(); + Result lastResult = null; + int rowCount = 0; + int kvCount = 0; + try { + while (scanner.next(kvList)) { + if (kvList.isEmpty()) continue; + rowCount++; + kvCount += kvList.size(); + if (lastResult != null) { + Result curResult = new Result(kvList); + assertEquals("LastResult:" + lastResult + "CurResult:" + curResult, + forward, + Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0); + } + lastResult = new Result(kvList); + kvList.clear(); + } + } finally { + scanner.close(); + } + if (!kvList.isEmpty()) { + rowCount++; + kvCount += kvList.size(); + kvList.clear(); + } + assertEquals(expectedKVCount, kvCount); + assertEquals(expectedRowCount, rowCount); + } + + private void internalTestSeekAndNextForReversibleKeyValueHeap( + ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException { + // Test next and seek + for (int i = startRowNum; i >= 0; i--) { + if (i % 2 == 1 && i - 2 >= 0) { + i = i - 2; + kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1])); + } + for (int j = 0; j < QUALSIZE; j++) { + if (j % 2 == 1 && (j + 1) < QUALSIZE) { + j = j + 1; + kvHeap.backwardSeek(makeKV(i, j)); + } + assertEquals(makeKV(i, j), kvHeap.peek()); + kvHeap.next(); + } + } + assertEquals(null, kvHeap.peek()); + } + + private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, + StoreFile sf1, StoreFile sf2, byte[] startRow) throws IOException { + List scanners = getScanners(memstore, sf1, sf2, startRow, + true); + ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, + KeyValue.COMPARATOR); + return kvHeap; + } + + private List getScanners(MemStore memstore, StoreFile sf1, + StoreFile sf2, byte[] startRow, boolean doSeek) throws IOException { + List fileScanners = StoreFileScanner + .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, + false); + List memScanners = memstore.getScanners(); + List scanners = new ArrayList( + fileScanners.size() + 1); + scanners.addAll(fileScanners); + scanners.addAll(memScanners); + + if (doSeek) { + if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) { + for (KeyValueScanner scanner : scanners) { + scanner.seekToLastRow(); + } + } else { + KeyValue startKey = KeyValue.createFirstOnRow(startRow); + for (KeyValueScanner scanner : scanners) { + scanner.backwardSeek(startKey); + } + } + } + return scanners; + } + + private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) + throws IOException { + /** + * Test without MVCC + */ + // Test seek to last row + assertTrue(scanner.seekToLastRow()); + assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek()); + + // Test backward seek in three cases + // Case1: seek in the same row in backwardSeek + KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2); + assertTrue(scanner.backwardSeek(seekKey)); + assertEquals(seekKey, scanner.peek()); + + // Case2: seek to the previous row in backwardSeek + int seekRowNum = ROWSIZE - 2; + assertTrue(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[seekRowNum]))); + KeyValue expectedKey = makeKV(seekRowNum - 1, 0); + assertEquals(expectedKey, scanner.peek()); + + // Case3: unable to backward seek + assertFalse(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[0]))); + assertEquals(null, scanner.peek()); + + // Test seek to previous row + seekRowNum = ROWSIZE - 4; + assertTrue(scanner.seekToPreviousRow(KeyValue + .createFirstOnRow(ROWS[seekRowNum]))); + expectedKey = makeKV(seekRowNum - 1, 0); + assertEquals(expectedKey, scanner.peek()); + + // Test seek to previous row for the first row + assertFalse(scanner.seekToPreviousRow(makeKV(0, 0))); + assertEquals(null, scanner.peek()); + + /** + * Test with MVCC + */ + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + MultiVersionConsistencyControl.setThreadReadPoint(readPoint); + // Test seek to last row + expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 1, 0, + readPoint); + assertEquals(expectedKey != null, scanner.seekToLastRow()); + assertEquals(expectedKey, scanner.peek()); + + // Test backward seek in two cases + // Case1: seek in the same row in backwardSeek + expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, + QUALSIZE - 2, readPoint); + assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey)); + assertEquals(expectedKey, scanner.peek()); + + // Case2: seek to the previous row in backwardSeek + seekRowNum = ROWSIZE - 3; + seekKey = KeyValue.createLastOnRow(ROWS[seekRowNum]); + expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, + readPoint); + assertEquals(expectedKey != null, scanner.backwardSeek(seekKey)); + assertEquals(expectedKey, scanner.peek()); + + // Test seek to previous row + seekRowNum = ROWSIZE - 4; + expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, + readPoint); + assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValue + .createFirstOnRow(ROWS[seekRowNum]))); + assertEquals(expectedKey, scanner.peek()); + } + } + + private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, + int startQualNum, int readPoint) { + Pair nextReadableNum = getNextReadableNumWithBackwardScan( + startRowNum, startQualNum, readPoint); + if (nextReadableNum == null) + return null; + return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); + } + + private Pair getNextReadableNumWithBackwardScan( + int startRowNum, int startQualNum, int readPoint) { + Pair nextReadableNum = null; + boolean findExpected = false; + for (int i = startRowNum; i >= 0; i--) { + for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) { + if (makeMVCC(i, j) <= readPoint) { + nextReadableNum = new Pair(i, j); + findExpected = true; + break; + } + } + if (findExpected) + break; + } + return nextReadableNum; + } + + private static void loadDataToRegion(HRegion region, byte[] additionalFamily) + throws IOException { + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + for (int j = 0; j < QUALSIZE; j++) { + put.add(makeKV(i, j)); + // put additional family + put.add(makeKV(i, j, additionalFamily)); + } + region.put(put); + if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) { + region.flushcache(); + } + } + } + + private static void writeMemstoreAndStoreFiles(MemStore memstore, + final StoreFile.Writer[] writers) throws IOException { + Random rand = new Random(); + try { + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + if (i % 2 == 0) { + memstore.add(makeKV(i, j)); + } else { + writers[(i + j) % writers.length].append(makeKV(i, j)); + } + } + } + } finally { + for (int i = 0; i < writers.length; i++) { + writers[i].close(); + } + } + } + + private static void writeStoreFile(final StoreFile.Writer writer) + throws IOException { + try { + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + writer.append(makeKV(i, j)); + } + } + } finally { + writer.close(); + } + } + + private static void writeMemstore(MemStore memstore) throws IOException { + // Add half of the keyvalues to memstore + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + if ((i + j) % 2 == 0) { + memstore.add(makeKV(i, j)); + } + } + } + memstore.snapshot(); + // Add another half of the keyvalues to snapshot + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + if ((i + j) % 2 == 1) { + memstore.add(makeKV(i, j)); + } + } + } + } + + private static KeyValue makeKV(int rowNum, int cqNum) { + return makeKV(rowNum, cqNum, FAMILYNAME); + } + + private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) { + KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, + VALUES[rowNum % VALUESIZE]); + kv.setMvccVersion(makeMVCC(rowNum, cqNum)); + return kv; + } + + private static long makeMVCC(int rowNum, int cqNum) { + return (rowNum + cqNum) % (MAXMVCC + 1); + } + + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i))); + } + return ret; + } +} Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1517733) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -693,8 +693,9 @@ if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } - return new ClientScanner(getConfiguration(), scan, - getName(), this.connection); + return scan.getReversed() ? new ReversedClientScanner(getConfiguration(), + scan, getName(), this.connection) : new ClientScanner( + getConfiguration(), scan, getName(), this.connection); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (working copy) @@ -26,14 +26,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.NonLazyKeyValueScanner; +import org.apache.hadoop.hbase.regionserver.NonReversedNonLazyKeyValueScanner; /** * Utility scanner that wraps a sortable collection and serves * as a KeyValueScanner. */ @InterfaceAudience.Private -public class CollectionBackedScanner extends NonLazyKeyValueScanner { +public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner { final private Iterable data; final KeyValue.KVComparator comparator; private Iterator iter; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -32,10 +32,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -45,7 +45,7 @@ * into List for a single row. */ @InterfaceAudience.Private -public class StoreScanner extends NonLazyKeyValueScanner +public class StoreScanner extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { static final Log LOG = LogFactory.getLog(StoreScanner.class); protected Store store; @@ -145,25 +145,10 @@ // Pass columns to try to filter out unnecessary StoreFiles. List scanners = getScannersNoCompaction(); + + seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery + && lazySeekEnabledGlobally, isParallelSeekEnabled); - // Seek all scanners to the start of the Row (or if the exact matching row - // key does not exist, then to the start of the next matching Row). - // Always check bloom filter to optimize the top row seek for delete - // family marker. - if (explicitColumnQuery && lazySeekEnabledGlobally) { - for (KeyValueScanner scanner : scanners) { - scanner.requestSeek(matcher.getStartKey(), false, true); - } - } else { - if (!isParallelSeekEnabled) { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } - } else { - parallelSeek(scanners, matcher.getStartKey()); - } - } - // set storeLimit this.storeLimit = scan.getMaxResultsPerColumnFamily(); @@ -171,7 +156,7 @@ this.storeOffset = scan.getRowOffsetPerColumnFamily(); // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.getComparator()); + resetKVHeap(scanners, store.getComparator()); this.store.addChangedReaderObserver(this); } @@ -227,16 +212,10 @@ scanners = selectScannersFrom(scanners); // Seek all scanners to the initial key - if (!isParallelSeekEnabled) { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } - } else { - parallelSeek(scanners, matcher.getStartKey()); - } + seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled); // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.getComparator()); + resetKVHeap(scanners, store.getComparator()); } /** Constructor for testing. */ @@ -258,16 +237,48 @@ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); // Seek all scanners to the initial key - if (!isParallelSeekEnabled) { + seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled); + + // Combine all seeked scanners with a heap + resetKVHeap(scanners, scanInfo.getComparator()); + } + + /** + * Seek the specified scanners with the given key + * @param scanners + * @param seekKey + * @param isLazy true if using lazy seek + * @param isParallelSeek true if using parallel seek + * @throws IOException + */ + protected void seekScanners(List scanners, + KeyValue seekKey, boolean isLazy, boolean isParallelSeek) + throws IOException { + // Seek all scanners to the start of the Row (or if the exact matching row + // key does not exist, then to the start of the next matching Row). + // Always check bloom filter to optimize the top row seek for delete + // family marker. + if (isLazy) { for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); + scanner.requestSeek(seekKey, false, true); } } else { - parallelSeek(scanners, matcher.getStartKey()); + if (!isParallelSeek) { + for (KeyValueScanner scanner : scanners) { + scanner.seek(seekKey); + } + } else { + parallelSeek(scanners, seekKey); + } } - heap = new KeyValueHeap(scanners, scanInfo.getComparator()); } + protected void resetKVHeap(List scanners, + KVComparator comparator) throws IOException { + // Combine all seeked scanners with a heap + heap = new KeyValueHeap(scanners, comparator); + } + /** * Get a filtered list of scanners. Assumes we are not in a compaction. * @return list of scanners to seek @@ -397,9 +408,7 @@ int count = 0; LOOP: while((kv = this.heap.peek()) != null) { ++kvsScanned; - // Check that the heap gives us KVs in an increasing order. - assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : - "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store; + checkScanOrder(prevKV, kv, comparator); prevKV = kv; ScanQueryMatcher.MatchCode qcode = matcher.match(kv); @@ -420,7 +429,7 @@ if (!matcher.moreRowsMayExistAfter(kv)) { return false; } - reseek(matcher.getKeyForNextRow(kv)); + seekToNextRow(kv); break LOOP; } @@ -435,9 +444,9 @@ if (!matcher.moreRowsMayExistAfter(kv)) { return false; } - reseek(matcher.getKeyForNextRow(kv)); + seekToNextRow(kv); } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { - reseek(matcher.getKeyForNextColumn(kv)); + seekAsDirection(matcher.getKeyForNextColumn(kv)); } else { this.heap.next(); } @@ -461,11 +470,11 @@ return false; } - reseek(matcher.getKeyForNextRow(kv)); + seekToNextRow(kv); break; case SEEK_NEXT_COL: - reseek(matcher.getKeyForNextColumn(kv)); + seekAsDirection(matcher.getKeyForNextColumn(kv)); break; case SKIP: @@ -475,7 +484,7 @@ case SEEK_NEXT_USING_HINT: KeyValue nextKV = matcher.getNextKeyHint(kv); if (nextKV != null) { - reseek(nextKV); + seekAsDirection(nextKV); } else { heap.next(); } @@ -555,16 +564,11 @@ * could have done it now by storing the scan object from the constructor */ List scanners = getScannersNoCompaction(); - if (!isParallelSeekEnabled) { - for (KeyValueScanner scanner : scanners) { - scanner.seek(lastTopKey); - } - } else { - parallelSeek(scanners, lastTopKey); - } + // Seek all scanners to the initial key + seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled); // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.getComparator()); + resetKVHeap(scanners, store.getComparator()); // Reset the state of the Query Matcher and set to top row. // Only reset and call setRow if the row changes; avoids confusing the @@ -584,6 +588,36 @@ } } + /** + * Check whether scan as expected order + * @param prevKV + * @param kv + * @param comparator + * @throws IOException + */ + protected void checkScanOrder(KeyValue prevKV, KeyValue kv, + KeyValue.KVComparator comparator) throws IOException { + // Check that the heap gives us KVs in an increasing order. + assert prevKV == null || comparator == null + || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV + + " followed by a " + "smaller key " + kv + " in cf " + store; + } + + protected synchronized boolean seekToNextRow(KeyValue kv) throws IOException { + return reseek(matcher.getKeyForNextRow(kv)); + } + + /** + * Do a reseek in a normal StoreScanner(scan forward) + * @param kv + * @return true if scanner has values left, false if end of scanner + * @throws IOException + */ + protected synchronized boolean seekAsDirection(KeyValue kv) + throws IOException { + return reseek(kv); + } + @Override public synchronized boolean reseek(KeyValue kv) throws IOException { //Heap will not be null, if this is called from next() which. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java (revision 0) @@ -0,0 +1,81 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; + +/** + * ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to + * support reversed scanning. + */ +@InterfaceAudience.Private +class ReversedRegionScannerImpl extends RegionScannerImpl { + + /** + * @param scan + * @param additionalScanners + * @param region + * @throws IOException + */ + ReversedRegionScannerImpl(Scan scan, + List additionalScanners, HRegion region) + throws IOException { + region.super(scan, additionalScanners, region); + } + + @Override + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) throws IOException { + this.storeHeap = new ReversedKeyValueHeap(scanners, region.getComparator()); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new ReversedKeyValueHeap(joinedScanners, + region.getComparator()); + } + } + + @Override + protected boolean isStopRow(byte[] currentRow, int offset, short length) { + return currentRow == null + || (super.stopRow != null && region.getComparator().compareRows( + stopRow, 0, stopRow.length, currentRow, offset, length) >= super.isScan); + } + + @Override + protected boolean nextRow(byte[] currentRow, int offset, short length) + throws IOException { + assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; + byte row[] = new byte[length]; + System.arraycopy(currentRow, offset, row, 0, length); + this.storeHeap.seekToPreviousRow(KeyValue.createFirstOnRow(row)); + resetFilters(); + // Calling the hook in CP which allows it to do a fast forward + if (this.region.getCoprocessorHost() != null) { + return this.region.getCoprocessorHost().postScannerFilterRow(this, + currentRow); + } + return true; + } + +} \ No newline at end of file Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (revision 1517733) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (working copy) @@ -74,6 +74,8 @@ public KeyValueScanner preStoreScannerOpen(final ObserverContext c, Store store, final Scan scan, final NavigableSet targetCols, KeyValueScanner s) throws IOException { - return new StoreScanner(store, store.getScanInfo(), scan, targetCols); + return scan.getReversed() ? new ReversedStoreScanner(store, + store.getScanInfo(), scan, targetCols) : new StoreScanner(store, + store.getScanInfo(), scan, targetCols); } } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (revision 1517733) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (working copy) @@ -155,6 +155,10 @@ * @param filter another filter */ public void addFilter(Filter filter) { + if (this.isReversed() != filter.isReversed()) { + throw new IllegalArgumentException( + "Filters in the list must have the same reversed flag"); + } this.filters.add(filter); } @@ -403,6 +407,14 @@ } @Override + public void setReversed(boolean reversed) { + for (Filter filter : filters) { + filter.setReversed(reversed); + } + this.reversed = reversed; + } + + @Override public String toString() { return toString(MAX_LOG_FILTERS); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1517733) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -2682,26 +2682,46 @@ } - private void scanTestNull(HTable ht, byte [] row, byte [] family, - byte [] value) - throws Exception { + private void scanTestNull(HTable ht, byte[] row, byte[] family, byte[] value) + throws Exception { + scanTestNull(ht, row, family, value, false); + } + private void scanTestNull(HTable ht, byte[] row, byte[] family, byte[] value, + boolean isReversedScan) throws Exception { + Scan scan = new Scan(); + scan.setReversed(isReversedScan); + if (isReversedScan) { + scan.setStartRow(Scan.createBiggestByteArray(9)); + } scan.addColumn(family, null); Result result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReversed(isReversedScan); + if (isReversedScan) { + scan.setStartRow(Scan.createBiggestByteArray(9)); + } scan.addColumn(family, HConstants.EMPTY_BYTE_ARRAY); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReversed(isReversedScan); + if (isReversedScan) { + scan.setStartRow(Scan.createBiggestByteArray(9)); + } scan.addFamily(family); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReversed(isReversedScan); + if (isReversedScan) { + scan.setStartRow(Scan.createBiggestByteArray(9)); + } result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); @@ -5177,4 +5197,475 @@ table.close(); TEST_UTIL.deleteTable(TABLE); } + + @Test + public void testSuperSimpleWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testSuperSimpleWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + Put put = new Put(Bytes.toBytes("0-b11111-0000000000000000000")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000002")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000004")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000006")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000008")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000001")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000003")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000005")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000007")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000009")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + ht.flushCommits(); + Scan scan = new Scan(Bytes.toBytes("0-b11111-9223372036854775807"), + Bytes.toBytes("0-b11111-0000000000000000000")); + scan.setReversed(true); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertTrue(Bytes.equals(result.getRow(), + Bytes.toBytes("0-b11111-0000000000000000008"))); + scanner.close(); + ht.close(); + } + + @Test + public void testFiltersWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testFiltersWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte[][] ROWS = makeN(ROW, 10); + byte[][] QUALIFIERS = { Bytes.toBytes("col0--"), + Bytes.toBytes("col1--"), + Bytes.toBytes("col2--"), + Bytes.toBytes("col3--"), + Bytes.toBytes("col4--"), + Bytes.toBytes("col5--"), + Bytes.toBytes("col6--"), + Bytes.toBytes("col7--"), + Bytes.toBytes("col8--"), + Bytes.toBytes("col9--") }; + for (int i = 0; i < 10; i++) { + Put put = new Put(ROWS[i]); + put.add(FAMILY, QUALIFIERS[i], VALUE); + ht.put(put); + } + Scan scan = new Scan(); + scan.setReversed(true); + scan.setStartRow(Scan.createBiggestByteArray(9)); + scan.addFamily(FAMILY); + Filter filter = new QualifierFilter(CompareOp.EQUAL, + new RegexStringComparator("col[1-5]")); + scan.setFilter(filter); + ResultScanner scanner = ht.getScanner(scan); + int expectedIndex = 5; + for (Result result : scanner) { + assertEquals(result.size(), 1); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[expectedIndex])); + assertTrue(Bytes.equals(result.raw()[0].getQualifier(), + QUALIFIERS[expectedIndex])); + expectedIndex--; + } + assertEquals(expectedIndex, 0); + scanner.close(); + ht.close(); + } + + @Test + public void testKeyOnlyFilterWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testKeyOnlyFilterWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte[][] ROWS = makeN(ROW, 10); + byte[][] QUALIFIERS = { Bytes.toBytes("col0--"), + Bytes.toBytes("col1--"), + Bytes.toBytes("col2--"), + Bytes.toBytes("col3--"), + Bytes.toBytes("col4--"), + Bytes.toBytes("col5--"), + Bytes.toBytes("col6--"), + Bytes.toBytes("col7--"), + Bytes.toBytes("col8--"), + Bytes.toBytes("col9--") }; + for (int i = 0; i < 10; i++) { + Put put = new Put(ROWS[i]); + put.add(FAMILY, QUALIFIERS[i], VALUE); + ht.put(put); + } + Scan scan = new Scan(); + scan.setReversed(true); + scan.setStartRow(Scan.createBiggestByteArray(9)); + scan.addFamily(FAMILY); + Filter filter = new KeyOnlyFilter(true); + scan.setFilter(filter); + ResultScanner scanner = ht.getScanner(scan); + int count = 0; + for (Result result : ht.getScanner(scan)) { + assertEquals(result.size(), 1); + assertEquals(result.raw()[0].getValueLength(), Bytes.SIZEOF_INT); + assertEquals(Bytes.toInt(result.raw()[0].getValue()), VALUE.length); + count++; + } + assertEquals(count, 10); + scanner.close(); + ht.close(); + } + + /** + * Test simple table and non-existent row cases. + */ + @Test + public void testSimpleMissingWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testSimpleMissingWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte[][] ROWS = makeN(ROW, 4); + + // Try to get a row on an empty table + Scan scan = new Scan(); + scan.setStartRow(Scan.createBiggestByteArray(9)); + scan.setReversed(true); + Result result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[0]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[0], ROWS[1]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.setReversed(true); + scan.setStartRow(Scan.createBiggestByteArray(9)); + scan.addFamily(FAMILY); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.setReversed(true); + scan.setStartRow(Scan.createBiggestByteArray(9)); + scan.addColumn(FAMILY, QUALIFIER); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Insert a row + + Put put = new Put(ROWS[2]); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + + // Make sure we can scan the row + + scan = new Scan(); + scan.setReversed(true); + scan.setStartRow(Scan.createBiggestByteArray(9)); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[3], ROWS[0]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[2], ROWS[1]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + // Try to scan empty rows around it + // Introduced MemStore#shouldSeekForReverseScan to fix the following + scan = new Scan(ROWS[1]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + ht.close(); + } + + @Test + public void testNullWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testNullWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + // Null qualifier (should work) + Put put = new Put(ROW); + put.add(FAMILY, null, VALUE); + ht.put(put); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + Delete delete = new Delete(ROW); + delete.deleteColumns(FAMILY, null); + ht.delete(delete); + // Use a new table + byte[] TABLE2 = Bytes.toBytes("testNull2WithReverseScan"); + ht = TEST_UTIL.createTable(TABLE2, FAMILY); + // Empty qualifier, byte[0] instead of null (should work) + put = new Put(ROW); + put.add(FAMILY, HConstants.EMPTY_BYTE_ARRAY, VALUE); + ht.put(put); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + TEST_UTIL.flush(); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + delete = new Delete(ROW); + delete.deleteColumns(FAMILY, HConstants.EMPTY_BYTE_ARRAY); + ht.delete(delete); + // Null value + put = new Put(ROW); + put.add(FAMILY, QUALIFIER, null); + ht.put(put); + Scan scan = new Scan(); + scan.setStartRow(Scan.createBiggestByteArray(9)); + scan.setReversed(true); + scan.addColumn(FAMILY, QUALIFIER); + Result result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROW, FAMILY, QUALIFIER, null); + ht.close(); + } + + @Test + public void testDeletesWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testDeletesWithReverseScan"); + byte[][] ROWS = makeNAscii(ROW, 6); + byte[][] FAMILIES = makeNAscii(FAMILY, 3); + byte[][] VALUES = makeN(VALUE, 5); + long[] ts = { 1000, 2000, 3000, 4000, 5000 }; + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, + TEST_UTIL.getConfiguration(), 3); + + Put put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[0], QUALIFIER, ts[1], VALUES[1]); + ht.put(put); + + Delete delete = new Delete(ROW); + delete.deleteFamily(FAMILIES[0], ts[0]); + ht.delete(delete); + + Scan scan = new Scan(ROW); + scan.setReversed(true); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + Result result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1] }, + new byte[][] { VALUES[1] }, 0, 0); + + // Test delete latest version + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); + put.add(FAMILIES[0], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[0], QUALIFIER, ts[3], VALUES[3]); + put.add(FAMILIES[0], null, ts[4], VALUES[4]); + put.add(FAMILIES[0], null, ts[2], VALUES[2]); + put.add(FAMILIES[0], null, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROW); + delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4] + ht.delete(delete); + + scan = new Scan(ROW); + scan.setReversed(true); + scan.addColumn(FAMILIES[0], QUALIFIER); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1], + ts[2], ts[3] }, new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2); + + // Test for HBASE-1847 + delete = new Delete(ROW); + delete.deleteColumn(FAMILIES[0], null); + ht.delete(delete); + + // Cleanup null qualifier + delete = new Delete(ROW); + delete.deleteColumns(FAMILIES[0], null); + ht.delete(delete); + + // Expected client behavior might be that you can re-put deleted values + // But alas, this is not to be. We can't put them back in either case. + + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000 + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000 + ht.put(put); + + // The Scanner returns the previous values, the expected-naive-unexpected + // behavior + + scan = new Scan(ROW); + scan.setReversed(true); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1], + ts[2], ts[3] }, new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2); + + // Test deleting an entire family from one row but not the other various + // ways + + put = new Put(ROWS[0]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[1]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[2]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROWS[0]); + delete.deleteFamily(FAMILIES[2]); + ht.delete(delete); + + delete = new Delete(ROWS[1]); + delete.deleteColumns(FAMILIES[1], QUALIFIER); + ht.delete(delete); + + delete = new Delete(ROWS[2]); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[2], QUALIFIER); + ht.delete(delete); + + scan = new Scan(ROWS[0]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, new long[] { ts[0], + ts[1] }, new byte[][] { VALUES[0], VALUES[1] }, 0, 1); + + scan = new Scan(ROWS[1]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + + scan = new Scan(ROWS[2]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertEquals(1, result.size()); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, + new long[] { ts[2] }, new byte[][] { VALUES[2] }, 0, 0); + + // Test if we delete the family first in one row (HBASE-1541) + + delete = new Delete(ROWS[3]); + delete.deleteFamily(FAMILIES[1]); + ht.delete(delete); + + put = new Put(ROWS[3]); + put.add(FAMILIES[2], QUALIFIER, VALUES[0]); + ht.put(put); + + put = new Put(ROWS[4]); + put.add(FAMILIES[1], QUALIFIER, VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, VALUES[2]); + ht.put(put); + + scan = new Scan(ROWS[4]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = ht.getScanner(scan); + result = scanner.next(); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.raw()[1].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[1])); + assertTrue(Bytes.equals(result.raw()[1].getValue(), VALUES[2])); + result = scanner.next(); + assertTrue("Expected 1 key but received " + result.size(), + result.size() == 1); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[3])); + assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[0])); + scanner.close(); + ht.close(); + } + + /** + * Tests reversed scan under multi regions + */ + @Test + public void testReversedScanUnderMultiRegions() throws Exception { + // Test Initialization. + byte[] TABLE = Bytes.toBytes("testReversedScanUnderMultiRegions"); + byte[][] splitRows = new byte[][] { Bytes.toBytes("005"), + Bytes.toBytes("008") }; + HTable table = TEST_UTIL.createTable(TABLE, FAMILY, splitRows); + TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); + + assertEquals(splitRows.length + 1, table.getRegionLocations().size()); + int insertNum = 9; + for (int i = 0; i < insertNum; i++) { + Put put = new Put(Bytes.toBytes(String.format("%03d", i))); + put.add(FAMILY, QUALIFIER, VALUE); + table.put(put); + } + + // scan forward + ResultScanner scanner = table.getScanner(new Scan()); + int count = 0; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + } + assertEquals(insertNum, count); + + // scan backward + Scan scan = new Scan(); + scan.setReversed(true); + scan.setStartRow(Scan.createBiggestByteArray(9)); + scanner = table.getScanner(scan); + count = 0; + byte[] lastRow = null; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + byte[] thisRow = r.getRow(); + if (lastRow != null) { + assertTrue("Error scan order, last row= " + Bytes.toString(lastRow) + + ",this row=" + Bytes.toString(thisRow), + Bytes.compareTo(thisRow, lastRow) < 0); + } + lastRow = thisRow; + } + assertEquals(insertNum, count); + table.close(); + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -731,6 +731,10 @@ volatile MemStoreLAB allocatorAtCreation; volatile MemStoreLAB snapshotAllocatorAtCreation; + // A flag represents whether could stop skipping KeyValues for MVCC + // if have encountered the next row. Only used for reversed scan + private boolean stopSkippingKVsIfNextRow = false; + /* Some notes... @@ -770,6 +774,7 @@ private KeyValue getNext(Iterator it) { long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); + KeyValue startKV = theNext; KeyValue v = null; try { while (it.hasNext()) { @@ -777,6 +782,10 @@ if (v.getMvccVersion() <= readPoint) { return v; } + if (stopSkippingKVsIfNextRow && startKV != null + && comparator.compareRows(v, startKV) > 0) { + return null; + } } return null; @@ -955,6 +964,70 @@ long oldestUnexpiredTS) { return shouldSeek(scan, oldestUnexpiredTS); } + + /** + * Seek scanner to the given key first. If it return false(means + * peek()==null) or scanner's peek row is bigger than row of given key, + * seek the scanner to the previous row of given key + */ + @Override + public synchronized boolean backwardSeek(KeyValue key) { + seek(key); + if (peek() == null || comparator.compareRows(peek(), key) > 0) { + return seekToPreviousRow(key); + } + return true; + } + + /** + * Separately get the KeyValue before the specified key from kvset and + * snapshotset, and use the row of higher one as the previous row of + * specified key, then seek to the first KeyValue of previous row + */ + @Override + public synchronized boolean seekToPreviousRow(KeyValue key) { + KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow()); + SortedSet kvHead = kvsetAtCreation.headSet(firstKeyOnRow); + KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last(); + SortedSet snapshotHead = snapshotAtCreation + .headSet(firstKeyOnRow); + KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead + .last(); + KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow); + if (lastKVBeforeRow == null) { + theNext = null; + return false; + } + KeyValue firstKeyOnPreviousRow = KeyValue + .createFirstOnRow(lastKVBeforeRow.getRow()); + this.stopSkippingKVsIfNextRow = true; + seek(firstKeyOnPreviousRow); + this.stopSkippingKVsIfNextRow = false; + if (peek() == null + || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) { + return seekToPreviousRow(lastKVBeforeRow); + } + return true; + } + + @Override + public synchronized boolean seekToLastRow() { + KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation + .last(); + KeyValue second = snapshotAtCreation.isEmpty() ? null + : snapshotAtCreation.last(); + KeyValue higherKv = getHighest(first, second); + if (higherKv == null) { + return false; + } + KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow()); + if (seek(firstKvOnLastRow)) { + return true; + } else { + return seekToPreviousRow(higherKv); + } + + } } public final static long FIXED_OVERHEAD = ClassSize.align( Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1178,6 +1178,13 @@ return size; } + /** + * @return KeyValue Comparator + */ + public KeyValue.KVComparator getComparator() { + return this.comparator; + } + /* * Do preparation for pending compaction. * @throws IOException @@ -1727,6 +1734,12 @@ protected RegionScanner instantiateRegionScanner(Scan scan, List additionalScanners) throws IOException { + if (scan.getReversed()) { + if (scan.getFilter() != null) { + scan.getFilter().setReversed(true); + } + return new ReversedRegionScannerImpl(scan, additionalScanners, this); + } return new RegionScannerImpl(scan, additionalScanners, this); } @@ -3390,17 +3403,17 @@ /** * If the joined heap data gathering is interrupted due to scan limits, this will * contain the row for which we are populating the values.*/ - private KeyValue joinedContinuationRow = null; + protected KeyValue joinedContinuationRow = null; // KeyValue indicating that limit is reached when scanning private final KeyValue KV_LIMIT = new KeyValue(); - private final byte [] stopRow; + protected final byte[] stopRow; private Filter filter; private int batch; - private int isScan; + protected int isScan; private boolean filterClosed = false; private long readPt; private long maxResultSize; - private HRegion region; + protected HRegion region; @Override public HRegionInfo getRegionInfo() { @@ -3461,16 +3474,22 @@ joinedScanners.add(scanner); } } - this.storeHeap = new KeyValueHeap(scanners, comparator); - if (!joinedScanners.isEmpty()) { - this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); - } + initializeKVHeap(scanners, joinedScanners, region); } RegionScannerImpl(Scan scan, HRegion region) throws IOException { this(scan, null, region); } + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) + throws IOException { + this.storeHeap = new KeyValueHeap(scanners, region.comparator); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator); + } + } + @Override public long getMaxResultSize() { return maxResultSize; @@ -3744,7 +3763,7 @@ currentRow); } - private boolean isStopRow(byte [] currentRow, int offset, short length) { + protected boolean isStopRow(byte[] currentRow, int offset, short length) { return currentRow == null || (stopRow != null && comparator.compareRows(stopRow, 0, stopRow.length, Index: hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (revision 1517733) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (working copy) @@ -55,6 +55,7 @@ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class Filter { + protected boolean reversed; /** * Reset the state of the filter between rows. * @@ -256,4 +257,16 @@ * @throws IOException in case an I/O or an filter specific failure needs to be signaled. */ abstract boolean areSerializedFieldsEqual(Filter other); + + /** + * alter the reversed scan flag + * @param reversed flag + */ + public void setReversed(boolean reversed) { + this.reversed = reversed; + } + + public boolean isReversed() { + return this.reversed; + } } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1517733) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -50,24 +50,24 @@ @InterfaceStability.Stable public class ClientScanner extends AbstractClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); - private Scan scan; + protected Scan scan; private boolean closed = false; // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. - private HRegionInfo currentRegion = null; - private ScannerCallable callable = null; + protected HRegionInfo currentRegion = null; + protected ScannerCallable callable = null; private final LinkedList cache = new LinkedList(); private final int caching; private long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. private Result lastResult = null; - private ScanMetrics scanMetrics = null; + protected ScanMetrics scanMetrics = null; private final long maxScannerResultSize; private final HConnection connection; private final TableName tableName; private final int scannerTimeout; private boolean scanMetricsPublished = false; - private RpcRetryingCaller caller; + protected RpcRetryingCaller caller; /** * Create a new ClientScanner for the specified table. An HConnection will be @@ -147,8 +147,13 @@ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); } - this.caller = rpcFactory. newCaller(); + this.caller = rpcFactory. newCaller(); + // initialize the scanner + initialize(conf); + } + + protected void initialize(final Configuration conf) throws IOException { // initialize the scanner nextScanner(this.caching, false); } @@ -170,7 +175,7 @@ } // returns true if the passed region endKey - private boolean checkScanStopRow(final byte [] endKey) { + protected boolean checkScanStopRow(final byte [] endKey) { if (this.scan.getStopRow().length > 0) { // there is a stop row, check to see if we are past it. byte [] stopRow = scan.getStopRow(); @@ -194,7 +199,7 @@ * @param nbRows * @param done Server-side says we're done scanning. */ - private boolean nextScanner(int nbRows, final boolean done) + private boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open if (this.callable != null) { @@ -208,10 +213,10 @@ // if we're at end of table, close and return false to stop iterating if (this.currentRegion != null) { - byte [] endKey = this.currentRegion.getEndKey(); - if (endKey == null || - Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || - checkScanStopRow(endKey) || + byte [] nextStartKey = getNextScannerStartKey(); + if (nextStartKey == null || + Bytes.equals(nextStartKey, HConstants.EMPTY_BYTE_ARRAY) || + checkScanStopRow(nextStartKey) || done) { close(); if (LOG.isDebugEnabled()) { @@ -219,7 +224,7 @@ } return false; } - localStartKey = endKey; + localStartKey = nextStartKey; if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } @@ -247,6 +252,10 @@ } return true; } + + protected byte[] getNextScannerStartKey() { + return this.currentRegion.getEndKey(); + } protected ScannerCallable getScannerCallable(byte [] localStartKey, int nbRows) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (working copy) @@ -123,4 +123,37 @@ * assumed. */ boolean isFileScanner(); + + // Support for "Reversed Scanner" + /** + * Seek the scanner at or before the row of specified KeyValue, it firstly + * tries to seek the scanner at or after the specified KeyValue, return if + * peek KeyValue of scanner has the same row with specified KeyValue, + * otherwise seek the scanner at the first KeyValue of the row which is the + * previous row of specified KeyValue + * + * @param key seek KeyValue + * @return true if the scanner is at the valid KeyValue, false if such + * KeyValue does not exist + * + */ + public boolean backwardSeek(KeyValue key) throws IOException; + + /** + * Seek the scanner at the first KeyValue of the row which is the previous row + * of specified key + * @param key seek value + * @return true if the scanner at the first valid KeyValue of previous row, + * false if not existing such KeyValue + */ + public boolean seekToPreviousRow(KeyValue key) throws IOException; + + /** + * Seek the scanner at the first KeyValue of last row + * + * @return true if scanner has values left, false if the underlying data is + * empty + * @throws IOException + */ + public boolean seekToLastRow() throws IOException; } Index: hbase-protocol/src/main/protobuf/Client.proto =================================================================== --- hbase-protocol/src/main/protobuf/Client.proto (revision 1517733) +++ hbase-protocol/src/main/protobuf/Client.proto (working copy) @@ -236,6 +236,7 @@ optional uint32 store_limit = 11; optional uint32 store_offset = 12; optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */ + optional bool reversed = 14 [default = false]; } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java (revision 0) @@ -0,0 +1,135 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.client.Scan; + +/** + * ReversibleStoreScanner extends from StoreScanner, and is used to support + * reversed scanning. + */ +@InterfaceAudience.Private +class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { + + /** + * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we + * are not in a compaction. + * + * @param store who we scan + * @param scanInfo + * @param scan the spec + * @param columns which columns we are scanning + * @throws IOException + */ + ReversedStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + NavigableSet columns) + throws IOException { + super(store, scanInfo, scan, columns); + } + + /** Constructor for testing. */ + ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, + final NavigableSet columns, final List scanners) + throws IOException { + super(scan, scanInfo, scanType, columns, scanners, + HConstants.LATEST_TIMESTAMP); + } + + @Override + protected void resetKVHeap(List scanners, + KVComparator comparator) throws IOException { + // Combine all seeked scanners with a heap + heap = new ReversedKeyValueHeap(scanners, comparator); + } + + @Override + protected void seekScanners(List scanners, + KeyValue seekKey, boolean isLazy, boolean isParallelSeek) + throws IOException { + // Seek all scanners to the start of the Row (or if the exact matching row + // key does not exist, then to the start of the previous matching Row). + if (seekKey.matchingRow(HConstants.EMPTY_START_ROW)) { + for (KeyValueScanner scanner : scanners) { + scanner.seekToLastRow(); + } + } else { + for (KeyValueScanner scanner : scanners) { + scanner.backwardSeek(seekKey); + } + } + } + + @Override + protected synchronized boolean seekToNextRow(KeyValue kv) throws IOException { + return seekToPreviousRow(kv); + } + + /** + * Do a backwardSeek in a reversed StoreScanner(scan backward) + */ + @Override + protected synchronized boolean seekAsDirection(KeyValue kv) + throws IOException { + return backwardSeek(kv); + } + + @Override + protected void checkScanOrder(KeyValue prevKV, KeyValue kv, + KeyValue.KVComparator comparator) throws IOException { + // Check that the heap gives us KVs in an increasing order for same row and + // decreasing order for different rows. + assert prevKV == null || comparator == null || comparator.compareRows(kv, prevKV) < 0 + || (comparator.matchingRows(kv, prevKV) && comparator.compare(kv, + prevKV) > 0) : "Key " + prevKV + + " followed by a " + "error order key " + kv + " in cf " + store + + " in reversed scan"; + } + + @Override + public synchronized boolean reseek(KeyValue kv) throws IOException { + throw new IllegalStateException( + "reseek cannot be called on ReversedStoreScanner"); + } + + @Override + public synchronized boolean seek(KeyValue key) throws IOException { + throw new IllegalStateException( + "seek cannot be called on ReversedStoreScanner"); + } + + @Override + public boolean seekToPreviousRow(KeyValue key) throws IOException { + checkReseek(); + return this.heap.seekToPreviousRow(key); + } + + @Override + public boolean backwardSeek(KeyValue key) throws IOException { + checkReseek(); + return this.heap.backwardSeek(key); + } +} Index: hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (revision 1517733) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (working copy) @@ -773,6 +773,9 @@ if (scan.getRowOffsetPerColumnFamily() > 0) { scanBuilder.setStoreOffset(scan.getRowOffsetPerColumnFamily()); } + if (scan.getReversed()) { + scanBuilder.setReversed(scan.getReversed()); + } return scanBuilder.build(); } @@ -846,6 +849,9 @@ } } } + if (proto.hasReversed()) { + scan.setReversed(proto.getReversed()); + } return scan; } Index: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java =================================================================== --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (revision 1517733) +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (working copy) @@ -14112,6 +14112,16 @@ * */ boolean getLoadColumnFamiliesOnDemand(); + + // optional bool reversed = 14 [default = false]; + /** + * optional bool reversed = 14 [default = false]; + */ + boolean hasReversed(); + /** + * optional bool reversed = 14 [default = false]; + */ + boolean getReversed(); } /** * Protobuf type {@code Scan} @@ -14262,6 +14272,11 @@ loadColumnFamiliesOnDemand_ = input.readBool(); break; } + case 112: { + bitField0_ |= 0x00000800; + reversed_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -14576,6 +14591,22 @@ return loadColumnFamiliesOnDemand_; } + // optional bool reversed = 14 [default = false]; + public static final int REVERSED_FIELD_NUMBER = 14; + private boolean reversed_; + /** + * optional bool reversed = 14 [default = false]; + */ + public boolean hasReversed() { + return ((bitField0_ & 0x00000800) == 0x00000800); + } + /** + * optional bool reversed = 14 [default = false]; + */ + public boolean getReversed() { + return reversed_; + } + private void initFields() { column_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList(); @@ -14590,6 +14621,7 @@ storeLimit_ = 0; storeOffset_ = 0; loadColumnFamiliesOnDemand_ = false; + reversed_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -14660,6 +14692,9 @@ if (((bitField0_ & 0x00000400) == 0x00000400)) { output.writeBool(13, loadColumnFamiliesOnDemand_); } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + output.writeBool(14, reversed_); + } getUnknownFields().writeTo(output); } @@ -14721,6 +14756,10 @@ size += com.google.protobuf.CodedOutputStream .computeBoolSize(13, loadColumnFamiliesOnDemand_); } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(14, reversed_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -14803,6 +14842,11 @@ result = result && (getLoadColumnFamiliesOnDemand() == other.getLoadColumnFamiliesOnDemand()); } + result = result && (hasReversed() == other.hasReversed()); + if (hasReversed()) { + result = result && (getReversed() + == other.getReversed()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -14868,6 +14912,10 @@ hash = (37 * hash) + LOAD_COLUMN_FAMILIES_ON_DEMAND_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand()); } + if (hasReversed()) { + hash = (37 * hash) + REVERSED_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getReversed()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -15034,6 +15082,8 @@ bitField0_ = (bitField0_ & ~0x00000800); loadColumnFamiliesOnDemand_ = false; bitField0_ = (bitField0_ & ~0x00001000); + reversed_ = false; + bitField0_ = (bitField0_ & ~0x00002000); return this; } @@ -15132,6 +15182,10 @@ to_bitField0_ |= 0x00000400; } result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_; + if (((from_bitField0_ & 0x00002000) == 0x00002000)) { + to_bitField0_ |= 0x00000800; + } + result.reversed_ = reversed_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -15233,6 +15287,9 @@ if (other.hasLoadColumnFamiliesOnDemand()) { setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand()); } + if (other.hasReversed()) { + setReversed(other.getReversed()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -16311,6 +16368,39 @@ return this; } + // optional bool reversed = 14 [default = false]; + private boolean reversed_ ; + /** + * optional bool reversed = 14 [default = false]; + */ + public boolean hasReversed() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + /** + * optional bool reversed = 14 [default = false]; + */ + public boolean getReversed() { + return reversed_; + } + /** + * optional bool reversed = 14 [default = false]; + */ + public Builder setReversed(boolean value) { + bitField0_ |= 0x00002000; + reversed_ = value; + onChanged(); + return this; + } + /** + * optional bool reversed = 14 [default = false]; + */ + public Builder clearReversed() { + bitField0_ = (bitField0_ & ~0x00002000); + reversed_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:Scan) } @@ -27538,7 +27628,7 @@ "gion\030\001 \002(\0132\020.RegionSpecifier\022 \n\010mutation" + "\030\002 \002(\0132\016.MutationProto\022\035\n\tcondition\030\003 \001(" + "\0132\n.Condition\"<\n\016MutateResponse\022\027\n\006resul" + - "t\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\325\002\n\004" + + "t\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\356\002\n\004" + "Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattribu" + "te\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003 " + "\001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007.", @@ -27547,46 +27637,46 @@ "\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_r" + "esult_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024\n" + "\014store_offset\030\014 \001(\r\022&\n\036load_column_famil" + - "ies_on_demand\030\r \001(\010\"\236\001\n\013ScanRequest\022 \n\006r" + - "egion\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 " + - "\001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number" + - "_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n" + - "\rnext_call_seq\030\006 \001(\004\"p\n\014ScanResponse\022)\n\020", - "result_cell_meta\030\001 \001(\0132\017.ResultCellMeta\022" + - "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" + - "\010\022\013\n\003ttl\030\004 \001(\r\"&\n\016ResultCellMeta\022\024\n\014cell" + - "s_length\030\001 \003(\r\"\263\001\n\024BulkLoadHFileRequest\022" + - " \n\006region\030\001 \002(\0132\020.RegionSpecifier\0225\n\013fam" + - "ily_path\030\002 \003(\0132 .BulkLoadHFileRequest.Fa" + - "milyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFami" + - "lyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025" + - "BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n" + - "\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014", - "service_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022" + - "\017\n\007request\030\004 \002(\014\"d\n\031CoprocessorServiceRe" + - "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + - "%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCall\"]" + - "\n\032CoprocessorServiceResponse\022 \n\006region\030\001" + - " \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016." + - "NameBytesPair\"B\n\013MultiAction\022 \n\010mutation" + - "\030\001 \001(\0132\016.MutationProto\022\021\n\003get\030\002 \001(\0132\004.Ge" + - "t\"I\n\014ActionResult\022\026\n\005value\030\001 \001(\0132\007.Resul" + - "t\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"^\n\014", - "MultiRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpe" + - "cifier\022\034\n\006action\030\002 \003(\0132\014.MultiAction\022\016\n\006" + - "atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n\006result\030" + - "\001 \003(\0132\r.ActionResult2\342\002\n\rClientService\022 " + - "\n\003Get\022\013.GetRequest\032\014.GetResponse\022/\n\010Mult" + - "iGet\022\020.MultiGetRequest\032\021.MultiGetRespons" + - "e\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateResp" + - "onse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRespons" + - "e\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReques" + - "t\032\026.BulkLoadHFileResponse\022F\n\013ExecService", - "\022\032.CoprocessorServiceRequest\032\033.Coprocess" + - "orServiceResponse\022&\n\005Multi\022\r.MultiReques" + - "t\032\016.MultiResponseBB\n*org.apache.hadoop.h" + - "base.protobuf.generatedB\014ClientProtosH\001\210" + - "\001\001\240\001\001" + "ies_on_demand\030\r \001(\010\022\027\n\010reversed\030\016 \001(\010:\005f" + + "alse\"\236\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.R" + + "egionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\ns" + + "canner_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022" + + "\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030", + "\006 \001(\004\"p\n\014ScanResponse\022)\n\020result_cell_met" + + "a\030\001 \001(\0132\017.ResultCellMeta\022\022\n\nscanner_id\030\002" + + " \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"" + + "&\n\016ResultCellMeta\022\024\n\014cells_length\030\001 \003(\r\"" + + "\263\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\013" + + "2\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132" + + " .BulkLoadHFileRequest.FamilyPath\022\026\n\016ass" + + "ign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006famil" + + "y\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRe" + + "sponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorSer", + "viceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 " + + "\002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(" + + "\014\"d\n\031CoprocessorServiceRequest\022 \n\006region" + + "\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" + + ".CoprocessorServiceCall\"]\n\032CoprocessorSe" + + "rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" + + "ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"B" + + "\n\013MultiAction\022 \n\010mutation\030\001 \001(\0132\016.Mutati" + + "onProto\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResu" + + "lt\022\026\n\005value\030\001 \001(\0132\007.Result\022!\n\texception\030", + "\002 \001(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 \n" + + "\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006actio" + + "n\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n" + + "\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.ActionR" + + "esult2\342\002\n\rClientService\022 \n\003Get\022\013.GetRequ" + + "est\032\014.GetResponse\022/\n\010MultiGet\022\020.MultiGet" + + "Request\032\021.MultiGetResponse\022)\n\006Mutate\022\016.M" + + "utateRequest\032\017.MutateResponse\022#\n\004Scan\022\014." + + "ScanRequest\032\r.ScanResponse\022>\n\rBulkLoadHF" + + "ile\022\025.BulkLoadHFileRequest\032\026.BulkLoadHFi", + "leResponse\022F\n\013ExecService\022\032.CoprocessorS" + + "erviceRequest\032\033.CoprocessorServiceRespon" + + "se\022&\n\005Multi\022\r.MultiRequest\032\016.MultiRespon" + + "seBB\n*org.apache.hadoop.hbase.protobuf.g" + + "eneratedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -27676,7 +27766,7 @@ internal_static_Scan_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", }); + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Reversed", }); internal_static_ScanRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_ScanRequest_fieldAccessorTable = new Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (working copy) @@ -536,7 +536,7 @@ ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock); if (reader.getComparator().compare(firstKey.array(), - firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0) + firstKey.arrayOffset(), firstKey.limit(), key, offset, length) >= 0) { long previousBlockOffset = seekToBlock.getPrevBlockOffset(); // The key we are interested in Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1517733) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -110,6 +110,7 @@ private int caching = -1; private long maxResultSize = -1; private boolean cacheBlocks = true; + private boolean reversed = false; private Filter filter = null; private TimeRange tr = new TimeRange(); private Map> familyMap = @@ -532,6 +533,29 @@ } /** + * Set whether this scan is a reversed one + *

+ * This is false by default which means forward(normal) scan. + * + * NOTE: when setting it true for a client scan, you must set the start row, + * else will throw exception. Through {@link #createBiggestByteArray(int)}, + * you could get a big enough byte array as the start row + * + * @param reversed if true, scan will be backward order + */ + public void setReversed(boolean reversed) { + this.reversed = reversed; + } + + /** + * Get whether this scan is a reversed one. + * @return true if backward scan, false if forward(default) scan + */ + public boolean getReversed() { + return reversed; + } + + /** * Set the value indicating whether loading CFs on demand should be allowed (cluster * default is false). On-demand CF loading doesn't load column families until necessary, e.g. * if you filter on one column, the other column family data will be loaded only for the rows @@ -698,4 +722,17 @@ return attr == null ? IsolationLevel.READ_COMMITTED : IsolationLevel.fromBytes(attr); } + + /** + * Create a biggest byte array with the specified max byte count + * @param maxByteCount the length of returned byte array + * @return the created biggest byte array + */ + public static byte[] createBiggestByteArray(int maxByteCount) { + byte[] biggestByteArray = new byte[maxByteCount]; + for (int i = 0; i < biggestByteArray.length; i++) { + biggestByteArray[i] = (byte) 0xff; + } + return biggestByteArray; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -41,9 +41,9 @@ * as an InternalScanner at the Store level, you will get runtime exceptions. */ @InterfaceAudience.Private -public class KeyValueHeap extends NonLazyKeyValueScanner +public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner { - private PriorityQueue heap = null; + protected PriorityQueue heap = null; /** * The current sub-scanner, i.e. the one that contains the next key/value @@ -55,9 +55,9 @@ * Bloom filter optimization, which is OK to propagate to StoreScanner. In * order to ensure that, always use {@link #pollRealKV()} to update current. */ - private KeyValueScanner current = null; + protected KeyValueScanner current = null; - private KVScannerComparator comparator; + protected KVScannerComparator comparator; /** * Constructor. This KeyValueHeap will handle closing of passed in @@ -67,7 +67,12 @@ */ public KeyValueHeap(List scanners, KVComparator comparator) throws IOException { - this.comparator = new KVScannerComparator(comparator); + this(scanners, new KVScannerComparator(comparator)); + } + + KeyValueHeap(List scanners, + KVScannerComparator comparator) throws IOException { + this.comparator = comparator; if (!scanners.isEmpty()) { this.heap = new PriorityQueue(scanners.size(), this.comparator); @@ -157,8 +162,8 @@ return next(result, -1); } - private static class KVScannerComparator implements Comparator { - private KVComparator kvComparator; + protected static class KVScannerComparator implements Comparator { + protected KVComparator kvComparator; /** * Constructor * @param kvComparator @@ -324,7 +329,7 @@ * this scanner heap if (1) it has done a real seek and (2) its KV is the top * among all top KVs (some of which are fake) in the scanner heap. */ - private KeyValueScanner pollRealKV() throws IOException { + protected KeyValueScanner pollRealKV() throws IOException { KeyValueScanner kvScanner = heap.poll(); if (kvScanner == null) { return null; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1517733) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -1386,11 +1386,16 @@ && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { return true; } - KeyValue startKeyValue = KeyValue.createFirstOnRow(scan.getStartRow()); - KeyValue stopKeyValue = KeyValue.createLastOnRow(scan.getStopRow()); + KeyValue smallestScanKeyValue = scan.getReversed() ? KeyValue + .createFirstOnRow(scan.getStopRow()) : KeyValue.createFirstOnRow(scan + .getStartRow()); + KeyValue largestScanKeyValue = scan.getReversed() ? KeyValue + .createLastOnRow(scan.getStartRow()) : KeyValue.createLastOnRow(scan + .getStopRow()); boolean nonOverLapping = (getComparator().compare(this.getFirstKey(), - stopKeyValue.getKey()) > 0 && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) - || getComparator().compare(this.getLastKey(), startKeyValue.getKey()) < 0; + largestScanKeyValue.getKey()) > 0 && !Bytes.equals(scan.getReversed() ? scan.getStartRow() + : scan.getStopRow(), HConstants.EMPTY_END_ROW)) + || getComparator().compare(this.getLastKey(), smallestScanKeyValue.getKey()) < 0; return !nonOverLapping; } @@ -1497,6 +1502,10 @@ return reader.getLastKey(); } + public byte[] getLastRowKey() { + return reader.getLastRowKey(); + } + public byte[] midkey() throws IOException { return reader.midkey(); } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (revision 1517733) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (working copy) @@ -57,7 +57,7 @@ // if we are passed the prefix, set flag int cmp = Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0, this.prefix.length); - if(cmp > 0) { + if ((!isReversed() && cmp > 0) || (isReversed() && cmp < 0)) { passedPrefix = true; } return cmp != 0; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 1517733) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -736,6 +736,47 @@ thread.interruptIfNecessary(); } + /** + * Test that on a major compaction, if all cells are expired or deleted, then we'll end up with no + * product. Make sure scanner over region returns right answer in this case - and that it just + * basically works. + * @throws IOException + */ + public void testMajorCompactingToNoOutputWithReverseScan() throws IOException { + createStoreFile(r); + for (int i = 0; i < compactionThreshold; i++) { + createStoreFile(r); + } + // Now delete everything. + Scan scan = new Scan(); + scan.setReversed(true); + InternalScanner s = r.getScanner(scan); + do { + List results = new ArrayList(); + boolean result = s.next(results); + assertTrue(!results.isEmpty()); + r.delete(new Delete(results.get(0).getRow())); + if (!result) break; + } while (true); + s.close(); + // Flush + r.flushcache(); + // Major compact. + r.compactStores(true); + scan = new Scan(); + scan.setReversed(true); + s = r.getScanner(scan); + int counter = 0; + do { + List results = new ArrayList(); + boolean result = s.next(results); + if (!result) break; + counter++; + } while (true); + s.close(); + assertEquals(0, counter); + } + private class StoreMockMaker extends StatefulStoreMockMaker { public ArrayList compacting = new ArrayList(); public ArrayList notCompacting = new ArrayList();