commit 9abef9ecbd587b0a9d2f3cd5b9f24d134eb82e3d Author: Karthick Sankarachary Date: Wed May 4 13:02:47 2011 -0700 HBASE-2938 Add Thread-Local Behavior To HTable Pool diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 92b45c9..fde42e3 100755 --- a/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -21,14 +21,13 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.PoolMap; +import org.apache.hadoop.hbase.util.PoolMap.PoolType; /** * A simple pool of HTable instances.

@@ -44,10 +43,10 @@ import org.apache.hadoop.hbase.util.Bytes; *

Pool will manage its own cluster to the cluster. See {@link HConnectionManager}. */ public class HTablePool implements Closeable { - private final Map> tables = - new ConcurrentHashMap>(); - private final Configuration config; + private final PoolMap tables; private final int maxSize; + private final PoolType poolType; + private final Configuration config; private final HTableInterfaceFactory tableFactory; /** @@ -63,16 +62,82 @@ public class HTablePool implements Closeable { * @param maxSize maximum number of references to keep for each table */ public HTablePool(final Configuration config, final int maxSize) { - this(config, maxSize, null); + this(config, maxSize, null, null); } + /** + * Constructor to set maximum versions and use the specified configuration and + * table factory. + * + * @param config + * configuration + * @param maxSize + * maximum number of references to keep for each table + * @param tableFactory + * table factory + */ public HTablePool(final Configuration config, final int maxSize, final HTableInterfaceFactory tableFactory) { + this(config, maxSize, null, PoolType.Reusable); + } + + /** + * Constructor to set maximum versions and use the specified configuration and + * pool type. + * + * @param config + * configuration + * @param maxSize + * maximum number of references to keep for each table + * @param tableFactory + * table factory + * @param poolType + * pool type which is one of {@link PoolType#Reusable} or + * {@link PoolType#ThreadLocal} + */ + public HTablePool(final Configuration config, final int maxSize, + final PoolType poolType) { + this(config, maxSize, null, poolType); + } + + /** + * Constructor to set maximum versions and use the specified configuration, + * table factory and pool type. The HTablePool supports the + * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool + * type is null or not one of those two values, then it will default to + * {@link PoolType#Reusable}. + * + * @param config + * configuration + * @param maxSize + * maximum number of references to keep for each table + * @param tableFactory + * table factory + * @param poolType + * pool type which is one of {@link PoolType#Reusable} or + * {@link PoolType#ThreadLocal} + */ + public HTablePool(final Configuration config, final int maxSize, + final HTableInterfaceFactory tableFactory, PoolType poolType) { // Make a new configuration instance so I can safely cleanup when // done with the pool. this.config = config == null? new Configuration(): config; this.maxSize = maxSize; this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory; + if (poolType == null) { + this.poolType = PoolType.Reusable; + } else { + switch (poolType) { + case Reusable: + case ThreadLocal: + this.poolType = poolType; + break; + default: + this.poolType = PoolType.Reusable; + break; + } + } + this.tables = new PoolMap(this.poolType, this.maxSize); } /** @@ -84,15 +149,9 @@ public class HTablePool implements Closeable { * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getTable(String tableName) { - Queue queue = tables.get(tableName); - if(queue == null) { - queue = new ConcurrentLinkedQueue(); - tables.put(tableName, queue); - return createHTable(tableName); - } - HTableInterface table = queue.poll(); + HTableInterface table = tables.get(tableName); if(table == null) { - return createHTable(tableName); + table = createHTable(tableName); } return table; } @@ -117,13 +176,13 @@ public class HTablePool implements Closeable { * @param table table */ public void putTable(HTableInterface table) throws IOException { - Queue queue = tables.get(Bytes.toString(table.getTableName())); - if(queue.size() >= maxSize) { + String tableName = Bytes.toString(table.getTableName()); + if(tables.size(tableName) >= maxSize) { // release table instance since we're not reusing it this.tableFactory.releaseHTableInterface(table); return; } - queue.add(table); + tables.put(tableName, table); } protected HTableInterface createHTable(String tableName) { @@ -140,14 +199,13 @@ public class HTablePool implements Closeable { * @param tableName */ public void closeTablePool(final String tableName) throws IOException { - Queue queue = tables.get(tableName); - if (queue != null) { - HTableInterface table = queue.poll(); - while (table != null) { + Collection tables = this.tables.values(tableName); + if (tables != null) { + for (HTableInterface table : tables) { this.tableFactory.releaseHTableInterface(table); - table = queue.poll(); } } + this.tables.remove(tableName); } /** @@ -171,7 +229,6 @@ public class HTablePool implements Closeable { } int getCurrentPoolSize(String tableName) { - Queue queue = tables.get(tableName); - return queue.size(); + return tables.size(tableName); } } diff --git a/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java b/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java index 354d49a..5732cc4 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java +++ b/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java @@ -21,13 +21,15 @@ package org.apache.hadoop.hbase.util; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; /** @@ -53,8 +55,7 @@ public class PoolMap implements Map { private int poolMaxSize; - private Map> pools = Collections - .synchronizedMap(new HashMap>()); + private Map> pools = new ConcurrentHashMap>(); public PoolMap(PoolType poolType, int poolMaxSize) { this.poolType = poolType; @@ -102,6 +103,19 @@ public class PoolMap implements Map { return values; } + public Collection values(K key) { + Collection values = new ArrayList(); + Pool pool = pools.get(key); + if (pool != null) { + Collection poolValues = pool.values(); + if (poolValues != null) { + values.addAll(poolValues); + } + } + return values; + } + + @Override public boolean isEmpty() { return pools.isEmpty(); @@ -270,7 +284,7 @@ public class PoolMap implements Map { * the type of the resource */ @SuppressWarnings("serial") - public class ReusablePool extends LinkedList implements Pool { + public class ReusablePool extends ConcurrentLinkedQueue implements Pool { private int maxSize; public ReusablePool(int maxSize) { @@ -314,7 +328,7 @@ public class PoolMap implements Map { * */ @SuppressWarnings("serial") - class RoundRobinPool extends ArrayList implements Pool { + class RoundRobinPool extends CopyOnWriteArrayList implements Pool { private int maxSize; private int nextResource = 0; diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java b/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java index d01043c..e3cc787 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java @@ -22,147 +22,236 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import junit.framework.Assert; +import junit.framework.TestCase; +import junit.framework.TestSuite; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.apache.hadoop.hbase.util.PoolMap.PoolType; import org.junit.Test; /** * Tests HTablePool. */ -public class TestHTablePool { - private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private final static byte [] TABLENAME = Bytes.toBytes("TestHTablePool"); - - @BeforeClass - public static void beforeClass() throws Exception { - TEST_UTIL.startMiniCluster(1); - TEST_UTIL.createTable(TABLENAME, HConstants.CATALOG_FAMILY); - } +public class TestHTablePool { + private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte[] TABLENAME = Bytes.toBytes("TestHTablePool"); + + public abstract static class TestHTablePoolType extends TestCase { + protected void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLENAME, HConstants.CATALOG_FAMILY); + } - @AfterClass - public static void afterClass() throws IOException { - TEST_UTIL.shutdownMiniCluster(); - } + protected void tearDown() throws IOException { + TEST_UTIL.shutdownMiniCluster(); + } - @Test - public void testTableWithStringName() throws Exception { - HTablePool pool = - new HTablePool(TEST_UTIL.getConfiguration(), Integer.MAX_VALUE); - String tableName = Bytes.toString(TABLENAME); + protected abstract PoolType getPoolType(); - // Request a table from an empty pool - HTableInterface table = pool.getTable(tableName); - Assert.assertNotNull(table); + @Test + public void testTableWithStringName() throws Exception { + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), + Integer.MAX_VALUE, getPoolType()); + String tableName = Bytes.toString(TABLENAME); - // Return the table to the pool - pool.putTable(table); + // Request a table from an empty pool + HTableInterface table = pool.getTable(tableName); + Assert.assertNotNull(table); - // Request a table of the same name - HTableInterface sameTable = pool.getTable(tableName); - Assert.assertSame(table, sameTable); - } + // Return the table to the pool + pool.putTable(table); - @Test - public void testTableWithByteArrayName() throws IOException { - HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), Integer.MAX_VALUE); + // Request a table of the same name + HTableInterface sameTable = pool.getTable(tableName); + Assert.assertSame(table, sameTable); + } - // Request a table from an empty pool - HTableInterface table = pool.getTable(TABLENAME); - Assert.assertNotNull(table); + @Test + public void testTableWithByteArrayName() throws IOException { + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), + Integer.MAX_VALUE, getPoolType()); - // Return the table to the pool - pool.putTable(table); + // Request a table from an empty pool + HTableInterface table = pool.getTable(TABLENAME); + Assert.assertNotNull(table); - // Request a table of the same name - HTableInterface sameTable = pool.getTable(TABLENAME); - Assert.assertSame(table, sameTable); - } + // Return the table to the pool + pool.putTable(table); - @Test - public void testTableWithMaxSize() throws Exception { - HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2); - - // Request tables from an empty pool - HTableInterface table1 = pool.getTable(TABLENAME); - HTableInterface table2 = pool.getTable(TABLENAME); - HTableInterface table3 = pool.getTable(TABLENAME); - - // Return the tables to the pool - pool.putTable(table1); - pool.putTable(table2); - // The pool should reject this one since it is already full - pool.putTable(table3); - - // Request tables of the same name - HTableInterface sameTable1 = pool.getTable(TABLENAME); - HTableInterface sameTable2 = pool.getTable(TABLENAME); - HTableInterface sameTable3 = pool.getTable(TABLENAME); - Assert.assertSame(table1, sameTable1); - Assert.assertSame(table2, sameTable2); - Assert.assertNotSame(table3, sameTable3); - } + // Request a table of the same name + HTableInterface sameTable = pool.getTable(TABLENAME); + Assert.assertSame(table, sameTable); + } - @Test - public void testTablesWithDifferentNames() throws IOException { - HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), Integer.MAX_VALUE); - byte [] otherTable = Bytes.toBytes("OtherTable"); - TEST_UTIL.createTable(otherTable, HConstants.CATALOG_FAMILY); - - // Request a table from an empty pool - HTableInterface table1 = pool.getTable(TABLENAME); - HTableInterface table2 = pool.getTable(otherTable); - Assert.assertNotNull(table2); - - // Return the tables to the pool - pool.putTable(table1); - pool.putTable(table2); - - // Request tables of the same names - HTableInterface sameTable1 = pool.getTable(TABLENAME); - HTableInterface sameTable2 = pool.getTable(otherTable); - Assert.assertSame(table1, sameTable1); - Assert.assertSame(table2, sameTable2); - } + @Test + public void testTablesWithDifferentNames() throws IOException { + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), + Integer.MAX_VALUE, getPoolType()); + byte[] otherTable = Bytes.toBytes("OtherTable"); + TEST_UTIL.createTable(otherTable, HConstants.CATALOG_FAMILY); + + // Request a table from an empty pool + HTableInterface table1 = pool.getTable(TABLENAME); + HTableInterface table2 = pool.getTable(otherTable); + Assert.assertNotNull(table2); + + // Return the tables to the pool + pool.putTable(table1); + pool.putTable(table2); + + // Request tables of the same names + HTableInterface sameTable1 = pool.getTable(TABLENAME); + HTableInterface sameTable2 = pool.getTable(otherTable); + Assert.assertSame(table1, sameTable1); + Assert.assertSame(table2, sameTable2); + } + } - @Test - public void testCloseTablePool() throws IOException { - HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4); - HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + public static class TestHTableReusablePool extends TestHTablePoolType { + @Override + protected PoolType getPoolType() { + return PoolType.Reusable; + } - if (admin.tableExists(TABLENAME)) { - admin.disableTable(TABLENAME); - admin.deleteTable(TABLENAME); + @Test + public void testTableWithMaxSize() throws Exception { + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, + getPoolType()); + + // Request tables from an empty pool + HTableInterface table1 = pool.getTable(TABLENAME); + HTableInterface table2 = pool.getTable(TABLENAME); + HTableInterface table3 = pool.getTable(TABLENAME); + + // Return the tables to the pool + pool.putTable(table1); + pool.putTable(table2); + // The pool should reject this one since it is already full + pool.putTable(table3); + + // Request tables of the same name + HTableInterface sameTable1 = pool.getTable(TABLENAME); + HTableInterface sameTable2 = pool.getTable(TABLENAME); + HTableInterface sameTable3 = pool.getTable(TABLENAME); + Assert.assertSame(table1, sameTable1); + Assert.assertSame(table2, sameTable2); + Assert.assertNotSame(table3, sameTable3); } - HTableDescriptor tableDescriptor = new HTableDescriptor(TABLENAME); - tableDescriptor.addFamily(new HColumnDescriptor("randomFamily")); - admin.createTable(tableDescriptor); + @Test + public void testCloseTablePool() throws IOException { + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, + getPoolType()); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + + if (admin.tableExists(TABLENAME)) { + admin.disableTable(TABLENAME); + admin.deleteTable(TABLENAME); + } + + HTableDescriptor tableDescriptor = new HTableDescriptor(TABLENAME); + tableDescriptor.addFamily(new HColumnDescriptor("randomFamily")); + admin.createTable(tableDescriptor); + // Request tables from an empty pool + HTableInterface[] tables = new HTableInterface[4]; + for (int i = 0; i < 4; ++i) { + tables[i] = pool.getTable(TABLENAME); + } - // Request tables from an empty pool - HTableInterface[] tables = new HTableInterface[4]; - for (int i = 0; i < 4; ++i ) { - tables[i] = pool.getTable(TABLENAME); + pool.closeTablePool(TABLENAME); + + for (int i = 0; i < 4; ++i) { + pool.putTable(tables[i]); + } + + Assert + .assertEquals(4, pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + + pool.closeTablePool(TABLENAME); + + Assert + .assertEquals(0, pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); } + } - pool.closeTablePool(TABLENAME); + public static class TestHTableThreadLocalPool extends TestHTablePoolType { + @Override + protected PoolType getPoolType() { + return PoolType.ThreadLocal; + } - for (int i = 0; i < 4; ++i ) { - pool.putTable(tables[i]); + @Test + public void testTableWithMaxSize() throws Exception { + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, + getPoolType()); + + // Request tables from an empty pool + HTableInterface table1 = pool.getTable(TABLENAME); + HTableInterface table2 = pool.getTable(TABLENAME); + HTableInterface table3 = pool.getTable(TABLENAME); + + // Return the tables to the pool + pool.putTable(table1); + pool.putTable(table2); + // The pool should not reject this one since the number of threads <= 2 + pool.putTable(table3); + + // Request tables of the same name + HTableInterface sameTable1 = pool.getTable(TABLENAME); + HTableInterface sameTable2 = pool.getTable(TABLENAME); + HTableInterface sameTable3 = pool.getTable(TABLENAME); + Assert.assertSame(table3, sameTable1); + Assert.assertSame(table3, sameTable2); + Assert.assertSame(table3, sameTable3); } - Assert.assertEquals(4, pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + @Test + public void testCloseTablePool() throws IOException { + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, + getPoolType()); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + + if (admin.tableExists(TABLENAME)) { + admin.disableTable(TABLENAME); + admin.deleteTable(TABLENAME); + } + + HTableDescriptor tableDescriptor = new HTableDescriptor(TABLENAME); + tableDescriptor.addFamily(new HColumnDescriptor("randomFamily")); + admin.createTable(tableDescriptor); - pool.closeTablePool(TABLENAME); + // Request tables from an empty pool + HTableInterface[] tables = new HTableInterface[4]; + for (int i = 0; i < 4; ++i) { + tables[i] = pool.getTable(TABLENAME); + } + + pool.closeTablePool(TABLENAME); + + for (int i = 0; i < 4; ++i) { + pool.putTable(tables[i]); + } + + Assert + .assertEquals(1, pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + + pool.closeTablePool(TABLENAME); + + Assert + .assertEquals(0, pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + } + } - Assert.assertEquals(0, pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + public static junit.framework.Test suite() { + TestSuite suite = new TestSuite(); + suite.addTestSuite(TestHTableReusablePool.class); + suite.addTestSuite(TestHTableThreadLocalPool.class); + return suite; } } \ No newline at end of file