Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 1463112) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (working copy) @@ -27,6 +27,10 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -34,6 +38,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; /** * This client class is for invoking the aggregate functions deployed on the @@ -66,6 +73,8 @@ private static final Log log = LogFactory.getLog(AggregationClient.class); Configuration conf; + private volatile HConnection connection; + private final ExecutorService threadPool; /** * Constructor with Conf object @@ -73,7 +82,34 @@ */ public AggregationClient(Configuration cfg) { this.conf = cfg; + int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = 1; // is there a better default? + } + long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); + this.threadPool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue(), + Threads.newDaemonThreadFactory("aggregation-client-shared-executor")); + ((ThreadPoolExecutor) this.threadPool).allowCoreThreadTimeOut(true); } + + protected HConnection getConnection() throws IOException { + if (this.connection == null || this.connection.isClosed()) { + synchronized (this) { + if (this.connection == null || this.connection.isClosed()) { + this.connection = HConnectionManager.createConnection(this.conf); + } + } + } + return this.connection; + } + + public void close() throws IOException { + this.threadPool.shutdown(); + if (connection != null && !connection.isClosed()) { + connection.close(); + } + } /** * It gives the maximum value of a column for a given column family for the @@ -105,7 +141,7 @@ MaxCallBack aMaxCallBack = new MaxCallBack(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call() { @Override