diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index 0e6e69e..90377c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -17,11 +17,12 @@ */ package org.apache.hadoop.hbase.master; +import java.util.concurrent.ThreadLocalRandom; import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -31,7 +32,8 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -54,10 +56,10 @@ public class RegionStateStore { protected static final char META_REPLICA_ID_DELIMITER = '_'; private volatile HRegion metaRegion; - private volatile HTableInterface metaTable; private volatile boolean initialized; private final Server server; + private HConnection[] hConnectionPool; /** * Returns the {@link ServerName} from catalog table {@link Result} @@ -132,33 +134,43 @@ public class RegionStateStore { initialized = false; } - @SuppressWarnings("deprecation") void start() throws IOException { if (server instanceof RegionServerServices) { metaRegion = ((RegionServerServices)server).getFromOnlineRegions( HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); } if (metaRegion == null) { - metaTable = new HTable(TableName.META_TABLE_NAME, - server.getShortCircuitConnection()); + Configuration conf = server.getConfiguration(); + int count = conf.getInt("hbase.statestore.meta.connection", 1); + hConnectionPool = new HConnection[count]; + for (int i = 0; i < count; i++) { + HConnection conn = HConnectionManager.createConnection(conf); + hConnectionPool[i] = conn; + } } initialized = true; } void stop() { initialized = false; - if (metaTable != null) { - try { - metaTable.close(); - } catch (IOException e) { - LOG.info("Got exception in closing meta table", e); - } finally { - metaTable = null; + // Closing connections should be enough as it shuts the batch pool + // and stops the rpc client. + threadLocalHTable = null; + if (hConnectionPool != null) { + for (HConnection conn : hConnectionPool) { + if (conn != null) { + try { + conn.close(); + } catch (IOException e) { + LOG.info("Got exception in closing connection to meta table", e); + } finally { + conn = null; + } + } } } } - @SuppressWarnings("deprecation") void updateRegionState(long openSeqNum, RegionState newState, RegionState oldState) { if (!initialized) { @@ -210,21 +222,48 @@ public class RegionStateStore { synchronized (this) { if (metaRegion != null) { LOG.info("Meta region shortcut failed", t); - metaTable = new HTable(TableName.META_TABLE_NAME, - server.getShortCircuitConnection()); + if (hConnectionPool == null) { + hConnectionPool = new HConnection[]{HConnectionManager.createConnection(server.getConfiguration())}; + } metaRegion = null; } } } } - synchronized(metaTable) { - metaTable.put(put); - } + HTableInterface htable = getThreadLocalHTable(); + htable.put(put); } catch (IOException ioe) { LOG.error("Failed to persist region state " + newState, ioe); server.abort("Failed to update region location", ioe); } } + + private ThreadLocal threadLocalHTable = + new ThreadLocal() { + @Override + protected HTableInterface initialValue() { + try { + int connectionId = ThreadLocalRandom.current().nextInt(hConnectionPool.length); + LOG.info("Creating htable for thread " + Thread.currentThread().getName() + + " using connection id " + connectionId); + HTableInterface htable = + hConnectionPool[connectionId].getTable(TableName.META_TABLE_NAME); + return htable; + } catch (IOException ioe) { + LOG.error("Failed to build HTable instance ", ioe); + return null; + } + } + }; + + private HTableInterface getThreadLocalHTable() throws IOException { + HTableInterface htable = threadLocalHTable.get(); + if (htable != null) { + return htable; + } else { + throw new IOException("Failed to build HTable instance "); + } + } void splitRegion(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index 6b1cc23..c128411 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -1051,7 +1053,63 @@ public class TestAssignmentManagerOnCluster { TEST_UTIL.deleteTable(Bytes.toBytes(table)); } } - + + /** + * Test concurrent updates to meta when meta is not on master + * @throws Exception + */ + @Test(timeout = 30000) + public void testUpdateStateMetaNotOnMaster() throws Exception { + conf.setInt("hbase.statestore.meta.connection", 3); + final RegionStateStore rss = + new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager())); + rss.start(); + // Create 10 threads and make each do 10 puts related to region state update + Thread[] th = new Thread[10]; + List nameList = new ArrayList(); + List tableNameList = new ArrayList(); + for (int i = 0; i < th.length; i++) { + th[i] = new Thread() { + @Override + public void run() { + HRegionInfo[] hri = new HRegionInfo[10]; + ServerName serverName = ServerName.valueOf("dummyhost", 1000, 1234); + for (int i = 0; i < 10; i++) { + hri[i] = new HRegionInfo(TableName.valueOf(Thread.currentThread().getName() + "_" + i)); + RegionState newState = new RegionState(hri[i], RegionState.State.OPEN, serverName); + RegionState oldState = + new RegionState(hri[i], RegionState.State.PENDING_OPEN, serverName); + rss.updateRegionState(1, newState, oldState); + } + } + }; + th[i].start(); + nameList.add(th[i].getName()); + } + for (int i = 0; i < th.length; i++) { + th[i].join(); + } + // Add all the expected table names in meta to tableNameList + for (String name : nameList) { + for (int i = 0; i < 10; i++) { + tableNameList.add(TableName.valueOf(name + "_" + i)); + } + } + List metaRows = MetaTableAccessor.fullScanOfMeta(admin.getConnection()); + int count = 0; + // Check all 100 rows are in meta + for (Result result : metaRows) { + if (tableNameList.contains(HRegionInfo.getTable(result.getRow()))) { + count++; + if (count == 100) { + break; + } + } + } + assertTrue(count == 100); + rss.stop(); + } + static class MyLoadBalancer extends StochasticLoadBalancer { // For this region, if specified, always assign to nowhere static volatile String controledRegion = null;