diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 7957cc8..9b070a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -47,9 +47,12 @@ public class RpcRetryingCallerFactory { } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { + String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = - configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, - RpcRetryingCallerFactory.class.getName()); + configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); + if (rpcCallerFactoryClazz.equals(clazzName)) { + return new RpcRetryingCallerFactory(configuration); + } return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index d9f2262..c12a433 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,8 +32,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.RegionState.State; @@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; +import org.apache.hadoop.hbase.util.MultiHConnection; import com.google.common.base.Preconditions; @@ -52,12 +52,12 @@ public class RegionStateStore { private static final Log LOG = LogFactory.getLog(RegionStateStore.class); private volatile HRegion metaRegion; - private volatile HTableInterface metaTable; private volatile boolean initialized; private final boolean noPersistence; private final CatalogTracker catalogTracker; private final Server server; + private MultiHConnection multiHConnection; /** * Returns the {@link ServerName} from catalog table {@link Result} @@ -113,16 +113,22 @@ public class RegionStateStore { initialized = false; } - @SuppressWarnings("deprecation") void start() throws IOException { if (!noPersistence) { if (server instanceof RegionServerServices) { metaRegion = ((RegionServerServices)server).getFromOnlineRegions( HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); } + // When meta is not colocated on master if (metaRegion == null) { - metaTable = new HTable(TableName.META_TABLE_NAME, - catalogTracker.getConnection()); + Configuration conf = server.getConfiguration(); + // Config to determine the no of HConnections to META. + // A single HConnection should be sufficient in most cases. Only if + // you are doing lot of writes (>1M) to META, + // increasing this value might improve the write throughput. + multiHConnection = + new MultiHConnection(conf, conf.getInt("hbase.regionstatestore.meta.connection", 1)); + } } initialized = true; @@ -130,18 +136,11 @@ public class RegionStateStore { void stop() { initialized = false; - if (metaTable != null) { - try { - metaTable.close(); - } catch (IOException e) { - LOG.info("Got exception in closing meta table", e); - } finally { - metaTable = null; - } + if (multiHConnection != null) { + multiHConnection.close(); } } - @SuppressWarnings("deprecation") void updateRegionState(long openSeqNum, RegionState newState, RegionState oldState) { if (noPersistence || !initialized) { @@ -182,6 +181,7 @@ public class RegionStateStore { Bytes.toBytes(state.name())); LOG.info(info); + // Persist the state change to meta if (metaRegion != null) { try { @@ -197,16 +197,17 @@ public class RegionStateStore { synchronized (this) { if (metaRegion != null) { LOG.info("Meta region shortcut failed", t); - metaTable = new HTable(TableName.META_TABLE_NAME, - catalogTracker.getConnection()); + if (multiHConnection == null) { + multiHConnection = new MultiHConnection(server.getConfiguration(), 1); + } metaRegion = null; } } } } - synchronized(metaTable) { - metaTable.put(put); - } + // Called when meta is not on master + multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, + null); } catch (IOException ioe) { LOG.error("Failed to persist region state " + newState, ioe); server.abort("Failed to update region location", ioe); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java new file mode 100644 index 0000000..26e222a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java @@ -0,0 +1,163 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.coprocessor.Batch; + +/** + * Provides ability to create multiple HConnection instances and allows to process a batch of + * actions using HConnection.processBatchCallback() + */ +@InterfaceAudience.Private +public class MultiHConnection { + private static final Log LOG = LogFactory.getLog(MultiHConnection.class); + private HConnection[] hConnections; + private int noOfConnections; + private ExecutorService batchPool; + + /** + * Create multiple HConnection instances and initialize a thread pool executor + * @param conf configuration + * @param noOfConnections total no of HConnections to create + * @throws IOException + */ + public MultiHConnection(Configuration conf, int noOfConnections) + throws IOException { + this.noOfConnections = noOfConnections; + hConnections = new HConnection[noOfConnections]; + for (int i = 0; i < noOfConnections; i++) { + HConnection conn = HConnectionManager.createConnection(conf); + hConnections[i] = conn; + } + createBatchPool(conf); + } + + /** + * Close the open connections and shutdown the batchpool + */ + public void close() { + if (hConnections != null) { + synchronized (hConnections) { + if (hConnections != null) { + for (HConnection conn : hConnections) { + if (conn != null) { + try { + conn.close(); + } catch (IOException e) { + LOG.info("Got exception in closing connection", e); + } finally { + conn = null; + } + } + } + hConnections = null; + } + } + } + if (this.batchPool != null && !this.batchPool.isShutdown()) { + this.batchPool.shutdown(); + try { + if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) { + this.batchPool.shutdownNow(); + } + } catch (InterruptedException e) { + this.batchPool.shutdownNow(); + } + } + + } + + private static ThreadLocal threadLocalRandom = new ThreadLocal() { + @Override + protected Random initialValue() { + return new Random(); + } + }; + + /** + * Randomly pick a connection and process the batch of actions for a given table + * @param actions the actions + * @param tableName table name + * @param results the results array + * @param callback + * @throws IOException + * @throws InterruptedException + */ + @SuppressWarnings("deprecation") + public void processBatchCallback(List actions, TableName tableName, + Object[] results, Batch.Callback callback) throws IOException { + // Currently used by RegionStateStore + // A deprecated method is used as multiple threads accessing RegionStateStore do a single put + // and htable is not thread safe. Alternative would be to create an Htable instance for each + // put but that is not very efficient. + // See HBASE-11610 for more details. + try { + hConnections[threadLocalRandom.get().nextInt(noOfConnections)].processBatchCallback( + actions, tableName, this.batchPool, results, callback); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + } + + + // Copied from HConnectionImplementation.getBatchPool() + // We should get rid of this when HConnection.processBatchCallback is un-deprecated and provides + // an API to manage a batch pool + private void createBatchPool(Configuration conf) { + // Use the same config for keep alive as in HConnectionImplementation.getBatchPool(); + int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256); + int coreThreads = conf.getInt("hbase.multihconnection.threads.core", 256); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors() * 8; + } + if (coreThreads == 0) { + coreThreads = Runtime.getRuntime().availableProcessors() * 8; + } + long keepAliveTime = conf.getLong("hbase.multihconnection.threads.keepalivetime", 60); + LinkedBlockingQueue workQueue = + new LinkedBlockingQueue(maxThreads + * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + ThreadPoolExecutor tpe = + new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, + Threads.newDaemonThreadFactory("MultiHConnection" + "-shared-")); + tpe.allowCoreThreadTimeOut(true); + this.batchPool = tpe; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index c20347e..c42128c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -47,8 +47,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -869,6 +871,66 @@ public class TestAssignmentManagerOnCluster { } } + /** + * Test concurrent updates to meta when meta is not on master. Only for zk-less assignment + * @throws Exception + */ + @Test(timeout = 30000) + public void testUpdatesRemoteMeta() throws Exception { + // Not for zk less assignment + if (conf.getBoolean("hbase.assignment.usezk", true)) { + return; + } + conf.setInt("hbase.regionstatestore.meta.connection", 3); + final RegionStateStore rss = new RegionStateStore(new MyRegionServer(conf)); + rss.start(); + // Create 10 threads and make each do 10 puts related to region state update + Thread[] th = new Thread[10]; + List nameList = new ArrayList(); + List tableNameList = new ArrayList(); + for (int i = 0; i < th.length; i++) { + th[i] = new Thread() { + @Override + public void run() { + HRegionInfo[] hri = new HRegionInfo[10]; + ServerName serverName = ServerName.valueOf("dummyhost", 1000, 1234); + for (int i = 0; i < 10; i++) { + hri[i] = new HRegionInfo(TableName.valueOf(Thread.currentThread().getName() + "_" + i)); + RegionState newState = new RegionState(hri[i], RegionState.State.OPEN, serverName); + RegionState oldState = + new RegionState(hri[i], RegionState.State.PENDING_OPEN, serverName); + rss.updateRegionState(1, newState, oldState); + } + } + }; + th[i].start(); + nameList.add(th[i].getName()); + } + for (int i = 0; i < th.length; i++) { + th[i].join(); + } + // Add all the expected table names in meta to tableNameList + for (String name : nameList) { + for (int i = 0; i < 10; i++) { + tableNameList.add(TableName.valueOf(name + "_" + i)); + } + } + List metaRows = + MetaReader.fullScan(TEST_UTIL.getMiniHBaseCluster().getMaster().getCatalogTracker()); + int count = 0; + // Check all 100 rows are in meta + for (Result result : metaRows) { + if (tableNameList.contains(HRegionInfo.getTable(result.getRow()))) { + count++; + if (count == 100) { + break; + } + } + } + assertTrue(count == 100); + rss.stop(); + } + static class MyLoadBalancer extends StochasticLoadBalancer { // For this region, if specified, always assign to nowhere static volatile String controledRegion = null;