diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index bc2d51a..2328bde 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -79,6 +79,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -364,7 +365,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { synchronized (this) { if (batchPool == null) { this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), - conf.getInt("hbase.hconnection.threads.core", 256), "-shared-"); + conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); this.cleanupPool = true; } } @@ -372,7 +373,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return this.batchPool; } - private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint) { + private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint, + BlockingQueue passedWorkQueue) { // shared HTable thread executor not yet initialized if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors() * 8; @@ -381,10 +383,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable { coreThreads = Runtime.getRuntime().availableProcessors() * 8; } long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); - LinkedBlockingQueue workQueue = + BlockingQueue workQueue = passedWorkQueue; + if (workQueue != null) { + workQueue = new LinkedBlockingQueue(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + } ThreadPoolExecutor tpe = new ThreadPoolExecutor( coreThreads, maxThreads, @@ -400,14 +405,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (this.metaLookupPool == null) { synchronized (this) { if (this.metaLookupPool == null) { - //The meta lookup can happen on replicas of the meta (if the appropriate configs - //are enabled).In a replicated-meta setup, the number '3' is assumed as the max - //number of replicas by default (unless it is configured to be of a higher value). - //In a non-replicated-meta setup, only one thread would be active. + //Some of the threads would be used for meta replicas + //To start with, threads.max.core threads can hit the meta (including replicas). + //After that, requests will get queued up in the passed queue, and only after + //the queue is full, a new thread will be started this.metaLookupPool = getThreadPool( - conf.getInt("hbase.hconnection.meta.lookup.threads.max", 3), - conf.getInt("hbase.hconnection.meta.lookup.threads.max.core", 3), - "-metaLookup-shared-"); + conf.getInt("hbase.hconnection.meta.lookup.threads.max", Integer.MAX_VALUE), + conf.getInt("hbase.hconnection.meta.lookup.threads.max.core", 10), + "-metaLookup-shared-", new LinkedBlockingQueue()); } } }