Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 1463112) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (working copy) @@ -27,6 +27,10 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -34,6 +38,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; /** * This client class is for invoking the aggregate functions deployed on the @@ -66,6 +73,9 @@ private static final Log log = LogFactory.getLog(AggregationClient.class); Configuration conf; + private volatile HConnection connection; + private final ExecutorService threadPool; + private boolean closed; /** * Constructor with Conf object @@ -73,7 +83,40 @@ */ public AggregationClient(Configuration cfg) { this.conf = cfg; + int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = 1; // is there a better default? + } + long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); + this.threadPool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue(), + Threads.newDaemonThreadFactory("aggregation-client-shared-executor")); + ((ThreadPoolExecutor) this.threadPool).allowCoreThreadTimeOut(true); + this.closed = false; } + + protected HConnection getConnection() throws IOException { + if (this.connection == null || this.connection.isClosed()) { + synchronized (this) { + if (this.connection == null || this.connection.isClosed()) { + this.connection = HConnectionManager.createConnection(this.conf); + } + } + } + return this.connection; + } + + public void close() throws IOException, InterruptedException { + if (!this.closed) { + this.threadPool.shutdown(); + // wait all running task exit from thread pool + this.threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + if (connection != null && !connection.isClosed()) { + connection.close(); + } + this.closed = true; + } + } /** * It gives the maximum value of a column for a given column family for the @@ -105,7 +148,7 @@ MaxCallBack aMaxCallBack = new MaxCallBack(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call() { @Override Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (revision 1463112) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (working copy) @@ -283,6 +283,7 @@ final ColumnInterpreter ci = new LongColumnInterpreter(); long maximum = aClient.max(TEST_TABLE, ci, scan); assertEquals(19, maximum); + aClient.close(); } /** @@ -298,6 +299,7 @@ final ColumnInterpreter ci = new LongColumnInterpreter(); long max = aClient.max(TEST_TABLE, ci, scan); assertEquals(14, max); + aClient.close(); } @Test @@ -308,6 +310,7 @@ final ColumnInterpreter ci = new LongColumnInterpreter(); long maximum = aClient.max(TEST_TABLE, ci, scan); assertEquals(190, maximum); + aClient.close(); } @Test @@ -320,6 +323,7 @@ final ColumnInterpreter ci = new LongColumnInterpreter(); long max = aClient.max(TEST_TABLE, ci, scan); assertEquals(60, max); + aClient.close(); } @Test @@ -382,6 +386,7 @@ final ColumnInterpreter ci = new LongColumnInterpreter(); max = aClient.max(TEST_TABLE, ci, scan); assertEquals(null, max); + aClient.close(); } /**