diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 7957cc8..9b070a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -47,9 +47,12 @@ public class RpcRetryingCallerFactory { } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { + String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = - configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, - RpcRetryingCallerFactory.class.getName()); + configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); + if (rpcCallerFactoryClazz.equals(clazzName)) { + return new RpcRetryingCallerFactory(configuration); + } return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index 0e6e69e..424f2f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -39,7 +38,7 @@ import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; - +import org.apache.hadoop.hbase.util.MultiHConnection; import com.google.common.base.Preconditions; /** @@ -54,8 +53,8 @@ public class RegionStateStore { protected static final char META_REPLICA_ID_DELIMITER = '_'; private volatile HRegion metaRegion; - private volatile HTableInterface metaTable; private volatile boolean initialized; + private MultiHConnection multiHConnection; private final Server server; @@ -132,33 +131,31 @@ public class RegionStateStore { initialized = false; } - @SuppressWarnings("deprecation") void start() throws IOException { if (server instanceof RegionServerServices) { metaRegion = ((RegionServerServices)server).getFromOnlineRegions( HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); } + // When meta is not colocated on master if (metaRegion == null) { - metaTable = new HTable(TableName.META_TABLE_NAME, - server.getShortCircuitConnection()); + Configuration conf = server.getConfiguration(); + // Config to determine the no of HConnections to META. + // A single HConnection should be sufficient in most cases. Only if + // you are doing lot of writes (>1M) to META, + // increasing this value might improve the write throughput. + int count = conf.getInt("hbase.regionstatestore.meta.connection", 1); + multiHConnection = new MultiHConnection(conf, count); } initialized = true; } void stop() { initialized = false; - if (metaTable != null) { - try { - metaTable.close(); - } catch (IOException e) { - LOG.info("Got exception in closing meta table", e); - } finally { - metaTable = null; - } + if (multiHConnection != null) { + multiHConnection.closeConnections(); } } - @SuppressWarnings("deprecation") void updateRegionState(long openSeqNum, RegionState newState, RegionState oldState) { if (!initialized) { @@ -210,22 +207,24 @@ public class RegionStateStore { synchronized (this) { if (metaRegion != null) { LOG.info("Meta region shortcut failed", t); - metaTable = new HTable(TableName.META_TABLE_NAME, - server.getShortCircuitConnection()); + if (multiHConnection == null) { + multiHConnection = new MultiHConnection(server.getConfiguration(), 1); + } metaRegion = null; } } } } - synchronized(metaTable) { - metaTable.put(put); - } + // Called when meta is not on master + HTableInterface htable = multiHConnection.getHTable(TableName.META_TABLE_NAME); + htable.put(put); + htable.close(); } catch (IOException ioe) { LOG.error("Failed to persist region state " + newState, ioe); server.abort("Failed to update region location", ioe); } } - + void splitRegion(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { MetaTableAccessor.splitRegion(server.getShortCircuitConnection(), p, a, b, sn); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java new file mode 100644 index 0000000..a466106 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; + +/** + * Provides ability to create multiple HConnection instances and allows to retrieve an HTable + * instance from a connection picked at random. + */ +public class MultiHConnection { + + private static final Log LOG = LogFactory.getLog(MultiHConnection.class); + private HConnection[] hConnections; + private int noOfConnections; + + /** + * Create HConnection instances based on the passed configuration + * @param conf configuration + * @param noOfConnections total no of connections to create + * @throws IOException + */ + public MultiHConnection(Configuration conf, int noOfConnections) throws IOException { + hConnections = new HConnection[noOfConnections]; + for (int i = 0; i < noOfConnections; i++) { + HConnection conn = HConnectionManager.createConnection(conf); + hConnections[i] = conn; + } + this.noOfConnections = noOfConnections; + } + + /** + * Close the open connections + */ + public void closeConnections() { + if (hConnections != null) { + for (HConnection conn : hConnections) { + if (conn != null) { + try { + conn.close(); + } catch (IOException e) { + LOG.info("Got exception in closing connection", e); + } finally { + conn = null; + } + } + } + } + hConnections = null; + } + + /** + * Return HTable instance for a connection picked randomly + * @param tableName table name + * @return HTable instance + * @throws IOException + */ + public HTableInterface getHTable(TableName tableName) throws IOException { + return hConnections[ThreadLocalRandom.current().nextInt(noOfConnections)].getTable(tableName); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index f8e87dd..02847d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -1050,7 +1052,63 @@ public class TestAssignmentManagerOnCluster { TEST_UTIL.deleteTable(Bytes.toBytes(table)); } } - + + /** + * Test concurrent updates to meta when meta is not on master + * @throws Exception + */ + @Test(timeout = 30000) + public void testUpdatesRemoteMeta() throws Exception { + conf.setInt("hbase.regionstatestore.meta.connection", 3); + final RegionStateStore rss = + new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager())); + rss.start(); + // Create 10 threads and make each do 10 puts related to region state update + Thread[] th = new Thread[10]; + List nameList = new ArrayList(); + List tableNameList = new ArrayList(); + for (int i = 0; i < th.length; i++) { + th[i] = new Thread() { + @Override + public void run() { + HRegionInfo[] hri = new HRegionInfo[10]; + ServerName serverName = ServerName.valueOf("dummyhost", 1000, 1234); + for (int i = 0; i < 10; i++) { + hri[i] = new HRegionInfo(TableName.valueOf(Thread.currentThread().getName() + "_" + i)); + RegionState newState = new RegionState(hri[i], RegionState.State.OPEN, serverName); + RegionState oldState = + new RegionState(hri[i], RegionState.State.PENDING_OPEN, serverName); + rss.updateRegionState(1, newState, oldState); + } + } + }; + th[i].start(); + nameList.add(th[i].getName()); + } + for (int i = 0; i < th.length; i++) { + th[i].join(); + } + // Add all the expected table names in meta to tableNameList + for (String name : nameList) { + for (int i = 0; i < 10; i++) { + tableNameList.add(TableName.valueOf(name + "_" + i)); + } + } + List metaRows = MetaTableAccessor.fullScanOfMeta(admin.getConnection()); + int count = 0; + // Check all 100 rows are in meta + for (Result result : metaRows) { + if (tableNameList.contains(HRegionInfo.getTable(result.getRow()))) { + count++; + if (count == 100) { + break; + } + } + } + assertTrue(count == 100); + rss.stop(); + } + static class MyLoadBalancer extends StochasticLoadBalancer { // For this region, if specified, always assign to nowhere static volatile String controledRegion = null;