Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (revision 1467937) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (working copy) @@ -292,6 +292,7 @@ new LongColumnInterpreter(); long maximum = aClient.max(TEST_TABLE, ci, scan); assertEquals(19, maximum); + aClient.close(); } /** @@ -308,6 +309,7 @@ new LongColumnInterpreter(); long max = aClient.max(TEST_TABLE, ci, scan); assertEquals(14, max); + aClient.close(); } @Test @@ -319,6 +321,7 @@ new LongColumnInterpreter(); long maximum = aClient.max(TEST_TABLE, ci, scan); assertEquals(190, maximum); + aClient.close(); } @Test @@ -332,6 +335,7 @@ new LongColumnInterpreter(); long max = aClient.max(TEST_TABLE, ci, scan); assertEquals(60, max); + aClient.close(); } @Test @@ -398,6 +402,7 @@ new LongColumnInterpreter(); max = aClient.max(TEST_TABLE, ci, scan); assertEquals(null, max); + aClient.close(); } /** Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 1467851) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (working copy) @@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import java.io.IOException; import java.nio.ByteBuffer; @@ -50,6 +53,10 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -77,6 +84,9 @@ private static final Log log = LogFactory.getLog(AggregationClient.class); Configuration conf; + private volatile HConnection connection; + private final ExecutorService threadPool; + private boolean closed; /** * Constructor with Conf object @@ -84,7 +94,40 @@ */ public AggregationClient(Configuration cfg) { this.conf = cfg; + int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = 1; // is there a better default? + } + long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); + this.threadPool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue(), + Threads.newDaemonThreadFactory("aggregation-client-shared-executor")); + ((ThreadPoolExecutor) this.threadPool).allowCoreThreadTimeOut(true); + this.closed = false; } + + protected HConnection getConnection() throws IOException { + if (this.connection == null) { + synchronized (this) { + if (this.connection == null) { + this.connection = HConnectionManager.createConnection(this.conf); + } + } + } + return this.connection; + } + + public void close() throws IOException, InterruptedException { + if (!this.closed) { + this.threadPool.shutdown(); + // wait all running task exit from thread pool + this.threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + if (connection != null && !connection.isClosed()) { + connection.close(); + } + this.closed = true; + } + } /** * It gives the maximum value of a column for a given column family for the @@ -117,7 +160,7 @@ MaxCallBack aMaxCallBack = new MaxCallBack(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call() { @Override