From 5fc8a6adedb799033fb1c271fff7b04cef8124b2 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Thu, 7 Apr 2016 13:06:28 +0800 Subject: [PATCH] HBASE-15593 Time limit of scanning should be offered by client --- .../hadoop/hbase/client/ScannerCallable.java | 2 +- .../hadoop/hbase/protobuf/RequestConverter.java | 3 +- .../hbase/protobuf/generated/ClientProtos.java | 262 +++++++++++++++------ hbase-protocol/src/main/protobuf/Client.proto | 3 + .../hadoop/hbase/regionserver/RSRpcServices.java | 15 +- .../regionserver/TestScannerHeartbeatMessages.java | 19 +- 6 files changed, 219 insertions(+), 85 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 72d69ec..300c695 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -214,7 +214,7 @@ public class ScannerCallable extends RegionServerCallable { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, - this.scanMetrics != null, renew); + this.scanMetrics != null, renew, callTimeout / 2); ScanResponse response = null; try { response = getStub().scan(controller, request); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 99e993d..c08206a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -502,7 +502,7 @@ public final class RequestConverter { */ public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics, - final boolean renew) { + final boolean renew, int timeLimit) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); @@ -512,6 +512,7 @@ public final class RequestConverter { builder.setClientHandlesHeartbeats(true); builder.setTrackScanMetrics(trackMetrics); builder.setRenew(renew); + builder.setTimeLimit(timeLimit); return builder.build(); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 4deab19..54936f9 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -17181,6 +17181,24 @@ public final class ClientProtos { * optional bool renew = 10 [default = false]; */ boolean getRenew(); + + // optional uint32 time_limit = 11; + /** + * optional uint32 time_limit = 11; + * + *
+     * We should use timeout setting from client because it may be different from server's.
+     * 
+ */ + boolean hasTimeLimit(); + /** + * optional uint32 time_limit = 11; + * + *
+     * We should use timeout setting from client because it may be different from server's.
+     * 
+ */ + int getTimeLimit(); } /** * Protobuf type {@code hbase.pb.ScanRequest} @@ -17312,6 +17330,11 @@ public final class ClientProtos { renew_ = input.readBool(); break; } + case 88: { + bitField0_ |= 0x00000400; + timeLimit_ = input.readUInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -17524,6 +17547,30 @@ public final class ClientProtos { return renew_; } + // optional uint32 time_limit = 11; + public static final int TIME_LIMIT_FIELD_NUMBER = 11; + private int timeLimit_; + /** + * optional uint32 time_limit = 11; + * + *
+     * We should use timeout setting from client because it may be different from server's.
+     * 
+ */ + public boolean hasTimeLimit() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional uint32 time_limit = 11; + * + *
+     * We should use timeout setting from client because it may be different from server's.
+     * 
+ */ + public int getTimeLimit() { + return timeLimit_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -17535,6 +17582,7 @@ public final class ClientProtos { clientHandlesHeartbeats_ = false; trackScanMetrics_ = false; renew_ = false; + timeLimit_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -17590,6 +17638,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000200) == 0x00000200)) { output.writeBool(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + output.writeUInt32(11, timeLimit_); + } getUnknownFields().writeTo(output); } @@ -17639,6 +17690,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(11, timeLimit_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -17712,6 +17767,11 @@ public final class ClientProtos { result = result && (getRenew() == other.getRenew()); } + result = result && (hasTimeLimit() == other.hasTimeLimit()); + if (hasTimeLimit()) { + result = result && (getTimeLimit() + == other.getTimeLimit()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -17765,6 +17825,10 @@ public final class ClientProtos { hash = (37 * hash) + RENEW_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getRenew()); } + if (hasTimeLimit()) { + hash = (37 * hash) + TIME_LIMIT_FIELD_NUMBER; + hash = (53 * hash) + getTimeLimit(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17917,6 +17981,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000100); renew_ = false; bitField0_ = (bitField0_ & ~0x00000200); + timeLimit_ = 0; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -17993,6 +18059,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000200; } result.renew_ = renew_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000400; + } + result.timeLimit_ = timeLimit_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18039,6 +18109,9 @@ public final class ClientProtos { if (other.hasRenew()) { setRenew(other.getRenew()); } + if (other.hasTimeLimit()) { + setTimeLimit(other.getTimeLimit()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -18576,6 +18649,55 @@ public final class ClientProtos { return this; } + // optional uint32 time_limit = 11; + private int timeLimit_ ; + /** + * optional uint32 time_limit = 11; + * + *
+       * We should use timeout setting from client because it may be different from server's.
+       * 
+ */ + public boolean hasTimeLimit() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional uint32 time_limit = 11; + * + *
+       * We should use timeout setting from client because it may be different from server's.
+       * 
+ */ + public int getTimeLimit() { + return timeLimit_; + } + /** + * optional uint32 time_limit = 11; + * + *
+       * We should use timeout setting from client because it may be different from server's.
+       * 
+ */ + public Builder setTimeLimit(int value) { + bitField0_ |= 0x00000400; + timeLimit_ = value; + onChanged(); + return this; + } + /** + * optional uint32 time_limit = 11; + * + *
+       * We should use timeout setting from client because it may be different from server's.
+       * 
+ */ + public Builder clearTimeLimit() { + bitField0_ = (bitField0_ & ~0x00000400); + timeLimit_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ScanRequest) } @@ -35553,81 +35675,81 @@ public final class ClientProtos { " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" + "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " + "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" + - "lumnFamilyTimeRange\"\246\002\n\013ScanRequest\022)\n\006r" + + "lumnFamilyTimeRange\"\272\002\n\013ScanRequest\022)\n\006r" + "egion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034", "\n\004scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_" + "id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclos" + "e_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037" + "\n\027client_handles_partials\030\007 \001(\010\022!\n\031clien" + "t_handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan" + - "_metrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\"\232\002\n" + - "\014ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r\022" + - "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" + - "\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase." + - "pb.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag", - "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" + - "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" + - "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" + - "s\"\305\001\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" + - "(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" + - "path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" + - "st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n" + - "\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(" + - "\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002" + - "(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(", - "\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003" + - " \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030CoprocessorServ" + - "iceResult\022&\n\005value\030\001 \001(\0132\027.hbase.pb.Name" + - "BytesPair\"v\n\031CoprocessorServiceRequest\022)" + - "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" + - "r\022.\n\004call\030\002 \002(\0132 .hbase.pb.CoprocessorSe" + - "rviceCall\"o\n\032CoprocessorServiceResponse\022" + - ")\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifi" + - "er\022&\n\005value\030\002 \002(\0132\027.hbase.pb.NameBytesPa" + - "ir\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r\022)\n\010mutation", - "\030\002 \001(\0132\027.hbase.pb.MutationProto\022\032\n\003get\030\003" + - " \001(\0132\r.hbase.pb.Get\0226\n\014service_call\030\004 \001(" + - "\0132 .hbase.pb.CoprocessorServiceCall\"k\n\014R" + - "egionAction\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R" + - "egionSpecifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action" + - "\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017RegionLoadSt" + - "ats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccu" + - "pancy\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001" + - "(\005:\0010\"j\n\024MultiRegionLoadStats\022)\n\006region\030" + - "\001 \003(\0132\031.hbase.pb.RegionSpecifier\022\'\n\004stat", - "\030\002 \003(\0132\031.hbase.pb.RegionLoadStats\"\336\001\n\021Re" + - "sultOrException\022\r\n\005index\030\001 \001(\r\022 \n\006result" + - "\030\002 \001(\0132\020.hbase.pb.Result\022*\n\texception\030\003 " + - "\001(\0132\027.hbase.pb.NameBytesPair\022:\n\016service_" + - "result\030\004 \001(\0132\".hbase.pb.CoprocessorServi" + - "ceResult\0220\n\tloadStats\030\005 \001(\0132\031.hbase.pb.R" + - "egionLoadStatsB\002\030\001\"x\n\022RegionActionResult" + - "\0226\n\021resultOrException\030\001 \003(\0132\033.hbase.pb.R" + - "esultOrException\022*\n\texception\030\002 \001(\0132\027.hb" + - "ase.pb.NameBytesPair\"x\n\014MultiRequest\022,\n\014", - "regionAction\030\001 \003(\0132\026.hbase.pb.RegionActi" + - "on\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tcondition\030\003 \001(" + - "\0132\023.hbase.pb.Condition\"\226\001\n\rMultiResponse" + - "\0228\n\022regionActionResult\030\001 \003(\0132\034.hbase.pb." + - "RegionActionResult\022\021\n\tprocessed\030\002 \001(\010\0228\n" + - "\020regionStatistics\030\003 \001(\0132\036.hbase.pb.Multi" + - "RegionLoadStats*\'\n\013Consistency\022\n\n\006STRONG" + - "\020\000\022\014\n\010TIMELINE\020\0012\203\004\n\rClientService\0222\n\003Ge" + - "t\022\024.hbase.pb.GetRequest\032\025.hbase.pb.GetRe" + - "sponse\022;\n\006Mutate\022\027.hbase.pb.MutateReques", - "t\032\030.hbase.pb.MutateResponse\0225\n\004Scan\022\025.hb" + - "ase.pb.ScanRequest\032\026.hbase.pb.ScanRespon" + - "se\022P\n\rBulkLoadHFile\022\036.hbase.pb.BulkLoadH" + - "FileRequest\032\037.hbase.pb.BulkLoadHFileResp" + - "onse\022X\n\013ExecService\022#.hbase.pb.Coprocess" + - "orServiceRequest\032$.hbase.pb.CoprocessorS" + - "erviceResponse\022d\n\027ExecRegionServerServic" + - "e\022#.hbase.pb.CoprocessorServiceRequest\032$" + - ".hbase.pb.CoprocessorServiceResponse\0228\n\005" + - "Multi\022\026.hbase.pb.MultiRequest\032\027.hbase.pb", - ".MultiResponseBB\n*org.apache.hadoop.hbas" + - "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" + - "\001\001" + "_metrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\022\022\n\n" + + "time_limit\030\013 \001(\r\"\232\002\n\014ScanResponse\022\030\n\020cel" + + "ls_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022" + + "\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007re" + + "sults\030\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006", + " \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(\010\022\036\n" + + "\026more_results_in_region\030\010 \001(\010\022\031\n\021heartbe" + + "at_message\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025" + + ".hbase.pb.ScanMetrics\"\305\001\n\024BulkLoadHFileR" + + "equest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Region" + + "Specifier\022>\n\013family_path\030\002 \003(\0132).hbase.p" + + "b.BulkLoadHFileRequest.FamilyPath\022\026\n\016ass" + + "ign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006famil" + + "y\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRe" + + "sponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorSer", + "viceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 " + + "\002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(" + + "\014\"B\n\030CoprocessorServiceResult\022&\n\005value\030\001" + + " \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031Coproce" + + "ssorServiceRequest\022)\n\006region\030\001 \002(\0132\031.hba" + + "se.pb.RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hb" + + "ase.pb.CoprocessorServiceCall\"o\n\032Coproce" + + "ssorServiceResponse\022)\n\006region\030\001 \002(\0132\031.hb" + + "ase.pb.RegionSpecifier\022&\n\005value\030\002 \002(\0132\027." + + "hbase.pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005ind", + "ex\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mu" + + "tationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\022" + + "6\n\014service_call\030\004 \001(\0132 .hbase.pb.Coproce" + + "ssorServiceCall\"k\n\014RegionAction\022)\n\006regio" + + "n\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022\016\n\006at" + + "omic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Ac" + + "tion\"c\n\017RegionLoadStats\022\027\n\014memstoreLoad\030" + + "\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022co" + + "mpactionPressure\030\003 \001(\005:\0010\"j\n\024MultiRegion" + + "LoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.Reg", + "ionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.Re" + + "gionLoadStats\"\336\001\n\021ResultOrException\022\r\n\005i" + + "ndex\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Re" + + "sult\022*\n\texception\030\003 \001(\0132\027.hbase.pb.NameB" + + "ytesPair\022:\n\016service_result\030\004 \001(\0132\".hbase" + + ".pb.CoprocessorServiceResult\0220\n\tloadStat" + + "s\030\005 \001(\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x" + + "\n\022RegionActionResult\0226\n\021resultOrExceptio" + + "n\030\001 \003(\0132\033.hbase.pb.ResultOrException\022*\n\t" + + "exception\030\002 \001(\0132\027.hbase.pb.NameBytesPair", + "\"x\n\014MultiRequest\022,\n\014regionAction\030\001 \003(\0132\026" + + ".hbase.pb.RegionAction\022\022\n\nnonceGroup\030\002 \001" + + "(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.Conditi" + + "on\"\226\001\n\rMultiResponse\0228\n\022regionActionResu" + + "lt\030\001 \003(\0132\034.hbase.pb.RegionActionResult\022\021" + + "\n\tprocessed\030\002 \001(\010\0228\n\020regionStatistics\030\003 " + + "\001(\0132\036.hbase.pb.MultiRegionLoadStats*\'\n\013C" + + "onsistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\203\004\n" + + "\rClientService\0222\n\003Get\022\024.hbase.pb.GetRequ" + + "est\032\025.hbase.pb.GetResponse\022;\n\006Mutate\022\027.h", + "base.pb.MutateRequest\032\030.hbase.pb.MutateR" + + "esponse\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026" + + ".hbase.pb.ScanResponse\022P\n\rBulkLoadHFile\022" + + "\036.hbase.pb.BulkLoadHFileRequest\032\037.hbase." + + "pb.BulkLoadHFileResponse\022X\n\013ExecService\022" + + "#.hbase.pb.CoprocessorServiceRequest\032$.h" + + "base.pb.CoprocessorServiceResponse\022d\n\027Ex" + + "ecRegionServerService\022#.hbase.pb.Coproce" + + "ssorServiceRequest\032$.hbase.pb.Coprocesso" + + "rServiceResponse\0228\n\005Multi\022\026.hbase.pb.Mul", + "tiRequest\032\027.hbase.pb.MultiResponseBB\n*or" + + "g.apache.hadoop.hbase.protobuf.generated" + + "B\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -35723,7 +35845,7 @@ public final class ClientProtos { internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", "TimeLimit", }); internal_static_hbase_pb_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 8a4d459..63d5442 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -278,6 +278,9 @@ message ScanRequest { optional bool client_handles_heartbeats = 8; optional bool track_scan_metrics = 9; optional bool renew = 10 [default = false]; + + // We should use timeout setting from client because it may be different from server's. + optional uint32 time_limit = 11; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2d27219..9bf032a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2703,10 +2703,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler, timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; } - // Use half of whichever timeout value was more restrictive... But don't allow - // the time limit to be less than the allowable minimum (could cause an - // immediatate timeout before scanning any data). - timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); + // Use half of whichever timeout value was more restrictive... + timeLimitDelta = timeLimitDelta / 2; + + if (request.hasTimeLimit() && request.getTimeLimit() > 0) { + // If ScanRequest contains a time limit and it is smaller than server's conf, + // we should use it to prevent responding heartbeat too late. + timeLimitDelta = Math.min(request.getTimeLimit(), timeLimitDelta); + } + // Don't allow the time limit to be less than the allowable minimum (could cause + // an immediate timeout before scanning any data). + timeLimitDelta = Math.max(timeLimitDelta, minimumScanTimeLimitDelta); timeLimit = System.currentTimeMillis() + timeLimitDelta; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index 1935c0a..21164da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -105,16 +106,16 @@ public class TestScannerHeartbeatMessages { private static int VALUE_SIZE = 128; private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static int SERVER_TIMEOUT = 2000; + // Time, in milliseconds, that the client will wait for a response from the server before timing // out. This value is used server side to determine when it is necessary to send a heartbeat // message to the client - private static int CLIENT_TIMEOUT = 2000; - - // The server limits itself to running for half of the CLIENT_TIMEOUT value. - private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2; + private static int CLIENT_TIMEOUT = SERVER_TIMEOUT / 3; // By default, at most one row's worth of cells will be retrieved before the time limit is reached - private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2; + private static int DEFAULT_ROW_SLEEP_TIME = CLIENT_TIMEOUT / 5; // By default, at most cells for two column families are retrieved before the time limit is // reached private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / NUM_FAMILIES; @@ -127,8 +128,8 @@ public class TestScannerHeartbeatMessages { conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); - conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // Check the timeout condition after every cell @@ -143,7 +144,7 @@ public class TestScannerHeartbeatMessages { Table ht = TEST_UTIL.createTable(name, families); List puts = createPuts(rows, families, qualifiers, cellValue); ht.put(puts); - + ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); return ht; } @@ -285,7 +286,7 @@ public class TestScannerHeartbeatMessages { @Override public ReturnCode filterKeyValue(Cell v) throws IOException { try { - Thread.sleep(SERVER_TIME_LIMIT + 10); + Thread.sleep(CLIENT_TIMEOUT/2 + 10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } -- 2.6.4 (Apple Git-63)