Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (revision 1470122) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (working copy) @@ -40,6 +40,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; + /** * A test class to cover aggregate functions, that can be implemented using * Coprocessors. @@ -139,6 +141,7 @@ long median = aClient.median(TEST_TABLE, ci, scan); assertEquals(8L, median); + aClient.close(); } /** @@ -161,6 +164,7 @@ new LongColumnInterpreter(); long rowCount = aClient.rowCount(TEST_TABLE, ci, scan); assertEquals(12, rowCount); + aClient.close(); } /** @@ -178,6 +182,7 @@ long rowCount = aClient.rowCount(TEST_TABLE, ci, scan); assertEquals(ROWSIZE, rowCount); + aClient.close(); } /** @@ -186,7 +191,7 @@ * @throws Throwable */ @Test - public void testRowCountWithInvalidRange1() { + public void testRowCountWithInvalidRange1() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); @@ -203,6 +208,7 @@ + e.getStackTrace()); } assertEquals(-1, rowCount); + aClient.close(); } /** @@ -211,7 +217,7 @@ * @throws Throwable */ @Test - public void testRowCountWithInvalidRange2() { + public void testRowCountWithInvalidRange2() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); @@ -227,13 +233,14 @@ rowCount = 0; } assertEquals(0, rowCount); + aClient.close(); } /** * This should return a 0 */ @Test - public void testRowCountWithNullCF() { + public void testRowCountWithNullCF() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.setStartRow(ROWS[5]); @@ -247,6 +254,7 @@ rowCount = 0; } assertEquals(0, rowCount); + aClient.close(); } @Test @@ -259,6 +267,7 @@ long rowCount = aClient.rowCount(TEST_TABLE, ci, scan); assertEquals(20, rowCount); + aClient.close(); } @Test @@ -273,6 +282,7 @@ long rowCount = aClient.rowCount(TEST_TABLE, ci, scan); assertEquals(0, rowCount); + aClient.close(); } /** @@ -292,6 +302,7 @@ new LongColumnInterpreter(); long maximum = aClient.max(TEST_TABLE, ci, scan); assertEquals(19, maximum); + aClient.close(); } /** @@ -308,6 +319,7 @@ new LongColumnInterpreter(); long max = aClient.max(TEST_TABLE, ci, scan); assertEquals(14, max); + aClient.close(); } @Test @@ -319,6 +331,7 @@ new LongColumnInterpreter(); long maximum = aClient.max(TEST_TABLE, ci, scan); assertEquals(190, maximum); + aClient.close(); } @Test @@ -332,10 +345,11 @@ new LongColumnInterpreter(); long max = aClient.max(TEST_TABLE, ci, scan); assertEquals(60, max); + aClient.close(); } @Test - public void testMaxWithValidRangeWithNullCF() { + public void testMaxWithValidRangeWithNullCF() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); final ColumnInterpreter ci = new LongColumnInterpreter(); @@ -348,10 +362,11 @@ } assertEquals(null, max);// CP will throw an IOException about the // null column family, and max will be set to 0 + aClient.close(); } @Test - public void testMaxWithInvalidRange() { + public void testMaxWithInvalidRange() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); final ColumnInterpreter ci = new LongColumnInterpreter(); @@ -366,17 +381,18 @@ max = 0; } assertEquals(0, max);// control should go to the catch block + aClient.close(); } @Test public void testMaxWithInvalidRange2() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); long max = Long.MIN_VALUE; Scan scan = new Scan(); scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); scan.setStartRow(ROWS[4]); scan.setStopRow(ROWS[4]); try { - AggregationClient aClient = new AggregationClient(conf); final ColumnInterpreter ci = new LongColumnInterpreter(); max = aClient.max(TEST_TABLE, ci, scan); @@ -384,6 +400,7 @@ max = 0; } assertEquals(0, max);// control should go to the catch block + aClient.close(); } @Test @@ -398,6 +415,7 @@ new LongColumnInterpreter(); max = aClient.max(TEST_TABLE, ci, scan); assertEquals(null, max); + aClient.close(); } /** @@ -419,6 +437,7 @@ Long min = aClient.min(TEST_TABLE, ci, scan); assertEquals(0l, min.longValue()); + aClient.close(); } /** @@ -435,6 +454,7 @@ new LongColumnInterpreter(); long min = aClient.min(TEST_TABLE, ci, scan); assertEquals(5, min); + aClient.close(); } @Test @@ -449,6 +469,7 @@ long min = aClient.min(TEST_TABLE, ci, scan); assertEquals(0, min); + aClient.close(); } @Test @@ -462,10 +483,11 @@ new LongColumnInterpreter(); long min = aClient.min(TEST_TABLE, ci, scan); assertEquals(6, min); + aClient.close(); } @Test - public void testMinWithValidRangeWithNullCF() { + public void testMinWithValidRangeWithNullCF() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.setStartRow(ROWS[5]); @@ -479,10 +501,11 @@ } assertEquals(null, min);// CP will throw an IOException about the // null column family, and max will be set to 0 + aClient.close(); } @Test - public void testMinWithInvalidRange() { + public void testMinWithInvalidRange() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Long min = null; Scan scan = new Scan(); @@ -496,10 +519,11 @@ } catch (Throwable e) { } assertEquals(null, min);// control should go to the catch block + aClient.close(); } @Test - public void testMinWithInvalidRange2() { + public void testMinWithInvalidRange2() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.addFamily(TEST_FAMILY); @@ -513,6 +537,7 @@ } catch (Throwable e) { } assertEquals(null, min);// control should go to the catch block + aClient.close(); } @Test @@ -527,6 +552,7 @@ Long min = null; min = aClient.min(TEST_TABLE, ci, scan); assertEquals(null, min); + aClient.close(); } /** @@ -545,6 +571,7 @@ long sum = aClient.sum(TEST_TABLE, ci, scan); assertEquals(190, sum); + aClient.close(); } /** @@ -561,6 +588,7 @@ new LongColumnInterpreter(); long sum = aClient.sum(TEST_TABLE, ci, scan); assertEquals(95, sum); + aClient.close(); } @Test @@ -573,6 +601,7 @@ long sum = aClient.sum(TEST_TABLE, ci, scan); assertEquals(190 + 1900, sum); + aClient.close(); } @Test @@ -586,10 +615,11 @@ new LongColumnInterpreter(); long sum = aClient.sum(TEST_TABLE, ci, scan); assertEquals(6 + 60, sum); + aClient.close(); } @Test - public void testSumWithValidRangeWithNullCF() { + public void testSumWithValidRangeWithNullCF() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.setStartRow(ROWS[6]); @@ -603,10 +633,11 @@ } assertEquals(null, sum);// CP will throw an IOException about the // null column family, and max will be set to 0 + aClient.close(); } @Test - public void testSumWithInvalidRange() { + public void testSumWithInvalidRange() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.addFamily(TEST_FAMILY); @@ -620,6 +651,7 @@ } catch (Throwable e) { } assertEquals(null, sum);// control should go to the catch block + aClient.close(); } @Test @@ -634,6 +666,7 @@ Long sum = null; sum = aClient.sum(TEST_TABLE, ci, scan); assertEquals(null, sum); + aClient.close(); } /** @@ -652,6 +685,7 @@ double avg = aClient.avg(TEST_TABLE, ci, scan); assertEquals(9.5, avg, 0); + aClient.close(); } /** @@ -668,6 +702,7 @@ new LongColumnInterpreter(); double avg = aClient.avg(TEST_TABLE, ci, scan); assertEquals(9.5, avg, 0); + aClient.close(); } @Test @@ -680,6 +715,7 @@ double avg = aClient.avg(TEST_TABLE, ci, scan); assertEquals(104.5, avg, 0); + aClient.close(); } @Test @@ -693,10 +729,11 @@ new LongColumnInterpreter(); double avg = aClient.avg(TEST_TABLE, ci, scan); assertEquals(6 + 60, avg, 0); + aClient.close(); } @Test - public void testAvgWithValidRangeWithNullCF() { + public void testAvgWithValidRangeWithNullCF() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); final ColumnInterpreter ci = @@ -708,10 +745,11 @@ } assertEquals(null, avg);// CP will throw an IOException about the // null column family, and max will be set to 0 + aClient.close(); } @Test - public void testAvgWithInvalidRange() { + public void testAvgWithInvalidRange() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); @@ -725,6 +763,7 @@ } catch (Throwable e) { } assertEquals(null, avg);// control should go to the catch block + aClient.close(); } @Test @@ -739,6 +778,7 @@ Double avg = null; avg = aClient.avg(TEST_TABLE, ci, scan); assertEquals(Double.NaN, avg, 0); + aClient.close(); } /** @@ -757,6 +797,7 @@ double std = aClient.std(TEST_TABLE, ci, scan); assertEquals(5.766, std, 0.05d); + aClient.close(); } /** @@ -773,6 +814,7 @@ new LongColumnInterpreter(); double std = aClient.std(TEST_TABLE, ci, scan); assertEquals(2.87, std, 0.05d); + aClient.close(); } @Test @@ -785,6 +827,7 @@ double std = aClient.std(TEST_TABLE, ci, scan); assertEquals(63.42, std, 0.05d); + aClient.close(); } @Test @@ -798,10 +841,11 @@ new LongColumnInterpreter(); double std = aClient.std(TEST_TABLE, ci, scan); assertEquals(0, std, 0); + aClient.close(); } @Test - public void testStdWithValidRangeWithNullCF() { + public void testStdWithValidRangeWithNullCF() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.setStartRow(ROWS[6]); @@ -815,10 +859,11 @@ } assertEquals(null, std);// CP will throw an IOException about the // null column family, and max will be set to 0 + aClient.close(); } @Test - public void testStdWithInvalidRange() { + public void testStdWithInvalidRange() throws IOException, InterruptedException { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); scan.addFamily(TEST_FAMILY); @@ -832,6 +877,7 @@ } catch (Throwable e) { } assertEquals(null, std);// control should go to the catch block + aClient.close(); } @Test @@ -846,6 +892,7 @@ Double std = null; std = aClient.std(TEST_TABLE, ci, scan); assertEquals(Double.NaN, std, 0); + aClient.close(); } } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 1470122) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (working copy) @@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import java.io.IOException; import java.nio.ByteBuffer; @@ -50,6 +53,10 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -77,6 +84,9 @@ private static final Log log = LogFactory.getLog(AggregationClient.class); Configuration conf; + private volatile HConnection connection; + private final ExecutorService threadPool; + private boolean closed; /** * Constructor with Conf object @@ -84,7 +94,45 @@ */ public AggregationClient(Configuration cfg) { this.conf = cfg; + int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = 1; // is there a better default? + } + long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); + this.threadPool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue(), + Threads.newDaemonThreadFactory("aggregation-client-shared-executor")); + ((ThreadPoolExecutor) this.threadPool).allowCoreThreadTimeOut(true); + this.closed = false; } + + protected HConnection getConnection() throws IOException { + if (this.connection == null) { + synchronized (this) { + if (this.connection == null) { + this.connection = HConnectionManager.createConnection(this.conf); + } + } + } + return this.connection; + } + + public void close() throws IOException, InterruptedException { + if (!this.closed) { + this.threadPool.shutdown(); + // wait for all running tasks to finish + try { + this.threadPool.awaitTermination(60000, TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + if (!this.threadPool.isTerminated()) this.threadPool.shutdownNow(); + if (connection != null && !connection.isClosed()) { + connection.close(); + } + this.closed = true; + } + } /** * It gives the maximum value of a column for a given column family for the @@ -117,7 +165,7 @@ MaxCallBack aMaxCallBack = new MaxCallBack(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call() { @Override @@ -189,7 +237,7 @@ MinCallBack minCallBack = new MinCallBack(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call() { @@ -252,7 +300,7 @@ RowNumCallback rowNum = new RowNumCallback(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call() { @Override @@ -308,7 +356,7 @@ SumCallBack sumCallBack = new SumCallBack(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call() { @Override @@ -367,7 +415,7 @@ AvgCallBack avgCallBack = new AvgCallBack(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call>() { @@ -463,7 +511,7 @@ StdCallback stdCallback = new StdCallback(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call, Long>>() { @@ -571,7 +619,7 @@ StdCallback stdCallback = new StdCallback(); HTable table = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call>() { @Override @@ -646,7 +694,7 @@ HTable table = null; ResultScanner scanner = null; try { - table = new HTable(conf, tableName); + table = new HTable(tableName, getConnection(), this.threadPool); int cacheSize = scan2.getCaching(); if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) { scan2.setCacheBlocks(true);