From 9416e6026f7124edab5fe7c40f3c1b03ab785168 Mon Sep 17 00:00:00 2001 From: Jonathan Lawlor Date: Tue, 10 Mar 2015 14:24:07 -0700 Subject: [PATCH] HBASE-13090:Progress heartbeats for long running scanners --- .../apache/hadoop/hbase/client/ClientScanner.java | 16 +- .../hadoop/hbase/client/ScannerCallable.java | 19 ++ .../hbase/client/ScannerCallableWithReplicas.java | 10 + .../hadoop/hbase/protobuf/RequestConverter.java | 3 + .../hbase/protobuf/generated/ClientProtos.java | 347 ++++++++++++++++--- hbase-protocol/src/main/protobuf/Client.proto | 7 + .../apache/hadoop/hbase/regionserver/HRegion.java | 129 ++++--- .../hadoop/hbase/regionserver/InternalScanner.java | 125 ++++--- .../hadoop/hbase/regionserver/KeyValueHeap.java | 47 ++- .../hadoop/hbase/regionserver/RSRpcServices.java | 56 ++- .../hadoop/hbase/regionserver/RegionScanner.java | 37 +- .../hadoop/hbase/regionserver/StoreScanner.java | 61 +++- .../coprocessor/TestCoprocessorInterface.java | 21 +- .../coprocessor/TestRegionObserverInterface.java | 14 +- .../regionserver/TestScannerHeartbeatMessages.java | 379 +++++++++++++++++++++ .../hbase/regionserver/TestStripeCompactor.java | 10 +- .../compactions/TestStripeCompactionPolicy.java | 10 +- 17 files changed, 1084 insertions(+), 207 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 9993974..7c4224f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -357,12 +357,18 @@ public class ClientScanner extends AbstractClientScanner { // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. boolean retryAfterOutOfOrderException = true; + + // This flag is set to indicate that the server only returned an RPC response to avoid + // a timeout (i.e. it did not return a response because the region/table was exhausted). + // If a heartbeat message is returned, the region on the server should NOT be changed + boolean isHeartbeatMessage = false; do { try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. values = call(callable, caller, scannerTimeout); + isHeartbeatMessage = callable.isHeartbeatMessage(); // When the replica switch happens, we need to do certain operations // again. The callable will openScanner with the right startkey @@ -446,6 +452,11 @@ public class ClientScanner extends AbstractClientScanner { this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext); } lastNext = currentTime; + + // If we received a heartbeat message from the server that didn't contain any Results, + // simply continue so that we can make another RPC to continue the scan server side + if (isHeartbeatMessage && (values == null || values.length == 0)) continue; + // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. @@ -465,8 +476,9 @@ public class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (remainingResultSize > 0 && countdown > 0 - && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))); + } while (remainingResultSize > 0 && countdown > 0 + && (!partialResults.isEmpty() || isHeartbeatMessage || possiblyNextScanner(countdown, + values == null))); } if (cache.size() > 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 6d5bb9e..f3a25eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -76,6 +76,12 @@ public class ScannerCallable extends RegionServerCallable { private int logCutOffLatency = 1000; private static String myAddress; protected final int id; + + /** + * Saves whether or not the most recent response from the server was a heartbeat message. + * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} + */ + protected boolean heartbeatMessage = false; static { try { myAddress = DNS.getDefaultHost("default", "default"); @@ -192,6 +198,8 @@ public class ScannerCallable extends RegionServerCallable { } else { Result [] rrs = null; ScanRequest request = null; + // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server + heartbeatMessage = false; try { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); @@ -212,6 +220,7 @@ public class ScannerCallable extends RegionServerCallable { // See HBASE-5974 nextCallSeq++; long timestamp = System.currentTimeMillis(); + heartbeatMessage = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response); @@ -281,6 +290,16 @@ public class ScannerCallable extends RegionServerCallable { return null; } + /** + * @return true when the most recent RPC response indicated that the response was a heartbeat + * message. Heartbeat messages are sent back from the server when the processing of the + * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid + * timeouts during long running scan operations. + */ + boolean isHeartbeatMessage() { + return heartbeatMessage; + } + private void incRPCcallsMetrics() { if (this.scanMetrics == null) { return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 16e6752..ebd31ae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -257,6 +257,16 @@ class ScannerCallableWithReplicas implements RetryingCallable { return replicaSwitched.get(); } + /** + * @return true when the most recent RPC response indicated that the response was a heartbeat + * message. Heartbeat messages are sent back from the server when the processing of the + * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid + * timeouts during long running scan operations. + */ + public boolean isHeartbeatMessage() { + return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); + } + private int addCallsForCurrentReplica( ResultBoundedCompletionService> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 02ad908..39996b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -487,6 +487,7 @@ public final class RequestConverter { builder.setRegion(region); builder.setScan(ProtobufUtil.toScan(scan)); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeatMessages(true); return builder.build(); } @@ -505,6 +506,7 @@ public final class RequestConverter { builder.setCloseScanner(closeScanner); builder.setScannerId(scannerId); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeatMessages(true); return builder.build(); } @@ -525,6 +527,7 @@ public final class RequestConverter { builder.setScannerId(scannerId); builder.setNextCallSeq(nextCallSeq); builder.setClientHandlesPartials(true); + builder.setClientHandlesHeartbeatMessages(true); return builder.build(); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 5bcba26..32e15ce 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -16433,6 +16433,16 @@ public final class ClientProtos { * optional bool client_handles_partials = 7; */ boolean getClientHandlesPartials(); + + // optional bool client_handles_heartbeat_messages = 8; + /** + * optional bool client_handles_heartbeat_messages = 8; + */ + boolean hasClientHandlesHeartbeatMessages(); + /** + * optional bool client_handles_heartbeat_messages = 8; + */ + boolean getClientHandlesHeartbeatMessages(); } /** * Protobuf type {@code ScanRequest} @@ -16549,6 +16559,11 @@ public final class ClientProtos { clientHandlesPartials_ = input.readBool(); break; } + case 64: { + bitField0_ |= 0x00000080; + clientHandlesHeartbeatMessages_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -16713,6 +16728,22 @@ public final class ClientProtos { return clientHandlesPartials_; } + // optional bool client_handles_heartbeat_messages = 8; + public static final int CLIENT_HANDLES_HEARTBEAT_MESSAGES_FIELD_NUMBER = 8; + private boolean clientHandlesHeartbeatMessages_; + /** + * optional bool client_handles_heartbeat_messages = 8; + */ + public boolean hasClientHandlesHeartbeatMessages() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool client_handles_heartbeat_messages = 8; + */ + public boolean getClientHandlesHeartbeatMessages() { + return clientHandlesHeartbeatMessages_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -16721,6 +16752,7 @@ public final class ClientProtos { closeScanner_ = false; nextCallSeq_ = 0L; clientHandlesPartials_ = false; + clientHandlesHeartbeatMessages_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -16767,6 +16799,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeBool(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBool(8, clientHandlesHeartbeatMessages_); + } getUnknownFields().writeTo(output); } @@ -16804,6 +16839,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(7, clientHandlesPartials_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, clientHandlesHeartbeatMessages_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -16862,6 +16901,11 @@ public final class ClientProtos { result = result && (getClientHandlesPartials() == other.getClientHandlesPartials()); } + result = result && (hasClientHandlesHeartbeatMessages() == other.hasClientHandlesHeartbeatMessages()); + if (hasClientHandlesHeartbeatMessages()) { + result = result && (getClientHandlesHeartbeatMessages() + == other.getClientHandlesHeartbeatMessages()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -16903,6 +16947,10 @@ public final class ClientProtos { hash = (37 * hash) + CLIENT_HANDLES_PARTIALS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getClientHandlesPartials()); } + if (hasClientHandlesHeartbeatMessages()) { + hash = (37 * hash) + CLIENT_HANDLES_HEARTBEAT_MESSAGES_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeatMessages()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17049,6 +17097,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000020); clientHandlesPartials_ = false; bitField0_ = (bitField0_ & ~0x00000040); + clientHandlesHeartbeatMessages_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -17113,6 +17163,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000040; } result.clientHandlesPartials_ = clientHandlesPartials_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.clientHandlesHeartbeatMessages_ = clientHandlesHeartbeatMessages_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -17150,6 +17204,9 @@ public final class ClientProtos { if (other.hasClientHandlesPartials()) { setClientHandlesPartials(other.getClientHandlesPartials()); } + if (other.hasClientHandlesHeartbeatMessages()) { + setClientHandlesHeartbeatMessages(other.getClientHandlesHeartbeatMessages()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -17588,6 +17645,39 @@ public final class ClientProtos { return this; } + // optional bool client_handles_heartbeat_messages = 8; + private boolean clientHandlesHeartbeatMessages_ ; + /** + * optional bool client_handles_heartbeat_messages = 8; + */ + public boolean hasClientHandlesHeartbeatMessages() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool client_handles_heartbeat_messages = 8; + */ + public boolean getClientHandlesHeartbeatMessages() { + return clientHandlesHeartbeatMessages_; + } + /** + * optional bool client_handles_heartbeat_messages = 8; + */ + public Builder setClientHandlesHeartbeatMessages(boolean value) { + bitField0_ |= 0x00000080; + clientHandlesHeartbeatMessages_ = value; + onChanged(); + return this; + } + /** + * optional bool client_handles_heartbeat_messages = 8; + */ + public Builder clearClientHandlesHeartbeatMessages() { + bitField0_ = (bitField0_ & ~0x00000080); + clientHandlesHeartbeatMessages_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanRequest) } @@ -17784,6 +17874,30 @@ public final class ClientProtos { * */ boolean getPartialFlagPerResult(int index); + + // optional bool heartbeat_message = 8; + /** + * optional bool heartbeat_message = 8; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + boolean hasHeartbeatMessage(); + /** + * optional bool heartbeat_message = 8; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + boolean getHeartbeatMessage(); } /** * Protobuf type {@code ScanResponse} @@ -17912,6 +18026,11 @@ public final class ClientProtos { input.popLimit(limit); break; } + case 64: { + bitField0_ |= 0x00000010; + heartbeatMessage_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -18197,6 +18316,36 @@ public final class ClientProtos { return partialFlagPerResult_.get(index); } + // optional bool heartbeat_message = 8; + public static final int HEARTBEAT_MESSAGE_FIELD_NUMBER = 8; + private boolean heartbeatMessage_; + /** + * optional bool heartbeat_message = 8; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + public boolean hasHeartbeatMessage() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bool heartbeat_message = 8; + * + *
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * 
+ */ + public boolean getHeartbeatMessage() { + return heartbeatMessage_; + } + private void initFields() { cellsPerResult_ = java.util.Collections.emptyList(); scannerId_ = 0L; @@ -18205,6 +18354,7 @@ public final class ClientProtos { results_ = java.util.Collections.emptyList(); stale_ = false; partialFlagPerResult_ = java.util.Collections.emptyList(); + heartbeatMessage_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -18239,6 +18389,9 @@ public final class ClientProtos { for (int i = 0; i < partialFlagPerResult_.size(); i++) { output.writeBool(7, partialFlagPerResult_.get(i)); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBool(8, heartbeatMessage_); + } getUnknownFields().writeTo(output); } @@ -18283,6 +18436,10 @@ public final class ClientProtos { size += dataSize; size += 1 * getPartialFlagPerResultList().size(); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, heartbeatMessage_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -18332,6 +18489,11 @@ public final class ClientProtos { } result = result && getPartialFlagPerResultList() .equals(other.getPartialFlagPerResultList()); + result = result && (hasHeartbeatMessage() == other.hasHeartbeatMessage()); + if (hasHeartbeatMessage()) { + result = result && (getHeartbeatMessage() + == other.getHeartbeatMessage()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -18373,6 +18535,10 @@ public final class ClientProtos { hash = (37 * hash) + PARTIAL_FLAG_PER_RESULT_FIELD_NUMBER; hash = (53 * hash) + getPartialFlagPerResultList().hashCode(); } + if (hasHeartbeatMessage()) { + hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getHeartbeatMessage()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -18507,6 +18673,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000020); partialFlagPerResult_ = java.util.Collections.emptyList(); bitField0_ = (bitField0_ & ~0x00000040); + heartbeatMessage_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -18570,6 +18738,10 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000040); } result.partialFlagPerResult_ = partialFlagPerResult_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000010; + } + result.heartbeatMessage_ = heartbeatMessage_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18644,6 +18816,9 @@ public final class ClientProtos { } onChanged(); } + if (other.hasHeartbeatMessage()) { + setHeartbeatMessage(other.getHeartbeatMessage()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -19423,6 +19598,67 @@ public final class ClientProtos { return this; } + // optional bool heartbeat_message = 8; + private boolean heartbeatMessage_ ; + /** + * optional bool heartbeat_message = 8; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public boolean hasHeartbeatMessage() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional bool heartbeat_message = 8; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public boolean getHeartbeatMessage() { + return heartbeatMessage_; + } + /** + * optional bool heartbeat_message = 8; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public Builder setHeartbeatMessage(boolean value) { + bitField0_ |= 0x00000080; + heartbeatMessage_ = value; + onChanged(); + return this; + } + /** + * optional bool heartbeat_message = 8; + * + *
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * 
+ */ + public Builder clearHeartbeatMessage() { + bitField0_ = (bitField0_ & ~0x00000080); + heartbeatMessage_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanResponse) } @@ -32552,62 +32788,63 @@ public final class ClientProtos { "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" + "eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" + "\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" + - "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", + "\"\352\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" + "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" + "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" + - "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\251\001\n\014S" + - "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" + - "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" + - "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" + - "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" + - "\030\007 \003(\010\"\263\001\n\024BulkLoadHFileRequest\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\0225\n\013family_path", - "\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyPath" + - "\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016" + - "\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoad" + - "HFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026Coproce" + - "ssorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_" + - "name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reque" + - "st\030\004 \002(\014\"9\n\030CoprocessorServiceResult\022\035\n\005" + - "value\030\001 \001(\0132\016.NameBytesPair\"d\n\031Coprocess" + - "orServiceRequest\022 \n\006region\030\001 \002(\0132\020.Regio" + - "nSpecifier\022%\n\004call\030\002 \002(\0132\027.CoprocessorSe", - "rviceCall\"]\n\032CoprocessorServiceResponse\022" + - " \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005val" + - "ue\030\002 \002(\0132\016.NameBytesPair\"{\n\006Action\022\r\n\005in" + - "dex\030\001 \001(\r\022 \n\010mutation\030\002 \001(\0132\016.MutationPr" + - "oto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004" + - " \001(\0132\027.CoprocessorServiceCall\"Y\n\014RegionA" + - "ction\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + - "\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action" + - "\"D\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(" + - "\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021Resul", - "tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " + - "\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" + - "tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" + - "essorServiceResult\022#\n\tloadStats\030\005 \001(\0132\020." + - "RegionLoadStats\"f\n\022RegionActionResult\022-\n" + - "\021resultOrException\030\001 \003(\0132\022.ResultOrExcep" + - "tion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" + - "f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\0132\r." + - "RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcond" + - "ition\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse", - "\022/\n\022regionActionResult\030\001 \003(\0132\023.RegionAct" + - "ionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consiste" + - "ncy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClient" + - "Service\022 \n\003Get\022\013.GetRequest\032\014.GetRespons" + - "e\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateResp" + - "onse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRespons" + - "e\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReques" + - "t\032\026.BulkLoadHFileResponse\022F\n\013ExecService" + - "\022\032.CoprocessorServiceRequest\032\033.Coprocess" + - "orServiceResponse\022R\n\027ExecRegionServerSer", - "vice\022\032.CoprocessorServiceRequest\032\033.Copro" + - "cessorServiceResponse\022&\n\005Multi\022\r.MultiRe" + - "quest\032\016.MultiResponseBB\n*org.apache.hado" + - "op.hbase.protobuf.generatedB\014ClientProto" + - "sH\001\210\001\001\240\001\001" + "\004\022\037\n\027client_handles_partials\030\007 \001(\010\022)\n!cl" + + "ient_handles_heartbeat_messages\030\010 \001(\010\"\304\001" + + "\n\014ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r" + + "\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001" + + "(\010\022\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Resul" + + "t\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_res" + + "ult\030\007 \003(\010\022\031\n\021heartbeat_message\030\010 \001(\010\"\263\001\n", + "\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020." + + "RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 .B" + + "ulkLoadHFileRequest.FamilyPath\022\026\n\016assign" + + "_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001" + + " \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespo" + + "nse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServic" + + "eCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t" + + "\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9" + + "\n\030CoprocessorServiceResult\022\035\n\005value\030\001 \001(" + + "\0132\016.NameBytesPair\"d\n\031CoprocessorServiceR", + "equest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" + + "\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCall\"" + + "]\n\032CoprocessorServiceResponse\022 \n\006region\030" + + "\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016" + + ".NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001(\r\022" + + " \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003get" + + "\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.Cop" + + "rocessorServiceCall\"Y\n\014RegionAction\022 \n\006r" + + "egion\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomic\030" + + "\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Region", + "LoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rhe" + + "apOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExcepti" + + "on\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Resu" + + "lt\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\0221\n" + + "\016service_result\030\004 \001(\0132\031.CoprocessorServi" + + "ceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLoad" + + "Stats\"f\n\022RegionActionResult\022-\n\021resultOrE" + + "xception\030\001 \003(\0132\022.ResultOrException\022!\n\tex" + + "ception\030\002 \001(\0132\016.NameBytesPair\"f\n\014MultiRe" + + "quest\022#\n\014regionAction\030\001 \003(\0132\r.RegionActi", + "on\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 \001(" + + "\0132\n.Condition\"S\n\rMultiResponse\022/\n\022region" + + "ActionResult\030\001 \003(\0132\023.RegionActionResult\022" + + "\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006STR" + + "ONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService\022 \n" + + "\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mutat" + + "e\022\016.MutateRequest\032\017.MutateResponse\022#\n\004Sc" + + "an\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBulkL" + + "oadHFile\022\025.BulkLoadHFileRequest\032\026.BulkLo" + + "adHFileResponse\022F\n\013ExecService\022\032.Coproce", + "ssorServiceRequest\032\033.CoprocessorServiceR" + + "esponse\022R\n\027ExecRegionServerService\022\032.Cop" + + "rocessorServiceRequest\032\033.CoprocessorServ" + + "iceResponse\022&\n\005Multi\022\r.MultiRequest\032\016.Mu" + + "ltiResponseBB\n*org.apache.hadoop.hbase.p" + + "rotobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -32703,13 +32940,13 @@ public final class ClientProtos { internal_static_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeatMessages", }); internal_static_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_ScanResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanResponse_descriptor, - new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", }); + new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "HeartbeatMessage", }); internal_static_BulkLoadHFileRequest_descriptor = getDescriptor().getMessageTypes().get(14); internal_static_BulkLoadHFileRequest_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 5142e53..161f2b1 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -275,6 +275,7 @@ message ScanRequest { optional bool close_scanner = 5; optional uint64 next_call_seq = 6; optional bool client_handles_partials = 7; + optional bool client_handles_heartbeat_messages = 8; } /** @@ -308,6 +309,12 @@ message ScanResponse { // has false, false, true in it, then we know that on the client side, we need to // make another RPC request since the last result was only a partial. repeated bool partial_flag_per_result = 7; + + // This field is filled in if the server is sending back a heartbeat message. + // Heartbeat messages are sent back to the client to prevent the scanner from + // timing out. Seeing a heartbeat message communicates to the Client that the + // server would have continued to scan had the time limit not been reached. + optional bool heartbeat_message = 8; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 71b5646..06dd39b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5489,12 +5489,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } @Override - public NextState next(List outResults, int limit) throws IOException { - return next(outResults, limit, -1); + public NextState next(List outResults, int batchLimit) throws IOException { + // -1 to indicate no size limit + return next(outResults, batchLimit, -1); } @Override - public synchronized NextState next(List outResults, int limit, long remainingResultSize) + public NextState next(List outResults, int batchLimit, long sizeLimit) throws IOException { + // -1 to indicate no time limit + return next(outResults, batchLimit, sizeLimit, -1); + } + + @Override + public synchronized NextState next(List outResults, int batchLimit, long sizeLimit, + long timeLimit) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + @@ -5504,7 +5512,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // startRegionOperation(Operation.SCAN); readRequestsCount.increment(); try { - return nextRaw(outResults, limit, remainingResultSize); + return nextRaw(outResults, batchLimit, sizeLimit, timeLimit); } finally { closeRegionOperation(Operation.SCAN); } @@ -5522,7 +5530,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } @Override - public NextState nextRaw(List outResults, int batchLimit, long remainingResultSize) + public NextState nextRaw(List outResults, int batchLimit, long sizeLimit) + throws IOException { + return nextRaw(outResults, batchLimit, sizeLimit, -1); + } + + @Override + public NextState nextRaw(List outResults, int batchLimit, long sizeLimit, long timeLimit) throws IOException { if (storeHeap == null) { // scanner is closed @@ -5532,10 +5546,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (outResults.isEmpty()) { // Usually outResults is empty. This is true when next is called // to handle scan or get operation. - state = nextInternal(outResults, batchLimit, remainingResultSize); + state = nextInternal(outResults, batchLimit, sizeLimit, timeLimit); } else { List tmpList = new ArrayList(); - state = nextInternal(tmpList, batchLimit, remainingResultSize); + state = nextInternal(tmpList, batchLimit, sizeLimit, timeLimit); outResults.addAll(tmpList); } // State should never be null, this is a precautionary measure @@ -5555,14 +5569,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @return the state the joinedHeap returned on the call to * {@link KeyValueHeap#next(List, int, long)} */ - private NextState populateFromJoinedHeap(List results, int limit, long resultSize) - throws IOException { + private NextState populateFromJoinedHeap(List results, int batchLimit, long sizeLimit, + long timeLimit) throws IOException { assert joinedContinuationRow != null; NextState state = - populateResult(results, this.joinedHeap, limit, resultSize, + populateResult(results, this.joinedHeap, batchLimit, sizeLimit, timeLimit, joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength()); - if (state != null && !state.batchLimitReached() && !state.sizeLimitReached()) { + if (state != null && !state.batchLimitReached() && !state.sizeLimitReached() + && !state.timeLimitReached()) { // We are done with this row, reset the continuation. joinedContinuationRow = null; } @@ -5576,38 +5591,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is * reached, or remainingResultSize (if not -1) is reaced * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. - * @param remainingResultSize The remaining space within our result size limit. A negative value - * indicate no limit * @param batchLimit Max amount of KVs to place in result list, -1 means no limit. + * @param sizeLimit limit on the size (in memory) of the results. A negative value indicate no + * limit + * @param timeLimit a future timestamp that the execution of this method must finish by. * @param currentRow Byte array with key we are fetching. * @param offset offset for currentRow * @param length length for currentRow * @return state of last call to {@link KeyValueHeap#next()} */ private NextState populateResult(List results, KeyValueHeap heap, int batchLimit, - long remainingResultSize, byte[] currentRow, int offset, short length) throws IOException { + long sizeLimit, long timeLimit, byte[] currentRow, int offset, short length) + throws IOException { Cell nextKv; boolean moreCellsInRow = false; long accumulatedResultSize = 0; List tmpResults = new ArrayList(); do { - int remainingBatchLimit = batchLimit - results.size(); - NextState heapState = - heap.next(tmpResults, remainingBatchLimit, remainingResultSize - accumulatedResultSize); + // Calculate the limits for this iteration + int heapBatchLimit = batchLimit - results.size(); + long heapSizeLimit = sizeLimit - accumulatedResultSize; + + NextState heapState = heap.next(tmpResults, heapBatchLimit, heapSizeLimit, timeLimit); + postHeapNext(tmpResults); + + // Update the accumulated result size results.addAll(tmpResults); accumulatedResultSize += calculateResultSize(tmpResults, heapState); tmpResults.clear(); + nextKv = heap.peek(); + moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); + + // Check the batch limit if (batchLimit > 0 && results.size() == batchLimit) { return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, accumulatedResultSize); } - nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); - boolean sizeLimitReached = - remainingResultSize > 0 && accumulatedResultSize >= remainingResultSize; - if (moreCellsInRow && sizeLimitReached) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, accumulatedResultSize); + // Check the size limit + boolean sizeLimitReached = sizeLimit > 0 && accumulatedResultSize >= sizeLimit; + if (sizeLimitReached) { + // More cells in row indicates whether or not we were cut short in the scan and had to + // return a partial Result. + return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, accumulatedResultSize, + moreCellsInRow); + } + + // Check the time limit + boolean timeLimitReached = timeLimitReached(timeLimit); + if (timeLimitReached) { + // More cells in row indicates whether or not we were cut short in the scan and had to + // return a partial Result. + return NextState.makeState(NextState.State.TIME_LIMIT_REACHED, accumulatedResultSize, + moreCellsInRow); } } while (moreCellsInRow); @@ -5619,6 +5655,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } /** + * A method exposed for testing that allows us to view the cells returned from heap.next inside + * {@link #populateResult(List, KeyValueHeap, int, long, long, byte[], int, short)} + */ + @VisibleForTesting + void postHeapNext(List results) { + // Do nothing... used for testing + } + + /** * Based on the nextKv in the heap, and the current row, decide whether or not there are more * cells to be read in the heap. If the row of the nextKv in the heap matches the current row * then there are more cells to be read in the row. @@ -5669,8 +5714,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.filter != null && this.filter.filterAllRemaining(); } - private NextState nextInternal(List results, int batchLimit, long remainingResultSize) - throws IOException { + private NextState nextInternal(List results, int batchLimit, long sizeLimit, + long timeLimit) throws IOException { if (!results.isEmpty()) { throw new IllegalArgumentException("First parameter should be an empty list"); } @@ -5715,13 +5760,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // If filter#hasFilterRow is true, partial results are not allowed since allowing them // would prevent the filters from being evaluated. Thus, if it is true, change the // remainingResultSize to -1 so that the entire row's worth of cells are fetched. - if (hasFilterRow && remainingResultSize > 0) { - remainingResultSize = -1; + if (hasFilterRow) { if (LOG.isTraceEnabled()) { - LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + - " formed. The remainingResultSize of: " + remainingResultSize + " will not " + - " be considered when fetching the cells for this row."); + LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + + " formed. Size and time limits will be ignored while fetching individual cells"); } + sizeLimit = -1; + timeLimit = -1; } NextState joinedHeapState; @@ -5746,7 +5791,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } NextState storeHeapState = - populateResult(results, this.storeHeap, batchLimit, remainingResultSize, currentRow, + populateResult(results, this.storeHeap, batchLimit, sizeLimit, timeLimit, currentRow, offset, length); resultSize += calculateResultSize(results, storeHeapState); // Invalid states should never be returned. If one is seen, throw exception @@ -5759,21 +5804,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (storeHeapState.batchLimitReached()) { if (hasFilterRow) { throw new IncompatibleFilterException( - "Filter whose hasFilterRow() returns true is incompatible with scan with limit!"); + "Filter whose hasFilterRow() returns true is incompatible with scans that have " + + "a batch limit!"); } // We hit the batch limit. return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, resultSize); - } else if (storeHeapState.sizeLimitReached()) { + } else if (storeHeapState.partialFormed()) { if (hasFilterRow) { - // We try to guard against this case above when remainingResultSize is set to -1 if - // hasFilterRow is true. In the even that the guard doesn't work, an exception must be - // thrown throw new IncompatibleFilterException( "Filter whose hasFilterRows() returns true is incompatible with scans that" + " return partial results"); } - // We hit the size limit. - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + + return NextState.makeState(storeHeapState.getState(), resultSize, true); } Cell nextKv = this.storeHeap.peek(); stopRow = nextKv == null || @@ -5815,7 +5858,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (mayHaveData) { joinedContinuationRow = current; joinedHeapState = - populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); + populateFromJoinedHeap(results, batchLimit, sizeLimit - resultSize, timeLimit); resultSize += joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? joinedHeapState.getResultSize() : 0; @@ -5827,7 +5870,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } else { // Populating from the joined heap was stopped by limits, populate some more. joinedHeapState = - populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); + populateFromJoinedHeap(results, batchLimit, sizeLimit - resultSize, timeLimit); resultSize += joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? joinedHeapState.getResultSize() : 0; @@ -5853,12 +5896,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // We are done. Return the result. if (stopRow) { return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); + } else if (timeLimitReached(timeLimit)) { + return NextState.makeState(NextState.State.TIME_LIMIT_REACHED, resultSize); } else { return NextState.makeState(NextState.State.MORE_VALUES, resultSize); } } } + private boolean timeLimitReached(long timeLimit) { + return timeLimit > 0 && System.currentTimeMillis() >= timeLimit; + } + /** * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines * both filterRow & filterRow(List kvs) functions. While 0.94 code or older, it may diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index e68dc75..aa0015f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -56,7 +56,8 @@ public interface InternalScanner extends Closeable { MORE_VALUES(true), NO_MORE_VALUES(false), SIZE_LIMIT_REACHED(true), - BATCH_LIMIT_REACHED(true); + BATCH_LIMIT_REACHED(true), + TIME_LIMIT_REACHED(true); private boolean moreValues; @@ -76,18 +77,24 @@ public interface InternalScanner extends Closeable { /** * state variables */ - private final State state; - private long resultSize; + private State state = DEFAULT_STATE; + private long resultSize = DEFAULT_RESULT_SIZE; + private boolean partialFormed = DEFAULT_PARTIAL_FORMED; /** - * Value to use for resultSize when the size has not been calculated. Must be a negative number - * so that {@link NextState#hasResultSizeEstimate()} returns false. + * Defaults for state variables */ private static final long DEFAULT_RESULT_SIZE = -1; + private static final State DEFAULT_STATE = State.NO_MORE_VALUES; + private static final boolean DEFAULT_PARTIAL_FORMED = false; - private NextState(State state, long resultSize) { + /** + * Private constructor. All instantiation should be performed through {@link #makeState(State)} + */ + private NextState(State state, long resultSize, boolean partialFormed) { this.state = state; this.resultSize = resultSize; + this.partialFormed = partialFormed; } /** @@ -108,60 +115,32 @@ public interface InternalScanner extends Closeable { * the result size by using the cached value retrievable via {@link #getResultSize()} */ public static NextState makeState(final State state, long resultSize) { + return makeState(state, resultSize, DEFAULT_PARTIAL_FORMED); + } + + /** + * @param state + * @param resultSize + * @param partialFormed + * @return An instance of {@link NextState} where the size of the values returned from the call + * to {@link InternalScanner#next(List)} is known. The caller can avoid recalculating + * the result size by using the cached value retrievable via {@link #getResultSize()} + */ + public static NextState makeState(final State state, long resultSize, boolean partialFormed) { switch (state) { case MORE_VALUES: - return createMoreValuesState(resultSize); case NO_MORE_VALUES: - return createNoMoreValuesState(resultSize); case BATCH_LIMIT_REACHED: - return createBatchLimitReachedState(resultSize); case SIZE_LIMIT_REACHED: - return createSizeLimitReachedState(resultSize); + case TIME_LIMIT_REACHED: + return new NextState(state, resultSize, partialFormed); default: // If the state is not recognized, default to no more value state - return createNoMoreValuesState(resultSize); + return new NextState(State.NO_MORE_VALUES, resultSize, partialFormed); } } /** - * Convenience method for creating a state that indicates that more values can be scanned - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createMoreValuesState(long resultSize) { - return new NextState(State.MORE_VALUES, resultSize); - } - - /** - * Convenience method for creating a state that indicates that no more values can be scanned. - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createNoMoreValuesState(long resultSize) { - return new NextState(State.NO_MORE_VALUES, resultSize); - } - - /** - * Convenience method for creating a state that indicates that the scan stopped because the - * batch limit was exceeded - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createBatchLimitReachedState(long resultSize) { - return new NextState(State.BATCH_LIMIT_REACHED, resultSize); - } - - /** - * Convenience method for creating a state that indicates that the scan stopped due to the size - * limit - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createSizeLimitReachedState(long resultSize) { - return new NextState(State.SIZE_LIMIT_REACHED, resultSize); - } - - /** * @return true when the scanner has more values to be scanned following the values returned by * the call to {@link InternalScanner#next(List)} */ @@ -182,6 +161,20 @@ public interface InternalScanner extends Closeable { public boolean sizeLimitReached() { return this.state == State.SIZE_LIMIT_REACHED; } + + /** + * @return true when the scanner had to stop scanning because the time limit was exceeded + */ + public boolean timeLimitReached() { + return this.state == State.TIME_LIMIT_REACHED; + } + + /** + * @return The underlying {@link State} of this {@link NextState} instance + */ + public State getState() { + return state; + } /** * @return The size (heap size) of the values that were returned from the call to @@ -193,6 +186,14 @@ public interface InternalScanner extends Closeable { } /** + * @return true when the scanner had to stop scanning in the middle of a row and thus a partial + * Result was formed. + */ + public boolean partialFormed() { + return this.partialFormed; + } + + /** * @return true when an estimate for the size of the values returned by * {@link InternalScanner#next(List)} was provided. If false, it is the responsibility * of the caller to calculate the result size @@ -231,24 +232,40 @@ public interface InternalScanner extends Closeable { /** * Grab the next row's worth of values with a limit on the number of values to return. * @param result return output array - * @param limit limit on row count to get + * @param batchLimit limit on the number of cells to fetch * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this * one, false if scanner is done * @throws IOException e */ - NextState next(List result, int limit) throws IOException; + NextState next(List result, int batchLimit) throws IOException; /** * Grab the next row's worth of values with a limit on the number of values to return as well as a * restriction on the size of the list of values that are returned. * @param result return output array - * @param limit limit on row count to get - * @param remainingResultSize limit on the size of the result being returned + * @param batchLimit limit on row count to get + * @param sizeLimit limit on the size of the result being returned * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this * one, false if scanner is done * @throws IOException e */ - NextState next(List result, int limit, long remainingResultSize) throws IOException; + NextState next(List result, int batchLimit, long sizeLimit) throws IOException; + + /** + * Grab the next row's values with a limit on the number of values to return as well as a + * restriction on the size of the list of values that are returned. + * @param result return output array + * @param batchLimit limit on the number of cells to fetch + * @param sizeLimit limit of the size of the result being returned + * @param timeLimit a timestamp in the future that the execution of this method must finish by. + * When the timeLimit is exceeded, the scanner will return whatever values it has + * accumulated (potentially empty). Negative timeLimits will indicate that no limit + * should be enforced + * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this + * one, false if scanner is done + * @throws IOException + */ + NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) throws IOException; /** * Closes the scanner and releases any resources it has allocated diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index beb23cf..8ebc8f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -128,27 +128,60 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * This can ONLY be called when you are using Scanners that implement InternalScanner as well as * KeyValueScanner (a {@link StoreScanner}). * @param result - * @param limit + * @param batchLimit * @return state where NextState#hasMoreValues() is true if more keys exist after this * one, false if scanner is done */ - public NextState next(List result, int limit) throws IOException { - return next(result, limit, -1); + public NextState next(List result, int batchLimit) throws IOException { + return next(result, batchLimit, -1); } - public NextState next(List result, int limit, long remainingResultSize) throws IOException { + /** + * Gets the next row of keys from the top-most scanner. + *

+ * This method takes care of updating the heap. + *

+ * This can ONLY be called when you are using Scanners that implement InternalScanner as well as + * KeyValueScanner (a {@link StoreScanner}). + * @param result + * @param batchLimit + * @param sizeLimit + * @return state where NextState#hasMoreValues() is true if more keys exist after this one, false + * if scanner is done + */ + public NextState next(List result, int batchLimit, long sizeLimit) throws IOException { + // -1 to indicate no timeLimit + return next(result, batchLimit, sizeLimit, -1); + } + + /** + * Gets the next row of keys from the top-most scanner. + *

+ * This method takes care of updating the heap. + *

+ * This can ONLY be called when you are using Scanners that implement InternalScanner as well as + * KeyValueScanner (a {@link StoreScanner}). + * @param result + * @param batchLimit + * @param sizeLimit + * @param timeLimit + * @return state where NextState#hasMoreValues() is true if more keys exist after this one, false + * if scanner is done + */ + public NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { if (this.current == null) { return NextState.makeState(NextState.State.NO_MORE_VALUES); } InternalScanner currentAsInternal = (InternalScanner)this.current; - NextState state = currentAsInternal.next(result, limit, remainingResultSize); + NextState state = currentAsInternal.next(result, batchLimit, sizeLimit, timeLimit); // Invalid states should never be returned. Receiving an invalid state means that we have // no clue how to proceed. Throw an exception. if (!NextState.isValidState(state)) { throw new IOException("Invalid state returned from InternalScanner#next"); } boolean mayContainMoreRows = NextState.hasMoreValues(state); - Cell pee = this.current.peek(); + Cell peek = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no * further rows to be fetched. So, we can close a scanner if it returns @@ -156,7 +189,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ - if (pee == null || !mayContainMoreRows) { + if (peek == null || !mayContainMoreRows) { this.current.close(); } else { this.heap.add(this.current); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3ce1b57..b7da877 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2151,6 +2151,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean stale = (region.getRegionInfo().getReplicaId() != 0); boolean clientHandlesPartials = request.hasClientHandlesPartials() && request.getClientHandlesPartials(); + boolean clientHandlesHeartbeats = + request.hasClientHandlesHeartbeatMessages() + && request.getClientHandlesHeartbeatMessages(); // On the server side we must ensure that the correct ordering of partial results is // returned to the client to allow them to properly reconstruct the partial results. @@ -2158,29 +2161,51 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // correct ordering of partial results and so we prevent partial results from being // formed. boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0; - boolean enforceMaxResultSizeAtCellLevel = + boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; + // Heartbeat messages occur when the processing of the ScanRequest is exceeds a + // certain time threshold on the server. When the time threshold is exceeded, the + // server stops the scan and sends back whatever Results it has accumulated within + // that time period (may be empty). Since heartbeat messages have the potential to + // create partial Results, we must only generate heartbeat messages when the client + // can handle both heartbeats AND partials + boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; + + // Default value of timeLimit is negative to indicate no timeLimit should be + // enforced. + long timeLimit = -1; + // If the scannerLeaseTimeoutPeriod is positive (which it always should be) then we + // can use that to define our timeLimit else we have no context for how long the + // timeLimit should be and thus do not enforce timeLimits + if (allowHeartbeatMessages && scannerLeaseTimeoutPeriod > 0) { + timeLimit = System.currentTimeMillis() + scannerLeaseTimeoutPeriod / 2; + } + while (i < rows) { // Stop collecting results if we have exceeded maxResultSize if (currentScanResultSize >= maxResultSize) { + if (LOG.isTraceEnabled()) { + LOG.trace("The scan maxResultSize: " + maxResultSize + " has been reached. " + + "The current scan result size is: " + currentScanResultSize); + } break; } - // A negative remainingResultSize communicates that there is no limit on the size + // A negative sizeLimit communicates that there is no limit on the size // of the results. - final long remainingResultSize = - enforceMaxResultSizeAtCellLevel ? maxResultSize - currentScanResultSize - : -1; + final long sizeLimit = + allowPartialResults ? maxResultSize - currentScanResultSize : -1; // Collect values to be returned here NextState state = - scanner.nextRaw(values, scanner.getBatch(), remainingResultSize); + scanner.nextRaw(values, scanner.getBatch(), sizeLimit, timeLimit); // Invalid states should never be returned. If one is seen, throw exception // to stop the scan -- We have no way of telling how we should proceed if (!NextState.isValidState(state)) { throw new IOException("NextState returned from call to nextRaw was invalid"); } + if (!values.isEmpty()) { // The state should always contain an estimate of the result size because that // estimate must be used to decide when partial results are formed. @@ -2195,16 +2220,25 @@ public class RSRpcServices implements HBaseRPCErrorHandler, currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell); } } - // The size limit was reached. This means there are more cells remaining in - // the row but we had to stop because we exceeded our max result size. This - // indicates that we are returning a partial result - final boolean partial = state != null && state.sizeLimitReached(); - results.add(Result.create(values, null, stale, partial)); + results.add(Result.create(values, null, stale, state.partialFormed())); i++; } + // The time limit has been reached. Mark the ScanResponse as a heartbeat message + // and break from the loop so that we can return the Results accumulated thus far + // to the client + if (timeLimit > 0 && System.currentTimeMillis() >= timeLimit) { + if (LOG.isTraceEnabled()) { + LOG.trace("The scan timeLimit: " + timeLimit + " has been reached." + + " Returning heartbeat message to client."); + } + builder.setHeartbeatMessage(true); + break; + } + if (!NextState.hasMoreValues(state)) { break; } + values.clear(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 26f9aef..fe2067a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -78,7 +78,7 @@ public interface RegionScanner extends InternalScanner { * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. * Caller must set the thread's readpoint, start and close a region operation, an synchronize on * the scanner object. Caller should maintain and update metrics. See - * {@link #nextRaw(List, int, long)} + * {@link #nextRaw(List, int, long, long)} * @param result return output array * @return a state where NextState#hasMoreValues() is true when more rows exist, false when * scanner is done. @@ -91,14 +91,31 @@ public interface RegionScanner extends InternalScanner { * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. * Caller must set the thread's readpoint, start and close a region operation, an synchronize on * the scanner object. Caller should maintain and update metrics. See - * {@link #nextRaw(List, int, long)} + * {@link #nextRaw(List, int, long, long)} * @param result return output array - * @param limit limit on row count to get + * @param batchLimit limit on cell count to get * @return a state where NextState#hasMoreValues() is true when more rows exist, false when * scanner is done. * @throws IOException e */ - NextState nextRaw(List result, int limit) throws IOException; + NextState nextRaw(List result, int batchLimit) throws IOException; + + /** + * Grab the next row's worth of values with the default limit on the number of values to return. + * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. + * Caller must set the thread's readpoint, start and close a region operation, an synchronize on + * the scanner object. Caller should maintain and update metrics. See + * {@link #nextRaw(List, int, long, long)} + * @param result return output array + * @param batchLimit limit on cell count to get + * @param sizeLimit limit on the size (in memory) of the results. Negative values indicate no + * limit + * @return a state where NextState#hasMoreValues() is true when more rows exist, false when + * scanner is done. + * @throws IOException e + */ + NextState nextRaw(List result, int batchLimit, final long sizeLimit) + throws IOException; /** * Grab the next row's worth of values with a limit on the number of values to return as well as a @@ -120,13 +137,17 @@ public interface RegionScanner extends InternalScanner { * } * * @param result return output array - * @param limit limit on row count to get - * @param remainingResultSize the space remaining within the restriction on the result size. - * Negative values indicate no limit + * @param batchLimit limit on row count to get + * @param sizeLimit limit on the size (in memory) of the results. Negative values indicate no + * limit + * @param timeLimit a timestamp in the future that the execution of this method must finish by. + * When the timeLimit is exceeded, the scanner will return whatever values it has + * accumulated (potentially empty). Negative timeLimits indicate that no limit should be + * enforced * @return a state where NextState#hasMoreValues() is true when more rows exist, false when * scanner is done. * @throws IOException e */ - NextState nextRaw(List result, int limit, final long remainingResultSize) + NextState nextRaw(List result, int batchLimit, long sizeLimit, long timeLimit) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7ce4e0b..b119829 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -445,24 +445,38 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner /** * Get the next row of values from this Store. * @param outResult - * @param limit + * @param batchLimit * @return true if there are more rows, false if scanner is done */ @Override - public NextState next(List outResult, int limit) throws IOException { - // -1 means no limit - return next(outResult, limit, -1); + public NextState next(List outResult, int batchLimit) throws IOException { + // -1 means no sizeLimit will be enforced + return next(outResult, batchLimit, -1); } /** * Get the next row of values from this Store. * @param outResult - * @param limit - * @param remainingResultSize + * @param batchLimit + * @param sizeLimit * @return true if there are more rows, false if scanner is done */ @Override - public NextState next(List outResult, int limit, long remainingResultSize) + public NextState next(List outResult, int batchLimit, long sizeLimit) throws IOException { + // -1 means no timeLimit will be enforced + return next(outResult, batchLimit, sizeLimit, -1); + } + + /** + * Get the next row of values from this Store. + * @param outResult + * @param batchLimit + * @param sizeLimit + * @param timeLimit + * @return true if there are more rows, false if scanner is done + */ + @Override + public NextState next(List outResult, int batchLimit, long sizeLimit, long timeLimit) throws IOException { lock.lock(); try { @@ -489,14 +503,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int offset = peeked.getRowOffset(); short length = peeked.getRowLength(); - // If limit < 0 and remainingResultSize < 0 we can skip the row comparison because we know - // the row has changed. Else it is possible we are still traversing the same row so we - // must perform the row comparison. - if ((limit < 0 && remainingResultSize < 0) || matcher.row == null - || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, - matcher.rowLength)) { - this.countPerRow = 0; - matcher.setRow(row, offset, length); + // If limits have not been specified (i.e. batchLimit, sizeLimit, and timeLimit are not set) + // we can skip the row comparison because we know the row has changed. Else it is possible we + // are still traversing the same row so we must perform the row comparison. + if ((batchLimit < 0 && sizeLimit < 0 && timeLimit < 0) || matcher.row == null + || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) { + this.countPerRow = 0; + matcher.setRow(row, offset, length); } Cell cell; @@ -510,6 +523,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner long totalHeapSize = 0; LOOP: while((cell = this.heap.peek()) != null) { + // Check the time limit + if (timeLimit > 0 && System.currentTimeMillis() >= timeLimit) { + if (LOG.isTraceEnabled()) { + LOG.trace("Time limit: " + timeLimit + " reached in StoreScanner"); + } + return NextState.makeState(NextState.State.TIME_LIMIT_REACHED, totalHeapSize); + } + if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. checkScanOrder(prevCell, cell, comparator); prevCell = cell; @@ -562,10 +583,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.heap.next(); } - if (limit > 0 && (count == limit)) { + if (batchLimit > 0 && (count == batchLimit)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Batch limit: " + batchLimit + " reached in StoreScanner"); + } break LOOP; } - if (remainingResultSize > 0 && (totalHeapSize >= remainingResultSize)) { + if (sizeLimit > 0 && (totalHeapSize >= sizeLimit)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Size limit: " + sizeLimit + " reached in StoreScanner"); + } break LOOP; } continue; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 75fe93d..fd1f8a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -90,14 +90,20 @@ public class TestCoprocessorInterface { } @Override - public NextState next(List result, int limit) throws IOException { - return delegate.next(result, limit); + public NextState next(List result, int batchLimit) throws IOException { + return delegate.next(result, batchLimit); } @Override - public NextState next(List result, int limit, long remainingResultSize) + public NextState next(List result, int batchLimit, long sizeLimit) throws IOException { - return delegate.next(result, limit, remainingResultSize); + return delegate.next(result, batchLimit, sizeLimit); + } + + @Override + public NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { + return delegate.next(result, batchLimit, sizeLimit, timeLimit); } @Override @@ -118,6 +124,12 @@ public class TestCoprocessorInterface { } @Override + public NextState nextRaw(List result, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { + return delegate.nextRaw(result, batchLimit, sizeLimit, timeLimit); + } + + @Override public void close() throws IOException { delegate.close(); } @@ -151,7 +163,6 @@ public class TestCoprocessorInterface { public int getBatch() { return delegate.getBatch(); } - } public static class CoprocessorImpl extends BaseRegionObserver { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index a4963ae..cd28752 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -438,18 +438,24 @@ public class TestRegionObserverInterface { } @Override - public NextState next(List results, int limit) throws IOException { - return next(results, limit, -1); + public NextState next(List results, int batchLimit) throws IOException { + return next(results, batchLimit, -1); } @Override - public NextState next(List results, int limit, long remainingResultSize) + public NextState next(List results, int batchLimit, long sizeLimit) + throws IOException { + return next(results, batchLimit, sizeLimit, -1); + } + + @Override + public NextState next(List results, int batchLimit, long sizeLimit, long timeLimit) throws IOException { List internalResults = new ArrayList(); boolean hasMore; NextState state; do { - state = scanner.next(internalResults, limit, remainingResultSize); + state = scanner.next(internalResults, batchLimit, sizeLimit, timeLimit); hasMore = state != null && state.hasMoreValues(); if (!internalResults.isEmpty()) { long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java new file mode 100644 index 0000000..735e22b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.AbstractRpcClient; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Here we test to make sure that scans return the expected Results when the server is sending the + * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent + * the scanner on the client side from timing out). A heartbeat message is sent from the server to + * the client when the server has exceeded the time limit during the processing of the scan. When + * the time limit is reached, the server will return to the Client whatever Results it has + * accumulated (potentially empty). + */ +@Category(MediumTests.class) +public class TestScannerHeartbeatMessages { + final Log LOG = LogFactory.getLog(getClass()); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static Table TABLE = null; + + /** + * Table configuration + */ + private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); + + private static int NUM_ROWS = 5; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + private static int NUM_FAMILIES = 10; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + private static int NUM_QUALIFIERS = 10; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + private static int VALUE_SIZE = 128; + private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static int CLIENT_TIMEOUT = 500; + + // The server limits itself to running for half of the CLIENT_TIMEOUT value. + private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2; + + private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2; + private static int DEFAULT_CF_SLEEP_TIME = SERVER_TIME_LIMIT; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + ((Log4JLogger) RpcServer.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + Configuration conf = TEST_UTIL.getConfiguration(); + + conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); + TEST_UTIL.startMiniCluster(1); + + TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + static Table createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + Table ht = TEST_UTIL.createTable(name, families); + List puts = createPuts(rows, families, qualifiers, cellValue); + ht.put(puts); + + return ht; + } + + /** + * Make puts to put the input value into each combination of row, family, and qualifier + * @param rows + * @param families + * @param qualifiers + * @param value + * @return + * @throws IOException + */ + static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + Put put; + ArrayList puts = new ArrayList<>(); + + for (int row = 0; row < rows.length; row++) { + put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + + return puts; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setupBeforeTest() throws Exception { + disableSleeping(); + } + + @After + public void teardownAfterTest() throws Exception { + disableSleeping(); + } + + /** + * Test a variety of scan configurations to ensure that they return the expected Results in the + * presence of heartbeat messages. These tests are accumulated under one test case to ensure that + * they don't run in parallel. If the tests ran in parallel, they may conflict with each other due + * to changes of the static variables in {@link HeartbeatHRegion} necessary to trigger timeouts + */ + @Test + public void testScannerHeartbeatMessages() throws Exception { + testHeartbeatBetweenRows(); + testHeartbeatBetweenColumnFamilies(); + } + + /** + * Test the case that the time limit for the scan is reached after each full row + * @throws Exception + */ + public void testHeartbeatBetweenRows() throws Exception { + // Configure the scan so that it can read the entire table in a single RPC. We want to test the + // case where a scan stops on the server side due to a time limit (not a caching or size limit) + Scan scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + + testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1); + } + + /** + * Test the case that the time limit for scans is reached in between column families + * @throws Exception + */ + public void testHeartbeatBetweenColumnFamilies() throws Exception { + // Configure the scan so that it can read the entire table in a single RPC. We want to test the + // case where a scan stops on the server side due to a time limit (not a caching or size limit) + Scan scan = new Scan(); + scan.setMaxResultSize(Long.MAX_VALUE); + scan.setCaching(Integer.MAX_VALUE); + + testEquivalenceOfScanWithHeartbeats(scan, -1, DEFAULT_CF_SLEEP_TIME); + } + + /** + * Test the equivalence of a scan versus the same scan executed in the presence of heartbeat + * messages + * @param table + * @param scan1 + * @param scan2 + * @throws Exception + */ + public void testEquivalenceOfScanWithHeartbeats(Scan scan, int rowSleepTime, int cfSleepTime) + throws Exception { + ResultScanner scanner = TABLE.getScanner(scan); + ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan); + + Result r1 = null; + Result r2 = null; + + while ((r1 = scanner.next()) != null) { + // Enforce the specified sleep conditions during calls to this scanner's next + configureSleepTime(rowSleepTime, cfSleepTime); + r2 = scannerWithHeartbeats.next(); + disableSleeping(); + + assertTrue(r2 != null); + try { + Result.compareResults(r1, r2); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + assertTrue(scannerWithHeartbeats.next() == null); + + scanner.close(); + scannerWithHeartbeats.close(); + } + + /** + * Helper method for setting the time to sleep between rows and column families. If a sleep time + * is negative then that sleep will be disabled + * @param rowSleepTime + * @param cfSleepTime + */ + private static void configureSleepTime(final int rowSleepTime, final int cfSleepTime) { + HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; + HeartbeatHRegion.rowSleepTime = rowSleepTime; + + HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0; + HeartbeatHRegion.columnFamilySleepTime = cfSleepTime; + } + + /** + * Disable the sleeping mechanism server side. + */ + private static void disableSleeping() { + HeartbeatHRegion.sleepBetweenRows = false; + HeartbeatHRegion.sleepBetweenColumnFamilies = false; + } + + /** + * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times + * between fetches of row Results and/or column family cells. Useful for emulating an instance + * where the server is taking a long time to process a client's scan request + */ + private static class HeartbeatHRegion extends HRegion { + // Row sleeps occur AFTER each row worth of cells is retrieved. + private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; + private static boolean sleepBetweenRows = false; + + // Column family sleeps occur AFTER all of the cells for a particular cf are retrieved + private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; + private static boolean sleepBetweenColumnFamilies = false; + + public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, + HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + + public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, + HTableDescriptor htd, RegionServerServices rsServices) { + super(fs, wal, confParam, htd, rsServices); + } + + // Instantiate the custom heartbeat region scanners + @Override + protected RegionScanner instantiateRegionScanner(Scan scan, + List additionalScanners) throws IOException { + if (scan.isReversed()) { + if (scan.getFilter() != null) { + scan.getFilter().setReversed(true); + } + return new HeartbeatReversedRegionScanner(scan, additionalScanners, this); + } + return new HeartbeatRegionScanner(scan, additionalScanners, this); + } + } + + /** + * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results + * and/or column family cells + */ + private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl { + HeartbeatReversedRegionScanner(Scan scan, List additionalScanners, + HRegion region) throws IOException { + super(scan, additionalScanners, region); + } + + @Override + public NextState nextRaw(List outResults, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { + NextState state = super.nextRaw(outResults, batchLimit, sizeLimit, timeLimit); + try { + if (HeartbeatHRegion.sleepBetweenRows) { + Thread.sleep(HeartbeatHRegion.rowSleepTime); + } + } catch (InterruptedException e) { + } + return state; + } + + @Override + void postHeapNext(List results) { + if (HeartbeatHRegion.sleepBetweenColumnFamilies) { + try { + Thread.sleep(HeartbeatHRegion.columnFamilySleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + super.postHeapNext(results); + } + } + + /** + * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or + * column family cells + */ + private static class HeartbeatRegionScanner extends RegionScannerImpl { + HeartbeatRegionScanner(Scan scan, List additionalScanners, HRegion region) + throws IOException { + region.super(scan, additionalScanners, region); + } + + @Override + public NextState nextRaw(List outResults, int batchLimit, long sizeLimit, long timeLimit) + throws IOException { + NextState state = super.nextRaw(outResults, batchLimit, sizeLimit, timeLimit); + try { + if (HeartbeatHRegion.sleepBetweenRows) { + Thread.sleep(HeartbeatHRegion.rowSleepTime); + } + } catch (InterruptedException e) { + } + return state; + } + + @Override + void postHeapNext(List results) { + if (HeartbeatHRegion.sleepBetweenColumnFamilies) { + try { + Thread.sleep(HeartbeatHRegion.columnFamilySleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + super.postHeapNext(results); + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index 06bbd54..2f2a9cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -247,12 +247,18 @@ public class TestStripeCompactor { } } @Override - public NextState next(List result, int limit) throws IOException { + public NextState next(List result, int batchLimit) throws IOException { return next(result); } @Override - public NextState next(List result, int limit, long remainingResultSize) + public NextState next(List result, int batchLimit, long sizeLimit) + throws IOException { + return next(result); + } + + @Override + public NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) throws IOException { return next(result); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 3294f6d..9aa8173 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -786,12 +786,18 @@ public class TestStripeCompactionPolicy { } @Override - public NextState next(List result, int limit) throws IOException { + public NextState next(List result, int batchLimit) throws IOException { return next(result); } @Override - public NextState next(List result, int limit, long remainingResultSize) + public NextState next(List result, int batchLimit, long sizeLimit) + throws IOException { + return next(result); + } + + @Override + public NextState next(List result, int batchLimit, long sizeLimit, long timeLimit) throws IOException { return next(result); } -- 1.9.3 (Apple Git-50)