.../java/org/apache/hadoop/hbase/client/Scan.java | 11 +
.../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 8 +-
.../hbase/protobuf/generated/ClientProtos.java | 281 ++++++++++++---
hbase-protocol/src/main/protobuf/Client.proto | 1 +
.../hbase/ipc/FairShareBalancedRPCExecutor.java | 58 +++
.../hadoop/hbase/ipc/FairShareBlockingQueue.java | 61 ++++
.../ipc/FairSharePriorityBasedBlockingQueue.java | 67 ++++
.../hbase/ipc/FairShareRWQueueRPCExecutor.java | 56 +++
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 24 +-
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 45 ++-
.../util/AbstractPriorityBasedRoundRobinQueue.java | 329 ++++++++++++++++++
.../hadoop/hbase/util/AbstractRoundRobinQueue.java | 387 +++++++++++++++++++++
.../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 136 ++++++--
13 files changed, 1349 insertions(+), 115 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 3b6194f..3467d29 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -114,6 +114,9 @@ public class Scan extends Query {
private int storeLimit = -1;
private int storeOffset = 0;
private boolean getScan;
+ // Set this groupingID so that in the RPCExecutor layer we could do a fair share (round-robin)
+ // among the parallel scans
+ private String groupingId;
/**
* @deprecated since 1.0.0. Use {@link #setScanMetricsEnabled(boolean)}
@@ -971,4 +974,12 @@ public class Scan extends Query {
if (bytes == null) return null;
return ProtobufUtil.toScanMetrics(bytes);
}
+
+ public void setGroupingId(String groupingId) {
+ this.groupingId = groupingId;
+ }
+
+ public String getGroupingId() {
+ return this.groupingId;
+ }
}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index bad3cb4..044dea3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -120,12 +120,12 @@ import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.quotas.QuotaScope;
import org.apache.hadoop.hbase.quotas.QuotaType;
@@ -929,6 +929,9 @@ public final class ProtobufUtil {
if (scan.getCaching() > 0) {
scanBuilder.setCaching(scan.getCaching());
}
+ if (scan.getGroupingId() != null) {
+ scanBuilder.setGroupingId(scan.getGroupingId());
+ }
return scanBuilder.build();
}
@@ -1014,6 +1017,9 @@ public final class ProtobufUtil {
if (proto.hasCaching()) {
scan.setCaching(proto.getCaching());
}
+ if (proto.hasGroupingId()) {
+ scan.setGroupingId(proto.getGroupingId());
+ }
return scan;
}
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 5bcba26..97e85e9 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -13814,6 +13814,21 @@ public final class ClientProtos {
* optional uint32 caching = 17;
*/
int getCaching();
+
+ // optional string groupingId = 18;
+ /**
+ * optional string groupingId = 18;
+ */
+ boolean hasGroupingId();
+ /**
+ * optional string groupingId = 18;
+ */
+ java.lang.String getGroupingId();
+ /**
+ * optional string groupingId = 18;
+ */
+ com.google.protobuf.ByteString
+ getGroupingIdBytes();
}
/**
* Protobuf type {@code Scan}
@@ -13990,6 +14005,11 @@ public final class ClientProtos {
caching_ = input.readUInt32();
break;
}
+ case 146: {
+ bitField0_ |= 0x00008000;
+ groupingId_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -14368,6 +14388,49 @@ public final class ClientProtos {
return caching_;
}
+ // optional string groupingId = 18;
+ public static final int GROUPINGID_FIELD_NUMBER = 18;
+ private java.lang.Object groupingId_;
+ /**
+ * optional string groupingId = 18;
+ */
+ public boolean hasGroupingId() {
+ return ((bitField0_ & 0x00008000) == 0x00008000);
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public java.lang.String getGroupingId() {
+ java.lang.Object ref = groupingId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ groupingId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public com.google.protobuf.ByteString
+ getGroupingIdBytes() {
+ java.lang.Object ref = groupingId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ groupingId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
column_ = java.util.Collections.emptyList();
attribute_ = java.util.Collections.emptyList();
@@ -14386,6 +14449,7 @@ public final class ClientProtos {
reversed_ = false;
consistency_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Consistency.STRONG;
caching_ = 0;
+ groupingId_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -14468,6 +14532,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00004000) == 0x00004000)) {
output.writeUInt32(17, caching_);
}
+ if (((bitField0_ & 0x00008000) == 0x00008000)) {
+ output.writeBytes(18, getGroupingIdBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -14545,6 +14612,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(17, caching_);
}
+ if (((bitField0_ & 0x00008000) == 0x00008000)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(18, getGroupingIdBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -14647,6 +14718,11 @@ public final class ClientProtos {
result = result && (getCaching()
== other.getCaching());
}
+ result = result && (hasGroupingId() == other.hasGroupingId());
+ if (hasGroupingId()) {
+ result = result && getGroupingId()
+ .equals(other.getGroupingId());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -14728,6 +14804,10 @@ public final class ClientProtos {
hash = (37 * hash) + CACHING_FIELD_NUMBER;
hash = (53 * hash) + getCaching();
}
+ if (hasGroupingId()) {
+ hash = (37 * hash) + GROUPINGID_FIELD_NUMBER;
+ hash = (53 * hash) + getGroupingId().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -14902,6 +14982,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00008000);
caching_ = 0;
bitField0_ = (bitField0_ & ~0x00010000);
+ groupingId_ = "";
+ bitField0_ = (bitField0_ & ~0x00020000);
return this;
}
@@ -15016,6 +15098,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00004000;
}
result.caching_ = caching_;
+ if (((from_bitField0_ & 0x00020000) == 0x00020000)) {
+ to_bitField0_ |= 0x00008000;
+ }
+ result.groupingId_ = groupingId_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -15129,6 +15215,11 @@ public final class ClientProtos {
if (other.hasCaching()) {
setCaching(other.getCaching());
}
+ if (other.hasGroupingId()) {
+ bitField0_ |= 0x00020000;
+ groupingId_ = other.groupingId_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -16342,6 +16433,80 @@ public final class ClientProtos {
return this;
}
+ // optional string groupingId = 18;
+ private java.lang.Object groupingId_ = "";
+ /**
+ * optional string groupingId = 18;
+ */
+ public boolean hasGroupingId() {
+ return ((bitField0_ & 0x00020000) == 0x00020000);
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public java.lang.String getGroupingId() {
+ java.lang.Object ref = groupingId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ groupingId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public com.google.protobuf.ByteString
+ getGroupingIdBytes() {
+ java.lang.Object ref = groupingId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ groupingId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public Builder setGroupingId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00020000;
+ groupingId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public Builder clearGroupingId() {
+ bitField0_ = (bitField0_ & ~0x00020000);
+ groupingId_ = getDefaultInstance().getGroupingId();
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public Builder setGroupingIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00020000;
+ groupingId_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:Scan)
}
@@ -32540,7 +32705,7 @@ public final class ClientProtos {
"pecifier\022 \n\010mutation\030\002 \002(\0132\016.MutationPro" +
"to\022\035\n\tcondition\030\003 \001(\0132\n.Condition\022\023\n\013non" +
"ce_group\030\004 \001(\004\"<\n\016MutateResponse\022\027\n\006resu" +
- "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\271\003\n" +
+ "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\315\003\n" +
"\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattrib" +
"ute\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003",
" \001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007" +
@@ -32552,62 +32717,62 @@ public final class ClientProtos {
"lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" +
"eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" +
"\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" +
- "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
- "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" +
- "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" +
- "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" +
- "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\251\001\n\014S" +
- "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
- "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
- "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" +
- "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" +
- "\030\007 \003(\010\"\263\001\n\024BulkLoadHFileRequest\022 \n\006regio" +
- "n\030\001 \002(\0132\020.RegionSpecifier\0225\n\013family_path",
- "\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyPath" +
- "\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016" +
- "\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoad" +
- "HFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026Coproce" +
- "ssorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_" +
- "name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reque" +
- "st\030\004 \002(\014\"9\n\030CoprocessorServiceResult\022\035\n\005" +
- "value\030\001 \001(\0132\016.NameBytesPair\"d\n\031Coprocess" +
- "orServiceRequest\022 \n\006region\030\001 \002(\0132\020.Regio" +
- "nSpecifier\022%\n\004call\030\002 \002(\0132\027.CoprocessorSe",
- "rviceCall\"]\n\032CoprocessorServiceResponse\022" +
- " \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005val" +
- "ue\030\002 \002(\0132\016.NameBytesPair\"{\n\006Action\022\r\n\005in" +
- "dex\030\001 \001(\r\022 \n\010mutation\030\002 \001(\0132\016.MutationPr" +
- "oto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004" +
- " \001(\0132\027.CoprocessorServiceCall\"Y\n\014RegionA" +
- "ction\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
- "\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action" +
- "\"D\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(" +
- "\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021Resul",
- "tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " +
- "\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" +
- "tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" +
- "essorServiceResult\022#\n\tloadStats\030\005 \001(\0132\020." +
- "RegionLoadStats\"f\n\022RegionActionResult\022-\n" +
- "\021resultOrException\030\001 \003(\0132\022.ResultOrExcep" +
- "tion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" +
- "f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\0132\r." +
- "RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcond" +
- "ition\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse",
- "\022/\n\022regionActionResult\030\001 \003(\0132\023.RegionAct" +
- "ionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consiste" +
- "ncy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClient" +
- "Service\022 \n\003Get\022\013.GetRequest\032\014.GetRespons" +
- "e\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateResp" +
- "onse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRespons" +
- "e\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReques" +
- "t\032\026.BulkLoadHFileResponse\022F\n\013ExecService" +
- "\022\032.CoprocessorServiceRequest\032\033.Coprocess" +
- "orServiceResponse\022R\n\027ExecRegionServerSer",
- "vice\022\032.CoprocessorServiceRequest\032\033.Copro" +
- "cessorServiceResponse\022&\n\005Multi\022\r.MultiRe" +
- "quest\032\016.MultiResponseBB\n*org.apache.hado" +
- "op.hbase.protobuf.generatedB\014ClientProto" +
- "sH\001\210\001\001\240\001\001"
+ "\022\022\n\ngroupingId\030\022 \001(\t\"\277\001\n\013ScanRequest\022 \n\006",
+ "region\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002" +
+ " \001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016numbe" +
+ "r_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025" +
+ "\n\rnext_call_seq\030\006 \001(\004\022\037\n\027client_handles_" +
+ "partials\030\007 \001(\010\"\251\001\n\014ScanResponse\022\030\n\020cells" +
+ "_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n" +
+ "\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022\030\n\007resu" +
+ "lts\030\005 \003(\0132\007.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027par" +
+ "tial_flag_per_result\030\007 \003(\010\"\263\001\n\024BulkLoadH" +
+ "FileRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpec",
+ "ifier\0225\n\013family_path\030\002 \003(\0132 .BulkLoadHFi" +
+ "leRequest.FamilyPath\022\026\n\016assign_seq_num\030\003" +
+ " \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pa" +
+ "th\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loa" +
+ "ded\030\001 \002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003r" +
+ "ow\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method" +
+ "_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030Coproces" +
+ "sorServiceResult\022\035\n\005value\030\001 \001(\0132\016.NameBy" +
+ "tesPair\"d\n\031CoprocessorServiceRequest\022 \n\006" +
+ "region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002",
+ " \002(\0132\027.CoprocessorServiceCall\"]\n\032Coproce" +
+ "ssorServiceResponse\022 \n\006region\030\001 \002(\0132\020.Re" +
+ "gionSpecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytes" +
+ "Pair\"{\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutatio" +
+ "n\030\002 \001(\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.G" +
+ "et\022-\n\014service_call\030\004 \001(\0132\027.CoprocessorSe" +
+ "rviceCall\"Y\n\014RegionAction\022 \n\006region\030\001 \002(" +
+ "\0132\020.RegionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006a" +
+ "ction\030\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022" +
+ "\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupanc",
+ "y\030\002 \001(\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005inde" +
+ "x\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texce" +
+ "ption\030\003 \001(\0132\016.NameBytesPair\0221\n\016service_r" +
+ "esult\030\004 \001(\0132\031.CoprocessorServiceResult\022#" +
+ "\n\tloadStats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022R" +
+ "egionActionResult\022-\n\021resultOrException\030\001" +
+ " \003(\0132\022.ResultOrException\022!\n\texception\030\002 " +
+ "\001(\0132\016.NameBytesPair\"f\n\014MultiRequest\022#\n\014r" +
+ "egionAction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonc" +
+ "eGroup\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condit",
+ "ion\"S\n\rMultiResponse\022/\n\022regionActionResu" +
+ "lt\030\001 \003(\0132\023.RegionActionResult\022\021\n\tprocess" +
+ "ed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010T" +
+ "IMELINE\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.Get" +
+ "Request\032\014.GetResponse\022)\n\006Mutate\022\016.Mutate" +
+ "Request\032\017.MutateResponse\022#\n\004Scan\022\014.ScanR" +
+ "equest\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025" +
+ ".BulkLoadHFileRequest\032\026.BulkLoadHFileRes" +
+ "ponse\022F\n\013ExecService\022\032.CoprocessorServic" +
+ "eRequest\032\033.CoprocessorServiceResponse\022R\n",
+ "\027ExecRegionServerService\022\032.CoprocessorSe" +
+ "rviceRequest\032\033.CoprocessorServiceRespons" +
+ "e\022&\n\005Multi\022\r.MultiRequest\032\016.MultiRespons" +
+ "eBB\n*org.apache.hadoop.hbase.protobuf.ge" +
+ "neratedB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -32697,7 +32862,7 @@ public final class ClientProtos {
internal_static_Scan_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Scan_descriptor,
- new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", });
+ new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "GroupingId", });
internal_static_ScanRequest_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_ScanRequest_fieldAccessorTable = new
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index 5142e53..9d0cee2 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -254,6 +254,7 @@ message Scan {
optional bool reversed = 15 [default = false];
optional Consistency consistency = 16 [default = STRONG];
optional uint32 caching = 17;
+ optional string groupingId = 18;
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java
new file mode 100644
index 0000000..4ff48f2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * An {@link RpcExecutor} that will balance requests in a round robin way across all its queues based on
+ * a Grouping Id set on the 'scan' request
+ */
+@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
+@InterfaceStability.Evolving
+public class FairShareBalancedRPCExecutor extends BalancedQueueRpcExecutor {
+
+ public FairShareBalancedRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final int maxQueueLength) {
+ this(name, handlerCount, numQueues, maxQueueLength, null, null);
+ }
+
+ public FairShareBalancedRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final int maxQueueLength, final Configuration conf,
+ final Abortable abortable) {
+ this(name, handlerCount, numQueues, conf, abortable, FairShareBlockingQueue.class,
+ maxQueueLength);
+ }
+
+ public FairShareBalancedRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final Class extends BlockingQueue> queueClass, Object... initargs) {
+ this(name, handlerCount, numQueues, null, null, queueClass, initargs);
+ }
+
+ public FairShareBalancedRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final Configuration conf, final Abortable abortable,
+ final Class extends BlockingQueue> queueClass, Object... initargs) {
+ super(name, handlerCount, numQueues, conf, abortable, queueClass, initargs);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java
new file mode 100644
index 0000000..1c54390
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.AbstractPriorityBasedRoundRobinQueue;
+import org.apache.hadoop.hbase.util.AbstractRoundRobinQueue;
+
+/**
+ * An implementation of {@link AbstractPriorityBasedRoundRobinQueue} that tries to iterate through
+ * each producer queue in a round robin fashion based on the Grouping Id set in the
+ * {@link Scan} request
+ */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FairShareBlockingQueue extends AbstractRoundRobinQueue {
+
+ public FairShareBlockingQueue(int maxSize) {
+ super(false, maxSize);
+ }
+ public FairShareBlockingQueue(boolean newProducerToFront, int maxSize) {
+ super(newProducerToFront, maxSize);
+ }
+
+ private static final String NO_GROUPING_ID = "_NO_GROUPING_ID";
+ @Override
+ protected Object extractProducer(CallRunner o) {
+ if (o.getCall() != null) {
+ RequestHeader header = o.getCall().getHeader();
+ if (header != null) {
+ String methodName = header.getMethodName();
+ if (methodName.equalsIgnoreCase("scan")) {
+ ScanRequest request = (ScanRequest) o.getCall().param;
+ if (request.getScan().hasGroupingId()) {
+ return request.getScan().getGroupingId();
+ }
+ }
+ }
+ }
+ return NO_GROUPING_ID;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java
new file mode 100644
index 0000000..acceef5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.AbstractPriorityBasedRoundRobinQueue;
+
+/**
+ * An implementation of {@link AbstractPriorityBasedRoundRobinQueue} that tries to iterate through
+ * each producer queue in a round robin fashion based on the Grouping Id set in the
+ * {@link Scan} request
+ */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FairSharePriorityBasedBlockingQueue extends
+ AbstractPriorityBasedRoundRobinQueue {
+
+ public FairSharePriorityBasedBlockingQueue(int maxSize) {
+ super(false, maxSize, null);
+ }
+ public FairSharePriorityBasedBlockingQueue(boolean newProducerToFront, int maxSize) {
+ super(newProducerToFront, maxSize, null);
+ }
+
+ public FairSharePriorityBasedBlockingQueue(int maxSize,
+ Comparator super CallRunner> comparator) {
+ super(maxSize, comparator);
+ }
+ private static final String NO_GROUPING_ID = "_NO_GROUPING_ID";
+ @Override
+ protected Object extractProducer(CallRunner o) {
+ if (o.getCall() != null) {
+ RequestHeader header = o.getCall().getHeader();
+ if (header != null) {
+ String methodName = header.getMethodName();
+ if (methodName.equalsIgnoreCase("scan")) {
+ ScanRequest request = (ScanRequest) o.getCall().param;
+ if (request.getScan().hasGroupingId()) {
+ return request.getScan().getGroupingId();
+ }
+ }
+ }
+ }
+ return NO_GROUPING_ID;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java
new file mode 100644
index 0000000..3769944
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java
@@ -0,0 +1,56 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A {@link RWQueueRpcExecutor} extension that creates a queue based on
+ * {@link FairSharePriorityBasedBlockingQueue}
+ */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FairShareRWQueueRPCExecutor extends RWQueueRpcExecutor {
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable) {
+ this(name, handlerCount, numQueues, readShare, maxQueueLength, 0,
+ conf, abortable, FairShareBlockingQueue.class);
+ }
+
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength) {
+ this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
+ }
+
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable) {
+ this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
+ conf, abortable, FairShareBlockingQueue.class);
+ }
+
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable,
+ final Class extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+ this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
+ readQueueClass, readQueueInitArgs);
+ }
+
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable,
+ final Class extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+ super(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
+ calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
+ LinkedBlockingQueue.class, new Object[] {maxQueueLength},
+ readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 2b58680..0ec59ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -50,16 +50,16 @@ import com.google.protobuf.Message;
public class RWQueueRpcExecutor extends RpcExecutor {
private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
- private final List> queues;
- private final QueueBalancer writeBalancer;
- private final QueueBalancer readBalancer;
- private final QueueBalancer scanBalancer;
- private final int writeHandlersCount;
- private final int readHandlersCount;
- private final int scanHandlersCount;
- private final int numWriteQueues;
- private final int numReadQueues;
- private final int numScanQueues;
+ protected final List> queues;
+ protected final QueueBalancer writeBalancer;
+ protected final QueueBalancer readBalancer;
+ protected final QueueBalancer scanBalancer;
+ protected final int writeHandlersCount;
+ protected final int readHandlersCount;
+ protected final int scanHandlersCount;
+ protected final int numWriteQueues;
+ protected final int numReadQueues;
+ protected final int numScanQueues;
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
@@ -218,7 +218,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
* Calculate the number of writers based on the "total count" and the read share.
* You'll get at least one writer.
*/
- private static int calcNumWriters(final int count, final float readShare) {
+ protected static int calcNumWriters(final int count, final float readShare) {
return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
}
@@ -226,7 +226,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
* Calculate the number of readers based on the "total count" and the read share.
* You'll get at least one reader.
*/
- private static int calcNumReaders(final int count, final float readShare) {
+ protected static int calcNumReaders(final int count, final float readShare) {
return count - calcNumWriters(count, readShare);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index d8ae3ba..f5af46b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -50,6 +50,8 @@ public class SimpleRpcScheduler extends RpcScheduler {
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
+ public static final String CALL_QUEUE_GROUPING = "hbase.ipc.server.callqueue.grouping";
+ public static final boolean CALL_QUEUE_GROUPING_DEFAULT_VALUE = false;
/** max delay in msec used to bound the deprioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
@@ -127,26 +129,51 @@ public class SimpleRpcScheduler extends RpcScheduler {
LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
+ boolean callQueueGrouping = conf.getBoolean(CALL_QUEUE_GROUPING, CALL_QUEUE_GROUPING_DEFAULT_VALUE);
if (numCallQueues > 1 && callqReadShare > 0) {
// multiple read/write queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
- callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
- callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
- BoundedPriorityBlockingQueue.class, callPriority);
+ if (callQueueGrouping) {
+ callExecutor = new FairShareRWQueueRPCExecutor("RW.default", handlerCount, numCallQueues,
+ callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
+ FairSharePriorityBasedBlockingQueue.class, callPriority);
+ } else {
+ callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
+ callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
+ BoundedPriorityBlockingQueue.class, callPriority);
+ }
} else {
- callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
- callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
+ // TODO : Introduce configuration that strictly allows the Balanced way of Write QueueExecutor
+ // Not adding the fair queue RPC executor for the write mode
+ if (callQueueGrouping) {
+ callExecutor = new FairShareRWQueueRPCExecutor("RW.default", handlerCount, numCallQueues,
+ callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
+ } else {
+ callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
+ callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
+ }
}
} else {
// multiple queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
- callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
- conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+ if (callQueueGrouping) {
+ callExecutor = new FairShareBalancedRPCExecutor("B.default", handlerCount, numCallQueues,
+ conf, abortable, FairSharePriorityBasedBlockingQueue.class, maxQueueLength, callPriority);
+ } else {
+ callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
+ conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+ }
} else {
- callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
- numCallQueues, maxQueueLength, conf, abortable);
+ // TODO : Introduce configuration that strictly allows the balanced way of write queue executor
+ if (callQueueGrouping) {
+ callExecutor = new FairShareBalancedRPCExecutor("B.default", handlerCount, numCallQueues,
+ maxQueueLength, conf, abortable);
+ } else {
+ callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
+ maxQueueLength, conf, abortable);
+ }
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java
new file mode 100644
index 0000000..ad5108e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A bounded queue implementation that is sorted based on the element E and allows the producers
+ * to be grouped based on the producer key and round-robins among the different producers with in
+ * the same priority
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public abstract class AbstractPriorityBasedRoundRobinQueue extends AbstractQueue implements
+ BlockingQueue {
+ public AbstractPriorityBasedRoundRobinQueue(int maxSize) {
+ this(false, maxSize, null);
+ }
+
+ /**
+ * @param newProducerToFront
+ * If true, new producers go to the front of the round-robin list, if
+ * false, they go to the end.
+ */
+ public AbstractPriorityBasedRoundRobinQueue(boolean newProducerToFront, int maxSize,
+ Comparator super E> comparator) {
+ this.producerMap = new TreeMap>>(comparator);
+ this.maxSize = maxSize;
+ }
+
+ public AbstractPriorityBasedRoundRobinQueue(int maxSize, Comparator super E> comparator) {
+ this(true, maxSize, comparator);
+ }
+
+ @Override
+ public Iterator iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ lock.lockInterruptibly();
+ try {
+ while (remainingCapacity() == 0) {
+ if (nanos <= 0)
+ return false;
+ nanos = notFull.awaitNanos(nanos);
+ }
+ boolean ret = offer(o);
+ notEmpty.signal();
+ return ret;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean offer(E o) {
+ if (o == null)
+ throw new NullPointerException();
+
+ final Object producerKey = extractProducer(o);
+
+ ProducerList producerList = null;
+ lock.lock();
+ try {
+ if (remainingCapacity() > 0) {
+ HashMap