From 51934a541d2fba16f0ec2cc75e55620ff18b86eb Mon Sep 17 00:00:00 2001 From: Mikhail Antonov Date: Tue, 12 Apr 2016 14:46:22 -0700 Subject: [PATCH] HBASE-15637 TSHA Thrift-2 server should allow limiting call queue size --- .../apache/hadoop/hbase/thrift2/ThriftServer.java | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java index 941d5f8..26b035d 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java @@ -251,7 +251,7 @@ public class ThriftServer { private static TServer getTHsHaServer(TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, - int workerThreads, + int workerThreads, int maxCallQueueSize, InetSocketAddress inetSocketAddress, ThriftMetrics metrics) throws TTransportException { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); @@ -262,7 +262,7 @@ public class ThriftServer { serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads); } ExecutorService executorService = createExecutor( - workerThreads, metrics); + workerThreads, maxCallQueueSize, metrics); serverArgs.executorService(executorService); serverArgs.processor(processor); serverArgs.transportFactory(transportFactory); @@ -271,9 +271,14 @@ public class ThriftServer { } private static ExecutorService createExecutor( - int workerThreads, ThriftMetrics metrics) { - CallQueue callQueue = new CallQueue( - new LinkedBlockingQueue(), metrics); + int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) { + CallQueue callQueue; + if (maxCallQueueSize > 0) { + callQueue = new CallQueue(new LinkedBlockingQueue(maxCallQueueSize), metrics); + } else { + callQueue = new CallQueue(new LinkedBlockingQueue(), metrics); + } + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift2-worker-%d"); @@ -336,6 +341,7 @@ public class ThriftServer { Configuration conf = HBaseConfiguration.create(); CommandLine cmd = parseArguments(conf, options, args); int workerThreads = 0; + int maxCallQueueSize = -1; // use unbounded queue by default /** * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase @@ -469,6 +475,10 @@ public class ThriftServer { workerThreads = Integer.parseInt(cmd.getOptionValue("w")); } + if (cmd.hasOption("q")) { + maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q")); + } + // check for user-defined info server port setting, if so override the conf try { if (cmd.hasOption("infoport")) { @@ -502,6 +512,7 @@ public class ThriftServer { processor, transportFactory, workerThreads, + maxCallQueueSize, inetSocketAddress, metrics); } else { -- 2.8.0-rc2