.../java/org/apache/hadoop/hbase/client/Scan.java | 19 + .../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 6 + .../hbase/protobuf/generated/ClientProtos.java | 308 ++++++++++++---- hbase-protocol/src/main/protobuf/Client.proto | 1 + .../hadoop/hbase/ipc/BalancedQueueRpcExecutor.java | 12 +- .../apache/hadoop/hbase/ipc/CallRunnerWrapper.java | 54 +++ .../hbase/ipc/FairShareBalancedRPCExecutor.java | 58 +++ .../hadoop/hbase/ipc/FairShareBlockingQueue.java | 46 +++ .../ipc/FairSharePriorityBasedBlockingQueue.java | 56 +++ .../hbase/ipc/FairShareRWQueueRPCExecutor.java | 57 +++ .../java/org/apache/hadoop/hbase/ipc/RPCUtil.java | 53 +++ .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 21 +- .../hadoop/hbase/ipc/RoundRobinRPCScheduler.java | 195 +++++++++++ .../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 14 +- .../org/apache/hadoop/hbase/ipc/RpcScheduler.java | 16 + .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 76 +--- .../hadoop/hbase/regionserver/RSRpcServices.java | 1 + .../RoundRobinRPCSchedulerFactory.java | 54 +++ .../util/AbstractPriorityBasedRoundRobinQueue.java | 360 +++++++++++++++++++ .../hadoop/hbase/util/AbstractRoundRobinQueue.java | 390 +++++++++++++++++++++ .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 128 ++++++- .../util/TestAbstractRoundRobinPriorityQueue.java | 273 +++++++++++++++ .../hbase/util/TestAbstractRoundRobinQueue.java | 247 +++++++++++++ 23 files changed, 2278 insertions(+), 167 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 9d46bc7..2aef7d0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -107,6 +107,11 @@ public class Scan extends Query { private int storeLimit = -1; private int storeOffset = 0; private boolean getScan; + /** + * A group id set on the scan requests enables the RPC handlers to round-robin + * among the requests with the same group id + */ + private String groupingId; /** * @deprecated since 1.0.0. Use {@link #setScanMetricsEnabled(boolean)} @@ -230,6 +235,7 @@ public class Scan extends Query { reversed = scan.isReversed(); asyncPrefetch = scan.isAsyncPrefetch(); small = scan.isSmall(); + groupingId = scan.getGroupingId(); TimeRange ctr = scan.getTimeRange(); tr = new TimeRange(ctr.getMin(), ctr.getMax()); Map> fams = scan.getFamilyMap(); @@ -955,6 +961,19 @@ public class Scan extends Query { } /** + * A group id set on the scan requests enables the RPC handlers to round-robin among the requests + * with the same group id + * @param groupingId + */ + public void setGroupingId(String groupingId) { + this.groupingId = groupingId; + } + + public String getGroupingId() { + return this.groupingId; + } + + /** * @return Metrics on this Scan, if metrics were enabled. * @see #setScanMetricsEnabled(boolean) */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 1ffdfaf..330e3f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -932,6 +932,9 @@ public final class ProtobufUtil { if (scan.getCaching() > 0) { scanBuilder.setCaching(scan.getCaching()); } + if (scan.getGroupingId() != null) { + scanBuilder.setGroupingId(scan.getGroupingId()); + } return scanBuilder.build(); } @@ -1017,6 +1020,9 @@ public final class ProtobufUtil { if (proto.hasCaching()) { scan.setCaching(proto.getCaching()); } + if (proto.hasGroupingId()) { + scan.setGroupingId(proto.getGroupingId()); + } return scan; } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index c4b1eec..7ec9899 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -13682,6 +13682,21 @@ public final class ClientProtos { * optional uint32 caching = 17; */ int getCaching(); + + // optional string groupingId = 18; + /** + * optional string groupingId = 18; + */ + boolean hasGroupingId(); + /** + * optional string groupingId = 18; + */ + java.lang.String getGroupingId(); + /** + * optional string groupingId = 18; + */ + com.google.protobuf.ByteString + getGroupingIdBytes(); } /** * Protobuf type {@code hbase.pb.Scan} @@ -13858,6 +13873,11 @@ public final class ClientProtos { caching_ = input.readUInt32(); break; } + case 146: { + bitField0_ |= 0x00008000; + groupingId_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -14236,6 +14256,49 @@ public final class ClientProtos { return caching_; } + // optional string groupingId = 18; + public static final int GROUPINGID_FIELD_NUMBER = 18; + private java.lang.Object groupingId_; + /** + * optional string groupingId = 18; + */ + public boolean hasGroupingId() { + return ((bitField0_ & 0x00008000) == 0x00008000); + } + /** + * optional string groupingId = 18; + */ + public java.lang.String getGroupingId() { + java.lang.Object ref = groupingId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + groupingId_ = s; + } + return s; + } + } + /** + * optional string groupingId = 18; + */ + public com.google.protobuf.ByteString + getGroupingIdBytes() { + java.lang.Object ref = groupingId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + groupingId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { column_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList(); @@ -14254,6 +14317,7 @@ public final class ClientProtos { reversed_ = false; consistency_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Consistency.STRONG; caching_ = 0; + groupingId_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -14336,6 +14400,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00004000) == 0x00004000)) { output.writeUInt32(17, caching_); } + if (((bitField0_ & 0x00008000) == 0x00008000)) { + output.writeBytes(18, getGroupingIdBytes()); + } getUnknownFields().writeTo(output); } @@ -14413,6 +14480,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(17, caching_); } + if (((bitField0_ & 0x00008000) == 0x00008000)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(18, getGroupingIdBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -14515,6 +14586,11 @@ public final class ClientProtos { result = result && (getCaching() == other.getCaching()); } + result = result && (hasGroupingId() == other.hasGroupingId()); + if (hasGroupingId()) { + result = result && getGroupingId() + .equals(other.getGroupingId()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -14596,6 +14672,10 @@ public final class ClientProtos { hash = (37 * hash) + CACHING_FIELD_NUMBER; hash = (53 * hash) + getCaching(); } + if (hasGroupingId()) { + hash = (37 * hash) + GROUPINGID_FIELD_NUMBER; + hash = (53 * hash) + getGroupingId().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -14770,6 +14850,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00008000); caching_ = 0; bitField0_ = (bitField0_ & ~0x00010000); + groupingId_ = ""; + bitField0_ = (bitField0_ & ~0x00020000); return this; } @@ -14884,6 +14966,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00004000; } result.caching_ = caching_; + if (((from_bitField0_ & 0x00020000) == 0x00020000)) { + to_bitField0_ |= 0x00008000; + } + result.groupingId_ = groupingId_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -14997,6 +15083,11 @@ public final class ClientProtos { if (other.hasCaching()) { setCaching(other.getCaching()); } + if (other.hasGroupingId()) { + bitField0_ |= 0x00020000; + groupingId_ = other.groupingId_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -16210,6 +16301,80 @@ public final class ClientProtos { return this; } + // optional string groupingId = 18; + private java.lang.Object groupingId_ = ""; + /** + * optional string groupingId = 18; + */ + public boolean hasGroupingId() { + return ((bitField0_ & 0x00020000) == 0x00020000); + } + /** + * optional string groupingId = 18; + */ + public java.lang.String getGroupingId() { + java.lang.Object ref = groupingId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + groupingId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string groupingId = 18; + */ + public com.google.protobuf.ByteString + getGroupingIdBytes() { + java.lang.Object ref = groupingId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + groupingId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string groupingId = 18; + */ + public Builder setGroupingId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00020000; + groupingId_ = value; + onChanged(); + return this; + } + /** + * optional string groupingId = 18; + */ + public Builder clearGroupingId() { + bitField0_ = (bitField0_ & ~0x00020000); + groupingId_ = getDefaultInstance().getGroupingId(); + onChanged(); + return this; + } + /** + * optional string groupingId = 18; + */ + public Builder setGroupingIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00020000; + groupingId_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.Scan) } @@ -33168,7 +33333,7 @@ public final class ClientProtos { "b.MutationProto\022&\n\tcondition\030\003 \001(\0132\023.hba", "se.pb.Condition\022\023\n\013nonce_group\030\004 \001(\004\"E\n\016" + "MutateResponse\022 \n\006result\030\001 \001(\0132\020.hbase.p" + - "b.Result\022\021\n\tprocessed\030\002 \001(\010\"\346\003\n\004Scan\022 \n\006" + + "b.Result\022\021\n\tprocessed\030\002 \001(\010\"\372\003\n\004Scan\022 \n\006" + "column\030\001 \003(\0132\020.hbase.pb.Column\022*\n\tattrib" + "ute\030\002 \003(\0132\027.hbase.pb.NameBytesPair\022\021\n\tst" + "art_row\030\003 \001(\014\022\020\n\010stop_row\030\004 \001(\014\022 \n\006filte" + @@ -33180,75 +33345,76 @@ public final class ClientProtos { "t\030\014 \001(\r\022&\n\036load_column_families_on_deman" + "d\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010reversed\030\017 \001(\010" + ":\005false\0222\n\013consistency\030\020 \001(\0162\025.hbase.pb." + - "Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r\"\220\002\n" + - "\013ScanRequest\022)\n\006region\030\001 \001(\0132\031.hbase.pb." + - "RegionSpecifier\022\034\n\004scan\030\002 \001(\0132\016.hbase.pb" + - ".Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_of_r" + - "ows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rnext" + - "_call_seq\030\006 \001(\004\022\037\n\027client_handles_partia", - "ls\030\007 \001(\010\022!\n\031client_handles_heartbeats\030\010 " + - "\001(\010\022\032\n\022track_scan_metrics\030\t \001(\010\"\232\002\n\014Scan" + - "Response\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nsc" + - "anner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003" + - "ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase.pb.Re" + - "sult\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_" + - "result\030\007 \003(\010\022\036\n\026more_results_in_region\030\010" + - " \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014scan_" + - "metrics\030\n \001(\0132\025.hbase.pb.ScanMetrics\"\305\001\n" + - "\024BulkLoadHFileRequest\022)\n\006region\030\001 \002(\0132\031.", - "hbase.pb.RegionSpecifier\022>\n\013family_path\030" + - "\002 \003(\0132).hbase.pb.BulkLoadHFileRequest.Fa" + - "milyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFami" + - "lyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025" + - "BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n" + - "\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014" + - "service_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022" + - "\017\n\007request\030\004 \002(\014\"B\n\030CoprocessorServiceRe" + - "sult\022&\n\005value\030\001 \001(\0132\027.hbase.pb.NameBytes" + - "Pair\"v\n\031CoprocessorServiceRequest\022)\n\006reg", - "ion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022.\n\004" + - "call\030\002 \002(\0132 .hbase.pb.CoprocessorService" + - "Call\"o\n\032CoprocessorServiceResponse\022)\n\006re" + - "gion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022&\n" + - "\005value\030\002 \002(\0132\027.hbase.pb.NameBytesPair\"\226\001" + - "\n\006Action\022\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(" + - "\0132\027.hbase.pb.MutationProto\022\032\n\003get\030\003 \001(\0132" + - "\r.hbase.pb.Get\0226\n\014service_call\030\004 \001(\0132 .h" + - "base.pb.CoprocessorServiceCall\"k\n\014Region" + - "Action\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Region", - "Specifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(" + - "\0132\020.hbase.pb.Action\"D\n\017RegionLoadStats\022\027" + - "\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy" + - "\030\002 \001(\005:\0010\"\332\001\n\021ResultOrException\022\r\n\005index" + - "\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result" + - "\022*\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytes" + - "Pair\022:\n\016service_result\030\004 \001(\0132\".hbase.pb." + - "CoprocessorServiceResult\022,\n\tloadStats\030\005 " + - "\001(\0132\031.hbase.pb.RegionLoadStats\"x\n\022Region" + - "ActionResult\0226\n\021resultOrException\030\001 \003(\0132", - "\033.hbase.pb.ResultOrException\022*\n\texceptio" + - "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" + - "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" + - "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" + - "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\\\n\rMu" + - "ltiResponse\0228\n\022regionActionResult\030\001 \003(\0132" + - "\034.hbase.pb.RegionActionResult\022\021\n\tprocess" + - "ed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010T" + - "IMELINE\020\0012\203\004\n\rClientService\0222\n\003Get\022\024.hba" + - "se.pb.GetRequest\032\025.hbase.pb.GetResponse\022", - ";\n\006Mutate\022\027.hbase.pb.MutateRequest\032\030.hba" + - "se.pb.MutateResponse\0225\n\004Scan\022\025.hbase.pb." + - "ScanRequest\032\026.hbase.pb.ScanResponse\022P\n\rB" + - "ulkLoadHFile\022\036.hbase.pb.BulkLoadHFileReq" + - "uest\032\037.hbase.pb.BulkLoadHFileResponse\022X\n" + - "\013ExecService\022#.hbase.pb.CoprocessorServi" + - "ceRequest\032$.hbase.pb.CoprocessorServiceR" + - "esponse\022d\n\027ExecRegionServerService\022#.hba" + - "se.pb.CoprocessorServiceRequest\032$.hbase." + - "pb.CoprocessorServiceResponse\0228\n\005Multi\022\026", - ".hbase.pb.MultiRequest\032\027.hbase.pb.MultiR" + - "esponseBB\n*org.apache.hadoop.hbase.proto" + - "buf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r\022\022\n\n" + + "groupingId\030\022 \001(\t\"\220\002\n\013ScanRequest\022)\n\006regi" + + "on\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034\n\004s" + + "can\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_id\030" + + "\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_s" + + "canner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027c", + "lient_handles_partials\030\007 \001(\010\022!\n\031client_h" + + "andles_heartbeats\030\010 \001(\010\022\032\n\022track_scan_me" + + "trics\030\t \001(\010\"\232\002\n\014ScanResponse\022\030\n\020cells_pe" + + "r_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mo" + + "re_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results" + + "\030\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022" + + "\037\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more" + + "_results_in_region\030\010 \001(\010\022\031\n\021heartbeat_me" + + "ssage\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbas" + + "e.pb.ScanMetrics\"\305\001\n\024BulkLoadHFileReques", + "t\022)\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpeci" + + "fier\022>\n\013family_path\030\002 \003(\0132).hbase.pb.Bul" + + "kLoadHFileRequest.FamilyPath\022\026\n\016assign_s" + + "eq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002" + + "(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespons" + + "e\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServiceC" + + "all\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023" + + "\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030" + + "CoprocessorServiceResult\022&\n\005value\030\001 \001(\0132" + + "\027.hbase.pb.NameBytesPair\"v\n\031CoprocessorS", + "erviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb" + + ".RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.p" + + "b.CoprocessorServiceCall\"o\n\032CoprocessorS" + + "erviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.p" + + "b.RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase" + + ".pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 " + + "\001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutatio" + + "nProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014se" + + "rvice_call\030\004 \001(\0132 .hbase.pb.CoprocessorS" + + "erviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002", + "(\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030" + + "\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"" + + "D\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005" + + ":\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\"\332\001\n\021Result" + + "OrException\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001" + + "(\0132\020.hbase.pb.Result\022*\n\texception\030\003 \001(\0132" + + "\027.hbase.pb.NameBytesPair\022:\n\016service_resu" + + "lt\030\004 \001(\0132\".hbase.pb.CoprocessorServiceRe" + + "sult\022,\n\tloadStats\030\005 \001(\0132\031.hbase.pb.Regio" + + "nLoadStats\"x\n\022RegionActionResult\0226\n\021resu", + "ltOrException\030\001 \003(\0132\033.hbase.pb.ResultOrE" + + "xception\022*\n\texception\030\002 \001(\0132\027.hbase.pb.N" + + "ameBytesPair\"x\n\014MultiRequest\022,\n\014regionAc" + + "tion\030\001 \003(\0132\026.hbase.pb.RegionAction\022\022\n\nno" + + "nceGroup\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbas" + + "e.pb.Condition\"\\\n\rMultiResponse\0228\n\022regio" + + "nActionResult\030\001 \003(\0132\034.hbase.pb.RegionAct" + + "ionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consiste" + + "ncy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\203\004\n\rClient" + + "Service\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.h", + "base.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb" + + ".MutateRequest\032\030.hbase.pb.MutateResponse" + + "\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase." + + "pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase" + + ".pb.BulkLoadHFileRequest\032\037.hbase.pb.Bulk" + + "LoadHFileResponse\022X\n\013ExecService\022#.hbase" + + ".pb.CoprocessorServiceRequest\032$.hbase.pb" + + ".CoprocessorServiceResponse\022d\n\027ExecRegio" + + "nServerService\022#.hbase.pb.CoprocessorSer" + + "viceRequest\032$.hbase.pb.CoprocessorServic", + "eResponse\0228\n\005Multi\022\026.hbase.pb.MultiReque" + + "st\032\027.hbase.pb.MultiResponseBB\n*org.apach" + + "e.hadoop.hbase.protobuf.generatedB\014Clien" + + "tProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -33338,7 +33504,7 @@ public final class ClientProtos { internal_static_hbase_pb_Scan_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", }); + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "GroupingId", }); internal_static_hbase_pb_ScanRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 101854d..e55e5a5 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -251,6 +251,7 @@ message Scan { optional bool reversed = 15 [default = false]; optional Consistency consistency = 16 [default = STRONG]; optional uint32 caching = 17; + optional string groupingId = 18; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 56424df..017891d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; @InterfaceStability.Evolving public class BalancedQueueRpcExecutor extends RpcExecutor { - protected final List> queues; + private final List> queues; private final QueueBalancer balancer; public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, @@ -59,7 +59,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { final Configuration conf, final Abortable abortable, final Class queueClass, Object... initargs) { super(name, Math.max(handlerCount, numQueues), conf, abortable); - queues = new ArrayList>(numQueues); + queues = new ArrayList>(numQueues); this.balancer = getBalancer(numQueues); initializeQueues(numQueues, queueClass, initargs); } @@ -67,12 +67,12 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { protected void initializeQueues(final int numQueues, final Class queueClass, Object... initargs) { for (int i = 0; i < numQueues; ++i) { - queues.add((BlockingQueue) ReflectionUtils.newInstance(queueClass, initargs)); + queues.add((BlockingQueue) ReflectionUtils.newInstance(queueClass, initargs)); } } @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { + public void dispatch(final CallRunnerWrapper callTask) throws InterruptedException { int queueIndex = balancer.getNextQueue(); queues.get(queueIndex).put(callTask); } @@ -80,14 +80,14 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { @Override public int getQueueLength() { int length = 0; - for (final BlockingQueue queue : queues) { + for (final BlockingQueue queue : queues) { length += queue.size(); } return length; } @Override - public List> getQueues() { + public List> getQueues() { return queues; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunnerWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunnerWrapper.java new file mode 100644 index 0000000..48355ad --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunnerWrapper.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A simple wrapper over the CallRunner that explicitly calculates the deadline of the call + * underlying the CallRunner, using the {@link PriorityFunction} + * + */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class CallRunnerWrapper { + private CallRunner o; + private long deadLine; + + public CallRunnerWrapper(CallRunner o, PriorityFunction priority, int maxDelay) { + this.o = o; + this.deadLine = calcDeadLine(o, priority, maxDelay); + } + + CallRunner getCallRunner() { + return this.o; + } + + long getDeadLine() { + return this.deadLine; + } + + private long calcDeadLine(CallRunner o, PriorityFunction priority, int maxDelay) { + RpcServer.Call call = o.getCall(); + long deadline = priority.getDeadline(call.getHeader(), call.param); + deadline = call.timestamp + Math.min(deadline, maxDelay); + return deadline; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java new file mode 100644 index 0000000..6feaa34 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.BlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * An {@link RpcExecutor} that will balance requests in a round robin way across + * all its queues based on unique group Id set on these incoming requests + */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class FairShareBalancedRPCExecutor extends BalancedQueueRpcExecutor { + + public FairShareBalancedRPCExecutor(final String name, final int handlerCount, + final int numQueues, final int maxQueueLength) { + this(name, handlerCount, numQueues, maxQueueLength, null, null); + } + + public FairShareBalancedRPCExecutor(final String name, final int handlerCount, + final int numQueues, final int maxQueueLength, final Configuration conf, + final Abortable abortable) { + this(name, handlerCount, numQueues, conf, abortable, FairShareBlockingQueue.class, + maxQueueLength); + } + + public FairShareBalancedRPCExecutor(final String name, final int handlerCount, + final int numQueues, final Class queueClass, Object... initargs) { + this(name, handlerCount, numQueues, null, null, queueClass, initargs); + } + + public FairShareBalancedRPCExecutor(final String name, final int handlerCount, + final int numQueues, final Configuration conf, final Abortable abortable, + final Class queueClass, Object... initargs) { + super(name, handlerCount, numQueues, conf, abortable, queueClass, initargs); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java new file mode 100644 index 0000000..9571b9c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.AbstractRoundRobinQueue; + +/** + * An implementation of {@link AbstractRoundRobinQueue} that tries + * to iterate through each producer queue in a round robin fashion based on the + * Grouping Id set in the incoming request + */ +@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class FairShareBlockingQueue extends AbstractRoundRobinQueue { + + public FairShareBlockingQueue(int maxSize) { + super(true, maxSize); + } + + public FairShareBlockingQueue(boolean newProducerToFront, int maxSize) { + super(newProducerToFront, maxSize); + } + + @Override + protected Object extractProducerId(CallRunnerWrapper o) { + return RPCUtil.extractGroupId(o.getCallRunner()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java new file mode 100644 index 0000000..6d77199 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.AbstractPriorityBasedRoundRobinQueue; + + +/** + * An implementation of {@link AbstractPriorityBasedRoundRobinQueue} that tries + * to iterate through each producer queue in a round robin fashion based on the + * Grouping Id set in the incoming request + */ +@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class FairSharePriorityBasedBlockingQueue extends + AbstractPriorityBasedRoundRobinQueue { + + + public FairSharePriorityBasedBlockingQueue(int maxSize) { + super(maxSize); + } + + // Unused + public FairSharePriorityBasedBlockingQueue(boolean newProducerToFront, int maxSize) { + super(maxSize); + } + + @Override + protected Object extractProducerId(CallRunnerWrapper o) { + return RPCUtil.extractGroupId(o.getCallRunner()); + } + + @Override + protected long extractPriority(CallRunnerWrapper o) { + return o.getDeadLine(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java new file mode 100644 index 0000000..5549c45 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java @@ -0,0 +1,57 @@ +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A {@link RWQueueRpcExecutor} extension that creates a queue based on + * {@link FairSharePriorityBasedBlockingQueue} which round-robins among the + * parallel requests based on unique grouping ID set on these requests + */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class FairShareRWQueueRPCExecutor extends RWQueueRpcExecutor { + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, + final int numQueues, final float readShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable) { + this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, conf, abortable, + FairShareBlockingQueue.class); + } + + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, + final int numQueues, final float readShare, final float scanShare, final int maxQueueLength) { + this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null); + } + + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, + final int numQueues, final float readShare, final float scanShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable) { + this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, conf, abortable, + FairShareBlockingQueue.class); + } + + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, + final int numQueues, final float readShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable, + final Class readQueueClass, Object... readQueueInitArgs) { + this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable, + readQueueClass, readQueueInitArgs); + } + + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, + final int numQueues, final float readShare, final float scanShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable, + final Class readQueueClass, Object... readQueueInitArgs) { + super(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare), + calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare, + LinkedBlockingQueue.class, new Object[] { maxQueueLength }, readQueueClass, ArrayUtils + .addAll(new Object[] { maxQueueLength }, readQueueInitArgs)); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RPCUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RPCUtil.java new file mode 100644 index 0000000..71af7ce --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RPCUtil.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; + +import com.google.protobuf.Message; + +/** + * A utility class for the RPC related classes. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class RPCUtil { + public static final String NO_GROUPING_ID = "_NO_GROUPING_ID"; + +/** + * Extracts the groupId when (@link FairShareBlockingQueue} and + * {@link FairSharePriorityBasedBlockingQueue) is used. This groupId + * helps in determining the round robin policy + * @param o + * @return + */ + public static Object extractGroupId(CallRunner o) { + if (o.getCall() != null) { + Message m = o.getCall().param; + if (m instanceof ScanRequest) { + ScanRequest request = (ScanRequest) m; + if (request.getScan().hasGroupingId()) { + return request.getScan().getGroupingId(); + } + } + } + return NO_GROUPING_ID; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 1be8c65..7ec0d9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -52,7 +52,7 @@ import com.google.protobuf.Message; public class RWQueueRpcExecutor extends RpcExecutor { private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class); - private final List> queues; + private final List> queues; private final QueueBalancer writeBalancer; private final QueueBalancer readBalancer; private final QueueBalancer scanBalancer; @@ -134,19 +134,20 @@ public class RWQueueRpcExecutor extends RpcExecutor { this.readBalancer = getBalancer(numReadQueues); this.scanBalancer = getBalancer(numScanQueues); - queues = new ArrayList>(numWriteQueues + numReadQueues + numScanQueues); + queues = new ArrayList>( + numWriteQueues + numReadQueues + numScanQueues); LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues + " scanHandlers=" + scanHandlersCount)); for (int i = 0; i < numWriteQueues; ++i) { - queues.add((BlockingQueue) + queues.add((BlockingQueue) ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs)); } for (int i = 0; i < (numReadQueues + numScanQueues); ++i) { - queues.add((BlockingQueue) + queues.add((BlockingQueue) ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs)); } } @@ -160,8 +161,8 @@ public class RWQueueRpcExecutor extends RpcExecutor { } @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { - RpcServer.Call call = callTask.getCall(); + public void dispatch(final CallRunnerWrapper callTask) throws InterruptedException { + RpcServer.Call call = callTask.getCallRunner().getCall(); int queueIndex; if (isWriteRequest(call.getHeader(), call.param)) { queueIndex = writeBalancer.getNextQueue(); @@ -218,14 +219,14 @@ public class RWQueueRpcExecutor extends RpcExecutor { @Override public int getQueueLength() { int length = 0; - for (final BlockingQueue queue: queues) { + for (final BlockingQueue queue: queues) { length += queue.size(); } return length; } @Override - protected List> getQueues() { + protected List> getQueues() { return queues; } @@ -233,7 +234,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { * Calculate the number of writers based on the "total count" and the read share. * You'll get at least one writer. */ - private static int calcNumWriters(final int count, final float readShare) { + protected static int calcNumWriters(final int count, final float readShare) { return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare))); } @@ -241,7 +242,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { * Calculate the number of readers based on the "total count" and the read share. * You'll get at least one reader. */ - private static int calcNumReaders(final int count, final float readShare) { + protected static int calcNumReaders(final int count, final float readShare) { return count - calcNumWriters(count, readShare); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RoundRobinRPCScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RoundRobinRPCScheduler.java new file mode 100644 index 0000000..fb4ba88 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RoundRobinRPCScheduler.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; +import java.util.Comparator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A scheduler that maintains isolated handler pools for general, + * high-priority, and replication requests. It tries to schedule the high-priority + * read requests using round robin based on the grouping ID + */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class RoundRobinRPCScheduler extends RpcScheduler { + public static final Log LOG = LogFactory.getLog(RoundRobinRPCScheduler.class); + + private int port; + private final PriorityFunction priority; + private final RpcExecutor callExecutor; + private final RpcExecutor priorityExecutor; + private final RpcExecutor replicationExecutor; + + /** What level a high priority call is at. */ + private final int highPriorityLevel; + + private Abortable abortable = null; + private int maxDelay; + /** + * @param conf + * @param handlerCount + * the number of handler threads that will be used to process calls + * @param priorityHandlerCount + * How many threads for priority handling. + * @param replicationHandlerCount + * How many threads for replication handling. + * @param highPriorityLevel + * @param priority + * Function to extract request priority. + */ + public RoundRobinRPCScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, + int replicationHandlerCount, PriorityFunction priority, Abortable server, + int highPriorityLevel) { + int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", handlerCount + * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + this.priority = priority; + this.highPriorityLevel = highPriorityLevel; + this.abortable = server; + this.maxDelay = conf.getInt(RpcScheduler.QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY); + + String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); + float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); + float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); + + float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); + int numCallQueues = Math.max(1, (int) Math.round(handlerCount * callQueuesHandlersFactor)); + + LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues); + + if (numCallQueues > 1 && callqReadShare > 0) { + // multiple read/write queues + if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + callExecutor = new FairShareRWQueueRPCExecutor("RW.default", handlerCount, numCallQueues, + callqReadShare, callqScanShare, maxQueueLength, conf, abortable, + FairSharePriorityBasedBlockingQueue.class); + } else { + callExecutor = new FairShareRWQueueRPCExecutor("RW.default", handlerCount, numCallQueues, + callqReadShare, callqScanShare, maxQueueLength, conf, abortable); + } + } else { + // multiple queues + if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + callExecutor = new FairShareBalancedRPCExecutor("B.default", handlerCount, numCallQueues, + conf, abortable, FairSharePriorityBasedBlockingQueue.class, maxQueueLength); + } else { + callExecutor = new FairShareBalancedRPCExecutor("B.default", handlerCount, numCallQueues, + maxQueueLength, conf, abortable); + } + } + + // Create 2 queues to help priorityExecutor be more scalable. + this.priorityExecutor = priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", + priorityHandlerCount, 2, maxQueueLength) : null; + + this.replicationExecutor = replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor( + "Replication", replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; + } + + public RoundRobinRPCScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, + int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) { + this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority, null, + highPriorityLevel); + } + + @Override + public void init(Context context) { + this.port = context.getListenerAddress().getPort(); + } + + @Override + public void start() { + callExecutor.start(port); + if (priorityExecutor != null) + priorityExecutor.start(port); + if (replicationExecutor != null) + replicationExecutor.start(port); + } + + @Override + public void stop() { + callExecutor.stop(); + if (priorityExecutor != null) + priorityExecutor.stop(); + if (replicationExecutor != null) + replicationExecutor.stop(); + } + + @Override + public void dispatch(CallRunner callTask) throws InterruptedException { + RpcServer.Call call = callTask.getCall(); + // The deadline calculation will even happen for non deadline based calls. + // We can avoid that by passing a boolean if needed? But should be fine as this + // happens once per call + CallRunnerWrapper wrap = new CallRunnerWrapper(callTask, this.priority, this.maxDelay); + int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser()); + if (priorityExecutor != null && level > highPriorityLevel) { + priorityExecutor.dispatch(wrap); + } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { + replicationExecutor.dispatch(wrap); + } else { + callExecutor.dispatch(wrap); + } + } + + @Override + public int getGeneralQueueLength() { + return callExecutor.getQueueLength(); + } + + @Override + public int getPriorityQueueLength() { + return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength(); + } + + @Override + public int getReplicationQueueLength() { + return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength(); + } + + @Override + public int getActiveRpcHandlerCount() { + return callExecutor.getActiveHandlerCount() + + (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) + + (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); + } + + /* + * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY + * is set to true. It uses the calculated "deadline" e.g. to deprioritize + * long-running job + * + * If multiple requests have the same deadline BoundedPriorityBlockingQueue + * will order them in FIFO (first-in-first-out) manner. + */ + public static class CallRunnerDeadLinePriority implements Comparator { + + @Override + public int compare(CallRunnerWrapper o1, CallRunnerWrapper o2) { + return (int) (o1.getDeadLine() - o2.getDeadLine()); + } + + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 709429d..88ad922 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -85,18 +85,18 @@ public abstract class RpcExecutor { public abstract int getQueueLength(); /** Add the request to the executor queue */ - public abstract void dispatch(final CallRunner callTask) throws InterruptedException; + public abstract void dispatch(final CallRunnerWrapper callTask) throws InterruptedException; /** Returns the list of request queues */ - protected abstract List> getQueues(); + protected abstract List> getQueues(); protected void startHandlers(final int port) { - List> callQueues = getQueues(); + List> callQueues = getQueues(); startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port); } protected void startHandlers(final String nameSuffix, final int numHandlers, - final List> callQueues, + final List> callQueues, final int qindex, final int qsize, final int port) { final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); for (int i = 0; i < numHandlers; i++) { @@ -116,7 +116,7 @@ public abstract class RpcExecutor { } } - protected void consumerLoop(final BlockingQueue myQueue) { + protected void consumerLoop(final BlockingQueue myQueue) { boolean interrupted = false; double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, @@ -124,10 +124,10 @@ public abstract class RpcExecutor { try { while (running) { try { - CallRunner task = myQueue.take(); + CallRunnerWrapper task = myQueue.take(); try { activeHandlerCount.incrementAndGet(); - task.run(); + task.getCallRunner().run(); } catch (Throwable e) { if (e instanceof Error) { int failedCount = failedHandlerCount.incrementAndGet(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index f273865..b80cb4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -31,6 +31,22 @@ import java.net.InetSocketAddress; @InterfaceStability.Evolving public abstract class RpcScheduler { + /** max delay in msec used to bound the deprioritized requests */ + public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY + = "hbase.ipc.server.queue.max.call.delay"; + public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = + "hbase.ipc.server.callqueue.read.ratio"; + public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY = + "hbase.ipc.server.callqueue.scan.ratio"; + public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = + "hbase.ipc.server.callqueue.handler.factor"; + public final static int DEFAULT_MAX_CALL_DELAY = 5000; + + /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */ + public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; + public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; + public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; + /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ static abstract class Context { public abstract InetSocketAddress getListenerAddress(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index b8e9c52..5239bae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; - -import java.util.Comparator; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,6 +25,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RoundRobinRPCScheduler.CallRunnerDeadLinePriority; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; /** @@ -39,52 +37,6 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; public class SimpleRpcScheduler extends RpcScheduler { private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class); - public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = - "hbase.ipc.server.callqueue.read.ratio"; - public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY = - "hbase.ipc.server.callqueue.scan.ratio"; - public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = - "hbase.ipc.server.callqueue.handler.factor"; - - /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */ - public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; - public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; - public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; - - /** max delay in msec used to bound the deprioritized requests */ - public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY - = "hbase.ipc.server.queue.max.call.delay"; - - /** - * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. - * It uses the calculated "deadline" e.g. to deprioritize long-running job - * - * If multiple requests have the same deadline BoundedPriorityBlockingQueue will order them in - * FIFO (first-in-first-out) manner. - */ - private static class CallPriorityComparator implements Comparator { - private final static int DEFAULT_MAX_CALL_DELAY = 5000; - - private final PriorityFunction priority; - private final int maxDelay; - - public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) { - this.priority = priority; - this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY); - } - - @Override - public int compare(CallRunner a, CallRunner b) { - RpcServer.Call callA = a.getCall(); - RpcServer.Call callB = b.getCall(); - long deadlineA = priority.getDeadline(callA.getHeader(), callA.param); - long deadlineB = priority.getDeadline(callB.getHeader(), callB.param); - deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay); - deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay); - return (int)(deadlineA - deadlineB); - } - } - private int port; private final PriorityFunction priority; private final RpcExecutor callExecutor; @@ -95,6 +47,7 @@ public class SimpleRpcScheduler extends RpcScheduler { private final int highPriorityLevel; private Abortable abortable = null; + private int maxDelay; /** * @param conf @@ -117,6 +70,7 @@ public class SimpleRpcScheduler extends RpcScheduler { this.priority = priority; this.highPriorityLevel = highPriorityLevel; this.abortable = server; + this.maxDelay = conf.getInt(RpcScheduler.QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY); String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); @@ -130,23 +84,23 @@ public class SimpleRpcScheduler extends RpcScheduler { if (numCallQueues > 1 && callqReadShare > 0) { // multiple read/write queues if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { - CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); + CallRunnerDeadLinePriority callPriority = new CallRunnerDeadLinePriority(); callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, maxQueueLength, conf, abortable, BoundedPriorityBlockingQueue.class, callPriority); } else { callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, - callqReadShare, callqScanShare, maxQueueLength, conf, abortable); + callqReadShare, callqScanShare, maxQueueLength, conf, abortable); } } else { // multiple queues if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { - CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, - conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + CallRunnerDeadLinePriority callPriority = new CallRunnerDeadLinePriority(); + callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, conf, + abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); } else { - callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, - numCallQueues, maxQueueLength, conf, abortable); + callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, + maxQueueLength, conf, abortable); } } @@ -192,13 +146,17 @@ public class SimpleRpcScheduler extends RpcScheduler { @Override public void dispatch(CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); + // The deadline calculation will even happen for non deadline based calls. + // We can avoid that by passing a boolean if needed? But should be fine as this + // happens once per call + CallRunnerWrapper wrap = new CallRunnerWrapper(callTask, this.priority, this.maxDelay); int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser()); if (priorityExecutor != null && level > highPriorityLevel) { - priorityExecutor.dispatch(callTask); + priorityExecutor.dispatch(wrap); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { - replicationExecutor.dispatch(callTask); + replicationExecutor.dispatch(wrap); } else { - callExecutor.dispatch(callTask); + callExecutor.dispatch(wrap); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3c0f50a..9469678 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -916,6 +916,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Class rpcSchedulerFactoryClass = rs.conf.getClass( REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, SimpleRpcSchedulerFactory.class); + LOG.info("The factory to be used is "+rpcSchedulerFactoryClass.getName()); rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance()); } catch (InstantiationException e) { throw new IllegalArgumentException(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RoundRobinRPCSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RoundRobinRPCSchedulerFactory.java new file mode 100644 index 0000000..4092279 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RoundRobinRPCSchedulerFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RoundRobinRPCScheduler; +import org.apache.hadoop.hbase.ipc.RpcScheduler; + +/** Constructs a {@link RoundRobinRPCScheduler}. */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class RoundRobinRPCSchedulerFactory implements RpcSchedulerFactory { + + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + + return new RoundRobinRPCScheduler(conf, handlerCount, conf.getInt( + HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), conf.getInt( + HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), priority, server, + HConstants.QOS_THRESHOLD); + } + + @Override + @Deprecated + public RpcScheduler create(Configuration conf, PriorityFunction priority) { + return create(conf, priority, null); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java new file mode 100644 index 0000000..6a2f7f0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A bounded queue implementation that is sorted based on the element E and + * allows the producers to be grouped based on the producer key and round-robins + * among the different producers with in the same priority + * + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public abstract class AbstractPriorityBasedRoundRobinQueue extends AbstractQueue implements + BlockingQueue { + private static final Log LOG = LogFactory.getLog(AbstractPriorityBasedRoundRobinQueue.class + .getName()); + + private final TreeMap>> producerMap; + // Lock used for all operations + private final ReentrantLock lock = new ReentrantLock(); + private final TreeMap, Integer>> currentProducer; + private int size; + private int maxSize; + + // Condition for blocking when empty + private final Condition notEmpty = lock.newCondition(); + + // Wait queue for waiting puts + private final Condition notFull = lock.newCondition(); + + /** + * @param maxSize the maxSize of this queue + */ + public AbstractPriorityBasedRoundRobinQueue(int maxSize) { + this.producerMap = new TreeMap>>(); + this.maxSize = maxSize; + this.currentProducer = new TreeMap, Integer>>(); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (remainingCapacity() == 0) { + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + boolean ret = offer(o); + notEmpty.signal(); + return ret; + } finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E o) { + if (o == null) + throw new NullPointerException(); + + final Object producerKey = extractProducerId(o); + if (producerKey == null) { + throw new NullPointerException(); + } + + LinkedList producerList = null; + lock.lock(); + try { + if ((maxSize - size) > 0) { + long priority = extractPriority(o); + LinkedHashMap> groupMap = this.producerMap.get(priority); + if (groupMap != null) { + producerList = groupMap.get(producerKey); + if (producerList == null) { + producerList = new LinkedList(); + groupMap.put(producerKey, producerList); + // Since we are not allowing nulls we are sure that the producerkeys + // are going to be unique + populateCurrentProducer(priority, producerKey); + } + } else { + LinkedHashMap> map = new LinkedHashMap>(); + producerList = new LinkedList(); + map.put(producerKey, producerList); + this.producerMap.put(priority, map); + populateCurrentProducer(priority, producerKey); + } + producerList.add(o); + this.size++; + notEmpty.signal(); + return true; + } + } finally { + lock.unlock(); + } + return false; + } + + protected void populateCurrentProducer(long priority, final Object producerKey) { + Pair, Integer> linkedList = this.currentProducer.get(priority); + if (linkedList == null) { + LinkedList objectList = new LinkedList(); + objectList.add(producerKey); + Pair, Integer> pair = new Pair, Integer>(); + pair.setFirst(objectList); + // Have it as 0 initially + pair.setSecond(0); + this.currentProducer.put(priority, pair); + } else { + linkedList.getFirst().add(producerKey); + } + } + + /** + * Implementations must extracts the producer object which is used as the key + * to identify a unique producer. + */ + protected abstract Object extractProducerId(E o); + /** + * Implementation must extract the priority + * @param o + * @return + */ + protected abstract long extractPriority(E o); + + @Override + public void put(E o) throws InterruptedException { + lock.lock(); + try { + while (remainingCapacity() == 0) { + notFull.await(); + } + offer(o); + } finally { + lock.unlock(); + } + } + + @Override + public E take() throws InterruptedException { + lock.lock(); + try { + while (size() == 0) { + notEmpty.await(); + } + E element = poll(); + // assert element != null; + notFull.signal(); + return element; + } finally { + lock.unlock(); + } + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (size() == 0 && nanos > 0) { + nanos = notEmpty.awaitNanos(nanos); + } + E result = poll(); + notFull.signal(); + return result; + } finally { + lock.unlock(); + } + } + + @Override + public E poll() { + lock.lock(); + try { + if (this.size > 0) { + E element = null; + // Cannot use poll here because under the same priority we could have + // more elements + Entry>> lastEntry = this.producerMap.lastEntry(); + if (lastEntry != null) { + LinkedHashMap> value = lastEntry.getValue(); + if (value.size() == 0) { + this.producerMap.remove(lastEntry.getKey()); + this.currentProducer.remove(lastEntry.getKey()); + assert this.size == 0; + return null; + } else { + // Here is the actual round robin that happens. For the Producers + // under the same + // priority we round robin based on the number of producers and the + // index that + // we currently point to + Pair, Integer> pair = currentProducer.get(lastEntry.getKey()); + Object object = pair.getFirst().get(pair.getSecond()); + LinkedList tList = value.get(object); + element = tList.pollFirst(); + int newProducer = 0; + if (tList.size() == 0) { + value.remove(object); + // remove and then calculate the new producer's index + pair.getFirst().remove(object); + if (pair.getFirst().size() > 1) { + newProducer = (pair.getSecond()) % (pair.getFirst().size()); + } + } else { + if (pair.getFirst().size() > 1) { + newProducer = (pair.getSecond() + 1) % (pair.getFirst().size()); + } + } + pair.setSecond(newProducer); + this.size--; + if (value.size() == 0) { + this.producerMap.remove(lastEntry.getKey()); + this.currentProducer.remove(lastEntry.getKey()); + } + } + } + notFull.signal(); + return element; + } + } finally { + lock.unlock(); + } + // assert this.size == 0; + return null; + } + + @Override + public E peek() { + lock.lock(); + try { + E element = null; + Entry>> firstEntry = this.producerMap.firstEntry(); + if (firstEntry != null) { + LinkedHashMap> value = firstEntry.getValue(); + if (value.size() == 0) { + this.producerMap.remove(firstEntry.getKey()); + // assert this.size == 0; + } else { + Iterator>> iterator = value.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> next = iterator.next(); + // This structure wont be needed any more + element = next.getValue().getFirst(); + } + } + } else { + // assert this.size == 0; + } + return element; + } finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + + lock.lock(); + try { + int originalSize = this.size; + int drained = drainTo(c, this.size); + assert drained == originalSize; + assert this.size == 0; + assert this.producerMap.isEmpty(); + return drained; + } finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + + lock.lock(); + try { + int i = 0; + while (i < maxElements) { + E element = poll(); + if (element != null) { + c.add(element); + i++; + } else { + break; + } + } + return i; + } finally { + lock.unlock(); + } + } + + @Override + public int remainingCapacity() { + lock.lock(); + try { + return (maxSize - size); + } finally { + lock.unlock(); + } + } + + @Override + public int size() { + lock.lock(); + try { + return this.size; + } finally { + lock.unlock(); + } + } + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractRoundRobinQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractRoundRobinQueue.java new file mode 100644 index 0000000..1c10343 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractRoundRobinQueue.java @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.ListIterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * An unbounded blocking queue implementation that keeps a virtual queue of + * elements on per-producer basis and iterates through each producer queue in + * round robin fashion. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public abstract class AbstractRoundRobinQueue extends AbstractQueue implements + BlockingQueue { + //The map that holds the different producers based on the group id + private final HashMap> producerMap; + //The list of producers under each group id + private final LinkedList> producerLists; + // Lock used for all operations + private final ReentrantLock lock = new ReentrantLock(); + private final boolean newProducerToFront; + private int currentProducer; + private int size; + private int maxSize; + + // Condition for blocking when empty + private final Condition notEmpty = lock.newCondition(); + + // Wait queue for waiting puts + private final Condition notFull = lock.newCondition(); + public AbstractRoundRobinQueue(int maxSize) { + this(true, maxSize); + } + + /** + * @param newProducerToFront + * If true, new producers go to the front of the round-robin list, if + * false, they go to the end. + */ + public AbstractRoundRobinQueue(boolean newProducerToFront, int maxSize) { + this.producerMap = new HashMap>(); + this.producerLists = new LinkedList>(); + this.newProducerToFront = newProducerToFront; + this.maxSize = maxSize; + } + + @Override + public Iterator iterator() { + lock.lock(); + try { + ArrayList allElements = new ArrayList(this.size); + ListIterator> iter = this.producerLists.listIterator(this.currentProducer); + while (iter.hasNext()) { + ProducerList tList = iter.next(); + allElements.addAll(tList.list); + } + return allElements.iterator(); + } finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (remainingCapacity() == 0) { + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + boolean ret = offer(o); + notEmpty.signal(); + return ret; + } finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E o) { + if (o == null) + throw new NullPointerException(); + + final Object producerKey = extractProducerId(o); + + ProducerList producerList = null; + lock.lock(); + try { + if ((maxSize - size) > 0) { + producerList = this.producerMap.get(producerKey); + if (producerList == null) { + producerList = new ProducerList(producerKey); + this.producerMap.put(producerKey, producerList); + this.producerLists.add(this.currentProducer, producerList); + if (!this.newProducerToFront) { + incrementCurrentProducerPointer(); + } + } + producerList.list.add(o); + this.size++; + notEmpty.signal(); + return true; + } + } finally { + lock.unlock(); + } + return false; + } + + /** + * Implementations must extracts the producer object which is used as the key + * to identify a unique producer. + */ + protected abstract Object extractProducerId(E o); + + @Override + public void put(E o) throws InterruptedException { + lock.lock(); + try { + while (remainingCapacity() == 0) { + notFull.await(); + } + offer(o); + } finally { + lock.unlock(); + } + } + + @Override + public E take() throws InterruptedException { + lock.lock(); + try { + while (size() == 0) { + notEmpty.await(); + } + E element = poll(); + assert element != null; + notFull.signal(); + return element; + } finally { + lock.unlock(); + } + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (size() == 0 && nanos > 0) { + nanos = notEmpty.awaitNanos(nanos); + } + E result = poll(); + notFull.signal(); + return result; + } finally { + lock.unlock(); + } + } + + @Override + public E poll() { + lock.lock(); + try { + if (this.size > 0) { + ListIterator> iter = this.producerLists.listIterator(this.currentProducer); + while (iter.hasNext()) { + ProducerList tList = iter.next(); + if (tList.list.isEmpty()) { + iter.remove(); + this.producerMap.remove(tList.producer); + adjustCurrentProducerPointer(); + } else { + E element = tList.list.removeFirst(); + this.size--; + assert element != null; + // This is the round robin part. When we take an element from the + // current thread's queue + // we move on to the next thread. + if (tList.list.isEmpty()) { + iter.remove(); + this.producerMap.remove(tList.producer); + adjustCurrentProducerPointer(); + } else { + incrementCurrentProducerPointer(); + } + notFull.signal(); + return element; + } + } + assert this.size == 0; + } + } finally { + lock.unlock(); + } + return null; + } + + /** + * Polls using the given producer key. + */ + protected E pollProducer(Object producer) { + lock.lock(); + try { + ProducerList tList = this.producerMap.get(producer); + if (tList != null && !tList.list.isEmpty()) { + E element = tList.list.removeFirst(); + this.size--; + if (tList.list.isEmpty()) { + this.producerLists.remove(tList); + this.producerMap.remove(tList.producer); + // we need to adjust the current thread pointer in case it pointed to + // this thread list, which is now removed + adjustCurrentProducerPointer(); + } + assert element != null; + // Since this is only processing the current thread's work, we'll + // leave + // the + // round-robin part alone and just return the work + return element; + } + } finally { + lock.unlock(); + } + return null; + } + + @Override + public E peek() { + lock.lock(); + try { + ListIterator> iter = this.producerLists.listIterator(this.currentProducer); + while (iter.hasNext()) { + ProducerList tList = iter.next(); + if (tList.list.isEmpty()) { + iter.remove(); + this.producerMap.remove(tList.producer); + adjustCurrentProducerPointer(); + } else { + E element = tList.list.getFirst(); + assert element != null; + return element; + } + } + assert this.size == 0; + } finally { + lock.unlock(); + } + return null; + } + + @Override + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + + lock.lock(); + try { + int originalSize = this.size; + int drained = drainTo(c, this.size); + assert drained == originalSize; + assert this.size == 0; + assert this.producerLists.isEmpty(); + assert this.producerMap.isEmpty(); + return drained; + } finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + + lock.lock(); + try { + int i = 0; + while (i < maxElements) { + E element = poll(); + if (element != null) { + c.add(element); + i++; + } else { + break; + } + } + return i; + } finally { + lock.unlock(); + } + } + + @Override + public int remainingCapacity() { + // TODO : Add implementation here. + lock.lock(); + try { + return (maxSize - size); + } finally { + lock.unlock(); + } + } + + @Override + public int size() { + lock.lock(); + try { + return this.size; + } finally { + lock.unlock(); + } + } + + private void incrementCurrentProducerPointer() { + lock.lock(); + try { + if (this.producerLists.size() == 0) { + this.currentProducer = 0; + } else { + this.currentProducer = (this.currentProducer + 1) % this.producerLists.size(); + } + } finally { + lock.unlock(); + } + } + + /** + * Adjusts the current pointer to a decrease in size. + */ + private void adjustCurrentProducerPointer() { + lock.lock(); + try { + if (this.producerLists.size() == 0) { + this.currentProducer = 0; + } else { + this.currentProducer = (this.currentProducer) % this.producerLists.size(); + } + } finally { + lock.unlock(); + } + } + + private static class ProducerList { + public ProducerList(Object producer) { + this.producer = producer; + this.list = new LinkedList(); + } + + private final Object producer; + private final LinkedList list; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 2b7ffb2..727993d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -35,9 +36,27 @@ import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -153,11 +172,13 @@ public class TestSimpleRpcScheduler { @Test public void testRpcScheduler() throws Exception { - testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); - testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, false); + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE, false); + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, true); + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE, true); } - private void testRpcScheduler(final String queueType) throws Exception { + private void testRpcScheduler(final String queueType, boolean grouping) throws Exception { Configuration schedConf = HBaseConfiguration.create(); schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType); @@ -166,8 +187,12 @@ public class TestSimpleRpcScheduler { any(Message.class), any(User.class))) .thenReturn(HConstants.NORMAL_QOS); - RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, - HConstants.QOS_THRESHOLD); + RpcScheduler scheduler; + if (grouping) { + scheduler = new RoundRobinRPCScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); + } else { + scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); + } try { scheduler.start(); @@ -176,6 +201,12 @@ public class TestSimpleRpcScheduler { RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); when(smallCallTask.getCall()).thenReturn(smallCall); when(smallCall.getHeader()).thenReturn(smallHead); + + CallRunner smallCallTask1 = mock(CallRunner.class); + RpcServer.Call smallCall1 = mock(RpcServer.Call.class); + RequestHeader smallHead1 = RequestHeader.newBuilder().setCallId(25).build(); + when(smallCallTask1.getCall()).thenReturn(smallCall1); + when(smallCall1.getHeader()).thenReturn(smallHead1); CallRunner largeCallTask = mock(CallRunner.class); RpcServer.Call largeCall = mock(RpcServer.Call.class); @@ -190,24 +221,54 @@ public class TestSimpleRpcScheduler { when(hugeCall.getHeader()).thenReturn(hugeHead); when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L); + when(priority.getDeadline(eq(smallHead1), any(Message.class))).thenReturn(0L); when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L); when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L); + Scan scan = new Scan(); + scan.setGroupingId("ABC"); + ClientProtos.Scan clientScan = ClientProtos.Scan.newBuilder().setGroupingId("ABC").build(); + smallCall.param = ScanRequest.newBuilder().setScannerId(1).setScan(clientScan).build(); + RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); + when(smallCallTask.getCall()).thenReturn(smallCall); + when(smallCall.getHeader()).thenReturn(scanHead); + + // Same grouping Id - but different priority + scan = new Scan(); + scan.setGroupingId("123"); + clientScan = ClientProtos.Scan.newBuilder().setGroupingId("123").build(); + smallCall1.param = ScanRequest.newBuilder().setScannerId(2).setScan(clientScan).build(); + RequestHeader scanHead1 = RequestHeader.newBuilder().setMethodName("scan").build(); + when(smallCallTask1.getCall()).thenReturn(smallCall1); + when(smallCall1.getHeader()).thenReturn(scanHead1); + + scan = new Scan(); + scan.setGroupingId("XYZ"); + ClientProtos.Scan clientScan1 = ClientProtos.Scan.newBuilder().setGroupingId("XYZ").build(); + largeCall.param = ScanRequest.newBuilder().setScannerId(1).setScan(clientScan1).build(); + RequestHeader scanHead2 = RequestHeader.newBuilder().setMethodName("scan").build(); + when(largeCallTask.getCall()).thenReturn(largeCall); + when(largeCall.getHeader()).thenReturn(scanHead2); + final ArrayList work = new ArrayList(); doAnswerTaskExecution(smallCallTask, work, 10, 250); doAnswerTaskExecution(largeCallTask, work, 50, 250); + doAnswerTaskExecution(smallCallTask1, work, 25, 250); doAnswerTaskExecution(hugeCallTask, work, 100, 250); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(hugeCallTask); + scheduler.dispatch(smallCallTask1); scheduler.dispatch(smallCallTask); scheduler.dispatch(largeCallTask); scheduler.dispatch(smallCallTask); + scheduler.dispatch(largeCallTask); + scheduler.dispatch(smallCallTask1); scheduler.dispatch(smallCallTask); - while (work.size() < 8) { + while (work.size() < 11) { Threads.sleepWithoutInterrupt(100); } @@ -220,13 +281,22 @@ public class TestSimpleRpcScheduler { } LOG.debug("Total Time: " + totalTime); - // -> [small small small huge small large small small] - // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) - // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) + // [10 10 10 100 25 10 50 10 50 25 10] -> 1835 (FIFO Queue with no GROUPING) + // [10 10 10 25 10 50 10 50 25 10 100] -> 1315 (Deadline Queue with no GROUPING) + // [10 10 100 25 50 10 25 50 10 10 10] -> 2105 (FIFO Queue with GROUPING) + // [10 10 25 50 10 25 10 50 10 10 100] -> 1455 (Deadlin Queue with GROUPING) if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { - assertEquals(530, totalTime); + if(grouping) { + assertEquals(1455, totalTime); + } else { + assertEquals(1315, totalTime); + } } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ { - assertEquals(930, totalTime); + if(grouping) { + assertEquals(2105, totalTime); + } else { + assertEquals(1835, totalTime); + } } } finally { scheduler.stop(); @@ -235,6 +305,11 @@ public class TestSimpleRpcScheduler { @Test public void testScanQueues() throws Exception { + testScanQueuesInternal(false); + testScanQueuesInternal(true); + } + + private void testScanQueuesInternal(boolean grouping) throws IOException, InterruptedException { Configuration schedConf = HBaseConfiguration.create(); schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); @@ -244,8 +319,15 @@ public class TestSimpleRpcScheduler { when(priority.getPriority(any(RequestHeader.class), any(Message.class), any(User.class))).thenReturn(HConstants.NORMAL_QOS); - RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, - HConstants.QOS_THRESHOLD); + RpcScheduler scheduler; + if (grouping) { + scheduler = new RoundRobinRPCScheduler(schedConf, 3, 1, 1, priority, + HConstants.QOS_THRESHOLD); + } else { + scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, + HConstants.QOS_THRESHOLD); + } + try { scheduler.start(); @@ -265,15 +347,30 @@ public class TestSimpleRpcScheduler { CallRunner scanCallTask = mock(CallRunner.class); RpcServer.Call scanCall = mock(RpcServer.Call.class); - scanCall.param = ScanRequest.newBuilder().setScannerId(1).build(); + + Scan scan = new Scan(); + scan.setGroupingId("ABC"); + ClientProtos.Scan clientScan = ClientProtos.Scan.newBuilder().setGroupingId("ABC").build(); + scanCall.param = ScanRequest.newBuilder().setScannerId(1).setScan(clientScan).build(); RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); when(scanCallTask.getCall()).thenReturn(scanCall); when(scanCall.getHeader()).thenReturn(scanHead); + + CallRunner scanCallTask1 = mock(CallRunner.class); + RpcServer.Call scanCall1 = mock(RpcServer.Call.class); + scan = new Scan(); + scan.setGroupingId("XYZ"); + ClientProtos.Scan clientScan1 = ClientProtos.Scan.newBuilder().setGroupingId("XYZ").build(); + scanCall1.param = ScanRequest.newBuilder().setScannerId(2).setScan(clientScan1).build(); + RequestHeader scanHead1 = RequestHeader.newBuilder().setMethodName("scan").build(); + when(scanCallTask1.getCall()).thenReturn(scanCall1); + when(scanCall1.getHeader()).thenReturn(scanHead1); ArrayList work = new ArrayList(); doAnswerTaskExecution(putCallTask, work, 1, 1000); doAnswerTaskExecution(getCallTask, work, 2, 1000); doAnswerTaskExecution(scanCallTask, work, 3, 1000); + doAnswerTaskExecution(scanCallTask1, work, 4, 1000); // There are 3 queues: [puts], [gets], [scans] // so the calls will be interleaved @@ -284,8 +381,11 @@ public class TestSimpleRpcScheduler { scheduler.dispatch(getCallTask); scheduler.dispatch(getCallTask); scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask1); scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask1); scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask1); while (work.size() < 6) { Threads.sleepWithoutInterrupt(100); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestAbstractRoundRobinPriorityQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestAbstractRoundRobinPriorityQueue.java new file mode 100644 index 0000000..903b870 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestAbstractRoundRobinPriorityQueue.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.Comparator; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category({ SmallTests.class }) +public class TestAbstractRoundRobinPriorityQueue { + private final static int CAPACITY = 16; + private static String[] groupId = new String[CAPACITY]; + static { + for (int i = 1; i <= CAPACITY; i += 2) { + groupId[i] = "ABC"; + } + for (int i = 0; i < CAPACITY; i += 2) { + groupId[i] = "XYZ"; + } + } + class TestObject { + private final int priority; + private final int seqId; + private final String groupId; + + public TestObject(final int priority, final int seqId, final String groupId) { + this.priority = priority; + this.seqId = seqId; + this.groupId = groupId; + } + + public int getSeqId() { + return this.seqId; + } + + public int getPriority() { + return this.priority; + } + + public String getGroupId() { + return this.groupId; + } + } + + class TestObjectComparator implements Comparator { + public TestObjectComparator() {} + + @Override + public int compare(TestObject a, TestObject b) { + return a.getPriority() - b.getPriority(); + } + } + + private AbstractPriorityBasedRoundRobinQueue queue; + + @Before + public void setUp() throws Exception { + this.queue = new AbstractRoundRobinPriorityQueueImpl(CAPACITY, + new TestObjectComparator()); + } + + @After + public void tearDown() throws Exception { + } + + + public static class AbstractRoundRobinPriorityQueueImpl extends + AbstractPriorityBasedRoundRobinQueue { + + public AbstractRoundRobinPriorityQueueImpl(int maxSize, + Comparator comparator) { + super(maxSize); + } + + @Override + protected Object extractProducerId(TestObject o) { + return o.getGroupId(); + } + + @Override + protected long extractPriority(TestObject o) { + return o.getPriority(); + } + + } + @Test + public void tesAppend() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(new TestObject(i, i, groupId[i - 1]))); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + assertFalse(queue.offer(new TestObject(0, -1, ""), 5, TimeUnit.MILLISECONDS)); + + // Pop + int count = CAPACITY-1; + for (int i = CAPACITY; i >= 1; --i) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + assertEquals(count--, queue.size()); + assertEquals(CAPACITY - (i - 1), queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void tesAppendSamePriority() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(new TestObject(0, i, groupId[i - 1]))); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + assertFalse(queue.offer(new TestObject(0, -1, ""), 5, TimeUnit.MILLISECONDS)); + + // Pop + for (int i = 1; i <= CAPACITY; ++i) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + assertEquals(CAPACITY - i, queue.size()); + assertEquals(i, queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testPrepend() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(new TestObject(CAPACITY - i, i, groupId[i - 1]))); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + + // Pop + int count = CAPACITY - 1; + for (int i = CAPACITY; i >= 1; --i) { + TestObject obj = queue.poll(); + assertEquals(CAPACITY - (i - 1), obj.getSeqId()); + assertEquals(count--, queue.size()); + assertEquals(CAPACITY - (i - 1), queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testInsert() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; i += 2) { + assertTrue(queue.offer(new TestObject(i, i, groupId[i - 1]))); + assertEquals((1 + i) / 2, queue.size()); + } + for (int i = 2; i <= CAPACITY; i += 2) { + assertTrue(queue.offer(new TestObject(i, i, groupId[i - 1]))); + assertEquals(CAPACITY / 2 + (i / 2), queue.size()); + } + assertFalse(queue.offer(new TestObject(0, -1, ""), 5, TimeUnit.MILLISECONDS)); + + // Pop + int count = CAPACITY-1; + for (int i = CAPACITY; i >= 1; --i) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + assertEquals(count--, queue.size()); + assertEquals(CAPACITY - (i - 1), queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testFifoSamePriority() throws Exception { + assertTrue(CAPACITY >= 6); + for (int i = 0; i < 6; ++i) { + assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i, groupId[i]))); + } + + for (int i = 1; i < 6; i += 2) { + TestObject obj = queue.poll(); + assertEquals(20, obj.getPriority()); + assertEquals(i, obj.getSeqId()); + } + + for (int i = 0; i < 6; i += 2) { + TestObject obj = queue.poll(); + assertEquals(10, obj.getPriority()); + assertEquals(i, obj.getSeqId()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testPoll() { + assertNull(queue.poll()); + AbstractPriorityBasedRoundRobinQueue testList = new AbstractRoundRobinPriorityQueueImpl( + CAPACITY, new TestObjectComparator()); + + for (int i = 0; i < CAPACITY; ++i) { + TestObject obj = new TestObject(i, i, groupId[i]); + testList.add(obj); + queue.offer(obj); + } + + for (int i = 0; i < CAPACITY; ++i) { + assertEquals(testList.poll(), queue.poll()); + } + + assertNull(null, queue.poll()); + } + + @Test(timeout=10000) + public void testPollInExecutor() throws InterruptedException { + final TestObject testObj = new TestObject(0, 0, ""); + + final CyclicBarrier threadsStarted = new CyclicBarrier(2); + ExecutorService executor = Executors.newFixedThreadPool(2); + executor.execute(new Runnable() { + public void run() { + try { + assertNull(queue.poll(1000, TimeUnit.MILLISECONDS)); + threadsStarted.await(); + assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS)); + assertTrue(queue.isEmpty()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.execute(new Runnable() { + public void run() { + try { + threadsStarted.await(); + queue.offer(testObj); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.shutdown(); + assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestAbstractRoundRobinQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestAbstractRoundRobinQueue.java new file mode 100644 index 0000000..288728c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestAbstractRoundRobinQueue.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class }) +public class TestAbstractRoundRobinQueue { + private final static int CAPACITY = 16; + + class TestObject { + private final int priority; + private final int seqId; + + public TestObject(final int priority, final int seqId) { + this.priority = priority; + this.seqId = seqId; + } + + public int getSeqId() { + return this.seqId; + } + + public int getPriority() { + return this.priority; + } + } + + private AbstractRoundRobinQueue queue; + + @Before + public void setUp() throws Exception { + this.queue = new AbstractRoundRobinQueueImpl(false, CAPACITY); + } + + private static class AbstractRoundRobinQueueImpl extends + AbstractRoundRobinQueue { + + public AbstractRoundRobinQueueImpl(boolean newProducerToFront, int maxSize) { + super(newProducerToFront, maxSize); + } + + @Override + protected Object extractProducerId(TestObject o) { + return "ABC"; + } + + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void tesAppend() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(new TestObject(i, i))); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); + + // Pop + for (int i = 1; i <= CAPACITY; ++i) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + assertEquals(CAPACITY - i, queue.size()); + assertEquals(i, queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void tesAppendSamePriority() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(new TestObject(0, i))); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); + + // Pop + for (int i = 1; i <= CAPACITY; ++i) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + assertEquals(CAPACITY - i, queue.size()); + assertEquals(i, queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testPrepend() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(new TestObject(CAPACITY - i, i))); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + + // Pop + for (int i = 1; i <= CAPACITY; ++i) { + // This will pop out in the order of insertion + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + assertEquals(CAPACITY - i, queue.size()); + assertEquals(i, queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testInsert() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; i += 2) { + assertTrue(queue.offer(new TestObject(i, i))); + assertEquals((1 + i) / 2, queue.size()); + } + for (int i = 2; i <= CAPACITY; i += 2) { + assertTrue(queue.offer(new TestObject(i, i))); + assertEquals(CAPACITY / 2 + (i / 2), queue.size()); + } + assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); + + // Pop + int sizeCounter = CAPACITY; + for (int i = 1; i <= CAPACITY; i += 2) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + sizeCounter--; + assertEquals(sizeCounter, queue.size()); + } + + for (int i = 2; i <= CAPACITY; i += 2) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + sizeCounter--; + assertEquals(sizeCounter, queue.size()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testSamePriority() throws Exception { + assertTrue(CAPACITY >= 6); + for (int i = 0; i < 6; ++i) { + assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i))); + } + + for (int i = 0; i < 6; ++i) { + TestObject obj = queue.poll(); + // As it will round robin we will get elements with alternate priority + if (i % 2 == 0) { + assertEquals(10, obj.getPriority()); + } else { + assertEquals(20, obj.getPriority()); + } + assertEquals(i, obj.getSeqId()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testPoll() { + assertNull(queue.poll()); + AbstractRoundRobinQueue testList = new AbstractRoundRobinQueueImpl( + false, CAPACITY); + + for (int i = 0; i < CAPACITY; ++i) { + TestObject obj = new TestObject(i, i); + testList.add(obj); + queue.offer(obj); + } + + for (int i = 0; i < CAPACITY; ++i) { + assertEquals(testList.poll(), queue.poll()); + } + + assertNull(null, queue.poll()); + } + + @Test(timeout = 10000) + public void testPollInExecutor() throws InterruptedException { + final TestObject testObj = new TestObject(0, 0); + + final CyclicBarrier threadsStarted = new CyclicBarrier(2); + ExecutorService executor = Executors.newFixedThreadPool(2); + executor.execute(new Runnable() { + public void run() { + try { + assertNull(queue.poll(1000, TimeUnit.MILLISECONDS)); + threadsStarted.await(); + assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS)); + assertTrue(queue.isEmpty()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.execute(new Runnable() { + public void run() { + try { + threadsStarted.await(); + queue.offer(testObj); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.shutdown(); + assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS)); + } +}