From 330c999c4294d9963d0adbe95f013724211bd7f7 Mon Sep 17 00:00:00 2001 From: Jonathan Lawlor Date: Tue, 10 Mar 2015 14:24:07 -0700 Subject: [PATCH] HBASE-13090 Progress heartbeats for long running scanners --- .../apache/hadoop/hbase/client/ClientScanner.java | 45 +- .../hadoop/hbase/client/ScannerCallable.java | 19 + .../hbase/client/ScannerCallableWithReplicas.java | 10 + .../hadoop/hbase/protobuf/RequestConverter.java | 3 + .../hbase/protobuf/generated/ClientProtos.java | 347 +++++++++++--- hbase-protocol/src/main/protobuf/Client.proto | 7 + .../hbase/client/ClientSideRegionScanner.java | 7 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 230 ++++++---- .../hadoop/hbase/regionserver/InternalScanner.java | 147 +++--- .../hadoop/hbase/regionserver/KeyValueHeap.java | 45 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 76 +++- .../hadoop/hbase/regionserver/RegionScanner.java | 49 +- .../hbase/regionserver/ScannerLimitUtil.java | 64 +++ .../hadoop/hbase/regionserver/StoreScanner.java | 68 +-- .../hbase/TestPartialResultsFromClientSide.java | 3 +- .../coprocessor/TestCoprocessorInterface.java | 27 +- .../coprocessor/TestRegionObserverInterface.java | 14 +- .../hbase/regionserver/TestAtomicOperation.java | 2 +- .../hadoop/hbase/regionserver/TestHRegion.java | 2 +- .../regionserver/TestScannerHeartbeatMessages.java | 500 +++++++++++++++++++++ .../hbase/regionserver/TestStripeCompactor.java | 10 +- .../compactions/TestStripeCompactionPolicy.java | 9 +- 22 files changed, 1363 insertions(+), 321 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerLimitUtil.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index bfbe718..586f989 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -363,12 +363,17 @@ public class ClientScanner extends AbstractClientScanner { // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. boolean retryAfterOutOfOrderException = true; + // This flag is set to indicate that the server only returned an RPC response to avoid + // a timeout (i.e. it did not return a response because the region/table was exhausted). + // If a heartbeat message is returned, the region on the server should NOT be changed + boolean heartbeatMessage = false; do { try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. values = call(callable, caller, scannerTimeout); + heartbeatMessage = callable.isHeartbeatMessage(); // When the replica switch happens, we need to do certain operations // again. The callable will openScanner with the right startkey @@ -377,6 +382,8 @@ public class ClientScanner extends AbstractClientScanner { // happens for the cases where we see exceptions. Since only openScanner // would have happened, values would be null if (values == null && callable.switchedToADifferentReplica()) { + // Any accumualted partials are no longer valid. + clearPartialResults(); this.currentRegion = callable.getHRegionInfo(); continue; } @@ -455,7 +462,7 @@ public class ClientScanner extends AbstractClientScanner { // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. - List resultsToAddToCache = getResultsToAddToCache(values); + List resultsToAddToCache = getResultsToAddToCache(values, heartbeatMessage); if (!resultsToAddToCache.isEmpty()) { for (Result rs : resultsToAddToCache) { cache.add(rs); @@ -467,12 +474,26 @@ public class ClientScanner extends AbstractClientScanner { this.lastResult = rs; } } + + // Caller of this method just wants a Result. If we see a heartbeat message, it means + // processing of the scan is taking a long time server side. Rather than continue to + // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing + // unnecesary delays to the caller + if (heartbeatMessage && cache.size() > 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Heartbeat message received and cache contains Results." + + " Breaking out of scan loop"); + } + break; + } + // Values == null means server-side filter has determined we must STOP // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (remainingResultSize > 0 && countdown > 0 - && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))); + } while (remainingResultSize > 0 && countdown > 0 && + (!partialResults.isEmpty() || heartbeatMessage || possiblyNextScanner(countdown, + values == null))); } if (cache.size() > 0) { @@ -495,10 +516,16 @@ public class ClientScanner extends AbstractClientScanner { * not contain errors. We return a list of results that should be added to the cache. In general, * this list will contain all NON-partial results from the input array (unless the client has * specified that they are okay with receiving partial results) + * @param resultsFromServer The array of {@link Result}s returned from the server + * @param heartbeatMessage Flag indicating whether or not the response received from the server + * represented a complete response, or a heartbeat message that was sent to keep the + * client-server connection alive * @return the list of results that should be added to the cache. * @throws IOException */ - protected List getResultsToAddToCache(Result[] resultsFromServer) throws IOException { + protected List + getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage) + throws IOException { int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; List resultsToAddToCache = new ArrayList(resultSize); @@ -516,10 +543,14 @@ public class ClientScanner extends AbstractClientScanner { return resultsToAddToCache; } - // If no results were returned it indicates that we have the all the partial results necessary - // to construct the complete result. + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive if (resultsFromServer == null || resultsFromServer.length == 0) { - if (!partialResults.isEmpty()) { + // If this response was an empty heartbeat message, then we have no exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partialResults.isEmpty() && !heartbeatMessage) { resultsToAddToCache.add(Result.createCompleteResult(partialResults)); clearPartialResults(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 6d5bb9e..f3a25eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -76,6 +76,12 @@ public class ScannerCallable extends RegionServerCallable { private int logCutOffLatency = 1000; private static String myAddress; protected final int id; + + /** + * Saves whether or not the most recent response from the server was a heartbeat message. + * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} + */ + protected boolean heartbeatMessage = false; static { try { myAddress = DNS.getDefaultHost("default", "default"); @@ -192,6 +198,8 @@ public class ScannerCallable extends RegionServerCallable { } else { Result [] rrs = null; ScanRequest request = null; + // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server + heartbeatMessage = false; try { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); @@ -212,6 +220,7 @@ public class ScannerCallable extends RegionServerCallable { // See HBASE-5974 nextCallSeq++; long timestamp = System.currentTimeMillis(); + heartbeatMessage = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response); @@ -281,6 +290,16 @@ public class ScannerCallable extends RegionServerCallable { return null; } + /** + * @return true when the most recent RPC response indicated that the response was a heartbeat + * message. Heartbeat messages are sent back from the server when the processing of the + * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid + * timeouts during long running scan operations. + */ + boolean isHeartbeatMessage() { + return heartbeatMessage; + } + private void incRPCcallsMetrics() { if (this.scanMetrics == null) { return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 16e6752..ebd31ae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -257,6 +257,16 @@ class ScannerCallableWithReplicas implements RetryingCallable { return replicaSwitched.get(); } + /** + * @return true when the most recent RPC response indicated that the response was a heartbeat + * message. Heartbeat messages are sent back from the server when the processing of the + * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid + * timeouts during long running scan operations. + */ + public boolean isHeartbeatMessage() { + return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); + } + private int addCallsForCurrentReplica( ResultBoundedCompletionService> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 02ad908..56b835a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -487,6 +487,7 @@ public final class RequestConverter { builder.setRegion(region); builder.setScan(ProtobufUtil.toScan(scan)); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeats(true); return builder.build(); } @@ -505,6 +506,7 @@ public final class RequestConverter { builder.setCloseScanner(closeScanner); builder.setScannerId(scannerId); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeats(true); return builder.build(); } @@ -525,6 +527,7 @@ public final class RequestConverter { builder.setScannerId(scannerId); builder.setNextCallSeq(nextCallSeq); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeats(true); return builder.build(); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 5bcba26..1424a52 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -16433,6 +16433,16 @@ public final class ClientProtos { * optional bool client_handles_partials = 7; */ boolean getClientHandlesPartials(); + + // optional bool client_handles_heartbeats = 8; + /** + * optional bool client_handles_heartbeats = 8; + */ + boolean hasClientHandlesHeartbeats(); + /** + * optional bool client_handles_heartbeats = 8; + */ + boolean getClientHandlesHeartbeats(); } /** * Protobuf type {@code ScanRequest} @@ -16549,6 +16559,11 @@ public final class ClientProtos { clientHandlesPartials_ = input.readBool(); break; } + case 64: { + bitField0_ |= 0x00000080; + clientHandlesHeartbeats_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -16713,6 +16728,22 @@ public final class ClientProtos { return clientHandlesPartials_; } + // optional bool client_handles_heartbeats = 8; + public static final int CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER = 8; + private boolean clientHandlesHeartbeats_; + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean hasClientHandlesHeartbeats() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean getClientHandlesHeartbeats() { + return clientHandlesHeartbeats_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -16721,6 +16752,7 @@ public final class ClientProtos { closeScanner_ = false; nextCallSeq_ = 0L; clientHandlesPartials_ = false; + clientHandlesHeartbeats_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -16767,6 +16799,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeBool(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBool(8, clientHandlesHeartbeats_); + } getUnknownFields().writeTo(output); } @@ -16804,6 +16839,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, clientHandlesHeartbeats_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -16862,6 +16901,11 @@ public final class ClientProtos { result = result && (getClientHandlesPartials() == other.getClientHandlesPartials()); } + result = result && (hasClientHandlesHeartbeats() == other.hasClientHandlesHeartbeats()); + if (hasClientHandlesHeartbeats()) { + result = result && (getClientHandlesHeartbeats() + == other.getClientHandlesHeartbeats()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -16903,6 +16947,10 @@ public final class ClientProtos { hash = (37 * hash) + CLIENT_HANDLES_PARTIALS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getClientHandlesPartials()); } + if (hasClientHandlesHeartbeats()) { + hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17049,6 +17097,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000020); clientHandlesPartials_ = false; bitField0_ = (bitField0_ & ~0x00000040); + clientHandlesHeartbeats_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -17113,6 +17163,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000040; } result.clientHandlesPartials_ = clientHandlesPartials_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -17150,6 +17204,9 @@ public final class ClientProtos { if (other.hasClientHandlesPartials()) { setClientHandlesPartials(other.getClientHandlesPartials()); } + if (other.hasClientHandlesHeartbeats()) { + setClientHandlesHeartbeats(other.getClientHandlesHeartbeats()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -17588,6 +17645,39 @@ public final class ClientProtos { return this; } + // optional bool client_handles_heartbeats = 8; + private boolean clientHandlesHeartbeats_ ; + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean hasClientHandlesHeartbeats() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public boolean getClientHandlesHeartbeats() { + return clientHandlesHeartbeats_; + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public Builder setClientHandlesHeartbeats(boolean value) { + bitField0_ |= 0x00000080; + clientHandlesHeartbeats_ = value; + onChanged(); + return this; + } + /** + * optional bool client_handles_heartbeats = 8; + */ + public Builder clearClientHandlesHeartbeats() { + bitField0_ = (bitField0_ & ~0x00000080); + clientHandlesHeartbeats_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanRequest) } @@ -17784,6 +17874,30 @@ public final class ClientProtos { * */ boolean getPartialFlagPerResult(int index); + + // optional bool heartbeat_message = 8; + /** + * optional bool heartbeat_message = 8; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + boolean hasHeartbeatMessage(); + /** + * optional bool heartbeat_message = 8; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + boolean getHeartbeatMessage(); } /** * Protobuf type {@code ScanResponse} @@ -17912,6 +18026,11 @@ public final class ClientProtos { input.popLimit(limit); break; } + case 64: { + bitField0_ |= 0x00000010; + heartbeatMessage_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -18197,6 +18316,36 @@ public final class ClientProtos { return partialFlagPerResult_.get(index); } + // optional bool heartbeat_message = 8; + public static final int HEARTBEAT_MESSAGE_FIELD_NUMBER = 8; + private boolean heartbeatMessage_; + /** + * optional bool heartbeat_message = 8; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + public boolean hasHeartbeatMessage() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bool heartbeat_message = 8; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + public boolean getHeartbeatMessage() { + return heartbeatMessage_; + } + private void initFields() { cellsPerResult_ = java.util.Collections.emptyList(); scannerId_ = 0L; @@ -18205,6 +18354,7 @@ public final class ClientProtos { results_ = java.util.Collections.emptyList(); stale_ = false; partialFlagPerResult_ = java.util.Collections.emptyList(); + heartbeatMessage_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -18239,6 +18389,9 @@ public final class ClientProtos { for (int i = 0; i < partialFlagPerResult_.size(); i++) { output.writeBool(7, partialFlagPerResult_.get(i)); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBool(8, heartbeatMessage_); + } getUnknownFields().writeTo(output); } @@ -18283,6 +18436,10 @@ public final class ClientProtos { size += dataSize; size += 1 * getPartialFlagPerResultList().size(); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, heartbeatMessage_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -18332,6 +18489,11 @@ public final class ClientProtos { } result = result && getPartialFlagPerResultList() .equals(other.getPartialFlagPerResultList()); + result = result && (hasHeartbeatMessage() == other.hasHeartbeatMessage()); + if (hasHeartbeatMessage()) { + result = result && (getHeartbeatMessage() + == other.getHeartbeatMessage()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -18373,6 +18535,10 @@ public final class ClientProtos { hash = (37 * hash) + PARTIAL_FLAG_PER_RESULT_FIELD_NUMBER; hash = (53 * hash) + getPartialFlagPerResultList().hashCode(); } + if (hasHeartbeatMessage()) { + hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getHeartbeatMessage()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -18507,6 +18673,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000020); partialFlagPerResult_ = java.util.Collections.emptyList(); bitField0_ = (bitField0_ & ~0x00000040); + heartbeatMessage_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -18570,6 +18738,10 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000040); } result.partialFlagPerResult_ = partialFlagPerResult_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000010; + } + result.heartbeatMessage_ = heartbeatMessage_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18644,6 +18816,9 @@ public final class ClientProtos { } onChanged(); } + if (other.hasHeartbeatMessage()) { + setHeartbeatMessage(other.getHeartbeatMessage()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -19423,6 +19598,67 @@ public final class ClientProtos { return this; } + // optional bool heartbeat_message = 8; + private boolean heartbeatMessage_ ; + /** + * optional bool heartbeat_message = 8; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public boolean hasHeartbeatMessage() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool heartbeat_message = 8; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public boolean getHeartbeatMessage() { + return heartbeatMessage_; + } + /** + * optional bool heartbeat_message = 8; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public Builder setHeartbeatMessage(boolean value) { + bitField0_ |= 0x00000080; + heartbeatMessage_ = value; + onChanged(); + return this; + } + /** + * optional bool heartbeat_message = 8; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public Builder clearHeartbeatMessage() { + bitField0_ = (bitField0_ & ~0x00000080); + heartbeatMessage_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanResponse) } @@ -32552,62 +32788,63 @@ public final class ClientProtos { "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" + "eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" + "\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" + - "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", + "\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" + "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" + "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" + - "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\251\001\n\014S" + - "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" + - "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" + - "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" + - "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" + - "\030\007 \003(\010\"\263\001\n\024BulkLoadHFileRequest\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\0225\n\013family_path", - "\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyPath" + - "\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016" + - "\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoad" + - "HFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026Coproce" + - "ssorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_" + - "name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reque" + - "st\030\004 \002(\014\"9\n\030CoprocessorServiceResult\022\035\n\005" + - "value\030\001 \001(\0132\016.NameBytesPair\"d\n\031Coprocess" + - "orServiceRequest\022 \n\006region\030\001 \002(\0132\020.Regio" + - "nSpecifier\022%\n\004call\030\002 \002(\0132\027.CoprocessorSe", - "rviceCall\"]\n\032CoprocessorServiceResponse\022" + - " \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005val" + - "ue\030\002 \002(\0132\016.NameBytesPair\"{\n\006Action\022\r\n\005in" + - "dex\030\001 \001(\r\022 \n\010mutation\030\002 \001(\0132\016.MutationPr" + - "oto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004" + - " \001(\0132\027.CoprocessorServiceCall\"Y\n\014RegionA" + - "ction\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + - "\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action" + - "\"D\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(" + - "\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021Resul", - "tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " + - "\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" + - "tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" + - "essorServiceResult\022#\n\tloadStats\030\005 \001(\0132\020." + - "RegionLoadStats\"f\n\022RegionActionResult\022-\n" + - "\021resultOrException\030\001 \003(\0132\022.ResultOrExcep" + - "tion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" + - "f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\0132\r." + - "RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcond" + - "ition\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse", - "\022/\n\022regionActionResult\030\001 \003(\0132\023.RegionAct" + - "ionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consiste" + - "ncy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClient" + - "Service\022 \n\003Get\022\013.GetRequest\032\014.GetRespons" + - "e\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateResp" + - "onse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRespons" + - "e\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReques" + - "t\032\026.BulkLoadHFileResponse\022F\n\013ExecService" + - "\022\032.CoprocessorServiceRequest\032\033.Coprocess" + - "orServiceResponse\022R\n\027ExecRegionServerSer", - "vice\022\032.CoprocessorServiceRequest\032\033.Copro" + - "cessorServiceResponse\022&\n\005Multi\022\r.MultiRe" + - "quest\032\016.MultiResponseBB\n*org.apache.hado" + - "op.hbase.protobuf.generatedB\014ClientProto" + - "sH\001\210\001\001\240\001\001" + "\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" + + "ient_handles_heartbeats\030\010 \001(\010\"\304\001\n\014ScanRe" + + "sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" + + "ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" + + "l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" + + "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" + + "\010\022\031\n\021heartbeat_message\030\010 \001(\010\"\263\001\n\024BulkLoa", + "dHFileRequest\022 \n\006region\030\001 \002(\0132\020.RegionSp" + + "ecifier\0225\n\013family_path\030\002 \003(\0132 .BulkLoadH" + + "FileRequest.FamilyPath\022\026\n\016assign_seq_num" + + "\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004" + + "path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006l" + + "oaded\030\001 \002(\010\"a\n\026CoprocessorServiceCall\022\013\n" + + "\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013meth" + + "od_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030Coproc" + + "essorServiceResult\022\035\n\005value\030\001 \001(\0132\016.Name" + + "BytesPair\"d\n\031CoprocessorServiceRequest\022 ", + "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call" + + "\030\002 \002(\0132\027.CoprocessorServiceCall\"]\n\032Copro" + + "cessorServiceResponse\022 \n\006region\030\001 \002(\0132\020." + + "RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016.NameByt" + + "esPair\"{\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutat" + + "ion\030\002 \001(\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004" + + ".Get\022-\n\014service_call\030\004 \001(\0132\027.Coprocessor" + + "ServiceCall\"Y\n\014RegionAction\022 \n\006region\030\001 " + + "\002(\0132\020.RegionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n" + + "\006action\030\003 \003(\0132\007.Action\"D\n\017RegionLoadStat", + "s\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupa" + + "ncy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005in" + + "dex\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\tex" + + "ception\030\003 \001(\0132\016.NameBytesPair\0221\n\016service" + + "_result\030\004 \001(\0132\031.CoprocessorServiceResult" + + "\022#\n\tloadStats\030\005 \001(\0132\020.RegionLoadStats\"f\n" + + "\022RegionActionResult\022-\n\021resultOrException" + + "\030\001 \003(\0132\022.ResultOrException\022!\n\texception\030" + + "\002 \001(\0132\016.NameBytesPair\"f\n\014MultiRequest\022#\n" + + "\014regionAction\030\001 \003(\0132\r.RegionAction\022\022\n\nno", + "nceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Cond" + + "ition\"S\n\rMultiResponse\022/\n\022regionActionRe" + + "sult\030\001 \003(\0132\023.RegionActionResult\022\021\n\tproce" + + "ssed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n" + + "\010TIMELINE\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.G" + + "etRequest\032\014.GetResponse\022)\n\006Mutate\022\016.Muta" + + "teRequest\032\017.MutateResponse\022#\n\004Scan\022\014.Sca" + + "nRequest\032\r.ScanResponse\022>\n\rBulkLoadHFile" + + "\022\025.BulkLoadHFileRequest\032\026.BulkLoadHFileR" + + "esponse\022F\n\013ExecService\022\032.CoprocessorServ", + "iceRequest\032\033.CoprocessorServiceResponse\022" + + "R\n\027ExecRegionServerService\022\032.Coprocessor" + + "ServiceRequest\032\033.CoprocessorServiceRespo" + + "nse\022&\n\005Multi\022\r.MultiRequest\032\016.MultiRespo" + + "nseBB\n*org.apache.hadoop.hbase.protobuf." + + "generatedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -32703,13 +32940,13 @@ public final class ClientProtos { internal_static_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", }); internal_static_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_ScanResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanResponse_descriptor, - new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", }); + new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "HeartbeatMessage", }); internal_static_BulkLoadHFileRequest_descriptor = getDescriptor().getMessageTypes().get(14); internal_static_BulkLoadHFileRequest_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 5142e53..dfa5655 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -275,6 +275,7 @@ message ScanRequest { optional bool close_scanner = 5; optional uint64 next_call_seq = 6; optional bool client_handles_partials = 7; + optional bool client_handles_heartbeats = 8; } /** @@ -308,6 +309,12 @@ message ScanResponse { // has false, false, true in it, then we know that on the client side, we need to // make another RPC request since the last result was only a partial. repeated bool partial_flag_per_result = 7; + + // This field is filled in if the server is sending back a heartbeat message. + // Heartbeat messages are sent back to the client to prevent the scanner from + // timing out. Seeing a heartbeat message communicates to the Client that the + // server would have continued to scan had the time limit not been reached. + optional bool heartbeat_message = 8; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index a80a07e..95efb6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,6 +29,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -72,10 +72,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner { public Result next() throws IOException { values.clear(); - // negative values indicate no limits - final long remainingResultSize = -1; - final int batchLimit = -1; - scanner.nextRaw(values, batchLimit, remainingResultSize); + scanner.nextRaw(values); if (values.isEmpty()) { //we are done return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3b1f267..9c2d453 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5484,18 +5484,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // @Override public NextState next(List outResults) throws IOException { - // apply the batching limit by default return next(outResults, batch); } @Override - public NextState next(List outResults, int limit) throws IOException { - return next(outResults, limit, -1); + public NextState next(List outResults, int batchLimit) throws IOException { + return next(outResults, batchLimit, -1); } @Override - public synchronized NextState next(List outResults, int limit, long remainingResultSize) + public NextState next(List outResults, int batchLimit, long sizeLimit) throws IOException { + return next(outResults, batchLimit, sizeLimit, -1); + } + + @Override + public synchronized NextState next(List outResults, int batchLimit, long sizeLimit, + long timeLimit) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -5504,7 +5509,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // startRegionOperation(Operation.SCAN); readRequestsCount.increment(); try { - return nextRaw(outResults, limit, remainingResultSize); + return nextRaw(outResults, batchLimit, sizeLimit, timeLimit); } finally { closeRegionOperation(Operation.SCAN); } @@ -5516,13 +5521,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } @Override - public NextState nextRaw(List outResults, int limit) + public NextState nextRaw(List outResults, int batchLimit) throws IOException { + return nextRaw(outResults, batchLimit, -1); + } + + @Override + public NextState nextRaw(List outResults, int batchLimit, long sizeLimit) throws IOException { - return nextRaw(outResults, limit, -1); + return nextRaw(outResults, batchLimit, sizeLimit, -1); } @Override - public NextState nextRaw(List outResults, int batchLimit, long remainingResultSize) + public NextState nextRaw(List outResults, int batchLimit, long sizeLimit, long timeLimit) throws IOException { if (storeHeap == null) { // scanner is closed @@ -5532,10 +5542,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (outResults.isEmpty()) { // Usually outResults is empty. This is true when next is called // to handle scan or get operation. - state = nextInternal(outResults, batchLimit, remainingResultSize); + state = nextInternal(outResults, batchLimit, sizeLimit, timeLimit); } else { List tmpList = new ArrayList(); - state = nextInternal(tmpList, batchLimit, remainingResultSize); + state = nextInternal(tmpList, batchLimit, sizeLimit, timeLimit); outResults.addAll(tmpList); } // Invalid states should never be returned. Receiving an invalid state means that we have @@ -5544,10 +5554,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // throw new IOException("Invalid state returned from nextInternal. state:" + state); } - // If the size limit was reached it means a partial Result is being returned. Returning a - // partial Result means that we should not reset the filters; filters should only be reset in - // between rows - if (!state.sizeLimitReached()) resetFilters(); + // Filters should only be reset between rows; Partial means we have not changed rows + if (!state.partialFormed()) resetFilters(); if (isFilterDoneInternal()) { state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize()); @@ -5556,17 +5564,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } /** + * @param results + * @param batchLimit + * @param sizeLimit + * @param timeLimit * @return the state the joinedHeap returned on the call to * {@link KeyValueHeap#next(List, int, long)} + * @throws IOException */ - private NextState populateFromJoinedHeap(List results, int limit, long resultSize) - throws IOException { + private NextState populateFromJoinedHeap(List results, int batchLimit, long sizeLimit, + long timeLimit) throws IOException { assert joinedContinuationRow != null; NextState state = - populateResult(results, this.joinedHeap, limit, resultSize, - joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(), - joinedContinuationRow.getRowLength()); - if (state != null && !state.batchLimitReached() && !state.sizeLimitReached()) { + populateResult(results, this.joinedHeap, batchLimit, sizeLimit, timeLimit, + joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(), + joinedContinuationRow.getRowLength()); + if (state != null && !state.limitReached()) { // We are done with this row, reset the continuation. joinedContinuationRow = null; } @@ -5580,38 +5593,55 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is * reached, or remainingResultSize (if not -1) is reaced * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. - * @param remainingResultSize The remaining space within our result size limit. A negative value - * indicate no limit - * @param batchLimit Max amount of KVs to place in result list, -1 means no limit. + * @param batchLimit Limit on the number of cells to return + * @param sizeLimit Limit on the size of the list of cells being returned + * @param timeLimit Limit on the execution time * @param currentRow Byte array with key we are fetching. * @param offset offset for currentRow * @param length length for currentRow * @return state of last call to {@link KeyValueHeap#next()} */ private NextState populateResult(List results, KeyValueHeap heap, int batchLimit, - long remainingResultSize, byte[] currentRow, int offset, short length) throws IOException { + long sizeLimit, long timeLimit, byte[] currentRow, int offset, short length) + throws IOException { Cell nextKv; boolean moreCellsInRow = false; long accumulatedResultSize = 0; List tmpResults = new ArrayList(); do { - int remainingBatchLimit = batchLimit - results.size(); - NextState heapState = - heap.next(tmpResults, remainingBatchLimit, remainingResultSize - accumulatedResultSize); + int heapBatchLimit = batchLimit - results.size(); + long heapSizeLimit = sizeLimit - accumulatedResultSize; + NextState heapState = heap.next(tmpResults, heapBatchLimit, heapSizeLimit, timeLimit); + + // Update the accumulated result size results.addAll(tmpResults); accumulatedResultSize += calculateResultSize(tmpResults, heapState); tmpResults.clear(); - if (batchLimit > 0 && results.size() == batchLimit) { + nextKv = heap.peek(); + moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); + + // Check the batch limit + if (ScannerLimitUtil.checkBatchLimit(batchLimit, results.size())) { return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, accumulatedResultSize); } - nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); - boolean sizeLimitReached = - remainingResultSize > 0 && accumulatedResultSize >= remainingResultSize; - if (moreCellsInRow && sizeLimitReached) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, accumulatedResultSize); + // Check the size limit + if (ScannerLimitUtil.checkSizeLimit(sizeLimit, accumulatedResultSize)) { + // Use moreCellsInRow to determine whether or not we are returning a partial Result. If + // there are more cells in the row and we have reached a limit, a partial result is being + // formed + return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, accumulatedResultSize, + moreCellsInRow); + } + + // Check the time limit + if (ScannerLimitUtil.checkTimeLimit(timeLimit, System.currentTimeMillis())) { + // Use moreCellsInRow to determine whether or not we are returning a partial Result. If + // there are more cells in the row and we have reached a limit, a partial result is being + // formed + return NextState.makeState(NextState.State.TIME_LIMIT_REACHED, accumulatedResultSize, + moreCellsInRow); } } while (moreCellsInRow); @@ -5673,13 +5703,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.filter != null && this.filter.filterAllRemaining(); } - private NextState nextInternal(List results, int batchLimit, long remainingResultSize) - throws IOException { + private NextState nextInternal(List results, int batchLimit, long sizeLimit, + long timeLimit) throws IOException { if (!results.isEmpty()) { throw new IllegalArgumentException("First parameter should be an empty list"); } - // Estimate of the size (heap size) of the results returned from this method - long resultSize = 0; RpcCallContext rpcCall = RpcServer.getCurrentCall(); // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. @@ -5687,6 +5715,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). while (true) { + // Estimate of the size (heap size) of the results returned from this method. + long resultSize = 0; + if (rpcCall != null) { // If a user specifies a too-restrictive or too-slow scanner, the // client might time out and disconnect while the server side @@ -5717,15 +5748,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); // If filter#hasFilterRow is true, partial results are not allowed since allowing them - // would prevent the filters from being evaluated. Thus, if it is true, change the - // remainingResultSize to -1 so that the entire row's worth of cells are fetched. - if (hasFilterRow && remainingResultSize > 0) { - remainingResultSize = -1; + // would prevent the filters from being evaluated. Thus, if it is true, disable any limits + // that could potentially create partial results + if (hasFilterRow) { if (LOG.isTraceEnabled()) { - LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + - " formed. The remainingResultSize of: " + remainingResultSize + " will not " + - " be considered when fetching the cells for this row."); + LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + + " formed. Disabling limits that may create partials"); } + sizeLimit = -1; + timeLimit = -1; } NextState joinedHeapState; @@ -5749,36 +5780,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // continue; } + // Ok, we are good, let's try to get some results from the main heap. NextState storeHeapState = - populateResult(results, this.storeHeap, batchLimit, remainingResultSize, currentRow, + populateResult(results, this.storeHeap, batchLimit, sizeLimit, timeLimit, currentRow, offset, length); + + // Update the local limit to reflect the accumulated result size thus far resultSize += calculateResultSize(results, storeHeapState); + // Invalid states should never be returned. If one is seen, throw exception // since we have no way of telling how we should proceed if (!NextState.isValidState(storeHeapState)) { - throw new IOException("NextState returned from call storeHeap was invalid"); + throw new IOException("NextState returned from call storeHeap was invalid. " + + "storeheapState:" + storeHeapState); } - // Ok, we are good, let's try to get some results from the main heap. - if (storeHeapState.batchLimitReached()) { + if (storeHeapState.limitReached()) { if (hasFilterRow) { throw new IncompatibleFilterException( - "Filter whose hasFilterRow() returns true is incompatible with scan with limit!"); + "Filter whose hasFilterRow() returns true is incompatible with scans that have " + + "to stop mid-row because a limit is reached. storeHeapState: " + + storeHeapState); } - // We hit the batch limit. - return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, resultSize); - } else if (storeHeapState.sizeLimitReached()) { - if (hasFilterRow) { - // We try to guard against this case above when remainingResultSize is set to -1 if - // hasFilterRow is true. In the even that the guard doesn't work, an exception must be - // thrown - throw new IncompatibleFilterException( - "Filter whose hasFilterRows() returns true is incompatible with scans that" - + " return partial results"); - } - // We hit the size limit. - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + return NextState.makeState(storeHeapState.getState(), resultSize, + storeHeapState.partialFormed()); } + Cell nextKv = this.storeHeap.peek(); stopRow = nextKv == null || isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength()); @@ -5808,35 +5835,47 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // These values are not needed for filter to work, so we postpone their // fetch to (possibly) reduce amount of data loads from disk. if (this.joinedHeap != null) { - Cell nextJoinedKv = joinedHeap.peek(); - // If joinedHeap is pointing to some other row, try to seek to a correct one. - boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, - currentRow, offset, length)) - || (this.joinedHeap.requestSeek( - KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true) - && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(), - currentRow, offset, length)); + boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length); if (mayHaveData) { joinedContinuationRow = current; + long joinedHeapSizeLimit = sizeLimit - resultSize; joinedHeapState = - populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); + populateFromJoinedHeap(results, batchLimit, joinedHeapSizeLimit, + timeLimit); + if (!NextState.isValidState(joinedHeapState)) { + throw new IOException("NextState returned from call to joinedHeap was invalid. " + + "joinedHeapState:" + joinedHeapState); + } + + // Update the local limit to reflect the accumulated result size resultSize += - joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? - joinedHeapState.getResultSize() : 0; - if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + joinedHeapState.hasResultSizeEstimate() ? joinedHeapState.getResultSize() : 0; + + // Check the state to see if limits were reached. + if (joinedHeapState.limitReached()) { + return NextState.makeState(joinedHeapState.getState(), resultSize, + joinedHeapState.partialFormed()); } } } } else { // Populating from the joined heap was stopped by limits, populate some more. + long joinedHeapSizeLimit = sizeLimit - resultSize; joinedHeapState = - populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); + populateFromJoinedHeap(results, batchLimit, joinedHeapSizeLimit, timeLimit); + if (!NextState.isValidState(joinedHeapState)) { + throw new IOException("NextState returned from call to joinedHeap was invalid. " + + "joinedHeapState:" + joinedHeapState); + } + + // Update the local limit to reflect the accumulated result size resultSize += - joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? - joinedHeapState.getResultSize() : 0; - if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + joinedHeapState.hasResultSizeEstimate() ? joinedHeapState.getResultSize() : 0; + + // Check the state to see if limits were reached. + if (joinedHeapState.limitReached()) { + return NextState.makeState(joinedHeapState.getState(), resultSize, + joinedHeapState.partialFormed()); } } // We may have just called populateFromJoinedMap and hit the limits. If that is @@ -5857,6 +5896,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // We are done. Return the result. if (stopRow) { return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); + } else if (ScannerLimitUtil.checkTimeLimit(timeLimit, System.currentTimeMillis())) { + return NextState.makeState(NextState.State.TIME_LIMIT_REACHED, resultSize); } else { return NextState.makeState(NextState.State.MORE_VALUES, resultSize); } @@ -5864,6 +5905,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } /** + * @param currentRow + * @param offset + * @param length + * @return true when the joined heap may have data for the current row + * @throws IOException + */ + private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length) + throws IOException { + Cell nextJoinedKv = joinedHeap.peek(); + boolean matchCurrentRow = + nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length); + boolean matchAfterSeek = false; + + // If the next value in the joined heap does not match the current row, try to seek to the + // correct row + if (!matchCurrentRow) { + Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length); + boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); + matchAfterSeek = + seekSuccessful && joinedHeap.peek() != null + && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length); + } + + return matchCurrentRow || matchAfterSeek; + } + + /** * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines * both filterRow & filterRow(List kvs) functions. While 0.94 code or older, it may * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index ea5a75f..c97a8b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -43,10 +43,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public interface InternalScanner extends Closeable { /** * This class encapsulates all the meaningful state information that we would like the know about - * after a call to {@link InternalScanner#next(List)}. While this is not an enum, a restriction on - * the possible states is implied through the exposed {@link #makeState(State)} method. + * after a call to {@link InternalScanner#next(List)}. While this is not an enum, a restriction + * on the possible states is implied through the exposed {@link #makeState(State)} method. */ - public static class NextState { + public static final class NextState { /** * The possible states we want to restrict ourselves to. This enum is not sufficient to * encapsulate all of the state information since some of the fields of the state must be @@ -56,7 +56,8 @@ public interface InternalScanner extends Closeable { MORE_VALUES(true), NO_MORE_VALUES(false), SIZE_LIMIT_REACHED(true), - BATCH_LIMIT_REACHED(true); + BATCH_LIMIT_REACHED(true), + TIME_LIMIT_REACHED(true); private boolean moreValues; @@ -76,18 +77,24 @@ public interface InternalScanner extends Closeable { /** * state variables */ - private final State state; - private long resultSize; + private State state = DEFAULT_STATE; + private long resultSize = DEFAULT_RESULT_SIZE; + private boolean partialFormed = DEFAULT_PARTIAL_FORMED; /** - * Value to use for resultSize when the size has not been calculated. Must be a negative number - * so that {@link NextState#hasResultSizeEstimate()} returns false. + * Defaults for state variables */ private static final long DEFAULT_RESULT_SIZE = -1; + private static final State DEFAULT_STATE = State.NO_MORE_VALUES; + private static final boolean DEFAULT_PARTIAL_FORMED = false; - private NextState(State state, long resultSize) { + /** + * Private constructor. All instantiation should be performed through {@link #makeState(State)} + */ + private NextState(State state, long resultSize, boolean partialFormed) { this.state = state; this.resultSize = resultSize; + this.partialFormed = partialFormed; } /** @@ -108,79 +115,73 @@ public interface InternalScanner extends Closeable { * the result size by using the cached value retrievable via {@link #getResultSize()} */ public static NextState makeState(final State state, long resultSize) { + return makeState(state, resultSize, DEFAULT_PARTIAL_FORMED); + } + + /** + * @param state + * @param resultSize + * @param partialFormed + * @return An instance of {@link NextState} where the size of the values returned from the call + * to {@link InternalScanner#next(List)} is known. The caller can avoid recalculating + * the result size by using the cached value retrievable via {@link #getResultSize()} + */ + public static NextState makeState(final State state, long resultSize, boolean partialFormed) { switch (state) { case MORE_VALUES: - return createMoreValuesState(resultSize); case NO_MORE_VALUES: - return createNoMoreValuesState(resultSize); case BATCH_LIMIT_REACHED: - return createBatchLimitReachedState(resultSize); case SIZE_LIMIT_REACHED: - return createSizeLimitReachedState(resultSize); + case TIME_LIMIT_REACHED: + return new NextState(state, resultSize, partialFormed); default: // If the state is not recognized, default to no more value state - return createNoMoreValuesState(resultSize); + return new NextState(State.NO_MORE_VALUES, resultSize, partialFormed); } } /** - * Convenience method for creating a state that indicates that more values can be scanned - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} + * @return true when the scanner has more values to be scanned following the values returned by + * the call to {@link InternalScanner#next(List)} */ - private static NextState createMoreValuesState(long resultSize) { - return new NextState(State.MORE_VALUES, resultSize); + public boolean hasMoreValues() { + return this.state.hasMoreValues(); } /** - * Convenience method for creating a state that indicates that no more values can be scanned. - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} + * @return true when the scanner had to stop scanning because it reached the batch limit */ - private static NextState createNoMoreValuesState(long resultSize) { - return new NextState(State.NO_MORE_VALUES, resultSize); + public boolean batchLimitReached() { + return this.state == State.BATCH_LIMIT_REACHED; } /** - * Convenience method for creating a state that indicates that the scan stopped because the - * batch limit was exceeded - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} + * @return true when the scanner had to stop scanning because it reached the size limit */ - private static NextState createBatchLimitReachedState(long resultSize) { - return new NextState(State.BATCH_LIMIT_REACHED, resultSize); + public boolean sizeLimitReached() { + return this.state == State.SIZE_LIMIT_REACHED; } - + /** - * Convenience method for creating a state that indicates that the scan stopped due to the size - * limit - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} + * @return true when the scanner had to stop scanning because the time limit was exceeded */ - private static NextState createSizeLimitReachedState(long resultSize) { - return new NextState(State.SIZE_LIMIT_REACHED, resultSize); + public boolean timeLimitReached() { + return this.state == State.TIME_LIMIT_REACHED; } /** - * @return true when the scanner has more values to be scanned following the values returned by - * the call to {@link InternalScanner#next(List)} + * @return true when the scanner had to stop scanning because one of the specified limits has + * been reached */ - public boolean hasMoreValues() { - return this.state.hasMoreValues(); + public boolean limitReached() { + return batchLimitReached() || sizeLimitReached() || timeLimitReached(); } /** - * @return true when the scanner had to stop scanning because it reached the batch limit + * @return The underlying {@link State} of this {@link NextState} instance */ - public boolean batchLimitReached() { - return this.state == State.BATCH_LIMIT_REACHED; - } - - /** - * @return true when the scanner had to stop scanning because it reached the size limit - */ - public boolean sizeLimitReached() { - return this.state == State.SIZE_LIMIT_REACHED; + public State getState() { + return state; } /** @@ -193,6 +194,14 @@ public interface InternalScanner extends Closeable { } /** + * @return true when the scanner had to stop scanning in the middle of a row and thus a partial + * Result was formed. + */ + public boolean partialFormed() { + return this.partialFormed; + } + + /** * @return true when an estimate for the size of the values returned by * {@link InternalScanner#next(List)} was provided. If false, it is the responsibility * of the caller to calculate the result size @@ -201,11 +210,6 @@ public interface InternalScanner extends Closeable { return resultSize >= 0; } - @Override - public String toString() { - return "State: " + state + " resultSize: " + resultSize; - } - /** * Helper method to centralize all checks as to whether or not the state is valid. * @param state @@ -222,6 +226,12 @@ public interface InternalScanner extends Closeable { public static boolean hasMoreValues(NextState state) { return state != null && state.hasMoreValues(); } + + @Override + public String toString() { + return "NextState: State: " + getState() + " resultSize: " + resultSize + " partialFormed: " + + partialFormed; + } } /** @@ -236,24 +246,37 @@ public interface InternalScanner extends Closeable { /** * Grab the next row's worth of values with a limit on the number of values to return. * @param result return output array - * @param limit limit on row count to get + * @param batchLimit limit on cell count to get * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this * one, false if scanner is done * @throws IOException e */ - NextState next(List result, int limit) throws IOException; + NextState next(List result, int batchLimit) throws IOException; /** - * Grab the next row's worth of values with a limit on the number of values to return as well as a + * Grab the next row's worth of values with a limit on the number of values to return and a * restriction on the size of the list of values that are returned. * @param result return output array - * @param limit limit on row count to get - * @param remainingResultSize limit on the size of the result being returned + * @param batchLimit limit on cell count to get + * @param sizeLimit limit on the size of the result being returned + * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this + * one, false if scanner is done + * @throws IOException e + */ + NextState next(List result, int batchLimit, long sizeLimit) throws IOException; + + /** + * Grab the next row's worth of values while respecting the limits (batch, size, time) + * @param result return output array + * @param batchLimit limit on cell count to get + * @param sizeLimit limit on the size of the result being returned + * @param timeLimit future timestamp that execution of this method must finish by * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this * one, false if scanner is done * @throws IOException e */ - NextState next(List result, int limit, long remainingResultSize) throws IOException; + NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) + throws IOException; /** * Closes the scanner and releases any resources it has allocated diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index beb23cf..b47c65d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -128,27 +128,39 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * This can ONLY be called when you are using Scanners that implement InternalScanner as well as * KeyValueScanner (a {@link StoreScanner}). * @param result - * @param limit - * @return state where NextState#hasMoreValues() is true if more keys exist after this - * one, false if scanner is done + * @return state where NextState#hasMoreValues() is true if more keys exist after this one, false + * if scanner is done */ - public NextState next(List result, int limit) throws IOException { - return next(result, limit, -1); + @Override + public NextState next(List result) throws IOException { + return next(result, -1); + } + + @Override + public NextState next(List result, int batchLimit) throws IOException { + return next(result, batchLimit, -1); } - public NextState next(List result, int limit, long remainingResultSize) throws IOException { + @Override + public NextState next(List result, int batchLimit, long sizeLimit) throws IOException { + return next(result, batchLimit, sizeLimit, -1); + } + + @Override + public NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { if (this.current == null) { return NextState.makeState(NextState.State.NO_MORE_VALUES); } InternalScanner currentAsInternal = (InternalScanner)this.current; - NextState state = currentAsInternal.next(result, limit, remainingResultSize); + NextState state = currentAsInternal.next(result, batchLimit, sizeLimit, timeLimit); // Invalid states should never be returned. Receiving an invalid state means that we have // no clue how to proceed. Throw an exception. if (!NextState.isValidState(state)) { throw new IOException("Invalid state returned from InternalScanner#next"); } boolean mayContainMoreRows = NextState.hasMoreValues(state); - Cell pee = this.current.peek(); + Cell peek = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no * further rows to be fetched. So, we can close a scanner if it returns @@ -156,7 +168,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ - if (pee == null || !mayContainMoreRows) { + if (peek == null || !mayContainMoreRows) { this.current.close(); } else { this.heap.add(this.current); @@ -168,21 +180,6 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return state; } - /** - * Gets the next row of keys from the top-most scanner. - *

- * This method takes care of updating the heap. - *

- * This can ONLY be called when you are using Scanners that implement InternalScanner as well as - * KeyValueScanner (a {@link StoreScanner}). - * @param result - * @return state where NextState#hasMoreValues() is true if more keys exist after this - * one, false if scanner is done - */ - public NextState next(List result) throws IOException { - return next(result, -1); - } - protected static class KVScannerComparator implements Comparator { protected KVComparator kvComparator; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3ce1b57..5348723 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2151,6 +2151,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean stale = (region.getRegionInfo().getReplicaId() != 0); boolean clientHandlesPartials = request.hasClientHandlesPartials() && request.getClientHandlesPartials(); + boolean clientHandlesHeartbeats = + request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); // On the server side we must ensure that the correct ordering of partial results is // returned to the client to allow them to properly reconstruct the partial results. @@ -2158,51 +2160,79 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // correct ordering of partial results and so we prevent partial results from being // formed. boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0; - boolean enforceMaxResultSizeAtCellLevel = + boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; - while (i < rows) { - // Stop collecting results if we have exceeded maxResultSize - if (currentScanResultSize >= maxResultSize) { - break; - } + // Heartbeat messages occur when the processing of the ScanRequest is exceeds a + // certain time threshold on the server. When the time threshold is exceeded, the + // server stops the scan and sends back whatever Results it has accumulated within + // that time period (may be empty). Since heartbeat messages have the potential to + // create partial Results, we must only generate heartbeat messages when the client + // can handle both heartbeats AND partials + boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; + + // Default value of timeLimit is negative to indicate no timeLimit should be + // enforced. + long timeLimit = -1; + // If the scannerLeaseTimeoutPeriod is positive (which it always should be) then we + // can use that to define our timeLimit else we have no context for how long the + // timeLimit should be and thus do not enforce timeLimits + if (allowHeartbeatMessages && scannerLeaseTimeoutPeriod > 0) { + timeLimit = System.currentTimeMillis() + scannerLeaseTimeoutPeriod / 2; + } - // A negative remainingResultSize communicates that there is no limit on the size - // of the results. - final long remainingResultSize = - enforceMaxResultSizeAtCellLevel ? maxResultSize - currentScanResultSize - : -1; + while (i < rows) { + // A negative value means no limit + final long sizeLimit = + allowPartialResults ? maxResultSize - currentScanResultSize : -1; - // Collect values to be returned here + // Collect values to be returned here and assert that state is valid NextState state = - scanner.nextRaw(values, scanner.getBatch(), remainingResultSize); - // Invalid states should never be returned. If one is seen, throw exception - // to stop the scan -- We have no way of telling how we should proceed + scanner.nextRaw(values, scanner.getBatch(), sizeLimit, timeLimit); if (!NextState.isValidState(state)) { throw new IOException("NextState returned from call to nextRaw was invalid"); } + + long incrementalResultSize = 0; if (!values.isEmpty()) { // The state should always contain an estimate of the result size because that // estimate must be used to decide when partial results are formed. boolean skipResultSizeCalculation = state.hasResultSizeEstimate(); - if (skipResultSizeCalculation) currentScanResultSize += state.getResultSize(); + if (skipResultSizeCalculation) incrementalResultSize += state.getResultSize(); for (Cell cell : values) { totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); // If the calculation can't be skipped, then do it now. if (!skipResultSizeCalculation) { - currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell); + incrementalResultSize += CellUtil.estimatedHeapSizeOf(cell); } } - // The size limit was reached. This means there are more cells remaining in - // the row but we had to stop because we exceeded our max result size. This - // indicates that we are returning a partial result - final boolean partial = state != null && state.sizeLimitReached(); - results.add(Result.create(values, null, stale, partial)); + currentScanResultSize += incrementalResultSize; + results.add(Result.create(values, null, stale, state.partialFormed())); i++; } - if (!NextState.hasMoreValues(state)) { + + boolean sizeLimitReached = + ScannerLimitUtil.checkSizeLimit(maxResultSize, currentScanResultSize); + boolean timeLimitReached = + ScannerLimitUtil.checkTimeLimit(timeLimit, System.currentTimeMillis()); + boolean limitReached = sizeLimitReached || timeLimitReached; + boolean moreValues = NextState.hasMoreValues(state); + + if (limitReached || !moreValues) { + if (LOG.isTraceEnabled()) { + LOG.trace("Done scanning. limitReached: " + limitReached + " moreValues: " + + moreValues); + } + // We only want to mark a ScanResponse as a heartbeat message in the event that + // there are more values to be read server side. If there aren't more values, + // marking it as a heartbeat is wasteful because the client will need to issue + // another ScanRequest only to realize that they already have all the values + if (moreValues) { + // Heartbeat messages occur when the time limit has been reached + builder.setHeartbeatMessage(timeLimitReached); + } break; } values.clear(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 26f9aef..eb83ba7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -74,11 +74,10 @@ public interface RegionScanner extends InternalScanner { int getBatch(); /** - * Grab the next row's worth of values with the default limit on the number of values to return. - * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. - * Caller must set the thread's readpoint, start and close a region operation, an synchronize on - * the scanner object. Caller should maintain and update metrics. See - * {@link #nextRaw(List, int, long)} + * Grab the next row's worth of values without any limits on the scanner. This is a special + * internal method to be called from coprocessor hooks to avoid expensive setup. Caller must set + * the thread's readpoint, start and close a region operation, an synchronize on the scanner + * object. Caller should maintain and update metrics. See {@link #nextRaw(List, int, long, long)} * @param result return output array * @return a state where NextState#hasMoreValues() is true when more rows exist, false when * scanner is done. @@ -87,24 +86,39 @@ public interface RegionScanner extends InternalScanner { NextState nextRaw(List result) throws IOException; /** - * Grab the next row's worth of values with the default limit on the number of values to return. + * Grab the next row's worth of values with a limit on the batch size. This is a special internal + * method to be called from coprocessor hooks to avoid expensive setup. Caller must set the + * thread's readpoint, start and close a region operation, an synchronize on the scanner object. + * Caller should maintain and update metrics. See {@link #nextRaw(List, int, long, long)} + * @param result return output array + * @param batchLimit limit on cell count + * @return a state where NextState#hasMoreValues() is true when more rows exist, false when + * scanner is done. + * @throws IOException e + */ + NextState nextRaw(List result, int batchLimit) throws IOException; + + /** + * Grab the next row's worth of values with limits on the batch size and result size (heap size). * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. * Caller must set the thread's readpoint, start and close a region operation, an synchronize on * the scanner object. Caller should maintain and update metrics. See - * {@link #nextRaw(List, int, long)} + * {@link #nextRaw(List, int, long, long)} * @param result return output array - * @param limit limit on row count to get + * @param batchLimit limit on cell count + * @param sizeLimit the restriction on the size of the results to return. Negative values indicate + * no limit * @return a state where NextState#hasMoreValues() is true when more rows exist, false when * scanner is done. * @throws IOException e */ - NextState nextRaw(List result, int limit) throws IOException; + NextState nextRaw(List result, int batchLimit, long sizeLimit) throws IOException; /** - * Grab the next row's worth of values with a limit on the number of values to return as well as a - * limit on the heap size of those values. This is a special internal method to be called from - * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and - * close a region operation, an synchronize on the scanner object. Example:

+   * Grab the next row's worth of values with different limits (e.g. batch, size, time). This is a
+   * special internal method to be called from coprocessor hooks to avoid expensive setup. Caller
+   * must set the thread's readpoint, start and close a region operation, an synchronize on the
+   * scanner object. Example: 
    * HRegion region = ...;
    * RegionScanner scanner = ...
    * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
@@ -120,13 +134,14 @@ public interface RegionScanner extends InternalScanner {
    * }
    * 
* @param result return output array - * @param limit limit on row count to get - * @param remainingResultSize the space remaining within the restriction on the result size. - * Negative values indicate no limit + * @param batchLimit limit on cell count to get + * @param sizeLimit the restriction on the size of the results to return. Negative values indicate + * no limit + * @param timeLimit future timestamp that execution of this method must finish by * @return a state where NextState#hasMoreValues() is true when more rows exist, false when * scanner is done. * @throws IOException e */ - NextState nextRaw(List result, int limit, final long remainingResultSize) + NextState nextRaw(List result, int batchLimit, long sizeLimit, long timeLimit) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerLimitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerLimitUtil.java new file mode 100644 index 0000000..2b19d38 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerLimitUtil.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Utility for checking the limits that can be specified during calls to + * {@link InternalScanner#next(java.util.List, int, long, long)}. + */ +@InterfaceAudience.Private +public final class ScannerLimitUtil { + /** + * @param batchLimit + * @param batch + * @return true when the batch limit has been reached, false otherwise + */ + public static boolean checkBatchLimit(int batchLimit, int batch) { + return checkLongLimit(batchLimit, batch); + } + + /** + * @param sizeLimit + * @param size + * @return true when the size limit is reached, false otherwise + */ + public static boolean checkSizeLimit(long sizeLimit, long size) { + return checkLongLimit(sizeLimit, size); + } + + /** + * @param timeLimit + * @param time + * @return true when the time limit has been reached, false otherwise + */ + public static boolean checkTimeLimit(long timeLimit, long time) { + return checkLongLimit(timeLimit, time); + } + + /** + * @param limit The limit being enforced. If negative, the limit is regarded as invalid + * @param value The value to check against the limit + * @return true when the limit has been reached, false otherwise. The limit is reached when the + * limit is positive and the value is greater than or equal to the limit + */ + private static boolean checkLongLimit(long limit, long value) { + return limit > 0 && value >= limit; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7ce4e0b..21e0769 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -442,29 +442,34 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } - /** - * Get the next row of values from this Store. - * @param outResult - * @param limit - * @return true if there are more rows, false if scanner is done - */ @Override - public NextState next(List outResult, int limit) throws IOException { - // -1 means no limit - return next(outResult, limit, -1); + public NextState next(List outResult) throws IOException { + return next(outResult, -1); + } + + @Override + public NextState next(List outResult, int batchLimit) throws IOException { + return next(outResult, batchLimit, -1); + } + + @Override + public NextState next(List outResult, int batchLimit, long sizeLimit) throws IOException { + return next(outResult, batchLimit, sizeLimit, -1); } /** * Get the next row of values from this Store. * @param outResult - * @param limit - * @param remainingResultSize + * @param batchLimit + * @param sizeLimit + * @param timeLimit * @return true if there are more rows, false if scanner is done */ @Override - public NextState next(List outResult, int limit, long remainingResultSize) + public NextState next(List outResult, int batchLimit, long sizeLimit, long timeLimit) throws IOException { lock.lock(); + try { if (checkReseek()) { return NextState.makeState(NextState.State.MORE_VALUES, 0); @@ -489,14 +494,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int offset = peeked.getRowOffset(); short length = peeked.getRowLength(); - // If limit < 0 and remainingResultSize < 0 we can skip the row comparison because we know - // the row has changed. Else it is possible we are still traversing the same row so we - // must perform the row comparison. - if ((limit < 0 && remainingResultSize < 0) || matcher.row == null - || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, - matcher.rowLength)) { - this.countPerRow = 0; - matcher.setRow(row, offset, length); + // If limits have not been specified (i.e. batchLimit, sizeLimit, and timeLimit are not set) + // we can skip the row comparison because we know the row has changed. Else it is possible we + // are still traversing the same row so we must perform the row comparison. + boolean noLimitsSpecified = batchLimit < 0 && sizeLimit < 0 && timeLimit < 0; + if (noLimitsSpecified || matcher.row == null || + !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) { + this.countPerRow = 0; + matcher.setRow(row, offset, length); } Cell cell; @@ -510,6 +515,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner long totalHeapSize = 0; LOOP: while((cell = this.heap.peek()) != null) { + // Check the time limit + if (ScannerLimitUtil.checkTimeLimit(timeLimit, System.currentTimeMillis())) { + if (LOG.isTraceEnabled()) { + LOG.trace("Time limit: reached in StoreScanner"); + } + return NextState.makeState(NextState.State.TIME_LIMIT_REACHED, totalHeapSize); + } + if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. checkScanOrder(prevCell, cell, comparator); prevCell = cell; @@ -562,10 +575,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.heap.next(); } - if (limit > 0 && (count == limit)) { + if (ScannerLimitUtil.checkBatchLimit(batchLimit, count)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Batch limit: reached in StoreScanner"); + } break LOOP; } - if (remainingResultSize > 0 && (totalHeapSize >= remainingResultSize)) { + if (ScannerLimitUtil.checkSizeLimit(sizeLimit, totalHeapSize)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Size limit: reached in StoreScanner"); + } break LOOP; } continue; @@ -655,11 +674,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return qcode; } - @Override - public NextState next(List outResult) throws IOException { - return next(outResult, -1); - } - // Implementation of ChangedReadersObserver @Override public void updateReaders() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index e7c3813..32c29ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -217,7 +217,8 @@ public class TestPartialResultsFromClientSide { count++; } - assertTrue(scanner2.next() == null); + r2 = scanner2.next(); + assertTrue("r2: " + r2 + " Should be null", r2 == null); scanner1.close(); scanner2.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 75fe93d..f37bcc9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -90,14 +90,19 @@ public class TestCoprocessorInterface { } @Override - public NextState next(List result, int limit) throws IOException { - return delegate.next(result, limit); + public NextState next(List results, int batchLimit) throws IOException { + return delegate.next(results, batchLimit); } @Override - public NextState next(List result, int limit, long remainingResultSize) + public NextState next(List results, int batchLimit, long sizeLimit) throws IOException { + return delegate.next(results, batchLimit, sizeLimit); + } + + @Override + public NextState next(List results, int batchLimit, long sizeLimit, long timeLimit) throws IOException { - return delegate.next(result, limit, remainingResultSize); + return delegate.next(results, batchLimit, sizeLimit, timeLimit); } @Override @@ -107,14 +112,19 @@ public class TestCoprocessorInterface { } @Override - public NextState nextRaw(List result, int limit) throws IOException { - return delegate.nextRaw(result, limit); + public NextState nextRaw(List result, int batchLimit) throws IOException { + return delegate.nextRaw(result, batchLimit); } @Override - public NextState nextRaw(List result, int limit, long remainingResultSize) + public NextState nextRaw(List result, int batchLimit, long sizeLimit) throws IOException { + return delegate.nextRaw(result, batchLimit, sizeLimit); + } + + @Override + public NextState nextRaw(List result, int batchLimit, long sizeLimit, long timeLimit) throws IOException { - return delegate.nextRaw(result, limit, remainingResultSize); + return delegate.nextRaw(result, batchLimit, sizeLimit, timeLimit); } @Override @@ -151,7 +161,6 @@ public class TestCoprocessorInterface { public int getBatch() { return delegate.getBatch(); } - } public static class CoprocessorImpl extends BaseRegionObserver { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index a4963ae..cd28752 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -438,18 +438,24 @@ public class TestRegionObserverInterface { } @Override - public NextState next(List results, int limit) throws IOException { - return next(results, limit, -1); + public NextState next(List results, int batchLimit) throws IOException { + return next(results, batchLimit, -1); } @Override - public NextState next(List results, int limit, long remainingResultSize) + public NextState next(List results, int batchLimit, long sizeLimit) + throws IOException { + return next(results, batchLimit, sizeLimit, -1); + } + + @Override + public NextState next(List results, int batchLimit, long sizeLimit, long timeLimit) throws IOException { List internalResults = new ArrayList(); boolean hasMore; NextState state; do { - state = scanner.next(internalResults, limit, remainingResultSize); + state = scanner.next(internalResults, batchLimit, sizeLimit, timeLimit); hasMore = state != null && state.hasMoreValues(); if (!internalResults.isEmpty()) { long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 9a2c23b..a7c3ed5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -60,11 +60,11 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; import org.junit.Rule; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ca5135d..5b413bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -131,8 +131,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java new file mode 100644 index 0000000..17a0806 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -0,0 +1,500 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Here we test to make sure that scans return the expected Results when the server is sending the + * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent + * the scanner on the client side from timing out). A heartbeat message is sent from the server to + * the client when the server has exceeded the time limit during the processing of the scan. When + * the time limit is reached, the server will return to the Client whatever Results it has + * accumulated (potentially empty). + */ +@Category(MediumTests.class) +public class TestScannerHeartbeatMessages { + final Log LOG = LogFactory.getLog(getClass()); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static Table TABLE = null; + + /** + * Table configuration + */ + private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); + + private static int NUM_ROWS = 10; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + private static int NUM_FAMILIES = 3; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + private static int NUM_QUALIFIERS = 3; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + private static int VALUE_SIZE = 128; + private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + // Time, in milliseconds, that the client will wait for a response from the server before timing + // out. This value is used server side to determine when it is necessary to send a heartbeat + // message to the client + private static int CLIENT_TIMEOUT = 500; + + // The server limits itself to running for half of the CLIENT_TIMEOUT value. + private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2; + + // By default, at most one row's worth of cells will be retrieved before the time limit is reached + private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT; + // By default, at most cells for two column families are retrieved before the time limit is + // reached + private static int DEFAULT_CF_SLEEP_TIME = SERVER_TIME_LIMIT / 2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + ((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) HeartbeatRPCServices.LOG).getLogger().setLevel(Level.ALL); + Configuration conf = TEST_UTIL.getConfiguration(); + + conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); + conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); + TEST_UTIL.startMiniCluster(1); + + TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + static Table createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + Table ht = TEST_UTIL.createTable(name, families); + List puts = createPuts(rows, families, qualifiers, cellValue); + ht.put(puts); + + return ht; + } + + /** + * Make puts to put the input value into each combination of row, family, and qualifier + * @param rows + * @param families + * @param qualifiers + * @param value + * @return + * @throws IOException + */ + static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + Put put; + ArrayList puts = new ArrayList<>(); + + for (int row = 0; row < rows.length; row++) { + put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + + return puts; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setupBeforeTest() throws Exception { + disableSleeping(); + } + + @After + public void teardownAfterTest() throws Exception { + disableSleeping(); + } + + /** + * Test a variety of scan configurations to ensure that they return the expected Results when + * heartbeat messages are necessary. These tests are accumulated under one test case to ensure + * that they don't run in parallel. If the tests ran in parallel, they may conflict with each + * other due to changing static variables + */ + @Test + public void testScannerHeartbeatMessages() throws Exception { + testHeartbeatBetweenColumnFamilies(); + testHeartbeatBetweenRows(); + } + + /** + * Test the case that the time limit for the scan is reached after each full row of cells is + * fetched. + * @throws Exception + */ + public void testHeartbeatBetweenRows() throws Exception { + // Configure the scan so that it can read the entire table in a single RPC. We want to test + // the case where a scan stops on the server side due to a time limit + Scan scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + + testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1); + } + + /** + * Test the case that the time limit for scans is reached in between column families + * @throws Exception + */ + public void testHeartbeatBetweenColumnFamilies() throws Exception { + // Configure the scan so that it can read the entire table in a single RPC. We want to test + // the case where a scan stops on the server side due to a time limit + Scan scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + + testEquivalenceOfScanWithHeartbeats(scan, -1, DEFAULT_CF_SLEEP_TIME, false); + testEquivalenceOfScanWithHeartbeats(scan, -1, DEFAULT_CF_SLEEP_TIME, true); + } + + /** + * Test the equivalence of a scan versus the same scan executed when heartbeat messages are + * necessary + * @param scan The scan configuration being tested + * @param rowSleepTime The time to sleep between fetches of row cells + * @param cfSleepTime The time to sleep between fetches of column family cells + * @throws Exception + */ + public void testEquivalenceOfScanWithHeartbeats(Scan scan, int rowSleepTime, int cfSleepTime) + throws Exception { + // by default, cf sleeps occur after cf cell fetches + testEquivalenceOfScanWithHeartbeats(scan, rowSleepTime, cfSleepTime, false); + } + + /** + * Test the equivalence of a scan versus the same scan executed when heartbeat messages are + * necessary + * @param scan The scan configuration being tested + * @param rowSleepTime The time to sleep between fetches of row cells + * @param cfSleepTime The time to sleep between fetches of column family cells + * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for + * that column family are fetched + * @throws Exception + */ + public void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, + int cfSleepTime, boolean sleepBeforeCf) throws Exception { + final ResultScanner scanner = TABLE.getScanner(scan); + final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan); + + Result r1 = null; + Result r2 = null; + + while ((r1 = scanner.next()) != null) { + // Enforce the specified sleep conditions during calls to the heartbeat scanner + configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf); + r2 = scannerWithHeartbeats.next(); + disableSleeping(); + + assertTrue(r2 != null); + try { + Result.compareResults(r1, r2); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + assertTrue(scannerWithHeartbeats.next() == null); + scanner.close(); + scannerWithHeartbeats.close(); + } + + /** + * Helper method for setting the time to sleep between rows and column families. If a sleep time + * is negative then that sleep will be disabled + * @param rowSleepTime + * @param cfSleepTime + */ + private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { + HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; + HeartbeatHRegion.rowSleepTime = rowSleepTime; + + HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0; + HeartbeatHRegion.columnFamilySleepTime = cfSleepTime; + HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf; + } + + /** + * Disable the sleeping mechanism server side. + */ + private static void disableSleeping() { + HeartbeatHRegion.sleepBetweenRows = false; + HeartbeatHRegion.sleepBetweenColumnFamilies = false; + } + + /** + * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of + * {@link RSRpcServices} to allow us to toggle support for heartbeat messages + */ + private static class HeartbeatHRegionServer extends HRegionServer { + public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + public HeartbeatHRegionServer(Configuration conf, CoordinatedStateManager csm) + throws IOException { + super(conf, csm); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new HeartbeatRPCServices(this); + } + } + + /** + * Custom RSRpcServices instance that allows heartbeat support to be toggled + */ + private static class HeartbeatRPCServices extends RSRpcServices { + private static boolean heartbeatsEnabled = true; + + public HeartbeatRPCServices(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public ScanResponse scan(RpcController controller, ScanRequest request) + throws ServiceException { + ScanRequest.Builder builder = ScanRequest.newBuilder(request); + builder.setClientHandlesHeartbeats(heartbeatsEnabled); + return super.scan(controller, builder.build()); + } + } + + /** + * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times + * between fetches of row Results and/or column family cells. Useful for emulating an instance + * where the server is taking a long time to process a client's scan request + */ + private static class HeartbeatHRegion extends HRegion { + // Row sleeps occur AFTER each row worth of cells is retrieved. + private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; + private static boolean sleepBetweenRows = false; + + // The sleep for column families can be initiated before or after we fetch the cells for the + // column family. If the sleep occurs BEFORE then the time limits will be reached inside + // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time + // limit will be reached inside RegionScanner after all the cells for a column family have been + // retrieved. + private static boolean sleepBeforeColumnFamily = false; + private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; + private static boolean sleepBetweenColumnFamilies = false; + + public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, + HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + + public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, + HTableDescriptor htd, RegionServerServices rsServices) { + super(fs, wal, confParam, htd, rsServices); + } + + private static void columnFamilySleep() { + if (HeartbeatHRegion.sleepBetweenColumnFamilies) { + try { + Thread.sleep(HeartbeatHRegion.columnFamilySleepTime); + } catch (InterruptedException e) { + } + } + } + + private static void rowSleep() { + try { + if (HeartbeatHRegion.sleepBetweenRows) { + Thread.sleep(HeartbeatHRegion.rowSleepTime); + } + } catch (InterruptedException e) { + } + } + + // Instantiate the custom heartbeat region scanners + @Override + protected RegionScanner instantiateRegionScanner(Scan scan, + List additionalScanners) throws IOException { + if (scan.isReversed()) { + if (scan.getFilter() != null) { + scan.getFilter().setReversed(true); + } + return new HeartbeatReversedRegionScanner(scan, additionalScanners, this); + } + return new HeartbeatRegionScanner(scan, additionalScanners, this); + } + } + + /** + * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results + * and/or column family cells + */ + private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl { + HeartbeatReversedRegionScanner(Scan scan, List additionalScanners, + HRegion region) throws IOException { + super(scan, additionalScanners, region); + } + + @Override + public NextState nextRaw(List outResults, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { + NextState state = super.nextRaw(outResults, batchLimit, sizeLimit, timeLimit); + HeartbeatHRegion.rowSleep(); + return state; + } + + @Override + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) throws IOException { + this.storeHeap = new HeartbeatReversedKVHeap(scanners, region.getComparator()); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, region.getComparator()); + } + } + } + + /** + * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or + * column family cells + */ + private static class HeartbeatRegionScanner extends RegionScannerImpl { + HeartbeatRegionScanner(Scan scan, List additionalScanners, HRegion region) + throws IOException { + region.super(scan, additionalScanners, region); + } + + @Override + public NextState nextRaw(List outResults, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { + NextState state = super.nextRaw(outResults, batchLimit, sizeLimit, timeLimit); + HeartbeatHRegion.rowSleep(); + return state; + } + + @Override + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) throws IOException { + this.storeHeap = new HeartbeatKVHeap(scanners, region.getComparator()); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getComparator()); + } + } + } + + /** + * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family + * cells. Useful for testing + */ + private static final class HeartbeatKVHeap extends KeyValueHeap { + public HeartbeatKVHeap(List scanners, KVComparator comparator) + throws IOException { + super(scanners, comparator); + } + + HeartbeatKVHeap(List scanners, KVScannerComparator comparator) + throws IOException { + super(scanners, comparator); + } + + @Override + public NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { + if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); + NextState state = super.next(result, batchLimit, sizeLimit, timeLimit); + if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); + return state; + } + } + + /** + * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family + * cells. + */ + private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { + public HeartbeatReversedKVHeap(List scanners, + KVComparator comparator) throws IOException { + super(scanners, comparator); + } + + @Override + public NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { + if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); + NextState state = super.next(result, batchLimit, sizeLimit, timeLimit); + if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); + return state; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index 06bbd54..89107bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -246,13 +246,19 @@ public class TestStripeCompactor { return NextState.makeState(NextState.State.NO_MORE_VALUES); } } + + @Override + public NextState next(List result, int batchLimit) throws IOException { + return next(result); + } + @Override - public NextState next(List result, int limit) throws IOException { + public NextState next(List result, int batchLimit, long sizeLimit) throws IOException { return next(result); } @Override - public NextState next(List result, int limit, long remainingResultSize) + public NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) throws IOException { return next(result); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 3294f6d..3c74b55 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -786,12 +786,17 @@ public class TestStripeCompactionPolicy { } @Override - public NextState next(List result, int limit) throws IOException { + public NextState next(List result, int batchLimit) throws IOException { return next(result); } @Override - public NextState next(List result, int limit, long remainingResultSize) + public NextState next(List result, int batchLimit, long sizeLimit) throws IOException { + return next(result); + } + + @Override + public NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) throws IOException { return next(result); } -- 1.9.3 (Apple Git-50)