diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java index ab5484c..2d9567a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java @@ -37,26 +37,41 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper @Override public int getGeneralQueueLength() { - if (this.server == null || this.server.callQueue == null) { + if (this.server == null + || this.server.getScheduler() == null + || !(this.server.getScheduler() instanceof SimpleRpcScheduler)) { return 0; } - return server.callQueue.size(); + SimpleRpcScheduler scheduler = (SimpleRpcScheduler) server.getScheduler(); + return scheduler.callQueue.size(); } @Override public int getReplicationQueueLength() { - if (this.server == null || this.server.replicationQueue == null) { + if (this.server == null + || this.server.getScheduler() == null + || !(this.server.getScheduler() instanceof SimpleRpcScheduler)) { return 0; } - return server.replicationQueue.size(); + SimpleRpcScheduler scheduler = (SimpleRpcScheduler) server.getScheduler(); + if (scheduler.replicationQueue== null) { + return 0; + } + return scheduler.replicationQueue.size(); } @Override public int getPriorityQueueLength() { - if (this.server == null || this.server.priorityCallQueue == null) { + if (this.server == null + || this.server.getScheduler() == null + || !(this.server.getScheduler() instanceof SimpleRpcScheduler)) { + return 0; + } + SimpleRpcScheduler scheduler = (SimpleRpcScheduler) server.getScheduler(); + if (scheduler.priorityCallQueue == null) { return 0; } - return server.priorityCallQueue.size(); + return scheduler.priorityCallQueue.size(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index dbafafb..917e95e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -51,10 +51,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import javax.security.sasl.Sasl; @@ -62,6 +60,7 @@ import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -69,6 +68,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.CallerDisconnectedException; @@ -102,7 +102,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -122,8 +121,6 @@ import org.cloudera.htrace.TraceInfo; import org.cloudera.htrace.impl.NullSpan; import org.codehaus.jackson.map.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; import com.google.protobuf.CodedInputStream; import com.google.protobuf.Descriptors.MethodDescriptor; @@ -153,7 +150,7 @@ public class RpcServer implements RpcServerInterface { /** * How many calls/handler are allowed in the queue. */ - private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; + static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; /** * The maximum size that we can hold in the RPC queue @@ -186,10 +183,12 @@ public class RpcServer implements RpcServerInterface { */ protected static final ThreadLocal CurCall = new ThreadLocal(); + /** Keeps MonitoredRPCHandler per handler thread. */ + private static final ThreadLocal MONITORED_RPC + = new ThreadLocal(); + protected final InetSocketAddress isa; protected int port; // port we listen on - private int handlerCount; // number of handler threads - private int priorityHandlerCount; private int readThreads; // number of read threads protected int maxIdleTime; // the maximum idle time after // which a client may be @@ -214,11 +213,7 @@ public class RpcServer implements RpcServerInterface { protected final long purgeTimeout; // in milliseconds volatile protected boolean running = true; // true while server runs - protected BlockingQueue callQueue; // queued calls protected final Counter callQueueSize = new Counter(); - protected BlockingQueue priorityCallQueue; - - protected int highPriorityLevel; // what level a high priority call is at protected final List connectionList = Collections.synchronizedList(new LinkedList()); @@ -227,12 +222,6 @@ public class RpcServer implements RpcServerInterface { private Listener listener = null; protected Responder responder = null; protected int numConnections = 0; - private Handler[] handlers = null; - private Handler[] priorityHandlers = null; - /** replication related queue; */ - protected BlockingQueue replicationQueue; - private int numOfReplicationHandlers = 0; - private Handler[] replicationHandlers = null; protected HBaseRPCErrorHandler errorHandler = null; @@ -248,6 +237,8 @@ public class RpcServer implements RpcServerInterface { private final Object serverInstance; private final List services; + private final RpcScheduler scheduler; + /** * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. @@ -256,6 +247,7 @@ public class RpcServer implements RpcServerInterface { protected int id; // the client's call id protected BlockingService service; protected MethodDescriptor md; + protected RequestHeader header; protected Message param; // the parameter passed // Optional cell data passed outside of protobufs. protected CellScanner cellScanner; @@ -271,12 +263,13 @@ public class RpcServer implements RpcServerInterface { protected boolean isError; protected TraceInfo tinfo; - Call(int id, final BlockingService service, final MethodDescriptor md, Message param, - CellScanner cellScanner, Connection connection, Responder responder, long size, - TraceInfo tinfo) { + Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, + Message param, CellScanner cellScanner, Connection connection, Responder responder, + long size, TraceInfo tinfo) { this.id = id; this.service = service; this.md = md; + this.header = header; this.param = param; this.cellScanner = cellScanner; this.connection = connection; @@ -737,8 +730,7 @@ public class RpcServer implements RpcServerInterface { } if (LOG.isDebugEnabled()) LOG.debug(getName() + ": connection from " + c.toString() + - "; # active connections: " + numConnections + - "; # queued calls: " + callQueue.size()); + "; # active connections: " + numConnections); } finally { reader.finishAdd(); } @@ -1076,25 +1068,6 @@ public class RpcServer implements RpcServerInterface { } } - private Function, Integer> qosFunction = null; - - /** - * Gets the QOS level for this call. If it is higher than the highPriorityLevel and there - * are priorityHandlers available it will be processed in it's own thread set. - * - * @param newFunc - */ - @Override - public void setQosFunction(Function, Integer> newFunc) { - qosFunction = newFunc; - } - - protected int getQosLevel(Pair headerAndParam) { - if (qosFunction == null) return 0; - Integer res = qosFunction.apply(headerAndParam); - return res == null? 0: res; - } - /** Reads calls from a connection and queues them for handling. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="VO_VOLATILE_INCREMENT", @@ -1138,14 +1111,14 @@ public class RpcServer implements RpcServerInterface { private boolean useWrap = false; // Fake 'call' for failed authorization response private static final int AUTHROIZATION_FAILED_CALLID = -1; - private final Call authFailedCall = - new Call(AUTHROIZATION_FAILED_CALLID, this.service, null, null, null, this, null, 0, null); + private final Call authFailedCall = new Call( + AUTHROIZATION_FAILED_CALLID, this.service, null, null, null, null, this, null, 0, null); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = - new Call(SASL_CALLID, this.service, null, null, null, this, null, 0, null); + new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null); public UserGroupInformation attemptingUser = null; // user name before auth @@ -1498,7 +1471,7 @@ public class RpcServer implements RpcServerInterface { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, this, responder, -1, null); + Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null); setupResponse(null, fakeCall, e, msg); responder.doRespond(fakeCall); // Returning -1 closes out the connection. @@ -1647,8 +1620,8 @@ public class RpcServer implements RpcServerInterface { // Enforcing the call queue size, this triggers a retry in the client // This is a bit late to be doing this check - we have already read in the total request. if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { - final Call callTooBig = - new Call(id, this.service, null, null, null, this, responder, totalRequestSize, null); + final Call callTooBig = new Call( + id, this.service, null, null, null, null, this, responder, totalRequestSize, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(), "Call queue is full, is ipc.server.max.callqueue.size too small?"); @@ -1678,8 +1651,8 @@ public class RpcServer implements RpcServerInterface { } catch (Throwable t) { String msg = "Unable to read call parameter from client " + getHostAddress(); LOG.warn(msg, t); - final Call readParamsFailedCall = - new Call(id, this.service, null, null, null, this, responder, totalRequestSize, null); + final Call readParamsFailedCall = new Call( + id, this.service, null, null, null, null, this, responder, totalRequestSize, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, t, msg + "; " + t.getMessage()); @@ -1689,22 +1662,15 @@ public class RpcServer implements RpcServerInterface { Call call = null; if (header.hasTraceInfo()) { - call = new Call(id, this.service, md, param, cellScanner, this, responder, totalRequestSize, - new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())); + call = new Call(id, this.service, md, header, param, cellScanner, this, responder, + totalRequestSize, + new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())); } else { - call = new Call(id, this.service, md, param, cellScanner, this, responder, + call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, null); } callQueueSize.add(totalRequestSize); - Pair headerAndParam = new Pair(header, param); - if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) { - priorityCallQueue.put(call); - } else if (replicationQueue != null && - getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) { - replicationQueue.put(call); - } else { - callQueue.put(call); // queue the call; maybe blocked here - } + scheduler.dispatch(new CallRunner(call)); } private boolean authorizeConnection() throws IOException { @@ -1773,109 +1739,135 @@ public class RpcServer implements RpcServerInterface { } } - /** Handles queued calls . */ - private class Handler extends Thread { - private final BlockingQueue myCallQueue; - private MonitoredRPCHandler status; + /** + * The real request processing logic, which is usually executed in + * thread pools provided by an {@link RpcScheduler}. + */ + public class CallRunner implements Runnable { + private final Call call; - public Handler(final BlockingQueue cq, int instanceNumber) { - this.myCallQueue = cq; - this.setDaemon(true); + public CallRunner(Call call) { + this.call = call; + } - String threadName = "RpcServer.handler=" + instanceNumber + ",port=" + port; - if (cq == priorityCallQueue) { - // this is just an amazing hack, but it works. - threadName = "Priority." + threadName; - } else if (cq == replicationQueue) { - threadName = "Replication." + threadName; - } - this.setName(threadName); - this.status = TaskMonitor.get().createRPCStatus(threadName); + public Call getCall() { + return call; } @Override public void run() { - LOG.info(getName() + ": starting"); - status.setStatus("starting"); + MonitoredRPCHandler status = getStatus(); SERVER.set(RpcServer.this); - while (running) { + try { + status.setStatus("Setting up call"); + status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); + if (LOG.isDebugEnabled()) { + UserGroupInformation remoteUser = call.connection.user; + LOG.debug(call.toShortString() + " executing as " + + ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName())); + } + Throwable errorThrowable = null; + String error = null; + Pair resultPair = null; + CurCall.set(call); + Span currentRequestSpan = NullSpan.getInstance(); try { - status.pause("Waiting for a call"); - Call call = myCallQueue.take(); // pop the queue; maybe blocked here - status.setStatus("Setting up call"); - status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); - if (LOG.isDebugEnabled()) { - UserGroupInformation remoteUser = call.connection.user; - LOG.debug(call.toShortString() + " executing as " + - ((remoteUser == null)? "NULL principal": remoteUser.getUserName())); + if (!started) { + throw new ServerNotRunningYetException("Server is not running yet"); } - Throwable errorThrowable = null; - String error = null; - Pair resultPair = null; - CurCall.set(call); - Span currentRequestSpan = NullSpan.getInstance(); - try { - if (!started) { - throw new ServerNotRunningYetException("Server is not running yet"); - } - if (call.tinfo != null) { - currentRequestSpan = Trace.startSpan( - "handling " + call.toShortString(), call.tinfo, Sampler.ALWAYS); - } - RequestContext.set(User.create(call.connection.user), getRemoteIp(), + if (call.tinfo != null) { + currentRequestSpan = Trace.startSpan( + "handling " + call.toShortString(), call.tinfo, Sampler.ALWAYS); + } + RequestContext.set(User.create(call.connection.user), getRemoteIp(), call.connection.service); - // make the call - resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp, + // make the call + resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp, status); - } catch (Throwable e) { - LOG.debug(getName() + ": " + call.toShortString(), e); - errorThrowable = e; - error = StringUtils.stringifyException(e); - } finally { - currentRequestSpan.stop(); - // Must always clear the request context to avoid leaking - // credentials between requests. - RequestContext.clear(); - } - CurCall.set(null); - callQueueSize.add(call.getSize() * -1); - // Set the response for undelayed calls and delayed calls with - // undelayed responses. - if (!call.isDelayed() || !call.isReturnValueDelayed()) { - Message param = resultPair != null? resultPair.getFirst(): null; - CellScanner cells = resultPair != null? resultPair.getSecond(): null; - call.setResponse(param, cells, errorThrowable, error); - } - call.sendResponseIfReady(); - status.markComplete("Sent response"); - } catch (InterruptedException e) { - if (running) { // unexpected -- log it - LOG.info(getName() + ": caught: " + StringUtils.stringifyException(e)); - } - } catch (OutOfMemoryError e) { - if (errorHandler != null) { - if (errorHandler.checkOOME(e)) { - LOG.info(getName() + ": exiting on OutOfMemoryError"); - return; - } - } else { - // rethrow if no handler - throw e; + } catch (Throwable e) { + LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); + errorThrowable = e; + error = StringUtils.stringifyException(e); + } finally { + currentRequestSpan.stop(); + // Must always clear the request context to avoid leaking + // credentials between requests. + RequestContext.clear(); + } + CurCall.set(null); + callQueueSize.add(call.getSize() * -1); + // Set the response for undelayed calls and delayed calls with + // undelayed responses. + if (!call.isDelayed() || !call.isReturnValueDelayed()) { + Message param = resultPair != null ? resultPair.getFirst() : null; + CellScanner cells = resultPair != null ? resultPair.getSecond() : null; + call.setResponse(param, cells, errorThrowable, error); + } + call.sendResponseIfReady(); + status.markComplete("Sent response"); + status.pause("Waiting for a call"); + } catch (OutOfMemoryError e) { + if (errorHandler != null) { + if (errorHandler.checkOOME(e)) { + LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError"); + return; } - } catch (ClosedChannelException cce) { - LOG.warn(getName() + ": caught a ClosedChannelException, " + + } else { + // rethrow if no handler + throw e; + } + } catch (ClosedChannelException cce) { + LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + "this means that the server was processing a " + "request but the client went away. The error message was: " + cce.getMessage()); - } catch (Exception e) { - LOG.warn(getName() + ": caught: " + StringUtils.stringifyException(e)); - } + } catch (Exception e) { + LOG.warn(Thread.currentThread().getName() + + ": caught: " + StringUtils.stringifyException(e)); + } + } + + public MonitoredRPCHandler getStatus() { + MonitoredRPCHandler status = MONITORED_RPC.get(); + if (status != null) { + return status; } - LOG.info(getName() + ": exiting"); + status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); + status.pause("Waiting for a call"); + MONITORED_RPC.set(status); + return status; } } + /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ + public interface RpcSchedulerContext { + InetSocketAddress getListenerAddress(); + } + + /** + * An interface for RPC request scheduling algorithm. + */ + public interface RpcScheduler { + + /** + * Does some quick initialization. Heavy tasks (e.g. starting threads) should be + * done in {@link #start()}. This method is called before {@code start}. + */ + void init(RpcSchedulerContext context); + + /** + * Prepares for request serving. An implementation may start some handler threads here. + */ + void start(); + + /** Stops serving new requests. */ + void stop(); + + /** Dispatches an RPC request asynchronously. */ + void dispatch(CallRunner task) throws IOException, InterruptedException; + } + /** * Datastructure for passing a {@link BlockingService} and its associated class of * protobuf service interface. For example, a server that fielded what is defined @@ -1910,8 +1902,7 @@ public class RpcServer implements RpcServerInterface { final Configuration conf) throws IOException { this(null, "generic", Lists.newArrayList(new BlockingServiceAndInterface(service, null)), - isa, 3, 3, conf, - HConstants.QOS_THRESHOLD); + isa, conf, new SimpleRpcScheduler(conf, 3, 3, 3, null, 0)); } /** @@ -1921,46 +1912,27 @@ public class RpcServer implements RpcServerInterface { * @param name Used keying this rpc servers' metrics and for naming the Listener thread. * @param services A list of services. * @param isa Where to listen - * @param handlerCount the number of handler threads that will be used to process calls - * @param priorityHandlerCount How many threads for priority handling. * @param conf - * @param highPriorityLevel * @throws IOException */ public RpcServer(final Server serverInstance, final String name, final List services, - final InetSocketAddress isa, int handlerCount, int priorityHandlerCount, Configuration conf, - int highPriorityLevel) + final InetSocketAddress isa, Configuration conf, + RpcScheduler scheduler) throws IOException { this.serverInstance = serverInstance; this.services = services; this.isa = isa; this.conf = conf; - this.handlerCount = handlerCount; - this.priorityHandlerCount = priorityHandlerCount; this.socketSendBufferSize = 0; - this.maxQueueLength = this.conf.getInt("ipc.server.max.callqueue.length", - handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); this.maxQueueSize = this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10); - this.callQueue = new LinkedBlockingQueue(maxQueueLength); - if (priorityHandlerCount > 0) { - this.priorityCallQueue = new LinkedBlockingQueue(maxQueueLength); // TODO hack on size - } else { - this.priorityCallQueue = null; - } - this.highPriorityLevel = highPriorityLevel; this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3); - if (numOfReplicationHandlers > 0) { - this.replicationQueue = new LinkedBlockingQueue(maxQueueSize); - } - this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); @@ -1984,6 +1956,13 @@ public class RpcServer implements RpcServerInterface { if (isSecurityEnabled) { HBaseSaslRpcServer.init(conf); } + this.scheduler = scheduler; + this.scheduler.init(new RpcSchedulerContext() { + @Override + public InetSocketAddress getListenerAddress() { + return RpcServer.this.getListenerAddress(); + } + }); } /** @@ -2058,9 +2037,7 @@ public class RpcServer implements RpcServerInterface { HBasePolicyProvider.init(conf, authManager); responder.start(); listener.start(); - handlers = startHandlers(callQueue, handlerCount); - priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount); - replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers); + scheduler.start(); } @Override @@ -2068,18 +2045,6 @@ public class RpcServer implements RpcServerInterface { this.authManager.refresh(this.conf, pp); } - private Handler[] startHandlers(BlockingQueue queue, int numOfHandlers) { - if (numOfHandlers <= 0) { - return null; - } - Handler[] handlers = new Handler[numOfHandlers]; - for (int i = 0; i < numOfHandlers; i++) { - handlers[i] = new Handler(queue, i); - handlers[i].start(); - } - return handlers; - } - private AuthenticationTokenSecretManager createSecretManager() { if (!isSecurityEnabled) return null; if (serverInstance == null) return null; @@ -2223,25 +2188,12 @@ public class RpcServer implements RpcServerInterface { public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; - stopHandlers(handlers); - stopHandlers(priorityHandlers); - stopHandlers(replicationHandlers); listener.interrupt(); listener.doStop(); responder.interrupt(); notifyAll(); } - private void stopHandlers(Handler[] handlers) { - if (handlers != null) { - for (Handler handler : handlers) { - if (handler != null) { - handler.interrupt(); - } - } - } - } - /** Wait for the server to be stopped. * Does not wait for all subthreads to finish. * See {@link #stop()}. @@ -2507,4 +2459,8 @@ public class RpcServer implements RpcServerInterface { throw e; } } + + public RpcScheduler getScheduler() { + return scheduler; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 91f09e2..de06297 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -65,8 +65,6 @@ public interface RpcServerInterface { */ MetricsHBaseServer getMetrics(); - public void setQosFunction(Function, Integer> newFunc); - /** * Refresh autentication manager policy. * @param pp diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java new file mode 100644 index 0000000..e8e4033 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.common.base.Function; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.util.Pair; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A scheduler that maintains isolated handler pools for high-priority and repliaction + * requests. + */ +public class SimpleRpcScheduler implements RpcServer.RpcScheduler { + + private int port; + private final int handlerCount; + private final int priorityHandlerCount; + private final int replicationHandlerCount; + final BlockingQueue callQueue; + final BlockingQueue priorityCallQueue; + final BlockingQueue replicationQueue; + private volatile boolean running = false; + private final List handlers = Lists.newArrayList(); + private final Function, Integer> qosFunction; + + /** What level a high priority call is at. */ + private final int highPriorityLevel; + + /** + * @param conf + * @param handlerCount the number of handler threads that will be used to process calls + * @param priorityHandlerCount How many threads for priority handling. + * @param replicationHandlerCount How many threads for replication handling. + * @param qosFunction a function that maps requests to priorities + * @param highPriorityLevel + */ + public SimpleRpcScheduler( + Configuration conf, + int handlerCount, + int priorityHandlerCount, + int replicationHandlerCount, + Function, Integer> qosFunction, + int highPriorityLevel) { + int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length", + handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + this.handlerCount = handlerCount; + this.priorityHandlerCount = priorityHandlerCount; + this.replicationHandlerCount = replicationHandlerCount; + this.qosFunction = qosFunction; + this.highPriorityLevel = highPriorityLevel; + this.callQueue = new LinkedBlockingQueue(maxQueueLength); + this.priorityCallQueue = priorityHandlerCount > 0 + ? new LinkedBlockingQueue(maxQueueLength) + : null; + this.replicationQueue = replicationHandlerCount > 0 + ? new LinkedBlockingQueue(maxQueueLength) + : null; + } + + @Override + public void init(RpcServer.RpcSchedulerContext context) { + this.port = context.getListenerAddress().getPort(); + } + + @Override + public void start() { + running = true; + startHandlers(handlerCount, callQueue, null); + if (priorityCallQueue != null) { + startHandlers(priorityHandlerCount, priorityCallQueue, "Priority."); + } + if (replicationQueue != null) { + startHandlers(replicationHandlerCount, replicationQueue, "Replication."); + } + } + + private void startHandlers( + int handlerCount, + final BlockingQueue callQueue, + String threadNamePrefix) { + for (int i = 0; i < handlerCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + consumerLoop(callQueue); + } + }); + t.setDaemon(true); + t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port); + t.start(); + handlers.add(t); + } + } + + @Override + public void stop() { + running = false; + for (Thread handler : handlers) { + handler.interrupt(); + } + } + + @Override + public void dispatch(RpcServer.CallRunner callTask) throws InterruptedException { + RpcServer.Call call = callTask.getCall(); + Pair headerAndParam = + new Pair(call.header, call.param); + if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) { + priorityCallQueue.put(callTask); + } else if (replicationQueue != null && + getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) { + replicationQueue.put(callTask); + } else { + callQueue.put(callTask); // queue the call; maybe blocked here + } + } + + private void consumerLoop(BlockingQueue myQueue) { + while (running) { + try { + RpcServer.CallRunner task = myQueue.take(); + task.run(); + } catch (InterruptedException e) { + Thread.interrupted(); + } + } + } + + private int getQosLevel(Pair headerAndParam) { + if (qosFunction == null) return 0; + Integer res = qosFunction.apply(headerAndParam); + return res == null? 0: res; + } +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 5d4303b..e35b50b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; @@ -397,12 +398,17 @@ MasterServices, Server { HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG); int numHandlers = conf.getInt("hbase.master.handler.count", conf.getInt("hbase.regionserver.handler.count", 25)); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler( + conf, + numHandlers, + 0, // we don't use high priority handlers in master + 0, // we don't use replication handlers in master + null, // this is a DNC w/o high priority handlers + 0); this.rpcServer = new RpcServer(this, name, getServices(), initialIsa, // BindAddress is IP we got for this server. - numHandlers, - 0, // we dont use high priority handlers in master conf, - 0); // this is a DNC w/o high priority handlers + scheduler); // Set our address. this.isa = this.rpcServer.getListenerAddress(); this.serverName = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index cd80bab..f91b9e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -545,18 +546,24 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa String name = "regionserver/" + initialIsa.toString(); // Set how many times to retry talking to another server over HConnection. HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG); + this.qosFunction = new QosFunction(this); + int handlerCount = conf.getInt("hbase.regionserver.handler.count", 10); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler( + conf, + handlerCount, + conf.getInt("hbase.regionserver.metahandler.count", 10), + conf.getInt("hbase.regionserver.replication.handler.count", 3), + qosFunction, + HConstants.QOS_THRESHOLD); this.rpcServer = new RpcServer(this, name, getServices(), /*HBaseRPCErrorHandler.class, OnlineRegions.class},*/ initialIsa, // BindAddress is IP we got for this server. - conf.getInt("hbase.regionserver.handler.count", 10), - conf.getInt("hbase.regionserver.metahandler.count", 10), - conf, HConstants.QOS_THRESHOLD); + conf, scheduler); // Set our address. this.isa = this.rpcServer.getListenerAddress(); this.rpcServer.setErrorHandler(this); - this.rpcServer.setQosFunction((qosFunction = new QosFunction(this))); this.startcode = System.currentTimeMillis(); // login the zookeeper client principal (if using security) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index b3520d2..bfc0e1d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -84,7 +84,9 @@ public class TestDelayedRpc { TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance); rpcServer = new RpcServer(null, "testDelayedRpc", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - isa, 1, 0, conf, 0); + isa, + conf, + new SimpleRpcScheduler(conf, 1, 0, 0, null, 0)); rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { @@ -163,7 +165,9 @@ public class TestDelayedRpc { TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance); rpcServer = new RpcServer(null, "testTooManyDelayedRpcs", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - isa, 1, 0, conf, 0); + isa, + conf, + new SimpleRpcScheduler(conf, 1, 0, 0, null, 0)); rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { @@ -283,7 +287,9 @@ public class TestDelayedRpc { TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance); rpcServer = new RpcServer(null, "testEndDelayThrowing", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - isa, 1, 0, conf, 0); + isa, + conf, + new SimpleRpcScheduler(conf, 1, 0, 0, null, 0)); rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index da33ba5..6c229c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -81,6 +81,7 @@ public class TestIPC { public static final Log LOG = LogFactory.getLog(TestIPC.class); static byte [] CELL_BYTES = Bytes.toBytes("xyz"); static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); + private final static Configuration CONF = HBaseConfiguration.create(); // We are using the test TestRpcServiceProtos generated classes and Service because they are // available and basic with methods like 'echo', and ping. Below we make a blocking service // by passing in implementation of blocking interface. We use this service in all tests that @@ -132,11 +133,11 @@ public class TestIPC { * HBaseRpcServer directly. */ private static class TestRpcServer extends RpcServer { + TestRpcServer() throws IOException { super(null, "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("0.0.0.0", 0), 1, 1, - HBaseConfiguration.create(), 0); + new InetSocketAddress("0.0.0.0", 0), CONF, new SimpleRpcScheduler(CONF, 1, 1, 0, null, 0)); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 02bbd97..1cf17f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -98,7 +98,8 @@ public class TestProtoBufRpc { // Get RPC server for server side implementation this.server = new RpcServer(null, "testrpc", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - new InetSocketAddress(ADDRESS, PORT), 10, 10, conf, 0); + new InetSocketAddress(ADDRESS, PORT), conf, + new SimpleRpcScheduler(conf, 10, 10, 0, null, 0)); this.isa = server.getListenerAddress(); this.server.start(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index 7b676ee..e7ab559 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ipc.RequestContext; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -128,8 +129,10 @@ public class TestTokenAuthentication { AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this); sai.add(new BlockingServiceAndInterface(service, AuthenticationProtos.AuthenticationService.BlockingInterface.class)); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler( + conf, 3, 1, 0, null, HConstants.QOS_THRESHOLD); this.rpcServer = - new RpcServer(this, "tokenServer", sai, initialIsa, 3, 1, conf, HConstants.QOS_THRESHOLD); + new RpcServer(this, "tokenServer", sai, initialIsa, conf, scheduler); this.isa = this.rpcServer.getListenerAddress(); this.sleeper = new Sleeper(1000, this); }