Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision ) @@ -295,6 +295,11 @@ protected Connection connection; // connection to client protected long timestamp; // the time received when response is null // the time served when response is not null + // Deadline for this call; + // - timestamp + conn. header timeout, when connection header timeout is set + // - timestamp + default HConstants#HBASE_RPC_TIMEOUT_KEY otherwise + // Calls in each queue set are ordered as per the EDF. + protected long deadline; /** * Chain of buffers to send as response. */ @@ -326,6 +331,12 @@ this.cellScanner = cellScanner; this.connection = connection; this.timestamp = System.currentTimeMillis(); + + if (connection != null && connection.connectionHeader != null) { + this.deadline = timestamp + connection.connectionHeader.getCallTimeout(); + } else { + this.deadline = timestamp + HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + } this.response = null; this.responder = responder; this.isError = false; @@ -483,6 +494,10 @@ public long getSize() { return this.size; + } + + public long getDeadline() { + return this.deadline; } public long getResponseCellSize() { Index: hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java (revision ) @@ -219,7 +219,9 @@ .addGauge(Interns.info(NUM_OPEN_CONNECTIONS_NAME, NUM_OPEN_CONNECTIONS_DESC), wrapper.getNumOpenConnections()) .addGauge(Interns.info(NUM_ACTIVE_HANDLER_NAME, - NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount()); + NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount()) + .addGauge(Interns.info(NUM_CALLS_MISSED_DEADLINE_NAME, + NUM_CALLS_MISSED_DEADLINE_DESC), wrapper.getNumCallsMissedDeadline()); } metricsRegistry.snapshot(mrb, all); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java (revision ) @@ -223,5 +223,10 @@ (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) + (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); } + + @Override + public long getNumCallsMissedDeadline() { + return callExecutor.getNumCallsMissedDeadline(); + } } Index: hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java (revision ) @@ -64,6 +64,9 @@ String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections."; String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler"; String NUM_ACTIVE_HANDLER_DESC = "Number of active rpc handlers."; + String NUM_CALLS_MISSED_DEADLINE_NAME = "numCallsMissedDeadline"; + String NUM_CALLS_MISSED_DEADLINE_DESC = "Total number of calls in general queue which missed" + + " their deadline and were not attempted."; String EXCEPTIONS_NAME="exceptions"; String EXCEPTIONS_DESC="Exceptions caused by requests"; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java (revision ) @@ -48,4 +48,9 @@ public int getActiveRpcHandlerCount() { return 106; } + + @Override + public long getNumCallsMissedDeadline() { + return 22; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java (revision ) @@ -71,4 +71,10 @@ /** Retrieves the number of active handler. */ public abstract int getActiveRpcHandlerCount(); + + /** + * Retrieves the number of tasks (Calls) which missed their deadline and were skipped + * by scheduler, returns 0 is scheduler doesn't support deadline-based scheduling. + */ + public abstract long getNumCallsMissedDeadline(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java (revision ) @@ -104,4 +104,9 @@ public int getActiveRpcHandlerCount() { return executor.getActiveCount(); } + + @Override + public long getNumCallsMissedDeadline() { + return 0; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/DeadlineRpcScheduler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/DeadlineRpcScheduler.java (revision ) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/DeadlineRpcScheduler.java (revision ) @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; + +import java.util.Comparator; + +/** + * A scheduler that maintains isolated handler pools for general, + * high-priority and replication requests and tries to enforce deadlines + * on the requests in general queue based on timeouts provided by client. + */ +public class DeadlineRpcScheduler extends RpcScheduler { + private static final Log LOG = LogFactory.getLog(DeadlineRpcScheduler.class); + + public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = + "hbase.ipc.server.callqueue.handler.factor"; + + /** + * Comparator used by the "normal callQueue", which uses deadlines set on Call objects + * to priority request processing. It does NOT at the moment use PriorityFunction#getDeadLine. + * + * If multiple requests have the same deadline BoundedPriorityBlockingQueue will order them in + * FIFO (first-in-first-out) manner. + */ + private static class CallPriorityComparator implements Comparator { + @Override + public int compare(CallRunner a, CallRunner b) { + return (int) (a.getCall().getDeadline() - b.getCall().getDeadline()); + } + } + + private int port; + private final PriorityFunction priority; + private final RpcExecutor callExecutor; + private final RpcExecutor priorityExecutor; + private final RpcExecutor replicationExecutor; + + /** What level a high priority call is at. */ + private final int highPriorityLevel; + + private Abortable abortable = null; + + /** + * @param conf Configuration to be used + * @param handlerCount the number of handler threads that will be used to process calls + * @param priorityHandlerCount How many threads for priority handling. + * @param replicationHandlerCount How many threads for replication handling. + * @param highPriorityLevel priority of high priority calls + * @param priority Function to extract request priority + */ + public DeadlineRpcScheduler( + Configuration conf, + int handlerCount, + int priorityHandlerCount, + int replicationHandlerCount, + PriorityFunction priority, + Abortable server, + int highPriorityLevel) { + int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", + handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + this.priority = priority; + this.highPriorityLevel = highPriorityLevel; + this.abortable = server; + + float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); + int numCallQueues = Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor)); + + LOG.info("Using deadline scheduling for user call queue, count=" + numCallQueues); + + // multiple queues w/o read/write separation + CallPriorityComparator callPriority = new CallPriorityComparator(); + callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, + conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + + // Create 2 queues to help priorityExecutor be more scalable. + this.priorityExecutor = priorityHandlerCount > 0 ? + new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null; + + this.replicationExecutor = + replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication", + replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; + } + + public DeadlineRpcScheduler( + Configuration conf, + int handlerCount, + int priorityHandlerCount, + int replicationHandlerCount, + PriorityFunction priority, + int highPriorityLevel) { + this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority, + null, highPriorityLevel); + } + + @Override + public void init(RpcScheduler.Context context) { + this.port = context.getListenerAddress().getPort(); + } + + @Override + public void start() { + callExecutor.start(port); + if (priorityExecutor != null) priorityExecutor.start(port); + if (replicationExecutor != null) replicationExecutor.start(port); + } + + @Override + public void stop() { + callExecutor.stop(); + if (priorityExecutor != null) priorityExecutor.stop(); + if (replicationExecutor != null) replicationExecutor.stop(); + } + + @Override + public boolean dispatch(CallRunner callTask) throws InterruptedException { + RpcServer.Call call = callTask.getCall(); + int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser()); + if (priorityExecutor != null && level > highPriorityLevel) { + return priorityExecutor.dispatch(callTask); + } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { + return replicationExecutor.dispatch(callTask); + } else { + return callExecutor.dispatch(callTask); + } + } + + @Override + public int getGeneralQueueLength() { + return callExecutor.getQueueLength(); + } + + @Override + public int getPriorityQueueLength() { + return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength(); + } + + @Override + public int getReplicationQueueLength() { + return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength(); + } + + @Override + public int getActiveRpcHandlerCount() { + return callExecutor.getActiveHandlerCount() + + (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) + + (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); + } + + @Override + public long getNumCallsMissedDeadline() { + return callExecutor.getNumCallsMissedDeadline(); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java (revision ) @@ -78,4 +78,12 @@ } return server.getScheduler().getActiveRpcHandlerCount(); } + + @Override + public long getNumCallsMissedDeadline() { + if (!isServerStarted() || this.server.getScheduler() == null) { + return 0; + } + return server.getScheduler().getNumCallsMissedDeadline(); + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java (revision ) @@ -18,25 +18,32 @@ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.CallMissedDeadlineException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.regionserver.DeadlineRpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import static org.apache.hadoop.hbase.regionserver.RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS; + @InterfaceAudience.Private @InterfaceStability.Evolving public abstract class RpcExecutor { @@ -47,6 +54,7 @@ private final int handlerCount; private final String name; private final AtomicInteger failedHandlerCount = new AtomicInteger(0); + private final AtomicLong numCallsMissedDeadline = new AtomicLong(0); private boolean running; @@ -82,6 +90,10 @@ return activeHandlerCount.get(); } + public long getNumCallsMissedDeadline() { + return numCallsMissedDeadline.get(); + } + /** Returns the length of the pending queue */ public abstract int getQueueLength(); @@ -128,36 +140,56 @@ MonitoredRPCHandler status = RpcServer.getStatus(); CallRunner task = myQueue.take(); task.setStatus(status); + + if (isFullDeadlineSchedulerInUse() && + task.getCall().getDeadline() < System.currentTimeMillis()) { + + numCallsMissedDeadline.incrementAndGet(); + + StringBuilder errorMsg = new StringBuilder(); + errorMsg.append("Call ").append(task.getCall()).append(" missed its deadline by "). + append(System.currentTimeMillis() - task.getCall().getDeadline()).append(" millis"); + task.getCall().setResponse(null, null, new CallMissedDeadlineException(), + errorMsg.toString()); - try { + try { + task.getCall().sendResponseIfReady(); + } catch (IOException e) { + RpcServer.LOG.warn(Thread.currentThread().getName() + + ": caught: " + StringUtils.stringifyException(e)); + } + status.markComplete("Sent response"); + } else { + try { - activeHandlerCount.incrementAndGet(); - task.run(); - } catch (Throwable e) { - if (e instanceof Error) { - int failedCount = failedHandlerCount.incrementAndGet(); - if (handlerFailureThreshhold >= 0 + activeHandlerCount.incrementAndGet(); + task.run(); + } catch (Throwable e) { + if (e instanceof Error) { + int failedCount = failedHandlerCount.incrementAndGet(); + if (handlerFailureThreshhold >= 0 && failedCount > handlerCount * handlerFailureThreshhold) { - String message = + String message = "Number of failed RpcServer handler exceeded threshhold " - + handlerFailureThreshhold + " with failed reason: " - + StringUtils.stringifyException(e); - if (abortable != null) { - abortable.abort(message, e); - } else { - LOG.error("Received " + StringUtils.stringifyException(e) - + " but not aborting due to abortable being null"); - throw e; - } - } else { - LOG.warn("RpcServer handler threads encountered errors " + + handlerFailureThreshhold + " with failed reason: " + + StringUtils.stringifyException(e); + if (abortable != null) { + abortable.abort(message, e); + } else { + LOG.error("Received " + StringUtils.stringifyException(e) + + " but not aborting due to abortable being null"); + throw e; + } + } else { + LOG.warn("RpcServer handler threads encountered errors " + StringUtils.stringifyException(e)); - } - } else { - LOG.warn("RpcServer handler threads encountered exceptions " + } + } else { + LOG.warn("RpcServer handler threads encountered exceptions " + StringUtils.stringifyException(e)); - } - } finally { - activeHandlerCount.decrementAndGet(); - } + } + } finally { + activeHandlerCount.decrementAndGet(); + } + } } catch (InterruptedException e) { interrupted = true; } @@ -167,6 +199,12 @@ Thread.currentThread().interrupt(); } } + } + + private boolean isFullDeadlineSchedulerInUse() { + return conf == null? false : conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SimpleRpcSchedulerFactory.class).getSimpleName().equals( + DeadlineRpcSchedulerFactory.class.getSimpleName()); } public static abstract class QueueBalancer { Index: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (revision ) @@ -803,6 +803,26 @@ * optional .VersionInfo version_info = 5; */ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfoOrBuilder getVersionInfoOrBuilder(); + + // optional uint32 call_timeout = 6; + /** + * optional uint32 call_timeout = 6; + * + *
+     * Indicates timeout (in milliseconds) for the requests send over this connection;
+     * used for deadline scheduling by RPC server.
+     * 
+ */ + boolean hasCallTimeout(); + /** + * optional uint32 call_timeout = 6; + * + *
+     * Indicates timeout (in milliseconds) for the requests send over this connection;
+     * used for deadline scheduling by RPC server.
+     * 
+ */ + int getCallTimeout(); } /** * Protobuf type {@code ConnectionHeader} @@ -900,8 +920,13 @@ bitField0_ |= 0x00000010; break; } + case 48: { + bitField0_ |= 0x00000020; + callTimeout_ = input.readUInt32(); + break; - } - } + } + } + } } catch (com.google.protobuf.InvalidProtocolBufferException e) { throw e.setUnfinishedMessage(this); } catch (java.io.IOException e) { @@ -1143,12 +1168,39 @@ return versionInfo_; } + // optional uint32 call_timeout = 6; + public static final int CALL_TIMEOUT_FIELD_NUMBER = 6; + private int callTimeout_; + /** + * optional uint32 call_timeout = 6; + * + *
+     * Indicates timeout (in milliseconds) for the requests send over this connection;
+     * used for deadline scheduling by RPC server.
+     * 
+ */ + public boolean hasCallTimeout() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional uint32 call_timeout = 6; + * + *
+     * Indicates timeout (in milliseconds) for the requests send over this connection;
+     * used for deadline scheduling by RPC server.
+     * 
+ */ + public int getCallTimeout() { + return callTimeout_; + } + private void initFields() { userInfo_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation.getDefaultInstance(); serviceName_ = ""; cellBlockCodecClass_ = ""; cellBlockCompressorClass_ = ""; versionInfo_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo.getDefaultInstance(); + callTimeout_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1189,6 +1241,9 @@ if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeMessage(5, versionInfo_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeUInt32(6, callTimeout_); + } getUnknownFields().writeTo(output); } @@ -1218,6 +1273,10 @@ size += com.google.protobuf.CodedOutputStream .computeMessageSize(5, versionInfo_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(6, callTimeout_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1266,6 +1325,11 @@ result = result && getVersionInfo() .equals(other.getVersionInfo()); } + result = result && (hasCallTimeout() == other.hasCallTimeout()); + if (hasCallTimeout()) { + result = result && (getCallTimeout() + == other.getCallTimeout()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1299,6 +1363,10 @@ hash = (37 * hash) + VERSION_INFO_FIELD_NUMBER; hash = (53 * hash) + getVersionInfo().hashCode(); } + if (hasCallTimeout()) { + hash = (37 * hash) + CALL_TIMEOUT_FIELD_NUMBER; + hash = (53 * hash) + getCallTimeout(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1432,6 +1500,8 @@ versionInfoBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000010); + callTimeout_ = 0; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -1488,6 +1558,10 @@ } else { result.versionInfo_ = versionInfoBuilder_.build(); } + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.callTimeout_ = callTimeout_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1525,6 +1599,9 @@ if (other.hasVersionInfo()) { mergeVersionInfo(other.getVersionInfo()); } + if (other.hasCallTimeout()) { + setCallTimeout(other.getCallTimeout()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2080,6 +2157,59 @@ return versionInfoBuilder_; } + // optional uint32 call_timeout = 6; + private int callTimeout_ ; + /** + * optional uint32 call_timeout = 6; + * + *
+       * Indicates timeout (in milliseconds) for the requests send over this connection;
+       * used for deadline scheduling by RPC server.
+       * 
+ */ + public boolean hasCallTimeout() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional uint32 call_timeout = 6; + * + *
+       * Indicates timeout (in milliseconds) for the requests send over this connection;
+       * used for deadline scheduling by RPC server.
+       * 
+ */ + public int getCallTimeout() { + return callTimeout_; + } + /** + * optional uint32 call_timeout = 6; + * + *
+       * Indicates timeout (in milliseconds) for the requests send over this connection;
+       * used for deadline scheduling by RPC server.
+       * 
+ */ + public Builder setCallTimeout(int value) { + bitField0_ |= 0x00000020; + callTimeout_ = value; + onChanged(); + return this; + } + /** + * optional uint32 call_timeout = 6; + * + *
+       * Indicates timeout (in milliseconds) for the requests send over this connection;
+       * used for deadline scheduling by RPC server.
+       * 
+ */ + public Builder clearCallTimeout() { + bitField0_ = (bitField0_ & ~0x00000020); + callTimeout_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ConnectionHeader) } @@ -6132,24 +6262,25 @@ java.lang.String[] descriptorData = { "\n\tRPC.proto\032\rTracing.proto\032\013HBase.proto\"" + "<\n\017UserInformation\022\026\n\016effective_user\030\001 \002" + - "(\t\022\021\n\treal_user\030\002 \001(\t\"\266\001\n\020ConnectionHead" + + "(\t\022\021\n\treal_user\030\002 \001(\t\"\314\001\n\020ConnectionHead" + "er\022#\n\tuser_info\030\001 \001(\0132\020.UserInformation\022" + "\024\n\014service_name\030\002 \001(\t\022\036\n\026cell_block_code" + "c_class\030\003 \001(\t\022#\n\033cell_block_compressor_c" + "lass\030\004 \001(\t\022\"\n\014version_info\030\005 \001(\0132\014.Versi" + - "onInfo\"\037\n\rCellBlockMeta\022\016\n\006length\030\001 \001(\r\"" + - "|\n\021ExceptionResponse\022\034\n\024exception_class_" + - "name\030\001 \001(\t\022\023\n\013stack_trace\030\002 \001(\t\022\020\n\010hostn", - "ame\030\003 \001(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_not_retry\030" + - "\005 \001(\010\"\246\001\n\rRequestHeader\022\017\n\007call_id\030\001 \001(\r" + - "\022\035\n\ntrace_info\030\002 \001(\0132\t.RPCTInfo\022\023\n\013metho" + - "d_name\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\022\'\n\017c" + - "ell_block_meta\030\005 \001(\0132\016.CellBlockMeta\022\020\n\010" + - "priority\030\006 \001(\r\"q\n\016ResponseHeader\022\017\n\007call" + - "_id\030\001 \001(\r\022%\n\texception\030\002 \001(\0132\022.Exception" + - "Response\022\'\n\017cell_block_meta\030\003 \001(\0132\016.Cell" + - "BlockMetaB<\n*org.apache.hadoop.hbase.pro" + - "tobuf.generatedB\tRPCProtosH\001\240\001\001" + "onInfo\022\024\n\014call_timeout\030\006 \001(\r\"\037\n\rCellBloc" + + "kMeta\022\016\n\006length\030\001 \001(\r\"|\n\021ExceptionRespon" + + "se\022\034\n\024exception_class_name\030\001 \001(\t\022\023\n\013stac", + "k_trace\030\002 \001(\t\022\020\n\010hostname\030\003 \001(\t\022\014\n\004port\030" + + "\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\246\001\n\rRequestH" + + "eader\022\017\n\007call_id\030\001 \001(\r\022\035\n\ntrace_info\030\002 \001" + + "(\0132\t.RPCTInfo\022\023\n\013method_name\030\003 \001(\t\022\025\n\rre" + + "quest_param\030\004 \001(\010\022\'\n\017cell_block_meta\030\005 \001" + + "(\0132\016.CellBlockMeta\022\020\n\010priority\030\006 \001(\r\"q\n\016" + + "ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022%\n\texcep" + + "tion\030\002 \001(\0132\022.ExceptionResponse\022\'\n\017cell_b" + + "lock_meta\030\003 \001(\0132\016.CellBlockMetaB<\n*org.a" + + "pache.hadoop.hbase.protobuf.generatedB\tR", + "PCProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6167,7 +6298,7 @@ internal_static_ConnectionHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ConnectionHeader_descriptor, - new java.lang.String[] { "UserInfo", "ServiceName", "CellBlockCodecClass", "CellBlockCompressorClass", "VersionInfo", }); + new java.lang.String[] { "UserInfo", "ServiceName", "CellBlockCodecClass", "CellBlockCompressorClass", "VersionInfo", "CallTimeout", }); internal_static_CellBlockMeta_descriptor = getDescriptor().getMessageTypes().get(2); internal_static_CellBlockMeta_fieldAccessorTable = new Index: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java (revision ) @@ -356,6 +356,8 @@ ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); builder.setServiceName(remoteId.getServiceName()); + builder.setCallTimeout(conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); UserInformation userInfoPB = getUserInfo(ticket); if (userInfoPB != null) { builder.setUserInfo(userInfoPB); Index: hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java (revision ) @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.exceptions; +import org.apache.hadoop.hbase.CallMissedDeadlineException; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.RegionTooBusyException; @@ -47,7 +48,7 @@ return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException || cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException - || cur instanceof CallQueueTooBigException); + || cur instanceof CallQueueTooBigException || cur instanceof CallMissedDeadlineException); } Index: hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java (revision ) @@ -26,4 +26,5 @@ int getPriorityQueueLength(); int getNumOpenConnections(); int getActiveRpcHandlerCount(); + long getNumCallsMissedDeadline(); } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/CallMissedDeadlineException.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/CallMissedDeadlineException.java (revision ) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/CallMissedDeadlineException.java (revision ) @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.io.IOException; + +/** + * Returned to client if RpcServer's executor skipped the task on the basis that it's + * already missed its deadline. + * + * if you get that it's guaranteed that no modifications has been made on the server. + */ +@SuppressWarnings("serial") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CallMissedDeadlineException extends IOException{ + public CallMissedDeadlineException() { + super(); + } +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeadlineRpcSchedulerFactory.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeadlineRpcSchedulerFactory.java (revision ) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeadlineRpcSchedulerFactory.java (revision ) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.DeadlineRpcScheduler; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcScheduler; + +/** + * Constructs a {@link DeadlineRpcScheduler}. + */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class DeadlineRpcSchedulerFactory implements RpcSchedulerFactory { + + @Override + @Deprecated + public RpcScheduler create(Configuration conf, PriorityFunction priority) { + return create(conf, priority, null); + } + + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + + return new DeadlineRpcScheduler( + conf, + handlerCount, + conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), + conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), + priority, + server, + HConstants.QOS_THRESHOLD); + } +} Index: hbase-protocol/src/main/protobuf/RPC.proto IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-protocol/src/main/protobuf/RPC.proto (revision 5cae3f66a910042a564e741d74007253ad2ab10b) +++ hbase-protocol/src/main/protobuf/RPC.proto (revision ) @@ -87,6 +87,9 @@ // Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec. optional string cell_block_compressor_class = 4; optional VersionInfo version_info = 5; + // Indicates timeout (in milliseconds) for the requests send over this connection; + // used for deadline scheduling by RPC server. + optional uint32 call_timeout = 6; } // Optional Cell block Message. Included in client RequestHeader