diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 9a7dfc7..429e47d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -352,8 +352,8 @@ private ExecutorService getBatchPool() { if (batchPool == null) { synchronized (this) { if (batchPool == null) { - this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), - conf.getInt("hbase.hconnection.threads.core", 256), "-shared", null); + int threads = conf.getInt("hbase.hconnection.threads.max", 256); + this.batchPool = getThreadPool(threads, threads, "-shared", null); this.cleanupPool = true; } } @@ -377,6 +377,7 @@ private ExecutorService getThreadPool(int maxThreads, int coreThreads, String na new LinkedBlockingQueue(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + coreThreads = maxThreads; } ThreadPoolExecutor tpe = new ThreadPoolExecutor( coreThreads, @@ -397,9 +398,10 @@ private ExecutorService getMetaLookupPool() { //To start with, threads.max.core threads can hit the meta (including replicas). //After that, requests will get queued up in the passed queue, and only after //the queue is full, a new thread will be started + int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128); this.metaLookupPool = getThreadPool( - conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128), - conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10), + threads, + threads, "-metaLookup-shared-", new LinkedBlockingQueue()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index 1a1044d..9893e7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -107,7 +107,7 @@ public HFileReplicator(Configuration sourceClusterConf, ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setNameFormat("HFileReplicationCallable-%1$d"); this.exec = - new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS, + new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), builder.build()); this.exec.allowCoreThreadTimeOut(true); this.copiesPerThread = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index da37cfa..3611608 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -235,20 +235,16 @@ protected void doStop() { */ private ExecutorService getDefaultThreadPool(Configuration conf) { int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); - int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16); if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors() * 8; } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); LinkedBlockingQueue workQueue = new LinkedBlockingQueue(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); ThreadPoolExecutor tpe = new ThreadPoolExecutor( - coreThreads, + maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java index ed72ea2..6efb10c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java @@ -135,20 +135,16 @@ public void close() { private void createBatchPool(Configuration conf) { // Use the same config for keep alive as in ConnectionImplementation.getBatchPool(); int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256); - int coreThreads = conf.getInt("hbase.multihconnection.threads.core", 256); if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors() * 8; } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } long keepAliveTime = conf.getLong("hbase.multihconnection.threads.keepalivetime", 60); LinkedBlockingQueue workQueue = new LinkedBlockingQueue(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); ThreadPoolExecutor tpe = - new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, + new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory("MultiHConnection" + "-shared-")); tpe.allowCoreThreadTimeOut(true); this.batchPool = tpe; diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java index 84613bd..1b8ebc9 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java @@ -132,7 +132,7 @@ public String toString() { } /** Executor service for handling client connections */ - private ExecutorService executorService; + private ThreadPoolExecutor executorService; /** Flag for stopping the server */ private volatile boolean stopped; @@ -142,9 +142,12 @@ public String toString() { public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) { super(options); + int minWorkerThreads = options.minWorkerThreads; + int maxWorkerThreads = options.maxWorkerThreads; if (options.maxQueuedRequests > 0) { this.callQueue = new CallQueue( new LinkedBlockingQueue(options.maxQueuedRequests), metrics); + minWorkerThreads = maxWorkerThreads; } else { this.callQueue = new CallQueue(new SynchronousQueue(), metrics); } @@ -153,9 +156,10 @@ public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) { tfb.setDaemon(true); tfb.setNameFormat("thrift-worker-%d"); executorService = - new ThreadPoolExecutor(options.minWorkerThreads, - options.maxWorkerThreads, options.threadKeepAliveTimeSec, + new ThreadPoolExecutor(minWorkerThreads, + maxWorkerThreads, options.threadKeepAliveTimeSec, TimeUnit.SECONDS, this.callQueue, tfb.build()); + executorService.allowCoreThreadTimeOut(true); serverOptions = options; } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java index 7d228dc..8767a3c 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java @@ -555,7 +555,7 @@ public boolean process(TProtocol inProt, CallQueue callQueue = new CallQueue(new LinkedBlockingQueue(), metrics); ExecutorService executorService = createExecutor( - callQueue, serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads()); + callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads()); serverArgs.executorService(executorService) .processor(processor) .transportFactory(transportFactory) @@ -620,8 +620,10 @@ ExecutorService createExecutor(BlockingQueue callQueue, ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift-worker-%d"); - return new ThreadPoolExecutor(minWorkers, maxWorkers, + ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minWorkers, maxWorkers, Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); + threadPool.allowCoreThreadTimeOut(true); + return threadPool; } private InetAddress getBindAddress(Configuration conf)