From f03fe5c61e878562fdf419e53bd78500ec7d01b3 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Wed, 2 Mar 2016 13:10:42 +0800 Subject: [PATCH] HBASE-15378 Scanner can not handle a heartbeat message with no results --- .../apache/hadoop/hbase/client/ClientScanner.java | 9 ++- .../regionserver/TestScannerHeartbeatMessages.java | 64 +++++++++++++++++++++- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 1658e5b..d324d37 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -381,7 +381,7 @@ public abstract class ClientScanner extends AbstractClientScanner { Result[] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; - + boolean emptyHeartbeat; // We need to reset it if it's a new callable that was created // with a countdown in nextScanner callable.setCaching(this.caching); @@ -392,6 +392,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; do { + emptyHeartbeat = false; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just @@ -512,6 +513,8 @@ public abstract class ClientScanner extends AbstractClientScanner { break; } + emptyHeartbeat = values != null && values.length == 0 && callable.isHeartbeatMessage(); + // We expect that the server won't have more results for us when we exhaust // the size (bytes or count) of the results returned. If the server *does* inform us that // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually @@ -525,8 +528,8 @@ public abstract class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) - && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))); + } while (emptyHeartbeat || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) + && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index 14c71d2..67f0799 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -33,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; @@ -86,7 +90,7 @@ public class TestScannerHeartbeatMessages { */ private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); - private static int NUM_ROWS = 10; + private static int NUM_ROWS = 5; private static byte[] ROW = Bytes.toBytes("testRow"); private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); @@ -197,6 +201,7 @@ public class TestScannerHeartbeatMessages { public void testScannerHeartbeatMessages() throws Exception { testImportanceOfHeartbeats(testHeartbeatBetweenRows()); testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies()); + testImportanceOfHeartbeats(testHeartbeatWithSparseFilter()); } /** @@ -275,6 +280,63 @@ public class TestScannerHeartbeatMessages { }; } + public static class SparseFilter extends FilterBase{ + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + try { + Thread.sleep(SERVER_TIME_LIMIT + 10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? + ReturnCode.INCLUDE : + ReturnCode.SKIP; + } + + public static Filter parseFrom(final byte [] pbBytes){ + return new SparseFilter(); + } + } + + /** + * Test the case that there is a filter which filters most of cells + * @throws Exception + */ + public Callable testHeartbeatWithSparseFilter() throws Exception { + return new Callable() { + @Override + public Void call() throws Exception { + Scan scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + scan.setFilter(new SparseFilter()); + ResultScanner scanner = TABLE.getScanner(scan); + int num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(1, num); + scanner.close(); + + scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + scan.setFilter(new SparseFilter()); + scan.setAllowPartialResults(true); + scanner = TABLE.getScanner(scan); + num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); + scanner.close(); + + return null; + } + }; + } + /** * Test the equivalence of a scan versus the same scan executed when heartbeat messages are * necessary -- 2.5.4 (Apple Git-61)