From 8a90e9f5d6256ccf57907579c0a74fa68511658e Mon Sep 17 00:00:00 2001 From: Mikhail Antonov Date: Tue, 12 Apr 2016 16:39:30 -0700 Subject: [PATCH] HBASE-15637 TSHA Thrift-2 server should allow limiting call queue size --- .../apache/hadoop/hbase/thrift2/ThriftServer.java | 23 +++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java index 695c74b..e35ba73 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java @@ -143,6 +143,8 @@ public class ThriftServer extends Configured implements Tool { options.addOption("f", "framed", false, "Use framed transport"); options.addOption("c", "compact", false, "Use the compact protocol"); options.addOption("w", "workers", true, "How many worker threads to use."); + options.addOption("q", "callQueueSize", true, + "Max size of request queue (unbounded by default)"); options.addOption("h", "help", false, "Print help information"); options.addOption(null, "infoport", true, "Port for web UI"); options.addOption("t", READ_TIMEOUT_OPTION, true, @@ -251,7 +253,7 @@ public class ThriftServer extends Configured implements Tool { private static TServer getTHsHaServer(TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, - int workerThreads, + int workerThreads, int maxCallQueueSize, InetSocketAddress inetSocketAddress, ThriftMetrics metrics) throws TTransportException { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); @@ -262,7 +264,7 @@ public class ThriftServer extends Configured implements Tool { serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads); } ExecutorService executorService = createExecutor( - workerThreads, metrics); + workerThreads, maxCallQueueSize, metrics); serverArgs.executorService(executorService); serverArgs.processor(processor); serverArgs.transportFactory(transportFactory); @@ -271,9 +273,14 @@ public class ThriftServer extends Configured implements Tool { } private static ExecutorService createExecutor( - int workerThreads, ThriftMetrics metrics) { - CallQueue callQueue = new CallQueue( - new LinkedBlockingQueue(), metrics); + int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) { + CallQueue callQueue; + if (maxCallQueueSize > 0) { + callQueue = new CallQueue(new LinkedBlockingQueue(maxCallQueueSize), metrics); + } else { + callQueue = new CallQueue(new LinkedBlockingQueue(), metrics); + } + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift2-worker-%d"); @@ -342,6 +349,7 @@ public class ThriftServer extends Configured implements Tool { Options options = getOptions(); CommandLine cmd = parseArguments(conf, options, args); int workerThreads = 0; + int maxCallQueueSize = -1; // use unbounded queue by default /** * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase @@ -475,6 +483,10 @@ public class ThriftServer extends Configured implements Tool { workerThreads = Integer.parseInt(cmd.getOptionValue("w")); } + if (cmd.hasOption("q")) { + maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q")); + } + // check for user-defined info server port setting, if so override the conf try { if (cmd.hasOption("infoport")) { @@ -508,6 +520,7 @@ public class ThriftServer extends Configured implements Tool { processor, transportFactory, workerThreads, + maxCallQueueSize, inetSocketAddress, metrics); } else { -- 2.8.0-rc2