.../java/org/apache/hadoop/hbase/client/Scan.java | 11 + .../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 8 +- .../hbase/protobuf/generated/ClientProtos.java | 281 ++++++++++++--- hbase-protocol/src/main/protobuf/Client.proto | 1 + .../hbase/ipc/FairShareBalancedRPCExecutor.java | 58 +++ .../hadoop/hbase/ipc/FairShareBlockingQueue.java | 61 ++++ .../ipc/FairSharePriorityBasedBlockingQueue.java | 67 ++++ .../hbase/ipc/FairShareRWQueueRPCExecutor.java | 56 +++ .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 24 +- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 45 ++- .../util/AbstractPriorityBasedRoundRobinQueue.java | 329 ++++++++++++++++++ .../hadoop/hbase/util/AbstractRoundRobinQueue.java | 387 +++++++++++++++++++++ .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 136 ++++++-- 13 files changed, 1349 insertions(+), 115 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 3b6194f..3467d29 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -114,6 +114,9 @@ public class Scan extends Query { private int storeLimit = -1; private int storeOffset = 0; private boolean getScan; + // Set this groupingID so that in the RPCExecutor layer we could do a fair share (round-robin) + // among the parallel scans + private String groupingId; /** * @deprecated since 1.0.0. Use {@link #setScanMetricsEnabled(boolean)} @@ -971,4 +974,12 @@ public class Scan extends Query { if (bytes == null) return null; return ProtobufUtil.toScanMetrics(bytes); } + + public void setGroupingId(String groupingId) { + this.groupingId = groupingId; + } + + public String getGroupingId() { + return this.groupingId; + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index bad3cb4..044dea3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -120,12 +120,12 @@ import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.quotas.QuotaScope; import org.apache.hadoop.hbase.quotas.QuotaType; @@ -929,6 +929,9 @@ public final class ProtobufUtil { if (scan.getCaching() > 0) { scanBuilder.setCaching(scan.getCaching()); } + if (scan.getGroupingId() != null) { + scanBuilder.setGroupingId(scan.getGroupingId()); + } return scanBuilder.build(); } @@ -1014,6 +1017,9 @@ public final class ProtobufUtil { if (proto.hasCaching()) { scan.setCaching(proto.getCaching()); } + if (proto.hasGroupingId()) { + scan.setGroupingId(proto.getGroupingId()); + } return scan; } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 5bcba26..97e85e9 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -13814,6 +13814,21 @@ public final class ClientProtos { * optional uint32 caching = 17; */ int getCaching(); + + // optional string groupingId = 18; + /** + * optional string groupingId = 18; + */ + boolean hasGroupingId(); + /** + * optional string groupingId = 18; + */ + java.lang.String getGroupingId(); + /** + * optional string groupingId = 18; + */ + com.google.protobuf.ByteString + getGroupingIdBytes(); } /** * Protobuf type {@code Scan} @@ -13990,6 +14005,11 @@ public final class ClientProtos { caching_ = input.readUInt32(); break; } + case 146: { + bitField0_ |= 0x00008000; + groupingId_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -14368,6 +14388,49 @@ public final class ClientProtos { return caching_; } + // optional string groupingId = 18; + public static final int GROUPINGID_FIELD_NUMBER = 18; + private java.lang.Object groupingId_; + /** + * optional string groupingId = 18; + */ + public boolean hasGroupingId() { + return ((bitField0_ & 0x00008000) == 0x00008000); + } + /** + * optional string groupingId = 18; + */ + public java.lang.String getGroupingId() { + java.lang.Object ref = groupingId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + groupingId_ = s; + } + return s; + } + } + /** + * optional string groupingId = 18; + */ + public com.google.protobuf.ByteString + getGroupingIdBytes() { + java.lang.Object ref = groupingId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + groupingId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { column_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList(); @@ -14386,6 +14449,7 @@ public final class ClientProtos { reversed_ = false; consistency_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Consistency.STRONG; caching_ = 0; + groupingId_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -14468,6 +14532,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00004000) == 0x00004000)) { output.writeUInt32(17, caching_); } + if (((bitField0_ & 0x00008000) == 0x00008000)) { + output.writeBytes(18, getGroupingIdBytes()); + } getUnknownFields().writeTo(output); } @@ -14545,6 +14612,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(17, caching_); } + if (((bitField0_ & 0x00008000) == 0x00008000)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(18, getGroupingIdBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -14647,6 +14718,11 @@ public final class ClientProtos { result = result && (getCaching() == other.getCaching()); } + result = result && (hasGroupingId() == other.hasGroupingId()); + if (hasGroupingId()) { + result = result && getGroupingId() + .equals(other.getGroupingId()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -14728,6 +14804,10 @@ public final class ClientProtos { hash = (37 * hash) + CACHING_FIELD_NUMBER; hash = (53 * hash) + getCaching(); } + if (hasGroupingId()) { + hash = (37 * hash) + GROUPINGID_FIELD_NUMBER; + hash = (53 * hash) + getGroupingId().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -14902,6 +14982,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00008000); caching_ = 0; bitField0_ = (bitField0_ & ~0x00010000); + groupingId_ = ""; + bitField0_ = (bitField0_ & ~0x00020000); return this; } @@ -15016,6 +15098,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00004000; } result.caching_ = caching_; + if (((from_bitField0_ & 0x00020000) == 0x00020000)) { + to_bitField0_ |= 0x00008000; + } + result.groupingId_ = groupingId_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -15129,6 +15215,11 @@ public final class ClientProtos { if (other.hasCaching()) { setCaching(other.getCaching()); } + if (other.hasGroupingId()) { + bitField0_ |= 0x00020000; + groupingId_ = other.groupingId_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -16342,6 +16433,80 @@ public final class ClientProtos { return this; } + // optional string groupingId = 18; + private java.lang.Object groupingId_ = ""; + /** + * optional string groupingId = 18; + */ + public boolean hasGroupingId() { + return ((bitField0_ & 0x00020000) == 0x00020000); + } + /** + * optional string groupingId = 18; + */ + public java.lang.String getGroupingId() { + java.lang.Object ref = groupingId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + groupingId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string groupingId = 18; + */ + public com.google.protobuf.ByteString + getGroupingIdBytes() { + java.lang.Object ref = groupingId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + groupingId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string groupingId = 18; + */ + public Builder setGroupingId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00020000; + groupingId_ = value; + onChanged(); + return this; + } + /** + * optional string groupingId = 18; + */ + public Builder clearGroupingId() { + bitField0_ = (bitField0_ & ~0x00020000); + groupingId_ = getDefaultInstance().getGroupingId(); + onChanged(); + return this; + } + /** + * optional string groupingId = 18; + */ + public Builder setGroupingIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00020000; + groupingId_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:Scan) } @@ -32540,7 +32705,7 @@ public final class ClientProtos { "pecifier\022 \n\010mutation\030\002 \002(\0132\016.MutationPro" + "to\022\035\n\tcondition\030\003 \001(\0132\n.Condition\022\023\n\013non" + "ce_group\030\004 \001(\004\"<\n\016MutateResponse\022\027\n\006resu" + - "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\271\003\n" + + "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\315\003\n" + "\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattrib" + "ute\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003", " \001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007" + @@ -32552,62 +32717,62 @@ public final class ClientProtos { "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" + "eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" + "\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" + - "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", - "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" + - "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" + - "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" + - "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\251\001\n\014S" + - "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" + - "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" + - "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" + - "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" + - "\030\007 \003(\010\"\263\001\n\024BulkLoadHFileRequest\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\0225\n\013family_path", - "\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyPath" + - "\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016" + - "\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoad" + - "HFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026Coproce" + - "ssorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_" + - "name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reque" + - "st\030\004 \002(\014\"9\n\030CoprocessorServiceResult\022\035\n\005" + - "value\030\001 \001(\0132\016.NameBytesPair\"d\n\031Coprocess" + - "orServiceRequest\022 \n\006region\030\001 \002(\0132\020.Regio" + - "nSpecifier\022%\n\004call\030\002 \002(\0132\027.CoprocessorSe", - "rviceCall\"]\n\032CoprocessorServiceResponse\022" + - " \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005val" + - "ue\030\002 \002(\0132\016.NameBytesPair\"{\n\006Action\022\r\n\005in" + - "dex\030\001 \001(\r\022 \n\010mutation\030\002 \001(\0132\016.MutationPr" + - "oto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004" + - " \001(\0132\027.CoprocessorServiceCall\"Y\n\014RegionA" + - "ction\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + - "\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action" + - "\"D\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(" + - "\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021Resul", - "tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " + - "\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" + - "tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" + - "essorServiceResult\022#\n\tloadStats\030\005 \001(\0132\020." + - "RegionLoadStats\"f\n\022RegionActionResult\022-\n" + - "\021resultOrException\030\001 \003(\0132\022.ResultOrExcep" + - "tion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" + - "f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\0132\r." + - "RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcond" + - "ition\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse", - "\022/\n\022regionActionResult\030\001 \003(\0132\023.RegionAct" + - "ionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consiste" + - "ncy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClient" + - "Service\022 \n\003Get\022\013.GetRequest\032\014.GetRespons" + - "e\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateResp" + - "onse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRespons" + - "e\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReques" + - "t\032\026.BulkLoadHFileResponse\022F\n\013ExecService" + - "\022\032.CoprocessorServiceRequest\032\033.Coprocess" + - "orServiceResponse\022R\n\027ExecRegionServerSer", - "vice\022\032.CoprocessorServiceRequest\032\033.Copro" + - "cessorServiceResponse\022&\n\005Multi\022\r.MultiRe" + - "quest\032\016.MultiResponseBB\n*org.apache.hado" + - "op.hbase.protobuf.generatedB\014ClientProto" + - "sH\001\210\001\001\240\001\001" + "\022\022\n\ngroupingId\030\022 \001(\t\"\277\001\n\013ScanRequest\022 \n\006", + "region\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002" + + " \001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016numbe" + + "r_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025" + + "\n\rnext_call_seq\030\006 \001(\004\022\037\n\027client_handles_" + + "partials\030\007 \001(\010\"\251\001\n\014ScanResponse\022\030\n\020cells" + + "_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n" + + "\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022\030\n\007resu" + + "lts\030\005 \003(\0132\007.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027par" + + "tial_flag_per_result\030\007 \003(\010\"\263\001\n\024BulkLoadH" + + "FileRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpec", + "ifier\0225\n\013family_path\030\002 \003(\0132 .BulkLoadHFi" + + "leRequest.FamilyPath\022\026\n\016assign_seq_num\030\003" + + " \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pa" + + "th\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loa" + + "ded\030\001 \002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003r" + + "ow\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method" + + "_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030Coproces" + + "sorServiceResult\022\035\n\005value\030\001 \001(\0132\016.NameBy" + + "tesPair\"d\n\031CoprocessorServiceRequest\022 \n\006" + + "region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002", + " \002(\0132\027.CoprocessorServiceCall\"]\n\032Coproce" + + "ssorServiceResponse\022 \n\006region\030\001 \002(\0132\020.Re" + + "gionSpecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytes" + + "Pair\"{\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutatio" + + "n\030\002 \001(\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.G" + + "et\022-\n\014service_call\030\004 \001(\0132\027.CoprocessorSe" + + "rviceCall\"Y\n\014RegionAction\022 \n\006region\030\001 \002(" + + "\0132\020.RegionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006a" + + "ction\030\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022" + + "\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupanc", + "y\030\002 \001(\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005inde" + + "x\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texce" + + "ption\030\003 \001(\0132\016.NameBytesPair\0221\n\016service_r" + + "esult\030\004 \001(\0132\031.CoprocessorServiceResult\022#" + + "\n\tloadStats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022R" + + "egionActionResult\022-\n\021resultOrException\030\001" + + " \003(\0132\022.ResultOrException\022!\n\texception\030\002 " + + "\001(\0132\016.NameBytesPair\"f\n\014MultiRequest\022#\n\014r" + + "egionAction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonc" + + "eGroup\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condit", + "ion\"S\n\rMultiResponse\022/\n\022regionActionResu" + + "lt\030\001 \003(\0132\023.RegionActionResult\022\021\n\tprocess" + + "ed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010T" + + "IMELINE\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.Get" + + "Request\032\014.GetResponse\022)\n\006Mutate\022\016.Mutate" + + "Request\032\017.MutateResponse\022#\n\004Scan\022\014.ScanR" + + "equest\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025" + + ".BulkLoadHFileRequest\032\026.BulkLoadHFileRes" + + "ponse\022F\n\013ExecService\022\032.CoprocessorServic" + + "eRequest\032\033.CoprocessorServiceResponse\022R\n", + "\027ExecRegionServerService\022\032.CoprocessorSe" + + "rviceRequest\032\033.CoprocessorServiceRespons" + + "e\022&\n\005Multi\022\r.MultiRequest\032\016.MultiRespons" + + "eBB\n*org.apache.hadoop.hbase.protobuf.ge" + + "neratedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -32697,7 +32862,7 @@ public final class ClientProtos { internal_static_Scan_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", }); + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "GroupingId", }); internal_static_ScanRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_ScanRequest_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 5142e53..9d0cee2 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -254,6 +254,7 @@ message Scan { optional bool reversed = 15 [default = false]; optional Consistency consistency = 16 [default = STRONG]; optional uint32 caching = 17; + optional string groupingId = 18; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java new file mode 100644 index 0000000..4ff48f2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.BlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * An {@link RpcExecutor} that will balance requests in a round robin way across all its queues based on + * a Grouping Id set on the 'scan' request + */ +@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) +@InterfaceStability.Evolving +public class FairShareBalancedRPCExecutor extends BalancedQueueRpcExecutor { + + public FairShareBalancedRPCExecutor(final String name, final int handlerCount, + final int numQueues, final int maxQueueLength) { + this(name, handlerCount, numQueues, maxQueueLength, null, null); + } + + public FairShareBalancedRPCExecutor(final String name, final int handlerCount, + final int numQueues, final int maxQueueLength, final Configuration conf, + final Abortable abortable) { + this(name, handlerCount, numQueues, conf, abortable, FairShareBlockingQueue.class, + maxQueueLength); + } + + public FairShareBalancedRPCExecutor(final String name, final int handlerCount, + final int numQueues, final Class queueClass, Object... initargs) { + this(name, handlerCount, numQueues, null, null, queueClass, initargs); + } + + public FairShareBalancedRPCExecutor(final String name, final int handlerCount, + final int numQueues, final Configuration conf, final Abortable abortable, + final Class queueClass, Object... initargs) { + super(name, handlerCount, numQueues, conf, abortable, queueClass, initargs); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java new file mode 100644 index 0000000..1c54390 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.util.AbstractPriorityBasedRoundRobinQueue; +import org.apache.hadoop.hbase.util.AbstractRoundRobinQueue; + +/** + * An implementation of {@link AbstractPriorityBasedRoundRobinQueue} that tries to iterate through + * each producer queue in a round robin fashion based on the Grouping Id set in the + * {@link Scan} request + */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class FairShareBlockingQueue extends AbstractRoundRobinQueue { + + public FairShareBlockingQueue(int maxSize) { + super(false, maxSize); + } + public FairShareBlockingQueue(boolean newProducerToFront, int maxSize) { + super(newProducerToFront, maxSize); + } + + private static final String NO_GROUPING_ID = "_NO_GROUPING_ID"; + @Override + protected Object extractProducer(CallRunner o) { + if (o.getCall() != null) { + RequestHeader header = o.getCall().getHeader(); + if (header != null) { + String methodName = header.getMethodName(); + if (methodName.equalsIgnoreCase("scan")) { + ScanRequest request = (ScanRequest) o.getCall().param; + if (request.getScan().hasGroupingId()) { + return request.getScan().getGroupingId(); + } + } + } + } + return NO_GROUPING_ID; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java new file mode 100644 index 0000000..acceef5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Comparator; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.util.AbstractPriorityBasedRoundRobinQueue; + +/** + * An implementation of {@link AbstractPriorityBasedRoundRobinQueue} that tries to iterate through + * each producer queue in a round robin fashion based on the Grouping Id set in the + * {@link Scan} request + */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class FairSharePriorityBasedBlockingQueue extends + AbstractPriorityBasedRoundRobinQueue { + + public FairSharePriorityBasedBlockingQueue(int maxSize) { + super(false, maxSize, null); + } + public FairSharePriorityBasedBlockingQueue(boolean newProducerToFront, int maxSize) { + super(newProducerToFront, maxSize, null); + } + + public FairSharePriorityBasedBlockingQueue(int maxSize, + Comparator comparator) { + super(maxSize, comparator); + } + private static final String NO_GROUPING_ID = "_NO_GROUPING_ID"; + @Override + protected Object extractProducer(CallRunner o) { + if (o.getCall() != null) { + RequestHeader header = o.getCall().getHeader(); + if (header != null) { + String methodName = header.getMethodName(); + if (methodName.equalsIgnoreCase("scan")) { + ScanRequest request = (ScanRequest) o.getCall().param; + if (request.getScan().hasGroupingId()) { + return request.getScan().getGroupingId(); + } + } + } + } + return NO_GROUPING_ID; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java new file mode 100644 index 0000000..3769944 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java @@ -0,0 +1,56 @@ +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A {@link RWQueueRpcExecutor} extension that creates a queue based on + * {@link FairSharePriorityBasedBlockingQueue} + */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class FairShareRWQueueRPCExecutor extends RWQueueRpcExecutor { + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable) { + this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, + conf, abortable, FairShareBlockingQueue.class); + } + + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final float scanShare, final int maxQueueLength) { + this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null); + } + + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final float scanShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable) { + this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, + conf, abortable, FairShareBlockingQueue.class); + } + + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable, + final Class readQueueClass, Object... readQueueInitArgs) { + this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable, + readQueueClass, readQueueInitArgs); + } + + public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final float scanShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable, + final Class readQueueClass, Object... readQueueInitArgs) { + super(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare), + calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare, + LinkedBlockingQueue.class, new Object[] {maxQueueLength}, + readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs)); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 2b58680..0ec59ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -50,16 +50,16 @@ import com.google.protobuf.Message; public class RWQueueRpcExecutor extends RpcExecutor { private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class); - private final List> queues; - private final QueueBalancer writeBalancer; - private final QueueBalancer readBalancer; - private final QueueBalancer scanBalancer; - private final int writeHandlersCount; - private final int readHandlersCount; - private final int scanHandlersCount; - private final int numWriteQueues; - private final int numReadQueues; - private final int numScanQueues; + protected final List> queues; + protected final QueueBalancer writeBalancer; + protected final QueueBalancer readBalancer; + protected final QueueBalancer scanBalancer; + protected final int writeHandlersCount; + protected final int readHandlersCount; + protected final int scanHandlersCount; + protected final int numWriteQueues; + protected final int numReadQueues; + protected final int numScanQueues; public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final int maxQueueLength, @@ -218,7 +218,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { * Calculate the number of writers based on the "total count" and the read share. * You'll get at least one writer. */ - private static int calcNumWriters(final int count, final float readShare) { + protected static int calcNumWriters(final int count, final float readShare) { return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare))); } @@ -226,7 +226,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { * Calculate the number of readers based on the "total count" and the read share. * You'll get at least one reader. */ - private static int calcNumReaders(final int count, final float readShare) { + protected static int calcNumReaders(final int count, final float readShare) { return count - calcNumWriters(count, readShare); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index d8ae3ba..f5af46b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -50,6 +50,8 @@ public class SimpleRpcScheduler extends RpcScheduler { public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; + public static final String CALL_QUEUE_GROUPING = "hbase.ipc.server.callqueue.grouping"; + public static final boolean CALL_QUEUE_GROUPING_DEFAULT_VALUE = false; /** max delay in msec used to bound the deprioritized requests */ public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY @@ -127,26 +129,51 @@ public class SimpleRpcScheduler extends RpcScheduler { LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues); + boolean callQueueGrouping = conf.getBoolean(CALL_QUEUE_GROUPING, CALL_QUEUE_GROUPING_DEFAULT_VALUE); if (numCallQueues > 1 && callqReadShare > 0) { // multiple read/write queues if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, - callqReadShare, callqScanShare, maxQueueLength, conf, abortable, - BoundedPriorityBlockingQueue.class, callPriority); + if (callQueueGrouping) { + callExecutor = new FairShareRWQueueRPCExecutor("RW.default", handlerCount, numCallQueues, + callqReadShare, callqScanShare, maxQueueLength, conf, abortable, + FairSharePriorityBasedBlockingQueue.class, callPriority); + } else { + callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, + callqReadShare, callqScanShare, maxQueueLength, conf, abortable, + BoundedPriorityBlockingQueue.class, callPriority); + } } else { - callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, - callqReadShare, callqScanShare, maxQueueLength, conf, abortable); + // TODO : Introduce configuration that strictly allows the Balanced way of Write QueueExecutor + // Not adding the fair queue RPC executor for the write mode + if (callQueueGrouping) { + callExecutor = new FairShareRWQueueRPCExecutor("RW.default", handlerCount, numCallQueues, + callqReadShare, callqScanShare, maxQueueLength, conf, abortable); + } else { + callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, + callqReadShare, callqScanShare, maxQueueLength, conf, abortable); + } } } else { // multiple queues if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, - conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + if (callQueueGrouping) { + callExecutor = new FairShareBalancedRPCExecutor("B.default", handlerCount, numCallQueues, + conf, abortable, FairSharePriorityBasedBlockingQueue.class, maxQueueLength, callPriority); + } else { + callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, + conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + } } else { - callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, - numCallQueues, maxQueueLength, conf, abortable); + // TODO : Introduce configuration that strictly allows the balanced way of write queue executor + if (callQueueGrouping) { + callExecutor = new FairShareBalancedRPCExecutor("B.default", handlerCount, numCallQueues, + maxQueueLength, conf, abortable); + } else { + callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, + maxQueueLength, conf, abortable); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java new file mode 100644 index 0000000..ad5108e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A bounded queue implementation that is sorted based on the element E and allows the producers + * to be grouped based on the producer key and round-robins among the different producers with in + * the same priority + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public abstract class AbstractPriorityBasedRoundRobinQueue extends AbstractQueue implements + BlockingQueue { + public AbstractPriorityBasedRoundRobinQueue(int maxSize) { + this(false, maxSize, null); + } + + /** + * @param newProducerToFront + * If true, new producers go to the front of the round-robin list, if + * false, they go to the end. + */ + public AbstractPriorityBasedRoundRobinQueue(boolean newProducerToFront, int maxSize, + Comparator comparator) { + this.producerMap = new TreeMap>>(comparator); + this.maxSize = maxSize; + } + + public AbstractPriorityBasedRoundRobinQueue(int maxSize, Comparator comparator) { + this(true, maxSize, comparator); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (remainingCapacity() == 0) { + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + boolean ret = offer(o); + notEmpty.signal(); + return ret; + } finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E o) { + if (o == null) + throw new NullPointerException(); + + final Object producerKey = extractProducer(o); + + ProducerList producerList = null; + lock.lock(); + try { + if (remainingCapacity() > 0) { + HashMap> groupMap = this.producerMap.get(o); + if(groupMap != null) { + producerList = groupMap.get(producerKey); + if(producerList == null) { + producerList = new ProducerList(); + groupMap.put(producerKey, producerList); + } + } else { + LinkedHashMap> map = new LinkedHashMap>(); + producerList = new ProducerList(); + map.put(producerKey, producerList); + this.producerMap.put(o, map); + } + producerList.list.add(o); + this.size++; + notEmpty.signal(); + return true; + } + } finally { + lock.unlock(); + } + return false; + } + + /** + * Implementations must extracts the producer object which is used as the key + * to identify a unique producer. + */ + protected abstract Object extractProducer(E o); + + @Override + public void put(E o) throws InterruptedException { + lock.lock(); + try { + while (remainingCapacity() == 0) { + notFull.await(); + } + offer(o); + } finally { + lock.unlock(); + } + } + + @Override + public E take() throws InterruptedException { + lock.lock(); + try { + while (size() == 0) { + notEmpty.await(); + } + E element = poll(); + assert element != null; + notFull.signal(); + return element; + } finally { + lock.unlock(); + } + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (size() == 0 && nanos > 0) { + nanos = notEmpty.awaitNanos(nanos); + } + E result = poll(); + notFull.signal(); + return result; + } finally { + lock.unlock(); + } + } + + @Override + public E poll() { + lock.lock(); + try { + if (size() > 0) { + E element = null; + // Cannot use poll here because under the same priority we could have + // more elements + Entry>> firstEntry = this.producerMap.firstEntry(); + if (firstEntry != null) { + LinkedHashMap> value = firstEntry.getValue(); + if (value.size() == 0) { + this.producerMap.remove(firstEntry.getKey()); + assert this.size == 0; + return null; + } else { + Iterator>> iterator = value.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> next = iterator.next(); + ProducerList tList = next.getValue(); + element = tList.list.poll(); + if (tList.list.size() == 0) { + iterator.remove(); + } + this.size--; + break; + } + if (value.size() == 0) { + this.producerMap.remove(firstEntry.getKey()); + } + notFull.signal(); + return element; + } + } + } + } finally { + lock.unlock(); + } + assert this.size == 0; + return null; + } + + @Override + public E peek() { + lock.lock(); + try { + E element = null; + Entry>> firstEntry = this.producerMap.firstEntry(); + if(firstEntry != null) { + LinkedHashMap> value = firstEntry.getValue(); + if(value.size() == 0) { + this.producerMap.remove(firstEntry.getKey()); + assert this.size == 0; + } else { + Iterator>> iterator = value.entrySet().iterator(); + while(iterator.hasNext()) { + Entry> next = iterator.next(); + element = next.getValue().list.getFirst(); + } + } + } else { + assert this.size == 0; + } + return element; + } finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + + lock.lock(); + try { + int originalSize = this.size; + int drained = drainTo(c, this.size); + assert drained == originalSize; + assert this.size == 0; + assert this.producerMap.isEmpty(); + return drained; + } finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + + lock.lock(); + try { + int i = 0; + while (i < maxElements) { + E element = poll(); + if (element != null) { + c.add(element); + i++; + } else { + break; + } + } + return i; + } finally { + lock.unlock(); + } + } + + @Override + public int remainingCapacity() { + lock.lock(); + try { + return (maxSize - size); + } finally { + lock.unlock(); + } + } + + @Override + public int size() { + lock.lock(); + try { + return this.size; + } finally { + lock.unlock(); + } + } + + private static class ProducerList { + public ProducerList() { + // A priority queue won't be needed here because we only need to have a list for + // the producers with the same priority + this.list = new LinkedList(); + } + + private final LinkedList list; + } + + private final TreeMap>> producerMap; + // Lock used for all operations + private final ReentrantLock lock = new ReentrantLock(); + private int size; + private int maxSize; + + // Condition for blocking when empty + private final Condition notEmpty = lock.newCondition(); + + // Wait queue for waiting puts + private final Condition notFull = lock.newCondition(); +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractRoundRobinQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractRoundRobinQueue.java new file mode 100644 index 0000000..a455c72 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractRoundRobinQueue.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.ListIterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * An unbounded blocking queue implementation that keeps a virtual queue of + * elements on per-producer basis and iterates through each producer queue in + * round robin fashion. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public abstract class AbstractRoundRobinQueue extends AbstractQueue implements + BlockingQueue { + + public AbstractRoundRobinQueue(int maxSize) { + this(false, maxSize); + } + + /** + * @param newProducerToFront + * If true, new producers go to the front of the round-robin list, if + * false, they go to the end. + */ + public AbstractRoundRobinQueue(boolean newProducerToFront, int maxSize) { + this.producerMap = new HashMap>(); + this.producerLists = new LinkedList>(); + this.newProducerToFront = newProducerToFront; + this.maxSize = maxSize; + } + + @Override + public Iterator iterator() { + lock.lock(); + try { + ArrayList allElements = new ArrayList(this.size); + ListIterator> iter = this.producerLists.listIterator(this.currentProducer); + while (iter.hasNext()) { + ProducerList tList = iter.next(); + allElements.addAll(tList.list); + } + return allElements.iterator(); + } finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (remainingCapacity() == 0) { + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + boolean ret = offer(o); + notEmpty.signal(); + return ret; + } finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E o) { + if (o == null) + throw new NullPointerException(); + + final Object producerKey = extractProducer(o); + + ProducerList producerList = null; + lock.lock(); + try { + if (remainingCapacity() > 0) { + producerList = this.producerMap.get(producerKey); + if (producerList == null) { + producerList = new ProducerList(producerKey); + this.producerMap.put(producerKey, producerList); + this.producerLists.add(this.currentProducer, producerList); + if (!this.newProducerToFront) { + incrementCurrentProducerPointer(); + } + } + producerList.list.add(o); + this.size++; + notEmpty.signal(); + return true; + } + } finally { + lock.unlock(); + } + return false; + } + + /** + * Implementations must extracts the producer object which is used as the key + * to identify a unique producer. + */ + protected abstract Object extractProducer(E o); + + @Override + public void put(E o) throws InterruptedException { + lock.lock(); + try { + while (remainingCapacity() == 0) { + notFull.await(); + } + offer(o); + } finally { + lock.unlock(); + } + } + + @Override + public E take() throws InterruptedException { + lock.lock(); + try { + while (size() == 0) { + notEmpty.await(); + } + E element = poll(); + assert element != null; + notFull.signal(); + return element; + } finally { + lock.unlock(); + } + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (size() == 0 && nanos > 0) { + nanos = notEmpty.awaitNanos(nanos); + } + E result = poll(); + notFull.signal(); + return result; + } finally { + lock.unlock(); + } + } + + @Override + public E poll() { + lock.lock(); + try { + if (size() > 0) { + ListIterator> iter = this.producerLists.listIterator(this.currentProducer); + while (iter.hasNext()) { + ProducerList tList = iter.next(); + if (tList.list.isEmpty()) { + iter.remove(); + this.producerMap.remove(tList.producer); + adjustCurrentProducerPointer(); + } else { + E element = tList.list.removeFirst(); + this.size--; + assert element != null; + // This is the round robin part. When we take an element from the current thread's queue + // we move on to the next thread. + if (tList.list.isEmpty()) { + iter.remove(); + this.producerMap.remove(tList.producer); + adjustCurrentProducerPointer(); + } else { + incrementCurrentProducerPointer(); + } + notFull.signal(); + return element; + } + } + assert this.size == 0; + } + } finally { + lock.unlock(); + } + return null; + } + + /** + * Polls using the given producer key. + */ + protected E pollProducer(Object producer) { + lock.lock(); + try { + ProducerList tList = this.producerMap.get(producer); + if (tList != null && !tList.list.isEmpty()) { + E element = tList.list.removeFirst(); + this.size--; + if (tList.list.isEmpty()) { + this.producerLists.remove(tList); + this.producerMap.remove(tList.producer); + // we need to adjust the current thread pointer in case it pointed to this thread list, which is now removed + adjustCurrentProducerPointer(); + } + assert element != null; + // Since this is only processing the current thread's work, we'll + // leave + // the + // round-robin part alone and just return the work + return element; + } + } finally { + lock.unlock(); + } + return null; + } + + @Override + public E peek() { + lock.lock(); + try { + ListIterator> iter = this.producerLists.listIterator(this.currentProducer); + while (iter.hasNext()) { + ProducerList tList = iter.next(); + if (tList.list.isEmpty()) { + iter.remove(); + this.producerMap.remove(tList.producer); + adjustCurrentProducerPointer(); + } else { + E element = tList.list.getFirst(); + assert element != null; + return element; + } + } + assert this.size == 0; + } finally { + lock.unlock(); + } + return null; + } + + @Override + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + + lock.lock(); + try { + int originalSize = this.size; + int drained = drainTo(c, this.size); + assert drained == originalSize; + assert this.size == 0; + assert this.producerLists.isEmpty(); + assert this.producerMap.isEmpty(); + return drained; + } finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + + lock.lock(); + try { + int i = 0; + while (i < maxElements) { + E element = poll(); + if (element != null) { + c.add(element); + i++; + } else { + break; + } + } + return i; + } finally { + lock.unlock(); + } + } + + @Override + public int remainingCapacity() { + // TODO : Add implementation here. + lock.lock(); + try { + return (maxSize - size); + } finally { + lock.unlock(); + } + } + + @Override + public int size() { + lock.lock(); + try { + return this.size; + } finally { + lock.unlock(); + } + } + + private void incrementCurrentProducerPointer() { + lock.lock(); + try { + if (this.producerLists.size() == 0) { + this.currentProducer = 0; + } else { + this.currentProducer = (this.currentProducer + 1) % this.producerLists.size(); + } + } finally { + lock.unlock(); + } + } + + /** + * Adjusts the current pointer to a decrease in size. + */ + private void adjustCurrentProducerPointer() { + lock.lock(); + try { + if (this.producerLists.size() == 0) { + this.currentProducer = 0; + } else { + this.currentProducer = (this.currentProducer) % this.producerLists.size(); + } + } finally { + lock.unlock(); + } + } + + private static class ProducerList { + public ProducerList(Object producer) { + this.producer = producer; + this.list = new LinkedList(); + } + + private final Object producer; + private final LinkedList list; + } + + private final HashMap> producerMap; + private final LinkedList> producerLists; + // Lock used for all operations + private final ReentrantLock lock = new ReentrantLock(); + private final boolean newProducerToFront; + private int currentProducer; + private int size; + private int maxSize; + + // Condition for blocking when empty + private final Condition notEmpty = lock.newCondition(); + + // Wait queue for waiting puts + private final Condition notFull = lock.newCondition(); +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 11ac43f..bc994bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -17,22 +17,37 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.google.protobuf.Message; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.ipc.RpcServer.Call; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; import org.junit.Test; @@ -40,23 +55,11 @@ import org.junit.experimental.categories.Category; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.protobuf.Message; @Category({RPCTests.class, SmallTests.class}) public class TestSimpleRpcScheduler { @@ -148,13 +151,16 @@ public class TestSimpleRpcScheduler { @Test public void testRpcScheduler() throws Exception { - testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); - testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, false); + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE, false); + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, true); + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE, true); } - private void testRpcScheduler(final String queueType) throws Exception { + private void testRpcScheduler(final String queueType, boolean grouping) throws Exception { Configuration schedConf = HBaseConfiguration.create(); schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType); + schedConf.setBoolean(SimpleRpcScheduler.CALL_QUEUE_GROUPING, grouping); PriorityFunction priority = mock(PriorityFunction.class); when(priority.getPriority(any(RequestHeader.class), any(Message.class))) @@ -171,6 +177,12 @@ public class TestSimpleRpcScheduler { when(smallCallTask.getCall()).thenReturn(smallCall); when(smallCall.getHeader()).thenReturn(smallHead); + CallRunner smallCallTask1 = mock(CallRunner.class); + RpcServer.Call smallCall1 = mock(RpcServer.Call.class); + RequestHeader smallHead1 = RequestHeader.newBuilder().setCallId(25).build(); + when(smallCallTask1.getCall()).thenReturn(smallCall1); + when(smallCall1.getHeader()).thenReturn(smallHead1); + CallRunner largeCallTask = mock(CallRunner.class); RpcServer.Call largeCall = mock(RpcServer.Call.class); RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); @@ -184,24 +196,54 @@ public class TestSimpleRpcScheduler { when(hugeCall.getHeader()).thenReturn(hugeHead); when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L); + when(priority.getDeadline(eq(smallHead1), any(Message.class))).thenReturn(0L); when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L); when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L); + Scan scan = new Scan(); + scan.setGroupingId("ABC"); + ClientProtos.Scan clientScan = ClientProtos.Scan.newBuilder().setGroupingId("ABC").build(); + smallCall.param = ScanRequest.newBuilder().setScannerId(1).setScan(clientScan).build(); + RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); + when(smallCallTask.getCall()).thenReturn(smallCall); + when(smallCall.getHeader()).thenReturn(scanHead); + + // Same grouping Id - but different priority + scan = new Scan(); + scan.setGroupingId("123"); + clientScan = ClientProtos.Scan.newBuilder().setGroupingId("123").build(); + smallCall1.param = ScanRequest.newBuilder().setScannerId(2).setScan(clientScan).build(); + RequestHeader scanHead1 = RequestHeader.newBuilder().setMethodName("scan").build(); + when(smallCallTask1.getCall()).thenReturn(smallCall1); + when(smallCall1.getHeader()).thenReturn(scanHead1); + + scan = new Scan(); + scan.setGroupingId("XYZ"); + ClientProtos.Scan clientScan1 = ClientProtos.Scan.newBuilder().setGroupingId("XYZ").build(); + largeCall.param = ScanRequest.newBuilder().setScannerId(1).setScan(clientScan1).build(); + RequestHeader scanHead2 = RequestHeader.newBuilder().setMethodName("scan").build(); + when(largeCallTask.getCall()).thenReturn(largeCall); + when(largeCall.getHeader()).thenReturn(scanHead2); + final ArrayList work = new ArrayList(); doAnswerTaskExecution(smallCallTask, work, 10, 250); doAnswerTaskExecution(largeCallTask, work, 50, 250); + doAnswerTaskExecution(smallCallTask1, work, 25, 250); doAnswerTaskExecution(hugeCallTask, work, 100, 250); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(hugeCallTask); + scheduler.dispatch(smallCallTask1); scheduler.dispatch(smallCallTask); scheduler.dispatch(largeCallTask); scheduler.dispatch(smallCallTask); + scheduler.dispatch(largeCallTask); + scheduler.dispatch(smallCallTask1); scheduler.dispatch(smallCallTask); - while (work.size() < 8) { + while (work.size() < 11) { Threads.sleepWithoutInterrupt(100); } @@ -214,13 +256,18 @@ public class TestSimpleRpcScheduler { } LOG.debug("Total Time: " + totalTime); - // -> [small small small huge small large small small] - // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) - // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { - assertEquals(530, totalTime); + if(grouping) { + assertEquals(1085, totalTime); + } else { + assertEquals(1315, totalTime); + } } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ { - assertEquals(930, totalTime); + if(grouping) { + assertEquals(2105, totalTime); + } else { + assertEquals(1835, totalTime); + } } } finally { scheduler.stop(); @@ -233,6 +280,7 @@ public class TestSimpleRpcScheduler { schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); + schedConf.setBoolean(SimpleRpcScheduler.CALL_QUEUE_GROUPING, true); PriorityFunction priority = mock(PriorityFunction.class); when(priority.getPriority(any(RequestHeader.class), any(Message.class))) @@ -257,15 +305,30 @@ public class TestSimpleRpcScheduler { CallRunner scanCallTask = mock(CallRunner.class); RpcServer.Call scanCall = mock(RpcServer.Call.class); - scanCall.param = ScanRequest.newBuilder().setScannerId(1).build(); + + Scan scan = new Scan(); + scan.setGroupingId("ABC"); + ClientProtos.Scan clientScan = ClientProtos.Scan.newBuilder().setGroupingId("ABC").build(); + scanCall.param = ScanRequest.newBuilder().setScannerId(1).setScan(clientScan).build(); RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); when(scanCallTask.getCall()).thenReturn(scanCall); when(scanCall.getHeader()).thenReturn(scanHead); + CallRunner scanCallTask1 = mock(CallRunner.class); + RpcServer.Call scanCall1 = mock(RpcServer.Call.class); + scan = new Scan(); + scan.setGroupingId("XYZ"); + ClientProtos.Scan clientScan1 = ClientProtos.Scan.newBuilder().setGroupingId("XYZ").build(); + scanCall1.param = ScanRequest.newBuilder().setScannerId(2).setScan(clientScan1).build(); + RequestHeader scanHead1 = RequestHeader.newBuilder().setMethodName("scan").build(); + when(scanCallTask1.getCall()).thenReturn(scanCall1); + when(scanCall1.getHeader()).thenReturn(scanHead1); + ArrayList work = new ArrayList(); doAnswerTaskExecution(putCallTask, work, 1, 1000); doAnswerTaskExecution(getCallTask, work, 2, 1000); doAnswerTaskExecution(scanCallTask, work, 3, 1000); + doAnswerTaskExecution(scanCallTask1, work, 4, 1000); // There are 3 queues: [puts], [gets], [scans] // so the calls will be interleaved @@ -276,8 +339,11 @@ public class TestSimpleRpcScheduler { scheduler.dispatch(getCallTask); scheduler.dispatch(getCallTask); scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask1); scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask1); scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask1); while (work.size() < 6) { Threads.sleepWithoutInterrupt(100);