From bfb4e6fd879852fb5d873809115613ad6e2804d0 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Thu, 20 Dec 2018 12:34:34 +0800 Subject: [PATCH] HBASE-21621 Reversed scan does not return expected number of rows --- .../hbase/regionserver/ReversedStoreScanner.java | 5 +-- .../hadoop/hbase/regionserver/StoreScanner.java | 10 ++++- .../hbase/regionserver/TestJoinedScanners.java | 52 ++++++++++++++++++++++ .../hbase/regionserver/TestStoreScanner.java | 8 +++- 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 90e1129..491e6ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -59,10 +59,9 @@ public class ReversedStoreScanner extends StoreScanner implements KeyValueScanne } @Override - protected void resetKVHeap(List scanners, + protected KeyValueHeap newKVHeap(List scanners, CellComparator comparator) throws IOException { - // Combine all seeked scanners with a heap - heap = new ReversedKeyValueHeap(scanners, comparator); + return new ReversedKeyValueHeap(scanners, comparator); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 91ca592..afd31d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -403,10 +403,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } + @VisibleForTesting protected void resetKVHeap(List scanners, CellComparator comparator) throws IOException { // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, comparator); + heap = newKVHeap(scanners, comparator); + } + + protected KeyValueHeap newKVHeap(List scanners, + CellComparator comparator) throws IOException { + return new KeyValueHeap(scanners, comparator); } /** @@ -1037,7 +1043,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); newCurrentScanners.addAll(fileScanners); newCurrentScanners.addAll(memstoreScanners); - newHeap = new KeyValueHeap(newCurrentScanners, comparator); + newHeap = newKVHeap(newCurrentScanners, comparator); } catch (Exception e) { LOG.warn("failed to switch to stream read", e); if (fileScanners != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java index 3aa35e1..736d8fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; @@ -264,4 +265,55 @@ public class TestJoinedScanners { } } } + + @Test + public void testReverseScanWithFlush() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + byte[] family = Bytes.toBytes("cf"); + int BATCH_SIZE = 10; + int ROWS_TO_INSERT = 100; + byte[] LARGE_VALUE = new byte[valueWidth]; + + TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)) + .build(); + + try (Connection conn = TEST_UTIL.getConnection(); + Admin admin = conn.getAdmin()) { + admin.createTable(tableDesc); + + try (Table table = conn.getTable(tableName)) { + List putList = new ArrayList(); + for (long i = 0; i < ROWS_TO_INSERT; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(family, col_name, LARGE_VALUE); + putList.add(put); + + if (putList.size() >= BATCH_SIZE) { + table.put(putList); + admin.flush(tableName); + putList.clear(); + } + } + + if (!putList.isEmpty()) { + table.put(putList); + admin.flush(tableName); + putList.clear(); + } + + Scan scan = new Scan(); + scan.setReversed(true); + int count = 0; + + try (ResultScanner results = table.getScanner(scan)) { + for (Result result : results) { + count++; + } + } + Assert.assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count, + ROWS_TO_INSERT, count); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index db50a85..687b780 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -170,7 +170,13 @@ public class TestStoreScanner { if (count == null) { count = new AtomicInteger(0); } - heap = new KeyValueHeapWithCount(scanners, comparator, count); + heap = newKVHeap(scanners, comparator); + } + + @Override + protected KeyValueHeap newKVHeap(List scanners, + CellComparator comparator) throws IOException { + return new KeyValueHeapWithCount(scanners, comparator, count); } @Override -- 2.7.4