diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java index ab5484c..fcd567e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java @@ -37,26 +37,41 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper @Override public int getGeneralQueueLength() { - if (this.server == null || this.server.callQueue == null) { + if (this.server == null + || this.server.scheduler == null + || !(this.server.scheduler instanceof RpcServer.SimpleRpcScheduler)) { return 0; } - return server.callQueue.size(); + RpcServer.SimpleRpcScheduler scheduler = (RpcServer.SimpleRpcScheduler) server.scheduler; + return scheduler.callQueue.size(); } @Override public int getReplicationQueueLength() { - if (this.server == null || this.server.replicationQueue == null) { + if (this.server == null + || this.server.scheduler == null + || !(this.server.scheduler instanceof RpcServer.SimpleRpcScheduler)) { return 0; } - return server.replicationQueue.size(); + RpcServer.SimpleRpcScheduler scheduler = (RpcServer.SimpleRpcScheduler) server.scheduler; + if (scheduler.replicationQueue== null) { + return 0; + } + return scheduler.replicationQueue.size(); } @Override public int getPriorityQueueLength() { - if (this.server == null || this.server.priorityCallQueue == null) { + if (this.server == null + || this.server.scheduler == null + || !(this.server.scheduler instanceof RpcServer.SimpleRpcScheduler)) { + return 0; + } + RpcServer.SimpleRpcScheduler scheduler = (RpcServer.SimpleRpcScheduler) server.scheduler; + if (scheduler.priorityCallQueue == null) { return 0; } - return server.priorityCallQueue.size(); + return scheduler.priorityCallQueue.size(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index dbafafb..bb8a78e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -61,7 +61,9 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -69,6 +71,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.CallerDisconnectedException; @@ -79,6 +82,7 @@ import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; @@ -102,7 +106,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -123,7 +126,6 @@ import org.cloudera.htrace.impl.NullSpan; import org.codehaus.jackson.map.ObjectMapper; import com.google.common.base.Function; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; import com.google.protobuf.CodedInputStream; import com.google.protobuf.Descriptors.MethodDescriptor; @@ -186,10 +188,11 @@ public class RpcServer implements RpcServerInterface { */ protected static final ThreadLocal CurCall = new ThreadLocal(); + /** Keeps MonitoredRPCHandler per handler thread. */ + private static final ThreadLocal MONITORED_RPC = new ThreadLocal(); + protected final InetSocketAddress isa; protected int port; // port we listen on - private int handlerCount; // number of handler threads - private int priorityHandlerCount; private int readThreads; // number of read threads protected int maxIdleTime; // the maximum idle time after // which a client may be @@ -214,9 +217,7 @@ public class RpcServer implements RpcServerInterface { protected final long purgeTimeout; // in milliseconds volatile protected boolean running = true; // true while server runs - protected BlockingQueue callQueue; // queued calls protected final Counter callQueueSize = new Counter(); - protected BlockingQueue priorityCallQueue; protected int highPriorityLevel; // what level a high priority call is at @@ -227,12 +228,6 @@ public class RpcServer implements RpcServerInterface { private Listener listener = null; protected Responder responder = null; protected int numConnections = 0; - private Handler[] handlers = null; - private Handler[] priorityHandlers = null; - /** replication related queue; */ - protected BlockingQueue replicationQueue; - private int numOfReplicationHandlers = 0; - private Handler[] replicationHandlers = null; protected HBaseRPCErrorHandler errorHandler = null; @@ -248,6 +243,8 @@ public class RpcServer implements RpcServerInterface { private final Object serverInstance; private final List services; + final RpcScheduler scheduler; + /** * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. @@ -256,6 +253,7 @@ public class RpcServer implements RpcServerInterface { protected int id; // the client's call id protected BlockingService service; protected MethodDescriptor md; + protected RequestHeader header; protected Message param; // the parameter passed // Optional cell data passed outside of protobufs. protected CellScanner cellScanner; @@ -271,12 +269,13 @@ public class RpcServer implements RpcServerInterface { protected boolean isError; protected TraceInfo tinfo; - Call(int id, final BlockingService service, final MethodDescriptor md, Message param, + Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, long size, TraceInfo tinfo) { this.id = id; this.service = service; this.md = md; + this.header = header; this.param = param; this.cellScanner = cellScanner; this.connection = connection; @@ -737,8 +736,7 @@ public class RpcServer implements RpcServerInterface { } if (LOG.isDebugEnabled()) LOG.debug(getName() + ": connection from " + c.toString() + - "; # active connections: " + numConnections + - "; # queued calls: " + callQueue.size()); + "; # active connections: " + numConnections); } finally { reader.finishAdd(); } @@ -1139,13 +1137,13 @@ public class RpcServer implements RpcServerInterface { // Fake 'call' for failed authorization response private static final int AUTHROIZATION_FAILED_CALLID = -1; private final Call authFailedCall = - new Call(AUTHROIZATION_FAILED_CALLID, this.service, null, null, null, this, null, 0, null); + new Call(AUTHROIZATION_FAILED_CALLID, this.service, null, null, null, null, this, null, 0, null); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = - new Call(SASL_CALLID, this.service, null, null, null, this, null, 0, null); + new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null); public UserGroupInformation attemptingUser = null; // user name before auth @@ -1498,7 +1496,7 @@ public class RpcServer implements RpcServerInterface { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, this, responder, -1, null); + Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null); setupResponse(null, fakeCall, e, msg); responder.doRespond(fakeCall); // Returning -1 closes out the connection. @@ -1648,7 +1646,7 @@ public class RpcServer implements RpcServerInterface { // This is a bit late to be doing this check - we have already read in the total request. if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { final Call callTooBig = - new Call(id, this.service, null, null, null, this, responder, totalRequestSize, null); + new Call(id, this.service, null, null, null, null, this, responder, totalRequestSize, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(), "Call queue is full, is ipc.server.max.callqueue.size too small?"); @@ -1679,7 +1677,7 @@ public class RpcServer implements RpcServerInterface { String msg = "Unable to read call parameter from client " + getHostAddress(); LOG.warn(msg, t); final Call readParamsFailedCall = - new Call(id, this.service, null, null, null, this, responder, totalRequestSize, null); + new Call(id, this.service, null, null, null, null, this, responder, totalRequestSize, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, t, msg + "; " + t.getMessage()); @@ -1689,22 +1687,14 @@ public class RpcServer implements RpcServerInterface { Call call = null; if (header.hasTraceInfo()) { - call = new Call(id, this.service, md, param, cellScanner, this, responder, totalRequestSize, + call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())); } else { - call = new Call(id, this.service, md, param, cellScanner, this, responder, + call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, null); } callQueueSize.add(totalRequestSize); - Pair headerAndParam = new Pair(header, param); - if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) { - priorityCallQueue.put(call); - } else if (replicationQueue != null && - getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) { - replicationQueue.put(call); - } else { - callQueue.put(call); // queue the call; maybe blocked here - } + scheduler.dispatch(new CallTask(call)); } private boolean authorizeConnection() throws IOException { @@ -1773,106 +1763,204 @@ public class RpcServer implements RpcServerInterface { } } - /** Handles queued calls . */ - private class Handler extends Thread { - private final BlockingQueue myCallQueue; - private MonitoredRPCHandler status; + /** + * The real request processing logic, which is usually executed in + * thread pools provided by an {@link RpcScheduler}. + */ + public class CallTask implements Runnable { + private final Call call; - public Handler(final BlockingQueue cq, int instanceNumber) { - this.myCallQueue = cq; - this.setDaemon(true); + public CallTask(Call call) { + this.call = call; + } - String threadName = "RpcServer.handler=" + instanceNumber + ",port=" + port; - if (cq == priorityCallQueue) { - // this is just an amazing hack, but it works. - threadName = "Priority." + threadName; - } else if (cq == replicationQueue) { - threadName = "Replication." + threadName; - } - this.setName(threadName); - this.status = TaskMonitor.get().createRPCStatus(threadName); + public Call getCall() { + return call; } @Override public void run() { - LOG.info(getName() + ": starting"); - status.setStatus("starting"); + MonitoredRPCHandler status = getStatus(); SERVER.set(RpcServer.this); - while (running) { + try { + status.setStatus("Setting up call"); + status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); + if (LOG.isDebugEnabled()) { + UserGroupInformation remoteUser = call.connection.user; + LOG.debug(call.toShortString() + " executing as " + + ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName())); + } + Throwable errorThrowable = null; + String error = null; + Pair resultPair = null; + CurCall.set(call); + Span currentRequestSpan = NullSpan.getInstance(); try { - status.pause("Waiting for a call"); - Call call = myCallQueue.take(); // pop the queue; maybe blocked here - status.setStatus("Setting up call"); - status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); - if (LOG.isDebugEnabled()) { - UserGroupInformation remoteUser = call.connection.user; - LOG.debug(call.toShortString() + " executing as " + - ((remoteUser == null)? "NULL principal": remoteUser.getUserName())); + if (!started) { + throw new ServerNotRunningYetException("Server is not running yet"); } - Throwable errorThrowable = null; - String error = null; - Pair resultPair = null; - CurCall.set(call); - Span currentRequestSpan = NullSpan.getInstance(); - try { - if (!started) { - throw new ServerNotRunningYetException("Server is not running yet"); - } - if (call.tinfo != null) { - currentRequestSpan = Trace.startSpan( - "handling " + call.toShortString(), call.tinfo, Sampler.ALWAYS); - } - RequestContext.set(User.create(call.connection.user), getRemoteIp(), + if (call.tinfo != null) { + currentRequestSpan = Trace.startSpan( + "handling " + call.toShortString(), call.tinfo, Sampler.ALWAYS); + } + RequestContext.set(User.create(call.connection.user), getRemoteIp(), call.connection.service); - // make the call - resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp, + // make the call + resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp, status); - } catch (Throwable e) { - LOG.debug(getName() + ": " + call.toShortString(), e); - errorThrowable = e; - error = StringUtils.stringifyException(e); - } finally { - currentRequestSpan.stop(); - // Must always clear the request context to avoid leaking - // credentials between requests. - RequestContext.clear(); - } - CurCall.set(null); - callQueueSize.add(call.getSize() * -1); - // Set the response for undelayed calls and delayed calls with - // undelayed responses. - if (!call.isDelayed() || !call.isReturnValueDelayed()) { - Message param = resultPair != null? resultPair.getFirst(): null; - CellScanner cells = resultPair != null? resultPair.getSecond(): null; - call.setResponse(param, cells, errorThrowable, error); - } - call.sendResponseIfReady(); - status.markComplete("Sent response"); - } catch (InterruptedException e) { - if (running) { // unexpected -- log it - LOG.info(getName() + ": caught: " + StringUtils.stringifyException(e)); - } - } catch (OutOfMemoryError e) { - if (errorHandler != null) { - if (errorHandler.checkOOME(e)) { - LOG.info(getName() + ": exiting on OutOfMemoryError"); - return; - } - } else { - // rethrow if no handler - throw e; + } catch (Throwable e) { + LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); + errorThrowable = e; + error = StringUtils.stringifyException(e); + } finally { + currentRequestSpan.stop(); + // Must always clear the request context to avoid leaking + // credentials between requests. + RequestContext.clear(); + } + CurCall.set(null); + callQueueSize.add(call.getSize() * -1); + // Set the response for undelayed calls and delayed calls with + // undelayed responses. + if (!call.isDelayed() || !call.isReturnValueDelayed()) { + Message param = resultPair != null ? resultPair.getFirst() : null; + CellScanner cells = resultPair != null ? resultPair.getSecond() : null; + call.setResponse(param, cells, errorThrowable, error); + } + call.sendResponseIfReady(); + status.markComplete("Sent response"); + status.pause("Waiting for a call"); + } catch (OutOfMemoryError e) { + if (errorHandler != null) { + if (errorHandler.checkOOME(e)) { + LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError"); + return; } - } catch (ClosedChannelException cce) { - LOG.warn(getName() + ": caught a ClosedChannelException, " + + } else { + // rethrow if no handler + throw e; + } + } catch (ClosedChannelException cce) { + LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + "this means that the server was processing a " + "request but the client went away. The error message was: " + cce.getMessage()); - } catch (Exception e) { - LOG.warn(getName() + ": caught: " + StringUtils.stringifyException(e)); + } catch (Exception e) { + LOG.warn(Thread.currentThread().getName() + ": caught: " + StringUtils.stringifyException(e)); + } + } + + public MonitoredRPCHandler getStatus() { + MonitoredRPCHandler status = MONITORED_RPC.get(); + if (status != null) { + return status; + } + status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); + status.pause("Waiting for a call"); + MONITORED_RPC.set(status); + return status; + } + } + + /** + * An interface for RPC request scheduling algorithm. + */ + public interface RpcScheduler { + + void start(); + void stop(); + + /** Dispatches an RPC request asynchronously. */ + void dispatch(CallTask task) throws IOException, InterruptedException; + } + + public class SimpleRpcScheduler implements RpcScheduler { + + private final int handlerCount; + private final int priorityHandlerCount; + private final int replicationHandlerCount; + final BlockingQueue callQueue; + final BlockingQueue priorityCallQueue; + final BlockingQueue replicationQueue; + private volatile boolean running = false; + private List handlers = Lists.newArrayList(); + + public SimpleRpcScheduler( + int handlerCount, + int priorityHandlerCount, + int replicationHanderCount, + int maxQueueLength) { + this.handlerCount = handlerCount; + this.priorityHandlerCount = priorityHandlerCount; + this.replicationHandlerCount = replicationHanderCount; + this.callQueue = new LinkedBlockingQueue(maxQueueLength); + this.priorityCallQueue = priorityHandlerCount > 0 + ? new LinkedBlockingQueue(maxQueueLength) + : null; + this.replicationQueue = replicationHanderCount > 0 + ? new LinkedBlockingQueue(maxQueueLength) + : null; + } + + @Override + public void start() { + running = true; + startHandlers(handlerCount, callQueue, null); + if (priorityCallQueue != null) { + startHandlers(priorityHandlerCount, priorityCallQueue, "Priority."); + } + if (replicationQueue != null) { + startHandlers(replicationHandlerCount, replicationQueue, "Replication."); + } + } + + private void startHandlers(int handlerCount, final BlockingQueue callQueue, String threadNamePrefix) { + for (int i = 0; i < handlerCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + consumerLoop(callQueue); + } + }); + t.setDaemon(true); + t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port); + t.start(); + handlers.add(t); + } + } + + @Override + public void stop() { + running = false; + for (Thread handler : handlers) { + handler.interrupt(); + } + } + + @Override + public void dispatch(CallTask callTask) throws InterruptedException { + Call call = callTask.call; + Pair headerAndParam = new Pair(call.header, call.param); + if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) { + priorityCallQueue.put(callTask); + } else if (replicationQueue != null && + getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) { + replicationQueue.put(callTask); + } else { + callQueue.put(callTask); // queue the call; maybe blocked here + } + } + + private void consumerLoop(BlockingQueue myQueue) { + while (running) { + try { + CallTask task = myQueue.take(); + task.run(); + } catch (InterruptedException e) { + Thread.interrupted(); } } - LOG.info(getName() + ": exiting"); } } @@ -1936,31 +2024,18 @@ public class RpcServer implements RpcServerInterface { this.services = services; this.isa = isa; this.conf = conf; - this.handlerCount = handlerCount; - this.priorityHandlerCount = priorityHandlerCount; this.socketSendBufferSize = 0; this.maxQueueLength = this.conf.getInt("ipc.server.max.callqueue.length", handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); this.maxQueueSize = this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10); - this.callQueue = new LinkedBlockingQueue(maxQueueLength); - if (priorityHandlerCount > 0) { - this.priorityCallQueue = new LinkedBlockingQueue(maxQueueLength); // TODO hack on size - } else { - this.priorityCallQueue = null; - } this.highPriorityLevel = highPriorityLevel; this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3); - if (numOfReplicationHandlers > 0) { - this.replicationQueue = new LinkedBlockingQueue(maxQueueSize); - } - this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); @@ -1984,6 +2059,13 @@ public class RpcServer implements RpcServerInterface { if (isSecurityEnabled) { HBaseSaslRpcServer.init(conf); } + + int numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3); + this.scheduler = new SimpleRpcScheduler( + handlerCount, + priorityHandlerCount, + numOfReplicationHandlers, + maxQueueLength); } /** @@ -2058,9 +2140,7 @@ public class RpcServer implements RpcServerInterface { HBasePolicyProvider.init(conf, authManager); responder.start(); listener.start(); - handlers = startHandlers(callQueue, handlerCount); - priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount); - replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers); + scheduler.start(); } @Override @@ -2068,18 +2148,6 @@ public class RpcServer implements RpcServerInterface { this.authManager.refresh(this.conf, pp); } - private Handler[] startHandlers(BlockingQueue queue, int numOfHandlers) { - if (numOfHandlers <= 0) { - return null; - } - Handler[] handlers = new Handler[numOfHandlers]; - for (int i = 0; i < numOfHandlers; i++) { - handlers[i] = new Handler(queue, i); - handlers[i].start(); - } - return handlers; - } - private AuthenticationTokenSecretManager createSecretManager() { if (!isSecurityEnabled) return null; if (serverInstance == null) return null; @@ -2223,25 +2291,12 @@ public class RpcServer implements RpcServerInterface { public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; - stopHandlers(handlers); - stopHandlers(priorityHandlers); - stopHandlers(replicationHandlers); listener.interrupt(); listener.doStop(); responder.interrupt(); notifyAll(); } - private void stopHandlers(Handler[] handlers) { - if (handlers != null) { - for (Handler handler : handlers) { - if (handler != null) { - handler.interrupt(); - } - } - } - } - /** Wait for the server to be stopped. * Does not wait for all subthreads to finish. * See {@link #stop()}.