diff --git a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java index fd505d7..31a1b9d 100644 --- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java +++ b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java @@ -199,11 +199,11 @@ public class SecureRpcEngine implements RpcEngine { Class[] ifaces, final String bindAddress, final int port, final int numHandlers, - int metaHandlerCount, final boolean verbose, Configuration conf, + int metaHandlerCount, int readerHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) throws IOException { Server server = new Server(instance, ifaces, conf, bindAddress, port, - numHandlers, metaHandlerCount, verbose, + numHandlers, metaHandlerCount, readerHandlerCount, verbose, highPriorityLevel); return server; } @@ -234,10 +234,10 @@ public class SecureRpcEngine implements RpcEngine { */ public Server(Object instance, final Class[] ifaces, Configuration conf, String bindAddress, int port, - int numHandlers, int metaHandlerCount, boolean verbose, + int numHandlers, int metaHandlerCount, int readerHandlerCount, boolean verbose, int highPriorityLevel) throws IOException { - super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, conf, + super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, readerHandlerCount, conf, classNameBase(instance.getClass().getName()), highPriorityLevel); this.instance = instance; this.implementation = instance.getClass(); diff --git a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java index 3448d99..93e5d22 100644 --- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java +++ b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java @@ -22,6 +22,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; @@ -626,9 +634,74 @@ public abstract class SecureServer extends HBaseServer { replicationQueue.put(call); updateCallQueueLenMetrics(replicationQueue); } else { - callQueue.put(call); // queue the call; maybe blocked here - updateCallQueueLenMetrics(callQueue); + if (seperateReaderWriterHandlerThreadPool) + { + Invocation curcall = (Invocation)call.param; + String methodName = curcall.methodName; + + if (methodName.contains("multi")) + { + boolean isWriteAction = false; + Class[] paramClass = curcall.getParameterClasses(); + Object[] params = curcall.getParameters(); + for (int i = 0; i< params.length; i++) + { + MultiAction mAction = (MultiAction)params[i]; + List allActions = mAction.allActions(); + for (int j = 0; j < allActions.size(); j++) + { + Action a = allActions.get(j); + Row action = a.getAction(); + if (action instanceof Delete || action instanceof Put || action instanceof Increment + || action instanceof Append || action instanceof RowMutations) { + //LOG.info("TianYing multi action is write related"); + isWriteAction = true; + break; + } else { + //LOG.info("TianYing multi action is read related"); + } + if (isWriteAction) + break; + } + if (isWriteAction) + { + callQueue.put(call); + //LOG.info("TianYing write queue len" + callQueue.toArray().length); + } else + { + readCallQueue.put(call); + //LOG.info("TianYing read queue len" + readCallQueue.toArray().length); + } + } + + } else + { + + //LOG.info("TianYing method name " + methodName); + if (methodName.contains("put") || methodName.contains("mutate") || methodName.contains("increment") + || methodName.contains("delete")) + { + callQueue.put(call); // queue the write related call; maybe blocked here + //LOG.info("TianYing enqueue write call "+ callQueue.toArray().length); + updateCallQueueLenMetrics(callQueue); + } else + { + readCallQueue.put(call); // queue the others, hopefully are all read related call, need to verify later + //LOG.info("TianYing euqueue read call " + readCallQueue.toArray().length); + updateCallQueueLenMetrics(readCallQueue); + } + } + } else + { + //LOG.info("TianYing no seperating"); + callQueue.put(call); // queue the call; maybe blocked here + //LOG.info("TianYing*** euqueue shared call " + callQueue.toArray().length); + updateCallQueueLenMetrics(callQueue); + } + } + + } private boolean authorizeConnection() throws IOException { @@ -683,10 +756,10 @@ public abstract class SecureServer extends HBaseServer { @SuppressWarnings("unchecked") protected SecureServer(String bindAddress, int port, Class paramClass, int handlerCount, - int priorityHandlerCount, Configuration conf, String serverName, + int priorityHandlerCount, int readerHandlerCount, Configuration conf, String serverName, int highPriorityLevel) throws IOException { - super(bindAddress, port, paramClass, handlerCount, priorityHandlerCount, + super(bindAddress, port, paramClass, handlerCount, priorityHandlerCount, readerHandlerCount, conf, serverName, highPriorityLevel); this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); @@ -753,4 +826,4 @@ public abstract class SecureServer extends HBaseServer { protocol, getConf(), addr); } } -} \ No newline at end of file +} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java index 27884f9..71212cc 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java @@ -314,9 +314,9 @@ public class HBaseRPC { final Class[] ifaces, final String bindAddress, final int port, final int numHandlers, - int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) + int metaHandlerCount, int readerHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) throws IOException { - return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel); + return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, readerHandlerCount, verbose, conf, highPriorityLevel); } /** Construct a server for a protocol implementation instance. */ @@ -325,10 +325,9 @@ public class HBaseRPC { final Class[] ifaces, String bindAddress, int port, final int numHandlers, - int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) + int metaHandlerCount, int readerHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) throws IOException { - return getProtocolEngine(conf) - .getServer(protocol, instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel); + return getProtocolEngine(conf).getServer(protocol, instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, readerHandlerCount, verbose, conf, highPriorityLevel); } public static void setRpcTimeout(int rpcTimeout) { diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java index bc897d9..a5979e5 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java @@ -86,6 +86,8 @@ public class HBaseRpcMetrics implements Updater { new MetricsIntValue("NumOpenConnections", registry); public final MetricsIntValue callQueueLen = new MetricsIntValue("callQueueLen", registry); + public final MetricsIntValue readCallQueueLen = + new MetricsIntValue("readCallQueueLen", registry); public final MetricsIntValue priorityCallQueueLen = new MetricsIntValue("priorityCallQueueLen", registry); public final MetricsTimeVaryingInt authenticationFailures = diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 8813a00..ab56f33 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -60,6 +60,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.coprocessor.Exec; +import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; @@ -67,6 +79,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SizeBasedThrottler; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -116,6 +129,8 @@ public abstract class HBaseServer implements RpcServer { private final int warnDelayedCalls; private AtomicInteger delayedCalls; + + boolean seperateReaderWriterHandlerThreadPool = true; public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); @@ -185,6 +200,7 @@ public abstract class HBaseServer implements RpcServer { protected String bindAddress; protected int port; // port we listen on private int handlerCount; // number of handler threads + private int readerHandlerCount; // number of handler threads for read requests private int priorityHandlerCount; private int readThreads; // number of read threads protected Class paramClass; // class of call parameters @@ -220,6 +236,7 @@ public abstract class HBaseServer implements RpcServer { volatile protected boolean running = true; // true while server runs protected BlockingQueue callQueue; // queued calls + protected BlockingQueue readCallQueue; // queued calls protected final Counter callQueueSize = new Counter(); protected BlockingQueue priorityCallQueue; @@ -233,6 +250,7 @@ public abstract class HBaseServer implements RpcServer { protected Responder responder = null; protected int numConnections = 0; private Handler[] handlers = null; + private Handler[] readHandlers = null; private Handler[] priorityHandlers = null; /** replication related queue; */ protected BlockingQueue replicationQueue; @@ -1169,7 +1187,10 @@ public abstract class HBaseServer implements RpcServer { return isIdle() && currentTime - lastContact > maxIdleTime; } + + public int readAndProcess() throws IOException, InterruptedException { + //LOG.info("TianYingTianYing readAndProcess"); while (true) { /* Read at most one RPC. If the header is not read completely yet * then iterate until we read first RPC or until there is no data left. @@ -1288,7 +1309,8 @@ public abstract class HBaseServer implements RpcServer { new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id long callSize = buf.length; - + //LOG.info("TianYing"); + if (LOG.isDebugEnabled()) { LOG.debug(" got call #" + id + ", " + callSize + " bytes"); } @@ -1332,8 +1354,72 @@ public abstract class HBaseServer implements RpcServer { replicationQueue.put(call); updateCallQueueLenMetrics(replicationQueue); } else { - callQueue.put(call); // queue the call; maybe blocked here - updateCallQueueLenMetrics(callQueue); + //LOG.info("TianYing"); + if (seperateReaderWriterHandlerThreadPool) + { + Invocation curcall = (Invocation)call.param; + String methodName = curcall.methodName; + + if (methodName.contains("multi")) + { + boolean isWriteAction = false; + Class[] paramClass = curcall.getParameterClasses(); + Object[] params = curcall.getParameters(); + for (int i = 0; i< params.length; i++) + { + MultiAction mAction = (MultiAction)params[i]; + List allActions = mAction.allActions(); + for (int j = 0; j < allActions.size(); j++) + { + Action a = allActions.get(j); + Row action = a.getAction(); + if (action instanceof Delete || action instanceof Put || action instanceof Increment + || action instanceof Append || action instanceof RowMutations) { + //LOG.info("TianYing multi action is write related"); + isWriteAction = true; + break; + } else { + //LOG.info("TianYing multi action is read related"); + } + if (isWriteAction) + break; + } + if (isWriteAction) + { + callQueue.put(call); + //LOG.info("TianYing write queue len" + callQueue.toArray().length); + } else + { + readCallQueue.put(call); + //LOG.info("TianYing read queue len" + readCallQueue.toArray().length); + } + } + + } else + { + + //LOG.info("TianYing method name " + methodName); + if (methodName.contains("put") || methodName.contains("mutate") || methodName.contains("increment") + || methodName.contains("delete")) + { + callQueue.put(call); // queue the write related call; maybe blocked here + //LOG.info("TianYing enqueue write call "+ callQueue.toArray().length); + updateCallQueueLenMetrics(callQueue); + } else + { + readCallQueue.put(call); // queue the others, hopefully are all read related call, need to verify later + //LOG.info("TianYing euqueue read call " + readCallQueue.toArray().length); + updateCallQueueLenMetrics(readCallQueue); + } + } + } else + { + //LOG.info("TianYing no seperating"); + callQueue.put(call); // queue the call; maybe blocked here + //LOG.info("TianYing*** euqueue shared call " + callQueue.toArray().length); + updateCallQueueLenMetrics(callQueue); + } + } } @@ -1362,6 +1448,8 @@ public abstract class HBaseServer implements RpcServer { rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size()); } else if (queue == replicationQueue) { rpcMetrics.replicationCallQueueLen.set(replicationQueue.size()); + } else if (queue == readCallQueue){ + rpcMetrics.readCallQueueLen.set(readCallQueue.size()); } else { LOG.warn("Unknown call queue"); } @@ -1508,7 +1596,7 @@ public abstract class HBaseServer implements RpcServer { */ protected HBaseServer(String bindAddress, int port, Class paramClass, int handlerCount, - int priorityHandlerCount, Configuration conf, String serverName, + int priorityHandlerCount, int readerHandlerCount, Configuration conf, String serverName, int highPriorityLevel) throws IOException { this.bindAddress = bindAddress; @@ -1516,8 +1604,17 @@ public abstract class HBaseServer implements RpcServer { this.port = port; this.paramClass = paramClass; this.handlerCount = handlerCount; + this.readerHandlerCount = readerHandlerCount; this.priorityHandlerCount = priorityHandlerCount; this.socketSendBufferSize = 0; + this.seperateReaderWriterHandlerThreadPool = conf.getBoolean("regionserver.seperateReaderWriterThreadPool", true); + if (this.seperateReaderWriterHandlerThreadPool) + { + LOG.info("TianYingTianYing seperateReader Pool is true"); + } else + { + LOG.info("TianYingTianYing seperateReader Pool is false"); + } // temporary backward compatibility String oldMaxQueueSize = this.conf.get("ipc.server.max.queue.size"); @@ -1539,6 +1636,7 @@ public abstract class HBaseServer implements RpcServer { "ipc.server.read.threadpool.size", 10); this.callQueue = new LinkedBlockingQueue(maxQueueLength); + this.readCallQueue = new LinkedBlockingQueue(maxQueueLength); if (priorityHandlerCount > 0) { this.priorityCallQueue = new LinkedBlockingQueue(maxQueueLength); // TODO hack on size } else { @@ -1666,6 +1764,8 @@ public abstract class HBaseServer implements RpcServer { responder.start(); listener.start(); handlers = startHandlers(callQueue, handlerCount); + LOG.info("TianYingTianYing reader handler count " + readerHandlerCount); + readHandlers = startHandlers(readCallQueue, readerHandlerCount); priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount); replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers); } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java b/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java index faf725c..95c92ea 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java @@ -51,7 +51,7 @@ public interface RpcEngine extends Configurable { /** Construct a server for a protocol implementation instance. */ RpcServer getServer(Class protocol, Object instance, Class[] ifaces, String bindAddress, - int port, int numHandlers, int metaHandlerCount, + int port, int numHandlers, int metaHandlerCount, int readerHandlerCount, boolean verbose, Configuration conf, int highPriorityLevel) throws IOException; diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java b/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java index a93fa49..e046a10 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java @@ -195,11 +195,11 @@ class WritableRpcEngine implements RpcEngine { Class[] ifaces, String bindAddress, int port, int numHandlers, - int metaHandlerCount, boolean verbose, + int metaHandlerCount, int readerHandlerCount, boolean verbose, Configuration conf, int highPriorityLevel) throws IOException { return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, - metaHandlerCount, verbose, highPriorityLevel); + metaHandlerCount, readerHandlerCount, verbose, highPriorityLevel); } /** An RPC Server. */ @@ -247,9 +247,9 @@ class WritableRpcEngine implements RpcEngine { */ public Server(Object instance, final Class[] ifaces, Configuration conf, String bindAddress, int port, - int numHandlers, int metaHandlerCount, boolean verbose, + int numHandlers, int metaHandlerCount, int readerHandlerCount, boolean verbose, int highPriorityLevel) throws IOException { - super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, + super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, readerHandlerCount, conf, classNameBase(instance.getClass().getName()), highPriorityLevel); this.instance = instance; diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8c62fc1..ae1c3e8 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -309,6 +309,7 @@ Server { initialIsa.getPort(), numHandlers, 0, // we dont use high priority handlers in master + 1, // we dont use reader handlers in master conf.getBoolean("hbase.rpc.verbose", false), conf, 0); // this is a DNC w/o high priority handlers // Set our address. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b168855..15b7409 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -456,6 +456,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, initialIsa.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), conf.getInt("hbase.regionserver.metahandler.count", 10), + conf.getInt("hbase.regionserver.readerHandler.count", 10), conf.getBoolean("hbase.rpc.verbose", false), conf, HConstants.QOS_THRESHOLD); if (rpcServer instanceof HBaseServer) server = (HBaseServer) rpcServer; diff --git a/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index ee390cd..a35f2e8 100644 --- a/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -69,7 +69,7 @@ public class TestDelayedRpc { rpcServer = HBaseRPC.getServer(new TestRpcImpl(delayReturnValue), new Class[]{ TestRpcImpl.class }, - isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); + isa.getHostName(), isa.getPort(), 1, 0, 0, true, conf, 0); RpcEngine rpcEngine = null; try { rpcServer.start(); @@ -140,7 +140,7 @@ public class TestDelayedRpc { InetSocketAddress isa = new InetSocketAddress("localhost", 0); rpcServer = HBaseRPC.getServer(new TestRpcImpl(true), new Class[]{ TestRpcImpl.class }, - isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); + isa.getHostName(), isa.getPort(), 1, 0, 0, true, conf, 0); RpcEngine rpcEngine = null; try { rpcServer.start(); @@ -271,7 +271,7 @@ public class TestDelayedRpc { rpcServer = HBaseRPC.getServer(new FaultyTestRpc(), new Class[]{ TestRpcImpl.class }, - isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); + isa.getHostName(), isa.getPort(), 1, 0, 0, true, conf, 0); RpcEngine rpcEngine = null; try { rpcServer.start(); diff --git a/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java b/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java index d0eb78b..0d7e5a9 100644 --- a/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java +++ b/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java @@ -86,6 +86,7 @@ public class TestPBOnWritableRpc { 0, // port number 2, // number of handlers 0, // we dont use high priority handlers in master + 0, // don't use reader handler conf.getBoolean("hbase.rpc.verbose", false), conf, 0); RpcEngine rpcEngine = null; diff --git a/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolExtension.java b/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolExtension.java index a8a0c6c..cfd1551 100644 --- a/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolExtension.java +++ b/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolExtension.java @@ -78,7 +78,7 @@ public class TestProtocolExtension { new Class[]{ProtocolExtention.class}, ADDRESS, 6016, - 10, 10, false, + 10, 10, 0, false, conf, 10); RpcEngine rpcEngine = null; try {