.../java/org/apache/hadoop/hbase/client/Scan.java | 19 +
.../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 8 +-
.../hbase/protobuf/generated/ClientProtos.java | 283 +++++++++++----
hbase-protocol/src/main/protobuf/Client.proto | 1 +
.../hadoop/hbase/ipc/BalancedQueueRpcExecutor.java | 12 +-
.../apache/hadoop/hbase/ipc/CallRunnerWrapper.java | 54 +++
.../hbase/ipc/FairShareBalancedRPCExecutor.java | 58 +++
.../hadoop/hbase/ipc/FairShareBlockingQueue.java | 47 +++
.../ipc/FairSharePriorityBasedBlockingQueue.java | 55 +++
.../hbase/ipc/FairShareRWQueueRPCExecutor.java | 57 +++
.../java/org/apache/hadoop/hbase/ipc/RPCUtil.java | 55 +++
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 20 +-
.../hadoop/hbase/ipc/RoundRobinRPCScheduler.java | 198 +++++++++++
.../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 14 +-
.../org/apache/hadoop/hbase/ipc/RpcScheduler.java | 16 +
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 76 +---
.../RoundRobinRPCSchedulerFactory.java | 54 +++
.../util/AbstractPriorityBasedRoundRobinQueue.java | 357 +++++++++++++++++++
.../hadoop/hbase/util/AbstractRoundRobinQueue.java | 390 +++++++++++++++++++++
.../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 162 ++++++---
.../util/TestAbstractRoundRobinPriorityQueue.java | 266 ++++++++++++++
.../hbase/util/TestAbstractRoundRobinQueue.java | 248 +++++++++++++
22 files changed, 2269 insertions(+), 181 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 3b6194f..6b23737 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -114,6 +114,11 @@ public class Scan extends Query {
private int storeLimit = -1;
private int storeOffset = 0;
private boolean getScan;
+ /**
+ * Set this groupingID so that in the RPCExecutor we could do a fair
+ * share (round-robin) execution among the parallel scans
+ */
+ private String groupingId;
/**
* @deprecated since 1.0.0. Use {@link #setScanMetricsEnabled(boolean)}
@@ -971,4 +976,18 @@ public class Scan extends Query {
if (bytes == null) return null;
return ProtobufUtil.toScanMetrics(bytes);
}
+
+ /**
+ * Set this groupingID so that in the RPCExecutor we could do a fair
+ * share (round-robin) execution among the parallel scans that have the
+ * same priority
+ * @param groupingId
+ */
+ public void setGroupingId(String groupingId) {
+ this.groupingId = groupingId;
+ }
+
+ public String getGroupingId() {
+ return this.groupingId;
+ }
}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 8b5b2d7..8750333 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -122,12 +122,12 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.quotas.QuotaScope;
import org.apache.hadoop.hbase.quotas.QuotaType;
@@ -932,6 +932,9 @@ public final class ProtobufUtil {
if (scan.getCaching() > 0) {
scanBuilder.setCaching(scan.getCaching());
}
+ if (scan.getGroupingId() != null) {
+ scanBuilder.setGroupingId(scan.getGroupingId());
+ }
return scanBuilder.build();
}
@@ -1017,6 +1020,9 @@ public final class ProtobufUtil {
if (proto.hasCaching()) {
scan.setCaching(proto.getCaching());
}
+ if (proto.hasGroupingId()) {
+ scan.setGroupingId(proto.getGroupingId());
+ }
return scan;
}
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 60ab651..79021d9 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -13814,6 +13814,21 @@ public final class ClientProtos {
* optional uint32 caching = 17;
*/
int getCaching();
+
+ // optional string groupingId = 18;
+ /**
+ * optional string groupingId = 18;
+ */
+ boolean hasGroupingId();
+ /**
+ * optional string groupingId = 18;
+ */
+ java.lang.String getGroupingId();
+ /**
+ * optional string groupingId = 18;
+ */
+ com.google.protobuf.ByteString
+ getGroupingIdBytes();
}
/**
* Protobuf type {@code Scan}
@@ -13990,6 +14005,11 @@ public final class ClientProtos {
caching_ = input.readUInt32();
break;
}
+ case 146: {
+ bitField0_ |= 0x00008000;
+ groupingId_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -14368,6 +14388,49 @@ public final class ClientProtos {
return caching_;
}
+ // optional string groupingId = 18;
+ public static final int GROUPINGID_FIELD_NUMBER = 18;
+ private java.lang.Object groupingId_;
+ /**
+ * optional string groupingId = 18;
+ */
+ public boolean hasGroupingId() {
+ return ((bitField0_ & 0x00008000) == 0x00008000);
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public java.lang.String getGroupingId() {
+ java.lang.Object ref = groupingId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ groupingId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public com.google.protobuf.ByteString
+ getGroupingIdBytes() {
+ java.lang.Object ref = groupingId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ groupingId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
column_ = java.util.Collections.emptyList();
attribute_ = java.util.Collections.emptyList();
@@ -14386,6 +14449,7 @@ public final class ClientProtos {
reversed_ = false;
consistency_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Consistency.STRONG;
caching_ = 0;
+ groupingId_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -14468,6 +14532,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00004000) == 0x00004000)) {
output.writeUInt32(17, caching_);
}
+ if (((bitField0_ & 0x00008000) == 0x00008000)) {
+ output.writeBytes(18, getGroupingIdBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -14545,6 +14612,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(17, caching_);
}
+ if (((bitField0_ & 0x00008000) == 0x00008000)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(18, getGroupingIdBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -14647,6 +14718,11 @@ public final class ClientProtos {
result = result && (getCaching()
== other.getCaching());
}
+ result = result && (hasGroupingId() == other.hasGroupingId());
+ if (hasGroupingId()) {
+ result = result && getGroupingId()
+ .equals(other.getGroupingId());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -14728,6 +14804,10 @@ public final class ClientProtos {
hash = (37 * hash) + CACHING_FIELD_NUMBER;
hash = (53 * hash) + getCaching();
}
+ if (hasGroupingId()) {
+ hash = (37 * hash) + GROUPINGID_FIELD_NUMBER;
+ hash = (53 * hash) + getGroupingId().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -14902,6 +14982,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00008000);
caching_ = 0;
bitField0_ = (bitField0_ & ~0x00010000);
+ groupingId_ = "";
+ bitField0_ = (bitField0_ & ~0x00020000);
return this;
}
@@ -15016,6 +15098,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00004000;
}
result.caching_ = caching_;
+ if (((from_bitField0_ & 0x00020000) == 0x00020000)) {
+ to_bitField0_ |= 0x00008000;
+ }
+ result.groupingId_ = groupingId_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -15129,6 +15215,11 @@ public final class ClientProtos {
if (other.hasCaching()) {
setCaching(other.getCaching());
}
+ if (other.hasGroupingId()) {
+ bitField0_ |= 0x00020000;
+ groupingId_ = other.groupingId_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -16342,6 +16433,80 @@ public final class ClientProtos {
return this;
}
+ // optional string groupingId = 18;
+ private java.lang.Object groupingId_ = "";
+ /**
+ * optional string groupingId = 18;
+ */
+ public boolean hasGroupingId() {
+ return ((bitField0_ & 0x00020000) == 0x00020000);
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public java.lang.String getGroupingId() {
+ java.lang.Object ref = groupingId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ groupingId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public com.google.protobuf.ByteString
+ getGroupingIdBytes() {
+ java.lang.Object ref = groupingId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ groupingId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public Builder setGroupingId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00020000;
+ groupingId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public Builder clearGroupingId() {
+ bitField0_ = (bitField0_ & ~0x00020000);
+ groupingId_ = getDefaultInstance().getGroupingId();
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string groupingId = 18;
+ */
+ public Builder setGroupingIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00020000;
+ groupingId_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:Scan)
}
@@ -32678,7 +32843,7 @@ public final class ClientProtos {
"pecifier\022 \n\010mutation\030\002 \002(\0132\016.MutationPro" +
"to\022\035\n\tcondition\030\003 \001(\0132\n.Condition\022\023\n\013non" +
"ce_group\030\004 \001(\004\"<\n\016MutateResponse\022\027\n\006resu" +
- "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\271\003\n" +
+ "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\315\003\n" +
"\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattrib" +
"ute\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003",
" \001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007" +
@@ -32690,63 +32855,63 @@ public final class ClientProtos {
"lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" +
"eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" +
"\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" +
- "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
- "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" +
- "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" +
- "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" +
- "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\311\001\n\014S" +
- "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
- "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
- "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" +
- "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" +
- "\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\"\263" +
- "\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132",
- "\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 " +
- ".BulkLoadHFileRequest.FamilyPath\022\026\n\016assi" +
- "gn_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family" +
- "\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRes" +
- "ponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServ" +
- "iceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002" +
- "(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" +
- "\"9\n\030CoprocessorServiceResult\022\035\n\005value\030\001 " +
- "\001(\0132\016.NameBytesPair\"d\n\031CoprocessorServic" +
- "eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi",
- "er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" +
- "l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" +
- "n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" +
- "2\016.NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001(" +
- "\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003g" +
- "et\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.C" +
- "oprocessorServiceCall\"Y\n\014RegionAction\022 \n" +
- "\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomi" +
- "c\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Regi" +
- "onLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\r",
- "heapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExcep" +
- "tion\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Re" +
- "sult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\022" +
- "1\n\016service_result\030\004 \001(\0132\031.CoprocessorSer" +
- "viceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLo" +
- "adStats\"f\n\022RegionActionResult\022-\n\021resultO" +
- "rException\030\001 \003(\0132\022.ResultOrException\022!\n\t" +
- "exception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Multi" +
- "Request\022#\n\014regionAction\030\001 \003(\0132\r.RegionAc" +
- "tion\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 ",
- "\001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022regi" +
- "onActionResult\030\001 \003(\0132\023.RegionActionResul" +
- "t\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006S" +
- "TRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService\022" +
- " \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mut" +
- "ate\022\016.MutateRequest\032\017.MutateResponse\022#\n\004" +
- "Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBul" +
- "kLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bulk" +
- "LoadHFileResponse\022F\n\013ExecService\022\032.Copro" +
- "cessorServiceRequest\032\033.CoprocessorServic",
- "eResponse\022R\n\027ExecRegionServerService\022\032.C" +
- "oprocessorServiceRequest\032\033.CoprocessorSe" +
- "rviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016." +
- "MultiResponseBB\n*org.apache.hadoop.hbase" +
- ".protobuf.generatedB\014ClientProtosH\001\210\001\001\240\001" +
- "\001"
+ "\022\022\n\ngroupingId\030\022 \001(\t\"\277\001\n\013ScanRequest\022 \n\006",
+ "region\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002" +
+ " \001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016numbe" +
+ "r_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025" +
+ "\n\rnext_call_seq\030\006 \001(\004\022\037\n\027client_handles_" +
+ "partials\030\007 \001(\010\"\311\001\n\014ScanResponse\022\030\n\020cells" +
+ "_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n" +
+ "\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022\030\n\007resu" +
+ "lts\030\005 \003(\0132\007.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027par" +
+ "tial_flag_per_result\030\007 \003(\010\022\036\n\026more_resul" +
+ "ts_in_region\030\010 \001(\010\"\263\001\n\024BulkLoadHFileRequ",
+ "est\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\0225\n" +
+ "\013family_path\030\002 \003(\0132 .BulkLoadHFileReques" +
+ "t.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\n" +
+ "FamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t" +
+ "\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(" +
+ "\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014" +
+ "\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003 " +
+ "\002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorServi" +
+ "ceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPair\"" +
+ "d\n\031CoprocessorServiceRequest\022 \n\006region\030\001",
+ " \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027.C" +
+ "oprocessorServiceCall\"]\n\032CoprocessorServ" +
+ "iceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSpec" +
+ "ifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{\n\006" +
+ "Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(\0132" +
+ "\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014se" +
+ "rvice_call\030\004 \001(\0132\027.CoprocessorServiceCal" +
+ "l\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Regi" +
+ "onSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030\003 " +
+ "\003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014memst",
+ "oreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:" +
+ "\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(\r\022" +
+ "\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030\003 " +
+ "\001(\0132\016.NameBytesPair\0221\n\016service_result\030\004 " +
+ "\001(\0132\031.CoprocessorServiceResult\022#\n\tloadSt" +
+ "ats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionAct" +
+ "ionResult\022-\n\021resultOrException\030\001 \003(\0132\022.R" +
+ "esultOrException\022!\n\texception\030\002 \001(\0132\016.Na" +
+ "meBytesPair\"f\n\014MultiRequest\022#\n\014regionAct" +
+ "ion\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup\030\002",
+ " \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n\rM" +
+ "ultiResponse\022/\n\022regionActionResult\030\001 \003(\013" +
+ "2\023.RegionActionResult\022\021\n\tprocessed\030\002 \001(\010" +
+ "*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020" +
+ "\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetRequest\032" +
+ "\014.GetResponse\022)\n\006Mutate\022\016.MutateRequest\032" +
+ "\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest\032\r" +
+ ".ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkLoa" +
+ "dHFileRequest\032\026.BulkLoadHFileResponse\022F\n" +
+ "\013ExecService\022\032.CoprocessorServiceRequest",
+ "\032\033.CoprocessorServiceResponse\022R\n\027ExecReg" +
+ "ionServerService\022\032.CoprocessorServiceReq" +
+ "uest\032\033.CoprocessorServiceResponse\022&\n\005Mul" +
+ "ti\022\r.MultiRequest\032\016.MultiResponseBB\n*org" +
+ ".apache.hadoop.hbase.protobuf.generatedB" +
+ "\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -32836,7 +33001,7 @@ public final class ClientProtos {
internal_static_Scan_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Scan_descriptor,
- new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", });
+ new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "GroupingId", });
internal_static_ScanRequest_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_ScanRequest_fieldAccessorTable = new
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index e0c370b..bc84450 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -254,6 +254,7 @@ message Scan {
optional bool reversed = 15 [default = false];
optional Consistency consistency = 16 [default = STRONG];
optional uint32 caching = 17;
+ optional string groupingId = 18;
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 56424df..017891d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
@InterfaceStability.Evolving
public class BalancedQueueRpcExecutor extends RpcExecutor {
- protected final List> queues;
+ private final List> queues;
private final QueueBalancer balancer;
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
@@ -59,7 +59,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
final Configuration conf, final Abortable abortable,
final Class extends BlockingQueue> queueClass, Object... initargs) {
super(name, Math.max(handlerCount, numQueues), conf, abortable);
- queues = new ArrayList>(numQueues);
+ queues = new ArrayList>(numQueues);
this.balancer = getBalancer(numQueues);
initializeQueues(numQueues, queueClass, initargs);
}
@@ -67,12 +67,12 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
protected void initializeQueues(final int numQueues,
final Class extends BlockingQueue> queueClass, Object... initargs) {
for (int i = 0; i < numQueues; ++i) {
- queues.add((BlockingQueue) ReflectionUtils.newInstance(queueClass, initargs));
+ queues.add((BlockingQueue) ReflectionUtils.newInstance(queueClass, initargs));
}
}
@Override
- public void dispatch(final CallRunner callTask) throws InterruptedException {
+ public void dispatch(final CallRunnerWrapper callTask) throws InterruptedException {
int queueIndex = balancer.getNextQueue();
queues.get(queueIndex).put(callTask);
}
@@ -80,14 +80,14 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
@Override
public int getQueueLength() {
int length = 0;
- for (final BlockingQueue queue : queues) {
+ for (final BlockingQueue queue : queues) {
length += queue.size();
}
return length;
}
@Override
- public List> getQueues() {
+ public List> getQueues() {
return queues;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunnerWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunnerWrapper.java
new file mode 100644
index 0000000..710e8dc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunnerWrapper.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A simple wrapper over the CallRunner that explicitly calculates the deadline of the call
+ * underlying the CallRunner, using the {@link PriorityFunction}
+ *
+ */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class CallRunnerWrapper {
+ private CallRunner o;
+ private int deadLine;
+
+ public CallRunnerWrapper(CallRunner o, PriorityFunction priority, int maxDelay) {
+ this.o = o;
+ this.deadLine = calcDeadLine(o, priority, maxDelay);
+ }
+
+ public CallRunner getCallRunner() {
+ return this.o;
+ }
+
+ public int getDeadLine() {
+ return this.deadLine;
+ }
+
+ private int calcDeadLine(CallRunner o, PriorityFunction priority, int maxDelay) {
+ RpcServer.Call call = o.getCall();
+ long deadline = priority.getDeadline(call.getHeader(), call.param);
+ deadline = call.timestamp + Math.min(deadline, maxDelay);
+ return (int) deadline;
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java
new file mode 100644
index 0000000..a184d96
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBalancedRPCExecutor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * An {@link RpcExecutor} that will balance requests in a round robin way across
+ * all its queues based on unique grouping Id set on the requests
+ */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FairShareBalancedRPCExecutor extends BalancedQueueRpcExecutor {
+
+ public FairShareBalancedRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final int maxQueueLength) {
+ this(name, handlerCount, numQueues, maxQueueLength, null, null);
+ }
+
+ public FairShareBalancedRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final int maxQueueLength, final Configuration conf,
+ final Abortable abortable) {
+ this(name, handlerCount, numQueues, conf, abortable, FairShareBlockingQueue.class,
+ maxQueueLength);
+ }
+
+ public FairShareBalancedRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final Class extends BlockingQueue> queueClass, Object... initargs) {
+ this(name, handlerCount, numQueues, null, null, queueClass, initargs);
+ }
+
+ public FairShareBalancedRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final Configuration conf, final Abortable abortable,
+ final Class extends BlockingQueue> queueClass, Object... initargs) {
+ super(name, handlerCount, numQueues, conf, abortable, queueClass, initargs);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java
new file mode 100644
index 0000000..64a3da6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareBlockingQueue.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.AbstractPriorityBasedRoundRobinQueue;
+import org.apache.hadoop.hbase.util.AbstractRoundRobinQueue;
+
+/**
+ * An implementation of {@link AbstractPriorityBasedRoundRobinQueue} that tries
+ * to iterate through each producer queue in a round robin fashion based on the
+ * Grouping Id set in the incoming request
+ */
+@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FairShareBlockingQueue extends AbstractRoundRobinQueue {
+
+ public FairShareBlockingQueue(int maxSize) {
+ super(false, maxSize);
+ }
+
+ public FairShareBlockingQueue(boolean newProducerToFront, int maxSize) {
+ super(newProducerToFront, maxSize);
+ }
+
+ @Override
+ protected Object extractGroupId(CallRunnerWrapper o) {
+ return RPCUtil.extractGroupId(o.getCallRunner());
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java
new file mode 100644
index 0000000..0199067
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairSharePriorityBasedBlockingQueue.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.AbstractPriorityBasedRoundRobinQueue;
+
+
+/**
+ * An implementation of {@link AbstractPriorityBasedRoundRobinQueue} that tries
+ * to iterate through each producer queue in a round robin fashion based on the
+ * Grouping Id set in the incoming request
+ */
+@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FairSharePriorityBasedBlockingQueue extends
+ AbstractPriorityBasedRoundRobinQueue {
+
+ public FairSharePriorityBasedBlockingQueue(int maxSize) {
+ super(maxSize, null);
+ }
+
+ public FairSharePriorityBasedBlockingQueue(boolean newProducerToFront, int maxSize) {
+ super(maxSize, null);
+ }
+
+ public FairSharePriorityBasedBlockingQueue(int maxSize,
+ Comparator super CallRunnerWrapper> comparator) {
+ super(maxSize, comparator);
+ }
+
+ @Override
+ protected Object extractGroupId(CallRunnerWrapper o) {
+ return RPCUtil.extractGroupId(o.getCallRunner());
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java
new file mode 100644
index 0000000..5549c45
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FairShareRWQueueRPCExecutor.java
@@ -0,0 +1,57 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A {@link RWQueueRpcExecutor} extension that creates a queue based on
+ * {@link FairSharePriorityBasedBlockingQueue} which round-robins among the
+ * parallel requests based on unique grouping ID set on these requests
+ */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FairShareRWQueueRPCExecutor extends RWQueueRpcExecutor {
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final float readShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable) {
+ this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, conf, abortable,
+ FairShareBlockingQueue.class);
+ }
+
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final float readShare, final float scanShare, final int maxQueueLength) {
+ this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
+ }
+
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final float readShare, final float scanShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable) {
+ this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, conf, abortable,
+ FairShareBlockingQueue.class);
+ }
+
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final float readShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable,
+ final Class extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+ this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
+ readQueueClass, readQueueInitArgs);
+ }
+
+ public FairShareRWQueueRPCExecutor(final String name, final int handlerCount,
+ final int numQueues, final float readShare, final float scanShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable,
+ final Class extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+ super(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
+ calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
+ LinkedBlockingQueue.class, new Object[] { maxQueueLength }, readQueueClass, ArrayUtils
+ .addAll(new Object[] { maxQueueLength }, readQueueInitArgs));
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RPCUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RPCUtil.java
new file mode 100644
index 0000000..92c5097
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RPCUtil.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+
+/**
+ * A utility class for the RPC related classes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class RPCUtil {
+ public static final String NO_GROUPING_ID = "_NO_GROUPING_ID";
+
+/**
+ * Extracts the groupId when (@link FairShareBlockingQueue} and
+ * {@link FairSharePriorityBasedBlockingQueue) is used. This groupId
+ * helps in determining the round robin policy
+ * @param o
+ * @return
+ */
+ public static Object extractGroupId(CallRunner o) {
+ if (o.getCall() != null) {
+ RequestHeader header = o.getCall().getHeader();
+ if (header != null) {
+ String methodName = header.getMethodName();
+ if (methodName.equalsIgnoreCase("scan")) {
+ ScanRequest request = (ScanRequest) o.getCall().param;
+ if (request.getScan().hasGroupingId()) {
+ return request.getScan().getGroupingId();
+ }
+ }
+ }
+ }
+ return NO_GROUPING_ID;
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 2b58680..d65a231 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -50,7 +50,7 @@ import com.google.protobuf.Message;
public class RWQueueRpcExecutor extends RpcExecutor {
private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
- private final List> queues;
+ private final List> queues;
private final QueueBalancer writeBalancer;
private final QueueBalancer readBalancer;
private final QueueBalancer scanBalancer;
@@ -132,19 +132,19 @@ public class RWQueueRpcExecutor extends RpcExecutor {
this.readBalancer = getBalancer(numReadQueues);
this.scanBalancer = getBalancer(numScanQueues);
- queues = new ArrayList>(writeHandlersCount + readHandlersCount);
+ queues = new ArrayList>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
" scanHandlers=" + scanHandlersCount));
for (int i = 0; i < numWriteQueues; ++i) {
- queues.add((BlockingQueue)
+ queues.add((BlockingQueue)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}
for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
- queues.add((BlockingQueue)
+ queues.add((BlockingQueue)
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
}
}
@@ -158,8 +158,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
}
@Override
- public void dispatch(final CallRunner callTask) throws InterruptedException {
- RpcServer.Call call = callTask.getCall();
+ public void dispatch(final CallRunnerWrapper callTask) throws InterruptedException {
+ RpcServer.Call call = callTask.getCallRunner().getCall();
int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) {
queueIndex = writeBalancer.getNextQueue();
@@ -203,14 +203,14 @@ public class RWQueueRpcExecutor extends RpcExecutor {
@Override
public int getQueueLength() {
int length = 0;
- for (final BlockingQueue queue: queues) {
+ for (final BlockingQueue queue: queues) {
length += queue.size();
}
return length;
}
@Override
- protected List> getQueues() {
+ protected List> getQueues() {
return queues;
}
@@ -218,7 +218,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
* Calculate the number of writers based on the "total count" and the read share.
* You'll get at least one writer.
*/
- private static int calcNumWriters(final int count, final float readShare) {
+ protected static int calcNumWriters(final int count, final float readShare) {
return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
}
@@ -226,7 +226,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
* Calculate the number of readers based on the "total count" and the read share.
* You'll get at least one reader.
*/
- private static int calcNumReaders(final int count, final float readShare) {
+ protected static int calcNumReaders(final int count, final float readShare) {
return count - calcNumWriters(count, readShare);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RoundRobinRPCScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RoundRobinRPCScheduler.java
new file mode 100644
index 0000000..e421ab6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RoundRobinRPCScheduler.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A scheduler that maintains isolated handler pools for general,
+ * high-priority, and replication requests. It tries to schedule the high-priority
+ * read requests using round robin based on the grouping ID
+ */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class RoundRobinRPCScheduler extends RpcScheduler {
+ public static final Log LOG = LogFactory.getLog(RoundRobinRPCScheduler.class);
+
+ private int port;
+ private final PriorityFunction priority;
+ private final RpcExecutor callExecutor;
+ private final RpcExecutor priorityExecutor;
+ private final RpcExecutor replicationExecutor;
+
+ /** What level a high priority call is at. */
+ private final int highPriorityLevel;
+
+ private Abortable abortable = null;
+ private int maxDelay;
+ /**
+ * @param conf
+ * @param handlerCount
+ * the number of handler threads that will be used to process calls
+ * @param priorityHandlerCount
+ * How many threads for priority handling.
+ * @param replicationHandlerCount
+ * How many threads for replication handling.
+ * @param highPriorityLevel
+ * @param priority
+ * Function to extract request priority.
+ */
+ public RoundRobinRPCScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
+ int replicationHandlerCount, PriorityFunction priority, Abortable server,
+ int highPriorityLevel) {
+ int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", handlerCount
+ * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ this.priority = priority;
+ this.highPriorityLevel = highPriorityLevel;
+ this.abortable = server;
+ this.maxDelay = conf.getInt(RpcScheduler.QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
+
+ String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
+ float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
+ float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
+
+ float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
+ int numCallQueues = Math.max(1, (int) Math.round(handlerCount * callQueuesHandlersFactor));
+
+ LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
+
+ if (numCallQueues > 1 && callqReadShare > 0) {
+ // multiple read/write queues
+ if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+ CallRunnerDeadLinePriority callPriority = new CallRunnerDeadLinePriority();
+ callExecutor = new FairShareRWQueueRPCExecutor("RW.default", handlerCount, numCallQueues,
+ callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
+ FairSharePriorityBasedBlockingQueue.class, callPriority);
+ } else {
+ callExecutor = new FairShareRWQueueRPCExecutor("RW.default", handlerCount, numCallQueues,
+ callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
+ }
+ } else {
+ // multiple queues
+ if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+ CallRunnerDeadLinePriority callPriority = new CallRunnerDeadLinePriority();
+ callExecutor = new FairShareBalancedRPCExecutor("B.default", handlerCount, numCallQueues,
+ conf, abortable, FairSharePriorityBasedBlockingQueue.class, maxQueueLength,
+ callPriority);
+ } else {
+ callExecutor = new FairShareBalancedRPCExecutor("B.default", handlerCount, numCallQueues,
+ maxQueueLength, conf, abortable);
+ }
+ }
+
+ // Create 2 queues to help priorityExecutor be more scalable.
+ this.priorityExecutor = priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority",
+ priorityHandlerCount, 2, maxQueueLength) : null;
+
+ this.replicationExecutor = replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor(
+ "Replication", replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
+ }
+
+ public RoundRobinRPCScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
+ int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
+ this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority, null,
+ highPriorityLevel);
+ }
+
+ @Override
+ public void init(Context context) {
+ this.port = context.getListenerAddress().getPort();
+ }
+
+ @Override
+ public void start() {
+ callExecutor.start(port);
+ if (priorityExecutor != null)
+ priorityExecutor.start(port);
+ if (replicationExecutor != null)
+ replicationExecutor.start(port);
+ }
+
+ @Override
+ public void stop() {
+ callExecutor.stop();
+ if (priorityExecutor != null)
+ priorityExecutor.stop();
+ if (replicationExecutor != null)
+ replicationExecutor.stop();
+ }
+
+ @Override
+ public void dispatch(CallRunner callTask) throws InterruptedException {
+ RpcServer.Call call = callTask.getCall();
+ // The deadline calculation will even happen for non deadline based calls.
+ // We can avoid that by passing a boolean if needed? But should be fine as this
+ // happens once per call
+ CallRunnerWrapper wrap = new CallRunnerWrapper(callTask, this.priority, this.maxDelay);
+ int level = priority.getPriority(call.getHeader(), call.param);
+ if (priorityExecutor != null && level > highPriorityLevel) {
+ priorityExecutor.dispatch(wrap);
+ } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
+ replicationExecutor.dispatch(wrap);
+ } else {
+ callExecutor.dispatch(wrap);
+ }
+ }
+
+ @Override
+ public int getGeneralQueueLength() {
+ return callExecutor.getQueueLength();
+ }
+
+ @Override
+ public int getPriorityQueueLength() {
+ return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
+ }
+
+ @Override
+ public int getReplicationQueueLength() {
+ return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
+ }
+
+ @Override
+ public int getActiveRpcHandlerCount() {
+ return callExecutor.getActiveHandlerCount()
+ + (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount())
+ + (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
+ }
+
+ /*
+ * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY
+ * is set to true. It uses the calculated "deadline" e.g. to deprioritize
+ * long-running job
+ *
+ * If multiple requests have the same deadline BoundedPriorityBlockingQueue
+ * will order them in FIFO (first-in-first-out) manner.
+ */
+ public static class CallRunnerDeadLinePriority implements Comparator {
+
+ @Override
+ public int compare(CallRunnerWrapper o1, CallRunnerWrapper o2) {
+ return o1.getDeadLine() - o2.getDeadLine();
+ }
+
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 709429d..88ad922 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -85,18 +85,18 @@ public abstract class RpcExecutor {
public abstract int getQueueLength();
/** Add the request to the executor queue */
- public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
+ public abstract void dispatch(final CallRunnerWrapper callTask) throws InterruptedException;
/** Returns the list of request queues */
- protected abstract List> getQueues();
+ protected abstract List> getQueues();
protected void startHandlers(final int port) {
- List> callQueues = getQueues();
+ List> callQueues = getQueues();
startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
}
protected void startHandlers(final String nameSuffix, final int numHandlers,
- final List> callQueues,
+ final List> callQueues,
final int qindex, final int qsize, final int port) {
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
for (int i = 0; i < numHandlers; i++) {
@@ -116,7 +116,7 @@ public abstract class RpcExecutor {
}
}
- protected void consumerLoop(final BlockingQueue myQueue) {
+ protected void consumerLoop(final BlockingQueue myQueue) {
boolean interrupted = false;
double handlerFailureThreshhold =
conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
@@ -124,10 +124,10 @@ public abstract class RpcExecutor {
try {
while (running) {
try {
- CallRunner task = myQueue.take();
+ CallRunnerWrapper task = myQueue.take();
try {
activeHandlerCount.incrementAndGet();
- task.run();
+ task.getCallRunner().run();
} catch (Throwable e) {
if (e instanceof Error) {
int failedCount = failedHandlerCount.incrementAndGet();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index f273865..b80cb4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -31,6 +31,22 @@ import java.net.InetSocketAddress;
@InterfaceStability.Evolving
public abstract class RpcScheduler {
+ /** max delay in msec used to bound the deprioritized requests */
+ public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
+ = "hbase.ipc.server.queue.max.call.delay";
+ public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
+ "hbase.ipc.server.callqueue.read.ratio";
+ public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
+ "hbase.ipc.server.callqueue.scan.ratio";
+ public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
+ "hbase.ipc.server.callqueue.handler.factor";
+ public final static int DEFAULT_MAX_CALL_DELAY = 5000;
+
+ /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
+ public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
+ public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
+ public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
+
/** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
static abstract class Context {
public abstract InetSocketAddress getListenerAddress();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index d8ae3ba..c8af3f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
-
-import java.util.Comparator;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -28,6 +25,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.RoundRobinRPCScheduler.CallRunnerDeadLinePriority;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
/**
@@ -39,52 +37,6 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
public class SimpleRpcScheduler extends RpcScheduler {
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
- public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
- "hbase.ipc.server.callqueue.read.ratio";
- public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
- "hbase.ipc.server.callqueue.scan.ratio";
- public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
- "hbase.ipc.server.callqueue.handler.factor";
-
- /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
- public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
- public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
- public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
-
- /** max delay in msec used to bound the deprioritized requests */
- public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
- = "hbase.ipc.server.queue.max.call.delay";
-
- /**
- * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
- * It uses the calculated "deadline" e.g. to deprioritize long-running job
- *
- * If multiple requests have the same deadline BoundedPriorityBlockingQueue will order them in
- * FIFO (first-in-first-out) manner.
- */
- private static class CallPriorityComparator implements Comparator {
- private final static int DEFAULT_MAX_CALL_DELAY = 5000;
-
- private final PriorityFunction priority;
- private final int maxDelay;
-
- public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
- this.priority = priority;
- this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
- }
-
- @Override
- public int compare(CallRunner a, CallRunner b) {
- RpcServer.Call callA = a.getCall();
- RpcServer.Call callB = b.getCall();
- long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
- long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
- deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
- deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
- return (int)(deadlineA - deadlineB);
- }
- }
-
private int port;
private final PriorityFunction priority;
private final RpcExecutor callExecutor;
@@ -95,6 +47,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
private final int highPriorityLevel;
private Abortable abortable = null;
+ private int maxDelay;
/**
* @param conf
@@ -117,6 +70,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
this.abortable = server;
+ this.maxDelay = conf.getInt(RpcScheduler.QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
@@ -130,23 +84,23 @@ public class SimpleRpcScheduler extends RpcScheduler {
if (numCallQueues > 1 && callqReadShare > 0) {
// multiple read/write queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
- CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
+ CallRunnerDeadLinePriority callPriority = new CallRunnerDeadLinePriority();
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority);
} else {
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
- callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
+ callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
}
} else {
// multiple queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
- CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
- callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
- conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+ CallRunnerDeadLinePriority callPriority = new CallRunnerDeadLinePriority();
+ callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, conf,
+ abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else {
- callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
- numCallQueues, maxQueueLength, conf, abortable);
+ callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
+ maxQueueLength, conf, abortable);
}
}
@@ -192,13 +146,17 @@ public class SimpleRpcScheduler extends RpcScheduler {
@Override
public void dispatch(CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
+ // The deadline calculation will even happen for non deadline based calls.
+ // We can avoid that by passing a boolean if needed? But should be fine as this
+ // happens once per call
+ CallRunnerWrapper wrap = new CallRunnerWrapper(callTask, this.priority, this.maxDelay);
int level = priority.getPriority(call.getHeader(), call.param);
if (priorityExecutor != null && level > highPriorityLevel) {
- priorityExecutor.dispatch(callTask);
+ priorityExecutor.dispatch(wrap);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
- replicationExecutor.dispatch(callTask);
+ replicationExecutor.dispatch(wrap);
} else {
- callExecutor.dispatch(callTask);
+ callExecutor.dispatch(wrap);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RoundRobinRPCSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RoundRobinRPCSchedulerFactory.java
new file mode 100644
index 0000000..4092279
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RoundRobinRPCSchedulerFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RoundRobinRPCScheduler;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+
+/** Constructs a {@link RoundRobinRPCScheduler}. */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class RoundRobinRPCSchedulerFactory implements RpcSchedulerFactory {
+
+ @Override
+ public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
+ int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+
+ return new RoundRobinRPCScheduler(conf, handlerCount, conf.getInt(
+ HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), conf.getInt(
+ HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), priority, server,
+ HConstants.QOS_THRESHOLD);
+ }
+
+ @Override
+ @Deprecated
+ public RpcScheduler create(Configuration conf, PriorityFunction priority) {
+ return create(conf, priority, null);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java
new file mode 100644
index 0000000..138ca8d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractPriorityBasedRoundRobinQueue.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A bounded queue implementation that is sorted based on the element E and
+ * allows the producers to be grouped based on the producer key and round-robins
+ * among the different producers with in the same priority
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public abstract class AbstractPriorityBasedRoundRobinQueue extends AbstractQueue implements
+ BlockingQueue {
+ private static final Log LOG = LogFactory.getLog(AbstractPriorityBasedRoundRobinQueue.class
+ .getName());
+
+ public AbstractPriorityBasedRoundRobinQueue(int maxSize) {
+ this(maxSize, null);
+ }
+
+ /**
+ * @param newProducerToFront
+ * If true, new producers go to the front of the round-robin list, if
+ * false, they go to the end.
+ */
+ public AbstractPriorityBasedRoundRobinQueue(int maxSize, Comparator super E> comparator) {
+ this.producerMap = new TreeMap>>(comparator);
+ this.maxSize = maxSize;
+ this.currentProducer = new TreeMap, Integer>>(comparator);
+ }
+
+ @Override
+ public Iterator iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ lock.lockInterruptibly();
+ try {
+ while (remainingCapacity() == 0) {
+ if (nanos <= 0)
+ return false;
+ nanos = notFull.awaitNanos(nanos);
+ }
+ boolean ret = offer(o);
+ notEmpty.signal();
+ return ret;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean offer(E o) {
+ if (o == null)
+ throw new NullPointerException();
+
+ final Object producerKey = extractGroupId(o);
+ if (producerKey == null) {
+ throw new NullPointerException();
+ }
+
+ LinkedList producerList = null;
+ lock.lock();
+ try {
+ if (remainingCapacity() > 0) {
+ HashMap