--- ThriftServer-master.java 2016-11-29 10:22:11.000000000 +0800 +++ ThriftServer.java 2016-11-29 10:20:01.000000000 +0800 @@ -78,6 +78,7 @@ import org.apache.thrift.server.THsHaSer import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerTransport; @@ -144,6 +145,7 @@ public class ThriftServer extends Config options.addOption("f", "framed", false, "Use framed transport"); options.addOption("c", "compact", false, "Use the compact protocol"); options.addOption("w", "workers", true, "How many worker threads to use."); + options.addOption("s", "selectors", true, "How many selector threads to use."); options.addOption("q", "callQueueSize", true, "Max size of request queue (unbounded by default)"); options.addOption("h", "help", false, "Print help information"); @@ -156,6 +158,7 @@ public class ThriftServer extends Config servers.addOption( new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport.")); servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport.")); + servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport.")); servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default.")); options.addOptionGroup(servers); return options; @@ -273,6 +276,28 @@ public class ThriftServer extends Config return new THsHaServer(serverArgs); } + private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory, + TProcessor processor, TTransportFactory transportFactory, + int workerThreads, int selectorThreads, int maxCallQueueSize, + InetSocketAddress inetSocketAddress, ThriftMetrics metrics) + throws TTransportException { + TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); + log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); + TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport); + if (workerThreads > 0) + serverArgs.workerThreads(workerThreads); + if (selectorThreads > 0) + serverArgs.selectorThreads(selectorThreads); + + ExecutorService executorService = createExecutor( + workerThreads, maxCallQueueSize, metrics); + serverArgs.executorService(executorService); + serverArgs.processor(processor); + serverArgs.transportFactory(transportFactory); + serverArgs.protocolFactory(protocolFactory); + return new TThreadedSelectorServer(serverArgs); + } + private static ExecutorService createExecutor( int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) { CallQueue callQueue; @@ -350,6 +375,7 @@ public class ThriftServer extends Config Options options = getOptions(); CommandLine cmd = parseArguments(conf, options, args); int workerThreads = 0; + int selectorThreads = 0; int maxCallQueueSize = -1; // use unbounded queue by default /** @@ -432,6 +458,7 @@ public class ThriftServer extends Config boolean nonblocking = cmd.hasOption("nonblocking"); boolean hsha = cmd.hasOption("hsha"); + boolean selector = cmd.hasOption("selector"); ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO); final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource()); @@ -441,6 +468,8 @@ public class ThriftServer extends Config implType = "nonblocking"; } else if (hsha) { implType = "hsha"; + } else if (selector) { + implType = "selector"; } conf.set("hbase.regionserver.thrift.server.type", implType); @@ -484,7 +513,9 @@ public class ThriftServer extends Config if (cmd.hasOption("w")) { workerThreads = Integer.parseInt(cmd.getOptionValue("w")); } - + if (cmd.hasOption("s")) { + selectorThreads = Integer.parseInt(cmd.getOptionValue("s")); + } if (cmd.hasOption("q")) { maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q")); } @@ -525,6 +556,15 @@ public class ThriftServer extends Config maxCallQueueSize, inetSocketAddress, metrics); + } else if (selector) { + server = getTThreadedSelectorServer(protocolFactory, + processor, + transportFactory, + workerThreads, + selectorThreads, + maxCallQueueSize, + inetSocketAddress, + metrics); } else { server = getTThreadPoolServer(protocolFactory, processor,