diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 7957cc8..9b070a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -47,9 +47,12 @@ public class RpcRetryingCallerFactory { } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { + String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = - configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, - RpcRetryingCallerFactory.class.getName()); + configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); + if (rpcCallerFactoryClazz.equals(clazzName)) { + return new RpcRetryingCallerFactory(configuration); + } return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index 0e6e69e..fb16dd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -31,15 +33,13 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; - +import org.apache.hadoop.hbase.util.MultiHConnection; import com.google.common.base.Preconditions; /** @@ -54,9 +54,8 @@ public class RegionStateStore { protected static final char META_REPLICA_ID_DELIMITER = '_'; private volatile HRegion metaRegion; - private volatile HTableInterface metaTable; private volatile boolean initialized; - + private MultiHConnection multiHConnection; private final Server server; /** @@ -132,33 +131,31 @@ public class RegionStateStore { initialized = false; } - @SuppressWarnings("deprecation") void start() throws IOException { if (server instanceof RegionServerServices) { metaRegion = ((RegionServerServices)server).getFromOnlineRegions( HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); } + // When meta is not colocated on master if (metaRegion == null) { - metaTable = new HTable(TableName.META_TABLE_NAME, - server.getShortCircuitConnection()); + Configuration conf = server.getConfiguration(); + // Config to determine the no of HConnections to META. + // A single HConnection should be sufficient in most cases. Only if + // you are doing lot of writes (>1M) to META, + // increasing this value might improve the write throughput. + multiHConnection = + new MultiHConnection(conf, conf.getInt("hbase.regionstatestore.meta.connection", 1)); } initialized = true; } void stop() { initialized = false; - if (metaTable != null) { - try { - metaTable.close(); - } catch (IOException e) { - LOG.info("Got exception in closing meta table", e); - } finally { - metaTable = null; - } + if (multiHConnection != null) { + multiHConnection.close(); } } - @SuppressWarnings("deprecation") void updateRegionState(long openSeqNum, RegionState newState, RegionState oldState) { if (!initialized) { @@ -210,22 +207,23 @@ public class RegionStateStore { synchronized (this) { if (metaRegion != null) { LOG.info("Meta region shortcut failed", t); - metaTable = new HTable(TableName.META_TABLE_NAME, - server.getShortCircuitConnection()); + if (multiHConnection == null) { + multiHConnection = new MultiHConnection(server.getConfiguration(), 1); + } metaRegion = null; } } } } - synchronized(metaTable) { - metaTable.put(put); - } + // Called when meta is not on master + multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null); + } catch (IOException ioe) { LOG.error("Failed to persist region state " + newState, ioe); server.abort("Failed to update region location", ioe); } } - + void splitRegion(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { MetaTableAccessor.splitRegion(server.getShortCircuitConnection(), p, a, b, sn); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java new file mode 100644 index 0000000..ea44c1d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.coprocessor.Batch; + +/** + * Provides ability to create multiple HConnection instances and allows to process a batch of + * actions using HConnection.processBatchCallback() + */ +public class MultiHConnection { + private static final Log LOG = LogFactory.getLog(MultiHConnection.class); + private HConnection[] hConnections; + private int noOfConnections; + private ExecutorService batchPool; + + /** + * Create multiple HConnection instances and initialize a thread pool executor + * @param conf configuration + * @param noOfConnections total no of HConnections to create + * @throws IOException + */ + public MultiHConnection(Configuration conf, int noOfConnections) + throws IOException { + this.noOfConnections = noOfConnections; + hConnections = new HConnection[noOfConnections]; + for (int i = 0; i < noOfConnections; i++) { + HConnection conn = HConnectionManager.createConnection(conf); + hConnections[i] = conn; + } + createBatchPool(conf); + } + + /** + * Close the open connections and shutdown the batchpool + */ + public void close() { + if (hConnections != null) { + for (HConnection conn : hConnections) { + if (conn != null) { + try { + conn.close(); + } catch (IOException e) { + LOG.info("Got exception in closing connection", e); + } finally { + conn = null; + } + } + } + hConnections = null; + } + if (this.batchPool != null && !this.batchPool.isShutdown()) { + this.batchPool.shutdown(); + try { + if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) { + this.batchPool.shutdownNow(); + } + } catch (InterruptedException e) { + this.batchPool.shutdownNow(); + } + } + + } + + /** + * Randomly pick a connection and process the batch of actions for a given table + * @param actions the actions + * @param tableName table name + * @param results the results array + * @param callback + * @throws IOException + * @throws InterruptedException + */ + @SuppressWarnings("deprecation") + public void processBatchCallback(List actions, TableName tableName, + Object[] results, Batch.Callback callback) throws IOException { + // Currently used by RegionStateStore + // A deprecated method is used as multiple threads accessing RegionStateStore do a single put + // and htable is not thread safe. Alternative would be to create an Htable instance for each + // put but that is not very efficient. + // See HBASE-11610 for more details. + try { + hConnections[ThreadLocalRandom.current().nextInt(noOfConnections)].processBatchCallback( + actions, tableName, this.batchPool, results, callback); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + } + + + // Copied from HConnectionImplementation.getBatchPool() + // We should get rid of this when HConnection.processBatchCallback is un-deprecated and provides + // an API to manage a batch pool + private ExecutorService createBatchPool(Configuration conf) { + // Use the same config for keep alive as in HConnectionImplementation.getBatchPool(); + int maxThreads = conf.getInt("hbase.hconnection.threads.max", 256); + int coreThreads = conf.getInt("hbase.hconnection.threads.core", 256); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors() * 8; + } + if (coreThreads == 0) { + coreThreads = Runtime.getRuntime().availableProcessors() * 8; + } + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + LinkedBlockingQueue workQueue = + new LinkedBlockingQueue(maxThreads + * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + ThreadPoolExecutor tpe = + new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, + Threads.newDaemonThreadFactory("MultiHConnection" + "-shared-")); + tpe.allowCoreThreadTimeOut(true); + return this.batchPool = tpe; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index f8e87dd..02847d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -1050,7 +1052,63 @@ public class TestAssignmentManagerOnCluster { TEST_UTIL.deleteTable(Bytes.toBytes(table)); } } - + + /** + * Test concurrent updates to meta when meta is not on master + * @throws Exception + */ + @Test(timeout = 30000) + public void testUpdatesRemoteMeta() throws Exception { + conf.setInt("hbase.regionstatestore.meta.connection", 3); + final RegionStateStore rss = + new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager())); + rss.start(); + // Create 10 threads and make each do 10 puts related to region state update + Thread[] th = new Thread[10]; + List nameList = new ArrayList(); + List tableNameList = new ArrayList(); + for (int i = 0; i < th.length; i++) { + th[i] = new Thread() { + @Override + public void run() { + HRegionInfo[] hri = new HRegionInfo[10]; + ServerName serverName = ServerName.valueOf("dummyhost", 1000, 1234); + for (int i = 0; i < 10; i++) { + hri[i] = new HRegionInfo(TableName.valueOf(Thread.currentThread().getName() + "_" + i)); + RegionState newState = new RegionState(hri[i], RegionState.State.OPEN, serverName); + RegionState oldState = + new RegionState(hri[i], RegionState.State.PENDING_OPEN, serverName); + rss.updateRegionState(1, newState, oldState); + } + } + }; + th[i].start(); + nameList.add(th[i].getName()); + } + for (int i = 0; i < th.length; i++) { + th[i].join(); + } + // Add all the expected table names in meta to tableNameList + for (String name : nameList) { + for (int i = 0; i < 10; i++) { + tableNameList.add(TableName.valueOf(name + "_" + i)); + } + } + List metaRows = MetaTableAccessor.fullScanOfMeta(admin.getConnection()); + int count = 0; + // Check all 100 rows are in meta + for (Result result : metaRows) { + if (tableNameList.contains(HRegionInfo.getTable(result.getRow()))) { + count++; + if (count == 100) { + break; + } + } + } + assertTrue(count == 100); + rss.stop(); + } + static class MyLoadBalancer extends StochasticLoadBalancer { // For this region, if specified, always assign to nowhere static volatile String controledRegion = null;