From 7d53b112cb7178f3ef45dd948b8384cee44acb76 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Wed, 2 Mar 2016 13:56:29 +0800 Subject: [PATCH] HBASE-15378 Scanner can not handle a heartbeat message with no results --- .../apache/hadoop/hbase/client/ClientScanner.java | 27 +++++---- .../regionserver/TestScannerHeartbeatMessages.java | 64 +++++++++++++++++++++- 2 files changed, 78 insertions(+), 13 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 1658e5b..aecbf1a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -500,16 +500,18 @@ public abstract class ClientScanner extends AbstractClientScanner { } } - // Caller of this method just wants a Result. If we see a heartbeat message, it means - // processing of the scan is taking a long time server side. Rather than continue to - // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing - // unnecesary delays to the caller - if (callable.isHeartbeatMessage() && cache.size() > 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("Heartbeat message received and cache contains Results." - + " Breaking out of scan loop"); + if (callable.isHeartbeatMessage()) { + if (cache.size() > 0) { + // Caller of this method just wants a Result. If we see a heartbeat message, it means + // processing of the scan is taking a long time server side. Rather than continue to + // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing + // unnecesary delays to the caller + if (LOG.isTraceEnabled()) { + LOG.trace("Heartbeat message received and cache contains Results." + " Breaking out of scan loop"); + } + break; } - break; + continue; } // We expect that the server won't have more results for us when we exhaust @@ -519,14 +521,15 @@ public abstract class ClientScanner extends AbstractClientScanner { if (null != values && values.length > 0 && callable.hasMoreResultsContext()) { // Only adhere to more server results when we don't have any partialResults // as it keeps the outer loop logic the same. - serverHasMoreResults = callable.getServerHasMoreResults() & partialResults.isEmpty(); + serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty(); } // Values == null means server-side filter has determined we must STOP // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) - && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))); + } while ((callable != null && callable.isHeartbeatMessage()) + || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) + && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index 14c71d2..67f0799 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -33,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; @@ -86,7 +90,7 @@ public class TestScannerHeartbeatMessages { */ private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); - private static int NUM_ROWS = 10; + private static int NUM_ROWS = 5; private static byte[] ROW = Bytes.toBytes("testRow"); private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); @@ -197,6 +201,7 @@ public class TestScannerHeartbeatMessages { public void testScannerHeartbeatMessages() throws Exception { testImportanceOfHeartbeats(testHeartbeatBetweenRows()); testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies()); + testImportanceOfHeartbeats(testHeartbeatWithSparseFilter()); } /** @@ -275,6 +280,63 @@ public class TestScannerHeartbeatMessages { }; } + public static class SparseFilter extends FilterBase{ + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + try { + Thread.sleep(SERVER_TIME_LIMIT + 10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? + ReturnCode.INCLUDE : + ReturnCode.SKIP; + } + + public static Filter parseFrom(final byte [] pbBytes){ + return new SparseFilter(); + } + } + + /** + * Test the case that there is a filter which filters most of cells + * @throws Exception + */ + public Callable testHeartbeatWithSparseFilter() throws Exception { + return new Callable() { + @Override + public Void call() throws Exception { + Scan scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + scan.setFilter(new SparseFilter()); + ResultScanner scanner = TABLE.getScanner(scan); + int num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(1, num); + scanner.close(); + + scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + scan.setFilter(new SparseFilter()); + scan.setAllowPartialResults(true); + scanner = TABLE.getScanner(scan); + num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); + scanner.close(); + + return null; + } + }; + } + /** * Test the equivalence of a scan versus the same scan executed when heartbeat messages are * necessary -- 2.5.4 (Apple Git-61)