From af0f4811005d5e0336cbafc4cf8585306f608f01 Mon Sep 17 00:00:00 2001 From: Mikhail Antonov Date: Fri, 27 Mar 2015 18:23:19 -0700 Subject: [PATCH] HBASE-13252 Get rid of managed connections and connection caching --- conf/log4j.properties | 4 +- .../org/apache/hadoop/hbase/MetaTableAccessor.java | 15 - .../apache/hadoop/hbase/client/AsyncProcess.java | 6 +- .../hadoop/hbase/client/ClusterConnection.java | 8 +- .../hadoop/hbase/client/ConnectionAdapter.java | 7 +- .../hadoop/hbase/client/ConnectionFactory.java | 12 +- .../hbase/client/ConnectionImplementation.java | 201 ++++++--- .../hadoop/hbase/client/ConnectionManager.java | 465 --------------------- .../hadoop/hbase/client/ConnectionUtils.java | 4 +- .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 4 +- .../apache/hadoop/hbase/client/HConnection.java | 23 +- .../apache/hadoop/hbase/client/HConnectionKey.java | 146 ------- .../org/apache/hadoop/hbase/client/HTable.java | 9 +- .../apache/hadoop/hbase/client/MultiAction.java | 2 +- .../client/NeedUnmanagedConnectionException.java | 31 -- .../hbase/client/ReversedScannerCallable.java | 2 - .../hadoop/hbase/client/ScannerCallable.java | 2 - .../hadoop/hbase/client/TestAsyncProcess.java | 2 +- .../hadoop/hbase/client/TestClientNoCluster.java | 44 +- hbase-common/src/test/resources/log4j.properties | 2 +- .../hbase/client/CoprocessorHConnection.java | 10 +- .../hbase/mapreduce/TableInputFormatBase.java | 22 +- .../hbase/ServerResourceCheckerJUnitListener.java | 15 - .../hbase/TestMetaTableAccessorNoCluster.java | 4 +- .../hbase/client/HConnectionTestingUtility.java | 53 +-- .../org/apache/hadoop/hbase/client/TestAdmin2.java | 21 - .../hadoop/hbase/client/TestFromClientSide.java | 2 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 174 +------- .../hadoop/hbase/client/TestMetaWithReplicas.java | 14 +- 29 files changed, 219 insertions(+), 1085 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/NeedUnmanagedConnectionException.java diff --git a/conf/log4j.properties b/conf/log4j.properties index 8d6badb..cd417d7 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -83,7 +83,7 @@ log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO #log4j.logger.org.apache.hadoop.dfs=DEBUG # Set this class to log INFO only otherwise its OTT # Enable this to get detailed connection error/retry logging. -# log4j.logger.org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation=TRACE +# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE # Uncomment this line to enable tracing on _every_ RPC call (this can be a lot of output) @@ -91,4 +91,4 @@ log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO # Uncomment the below if you want to remove logging of client region caching' # and scan of hbase:meta messages -# log4j.logger.org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation=INFO +# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=INFO diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 88cc25e..d18239b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -40,14 +40,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionReplicaUtil; @@ -254,19 +252,6 @@ public class MetaTableAccessor { if (connection == null || connection.isClosed()) { throw new NullPointerException("No connection"); } - // If the passed in 'connection' is 'managed' -- i.e. every second test uses - // a Table or an HBaseAdmin with managed connections -- then doing - // connection.getTable will throw an exception saying you are NOT to use - // managed connections getting tables. Leaving this as it is for now. Will - // revisit when inclined to change all tests. User code probaby makes use of - // managed connections too so don't change it till post hbase 1.0. - // - // There should still be a way to use this method with an unmanaged connection. - if (connection instanceof ClusterConnection) { - if (((ClusterConnection) connection).isManaged()) { - throw new NeedUnmanagedConnectionException(); - } - } return connection.getTable(TableName.META_TABLE_NAME); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 1900a25..10f2330 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -744,7 +744,7 @@ class AsyncProcess { private final Batch.Callback callback; private final BatchErrors errors; - private final ConnectionManager.ServerErrorTracker errorsByServer; + private final ConnectionImplementation.ServerErrorTracker errorsByServer; private final ExecutorService pool; private final Set> callsInProgress; @@ -1743,8 +1743,8 @@ class AsyncProcess { * We may benefit from connection-wide tracking of server errors. * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection */ - protected ConnectionManager.ServerErrorTracker createServerErrorTracker() { - return new ConnectionManager.ServerErrorTracker( + protected ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { + return new ConnectionImplementation.ServerErrorTracker( this.serverTrackerTimeout, this.numTries); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index f0398f9..07b055a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -288,12 +288,6 @@ public interface ClusterConnection extends HConnection { RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf); /** - * - * @return true if this is a managed connection. - */ - boolean isManaged(); - - /** * @return the current statistics tracker associated with this connection */ ServerStatisticTracker getStatisticsTracker(); @@ -302,4 +296,4 @@ public interface ClusterConnection extends HConnection { * @return the configured client backoff policy */ ClientBackoffPolicy getBackoffPolicy(); -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index e1458b8..1d8a793 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; * A convenience to override when customizing method implementations. * * - * @see ConnectionUtils#createShortCircuitHConnection(HConnection, ServerName, + * @see ConnectionUtils#createShortCircuitHConnection(Connection, ServerName, * AdminService.BlockingInterface, ClientService.BlockingInterface) for case where we make * Connections skip RPC if request is to local server. */ @@ -456,11 +456,6 @@ abstract class ConnectionAdapter implements ClusterConnection { } @Override - public boolean isManaged() { - return wrappedConnection.isManaged(); - } - - @Override public ServerStatisticTracker getStatisticsTracker() { return wrappedConnection.getStatisticsTracker(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index fd23d58..3e8ca31 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -214,15 +214,9 @@ public class ConnectionFactory { user = provider.getCurrent(); } - return createConnection(conf, false, pool, user); - } - - static Connection createConnection(final Configuration conf, final boolean managed, - final ExecutorService pool, final User user) - throws IOException { String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL, ConnectionImplementation.class.getName()); - Class clazz = null; + Class clazz; try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { @@ -232,9 +226,9 @@ public class ConnectionFactory { // Default HCM#HCI is not accessible; make it so before invoking. Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, - boolean.class, ExecutorService.class, User.class); + ExecutorService.class, User.class); constructor.setAccessible(true); - return (Connection) constructor.newInstance(conf, managed, pool, user); + return (Connection) constructor.newInstance(conf, pool, user); } catch (Exception e) { throw new IOException(e); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 93ddea9..71653af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -73,8 +74,10 @@ import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -90,6 +93,7 @@ import java.util.concurrent.atomic.AtomicInteger; justification="Access to the conncurrent hash map is under a lock so should be fine.") @InterfaceAudience.Private class ConnectionImplementation implements ClusterConnection, Closeable { + public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; static final Log LOG = LogFactory.getLog(ConnectionImplementation.class); private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled"; @@ -149,9 +153,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private int refCount; - // indicates whether this connection's life cycle is managed (by us) - private boolean managed; - private User user; private RpcRetryingCallerFactory rpcCallerFactory; @@ -167,27 +168,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private final ClientBackoffPolicy backoffPolicy; - ConnectionImplementation(Configuration conf, boolean managed) throws IOException { - this(conf, managed, null, null); - } - /** * constructor * @param conf Configuration object - * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection - * to zk and shutdown of all services; we just close down the resources this connection was - * responsible for and decrement usage counters. It is up to the caller to do the full - * cleanup. It is set when we want have connection sharing going on -- reuse of zk connection, - * and cached region locations, established regionserver connections, etc. When connections - * are shared, we have reference counting going on and will only do full cleanup when no more - * users of an ConnectionImplementation instance. */ - ConnectionImplementation(Configuration conf, boolean managed, + ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException { this(conf); this.user = user; this.batchPool = pool; - this.managed = managed; this.registry = setupRegistry(); retrieveClusterId(); @@ -228,11 +217,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, - HConstants.DEFAULT_USE_META_REPLICAS); + HConstants.DEFAULT_USE_META_REPLICAS); this.numTries = tableConfig.getRetriesNumber(); this.rpcTimeout = conf.getInt( - HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { synchronized (nonceGeneratorCreateLock) { if (nonceGenerator == null) { @@ -240,7 +229,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } } else { - nonceGenerator = new ConnectionManager.NoNonceGenerator(); + nonceGenerator = new NoNonceGenerator(); } stats = ServerStatisticTracker.create(conf); this.asyncProcess = createAsyncProcess(this.conf); @@ -259,12 +248,52 @@ class ConnectionImplementation implements ClusterConnection, Closeable { ClusterConnection conn, NonceGenerator cnm) { ConnectionImplementation connImpl = (ConnectionImplementation)conn; NonceGenerator ng = connImpl.getNonceGenerator(); - ConnectionManager.LOG.warn("Nonce generator is being replaced by test code for " + LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName()); nonceGenerator = cnm; return ng; } + /** + * Look for an exception we know in the remote exception: + * - hadoop.ipc wrapped exceptions + * - nested exceptions + * + * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException + * @return null if we didn't find the exception, the exception otherwise. + */ + public static Throwable findException(Object exception) { + if (exception == null || !(exception instanceof Throwable)) { + return null; + } + Throwable cur = (Throwable) exception; + while (cur != null) { + if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException + || cur instanceof RegionTooBusyException) { + return cur; + } + if (cur instanceof RemoteException) { + RemoteException re = (RemoteException) cur; + cur = re.unwrapRemoteException( + RegionOpeningException.class, RegionMovedException.class, + RegionTooBusyException.class); + if (cur == null) { + cur = re.unwrapRemoteException(); + } + // unwrapRemoteException can return the exception given as a parameter when it cannot + // unwrap it. In this case, there is no need to look further + // noinspection ObjectEquality + if (cur == re) { + return null; + } + } else { + cur = cur.getCause(); + } + } + + return null; + } + @Override public HTableInterface getTable(String tableName) throws IOException { return getTable(TableName.valueOf(tableName)); @@ -292,9 +321,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { @Override public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException { - if (managed) { - throw new NeedUnmanagedConnectionException(); - } return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool); } @@ -327,9 +353,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { @Override public Admin getAdmin() throws IOException { - if (managed) { - throw new NeedUnmanagedConnectionException(); - } return new HBaseAdmin(this); } @@ -540,14 +563,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable { @Override public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys) throws IOException { + if (this.closed) throw new IOException(toString() + " closed"); try { if (!isTableEnabled(tableName)) { LOG.debug("Table " + tableName + " not enabled"); return false; } - ClusterConnection connection = ConnectionManager.getConnectionInternal(getConfiguration()); - List> locations = MetaTableAccessor - .getTableRegionsAndLocations(connection, tableName, true); + List> locations = + MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true); + int notDeployed = 0; int regionCount = 0; for (Pair pair : locations) { @@ -1004,6 +1028,99 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } + /** Dummy nonce generator for disabled nonces. */ + static class NoNonceGenerator implements NonceGenerator { + @Override + public long getNonceGroup() { + return HConstants.NO_NONCE; + } + @Override + public long newNonce() { + return HConstants.NO_NONCE; + } + } + + /** + * The record of errors for servers. + */ + static class ServerErrorTracker { + // We need a concurrent map here, as we could have multiple threads updating it in parallel. + private final ConcurrentMap errorsByServer = + new ConcurrentHashMap(); + private final long canRetryUntil; + private final int maxRetries; + private final long startTrackingTime; + + public ServerErrorTracker(long timeout, int maxRetries) { + this.maxRetries = maxRetries; + this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; + this.startTrackingTime = new Date().getTime(); + } + + /** + * We stop to retry when we have exhausted BOTH the number of retries and the time allocated. + */ + boolean canRetryMore(int numRetry) { + // If there is a single try we must not take into account the time. + return numRetry < maxRetries || (maxRetries > 1 && + EnvironmentEdgeManager.currentTime() < this.canRetryUntil); + } + + /** + * Calculates the back-off time for a retrying request to a particular server. + * + * @param server The server in question. + * @param basePause The default hci pause. + * @return The time to wait before sending next request. + */ + long calculateBackoffTime(ServerName server, long basePause) { + long result; + ServerErrors errorStats = errorsByServer.get(server); + if (errorStats != null) { + result = ConnectionUtils.getPauseTime(basePause, errorStats.getCount()); + } else { + result = 0; // yes, if the server is not in our list we don't wait before retrying. + } + return result; + } + + /** + * Reports that there was an error on the server to do whatever bean-counting necessary. + * + * @param server The server in question. + */ + void reportServerError(ServerName server) { + ServerErrors errors = errorsByServer.get(server); + if (errors != null) { + errors.addError(); + } else { + errors = errorsByServer.putIfAbsent(server, new ServerErrors()); + if (errors != null){ + errors.addError(); + } + } + } + + long getStartTrackingTime() { + return startTrackingTime; + } + + /** + * The record of errors for a server. + */ + private static class ServerErrors { + private final AtomicInteger retries = new AtomicInteger(0); + + public int getCount() { + return retries.get(); + } + + public void addError() { + retries.incrementAndGet(); + } + } + } + /** * Makes a client-side stub for master services. Sub-class to specialize. * Depends on hosting class so not static. Exists so we avoid duplicating a bunch of code @@ -1701,7 +1818,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } HRegionInfo regionInfo = oldLocation.getRegionInfo(); - Throwable cause = ConnectionManager.findException(exception); + Throwable cause = findException(exception); if (cause != null) { if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException) { // We know that the region is still on this region server @@ -1955,15 +2072,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { @Override public void close() { - if (managed) { - if (aborted) { - ConnectionManager.deleteStaleConnection(this); - } else { - ConnectionManager.deleteConnection(this, false); - } - } else { - internalClose(); - } + internalClose(); } /** @@ -2026,7 +2135,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { MasterKeepAliveConnection master = getKeepAliveMasterService(); try { return ProtobufUtil.getTableNameArray(master.getTableNames(null, - MasterProtos.GetTableNamesRequest.newBuilder().build()) + MasterProtos.GetTableNamesRequest.newBuilder().build()) .getTableNamesList()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); @@ -2116,8 +2225,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable { @Override public TableState getTableState(TableName tableName) throws IOException { - ClusterConnection conn = ConnectionManager.getConnectionInternal(getConfiguration()); - TableState tableState = MetaTableAccessor.getTableState(conn, tableName); + if (this.closed) throw new IOException(toString() + " closed"); + + TableState tableState = MetaTableAccessor.getTableState(this, tableName); if (tableState == null) throw new TableNotFoundException(tableName); return tableState; @@ -2128,9 +2238,4 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return RpcRetryingCallerFactory .instantiate(conf, this.interceptor, this.getStatisticsTracker()); } - - @Override - public boolean isManaged() { - return managed; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java deleted file mode 100644 index 4eacf7b..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ /dev/null @@ -1,465 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.RegionTooBusyException; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.RegionMovedException; -import org.apache.hadoop.hbase.exceptions.RegionOpeningException; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ExceptionUtil; -import org.apache.hadoop.ipc.RemoteException; - -/** - * An internal, non-instantiable class that manages creation of {@link HConnection}s. - */ -@SuppressWarnings("serial") -@InterfaceAudience.Private -// NOTE: DO NOT make this class public. It was made package-private on purpose. -final class ConnectionManager { - static final Log LOG = LogFactory.getLog(ConnectionManager.class); - - public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; - - // An LRU Map of HConnectionKey -> HConnection (TableServer). All - // access must be synchronized. This map is not private because tests - // need to be able to tinker with it. - static final Map CONNECTION_INSTANCES; - - public static final int MAX_CACHED_CONNECTION_INSTANCES; - - static { - // We set instances to one more than the value specified for {@link - // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max - // connections to the ensemble from the one client is 30, so in that case we - // should run into zk issues before the LRU hit this value of 31. - MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt( - HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1; - CONNECTION_INSTANCES = new LinkedHashMap( - (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) { - @Override - protected boolean removeEldestEntry( - Map.Entry eldest) { - return size() > MAX_CACHED_CONNECTION_INSTANCES; - } - }; - } - - /** Dummy nonce generator for disabled nonces. */ - static class NoNonceGenerator implements NonceGenerator { - @Override - public long getNonceGroup() { - return HConstants.NO_NONCE; - } - @Override - public long newNonce() { - return HConstants.NO_NONCE; - } - } - - /* - * Non-instantiable. - */ - private ConnectionManager() { - super(); - } - - /** - * Get the connection that goes with the passed conf configuration instance. - * If no current connection exists, method creates a new connection and keys it using - * connection-specific properties from the passed {@link Configuration}; see - * {@link HConnectionKey}. - * @param conf configuration - * @return HConnection object for conf - * @throws ZooKeeperConnectionException - * @deprecated connection caching is going away. - */ - @Deprecated - public static HConnection getConnection(final Configuration conf) throws IOException { - return getConnectionInternal(conf); - } - - - static ClusterConnection getConnectionInternal(final Configuration conf) - throws IOException { - HConnectionKey connectionKey = new HConnectionKey(conf); - synchronized (CONNECTION_INSTANCES) { - ConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); - if (connection == null) { - connection = (ConnectionImplementation) ConnectionFactory.createConnection(conf); - CONNECTION_INSTANCES.put(connectionKey, connection); - } else if (connection.isClosed()) { - ConnectionManager.deleteConnection(connectionKey, true); - connection = (ConnectionImplementation) ConnectionFactory.createConnection(conf); - CONNECTION_INSTANCES.put(connectionKey, connection); - } - connection.incCount(); - return connection; - } - } - - /** - * Create a new HConnection instance using the passed conf instance. - *

Note: This bypasses the usual HConnection life cycle management done by - * {@link #getConnection(Configuration)}. The caller is responsible for - * calling {@link HConnection#close()} on the returned connection instance. - * - * This is the recommended way to create HConnections. - * {@code - * HConnection connection = ConnectionManagerInternal.createConnection(conf); - * HTableInterface table = connection.getTable("mytable"); - * table.get(...); - * ... - * table.close(); - * connection.close(); - * } - * - * @param conf configuration - * @return HConnection object for conf - * @throws ZooKeeperConnectionException - */ - public static HConnection createConnection(Configuration conf) throws IOException { - return createConnectionInternal(conf); - } - - static ClusterConnection createConnectionInternal(Configuration conf) throws IOException { - UserProvider provider = UserProvider.instantiate(conf); - return createConnection(conf, false, null, provider.getCurrent()); - } - - /** - * Create a new HConnection instance using the passed conf instance. - *

Note: This bypasses the usual HConnection life cycle management done by - * {@link #getConnection(Configuration)}. The caller is responsible for - * calling {@link HConnection#close()} on the returned connection instance. - * This is the recommended way to create HConnections. - * {@code - * ExecutorService pool = ...; - * HConnection connection = ConnectionManager.createConnection(conf, pool); - * HTableInterface table = connection.getTable("mytable"); - * table.get(...); - * ... - * table.close(); - * connection.close(); - * } - * @param conf configuration - * @param pool the thread pool to use for batch operation in HTables used via this HConnection - * @return HConnection object for conf - * @throws ZooKeeperConnectionException - */ - public static HConnection createConnection(Configuration conf, ExecutorService pool) - throws IOException { - UserProvider provider = UserProvider.instantiate(conf); - return createConnection(conf, false, pool, provider.getCurrent()); - } - - /** - * Create a new HConnection instance using the passed conf instance. - *

Note: This bypasses the usual HConnection life cycle management done by - * {@link #getConnection(Configuration)}. The caller is responsible for - * calling {@link HConnection#close()} on the returned connection instance. - * This is the recommended way to create HConnections. - * {@code - * ExecutorService pool = ...; - * HConnection connection = ConnectionManager.createConnection(conf, pool); - * HTableInterface table = connection.getTable("mytable"); - * table.get(...); - * ... - * table.close(); - * connection.close(); - * } - * @param conf configuration - * @param user the user the connection is for - * @return HConnection object for conf - * @throws ZooKeeperConnectionException - */ - public static HConnection createConnection(Configuration conf, User user) - throws IOException { - return createConnection(conf, false, null, user); - } - - /** - * Create a new HConnection instance using the passed conf instance. - *

Note: This bypasses the usual HConnection life cycle management done by - * {@link #getConnection(Configuration)}. The caller is responsible for - * calling {@link HConnection#close()} on the returned connection instance. - * This is the recommended way to create HConnections. - * {@code - * ExecutorService pool = ...; - * HConnection connection = ConnectionManager.createConnection(conf, pool); - * HTableInterface table = connection.getTable("mytable"); - * table.get(...); - * ... - * table.close(); - * connection.close(); - * } - * @param conf configuration - * @param pool the thread pool to use for batch operation in HTables used via this HConnection - * @param user the user the connection is for - * @return HConnection object for conf - * @throws ZooKeeperConnectionException - */ - public static HConnection createConnection(Configuration conf, ExecutorService pool, User user) - throws IOException { - return createConnection(conf, false, pool, user); - } - - /** - * @deprecated instead use one of the {@link ConnectionFactory#createConnection()} methods. - */ - @Deprecated - static HConnection createConnection(final Configuration conf, final boolean managed) - throws IOException { - UserProvider provider = UserProvider.instantiate(conf); - return createConnection(conf, managed, null, provider.getCurrent()); - } - - /** - * @deprecated instead use one of the {@link ConnectionFactory#createConnection()} methods. - */ - @Deprecated - static ClusterConnection createConnection(final Configuration conf, final boolean managed, - final ExecutorService pool, final User user) - throws IOException { - return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user); - } - - /** - * Cleanup a known stale connection. - * This will then close connection to the zookeeper ensemble and let go of all resources. - * - * @param connection - * @deprecated connection caching is going away. - */ - @Deprecated - public static void deleteStaleConnection(HConnection connection) { - deleteConnection(connection, true); - } - - /** - * @deprecated connection caching is going away. - */ - @Deprecated - static void deleteConnection(HConnection connection, boolean staleConnection) { - synchronized (CONNECTION_INSTANCES) { - for (Entry e: CONNECTION_INSTANCES.entrySet()) { - if (e.getValue() == connection) { - deleteConnection(e.getKey(), staleConnection); - break; - } - } - } - } - - /** - * @deprecated connection caching is going away. -˙ */ - @Deprecated - private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) { - synchronized (CONNECTION_INSTANCES) { - ConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); - if (connection != null) { - connection.decCount(); - if (connection.isZeroReference() || staleConnection) { - CONNECTION_INSTANCES.remove(connectionKey); - connection.internalClose(); - } - } else { - LOG.error("Connection not found in the list, can't delete it "+ - "(connection key=" + connectionKey + "). May be the key was modified?", new Exception()); - } - } - } - - - /** - * This convenience method invokes the given {@link HConnectable#connect} - * implementation using a {@link HConnection} instance that lasts just for the - * duration of the invocation. - * - * @param the return type of the connect method - * @param connectable the {@link HConnectable} instance - * @return the value returned by the connect method - * @throws IOException - */ - @InterfaceAudience.Private - public static T execute(HConnectable connectable) throws IOException { - if (connectable == null || connectable.conf == null) { - return null; - } - Configuration conf = connectable.conf; - HConnection connection = getConnection(conf); - boolean connectSucceeded = false; - try { - T returnValue = connectable.connect(connection); - connectSucceeded = true; - return returnValue; - } finally { - try { - connection.close(); - } catch (Exception e) { - ExceptionUtil.rethrowIfInterrupt(e); - if (connectSucceeded) { - throw new IOException("The connection to " + connection - + " could not be deleted.", e); - } - } - } - } - - /** - * The record of errors for servers. - */ - static class ServerErrorTracker { - // We need a concurrent map here, as we could have multiple threads updating it in parallel. - private final ConcurrentMap errorsByServer = - new ConcurrentHashMap(); - private final long canRetryUntil; - private final int maxRetries; - private final long startTrackingTime; - - public ServerErrorTracker(long timeout, int maxRetries) { - this.maxRetries = maxRetries; - this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; - this.startTrackingTime = new Date().getTime(); - } - - /** - * We stop to retry when we have exhausted BOTH the number of retries and the time allocated. - */ - boolean canRetryMore(int numRetry) { - // If there is a single try we must not take into account the time. - return numRetry < maxRetries || (maxRetries > 1 && - EnvironmentEdgeManager.currentTime() < this.canRetryUntil); - } - - /** - * Calculates the back-off time for a retrying request to a particular server. - * - * @param server The server in question. - * @param basePause The default hci pause. - * @return The time to wait before sending next request. - */ - long calculateBackoffTime(ServerName server, long basePause) { - long result; - ServerErrors errorStats = errorsByServer.get(server); - if (errorStats != null) { - result = ConnectionUtils.getPauseTime(basePause, errorStats.getCount()); - } else { - result = 0; // yes, if the server is not in our list we don't wait before retrying. - } - return result; - } - - /** - * Reports that there was an error on the server to do whatever bean-counting necessary. - * - * @param server The server in question. - */ - void reportServerError(ServerName server) { - ServerErrors errors = errorsByServer.get(server); - if (errors != null) { - errors.addError(); - } else { - errors = errorsByServer.putIfAbsent(server, new ServerErrors()); - if (errors != null){ - errors.addError(); - } - } - } - - long getStartTrackingTime() { - return startTrackingTime; - } - - /** - * The record of errors for a server. - */ - private static class ServerErrors { - private final AtomicInteger retries = new AtomicInteger(0); - - public int getCount() { - return retries.get(); - } - - public void addError() { - retries.incrementAndGet(); - } - } - } - - /** - * Look for an exception we know in the remote exception: - * - hadoop.ipc wrapped exceptions - * - nested exceptions - * - * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException - * @return null if we didn't find the exception, the exception otherwise. - */ - public static Throwable findException(Object exception) { - if (exception == null || !(exception instanceof Throwable)) { - return null; - } - Throwable cur = (Throwable) exception; - while (cur != null) { - if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException - || cur instanceof RegionTooBusyException) { - return cur; - } - if (cur instanceof RemoteException) { - RemoteException re = (RemoteException) cur; - cur = re.unwrapRemoteException( - RegionOpeningException.class, RegionMovedException.class, - RegionTooBusyException.class); - if (cur == null) { - cur = re.unwrapRemoteException(); - } - // unwrapRemoteException can return the exception given as a parameter when it cannot - // unwrap it. In this case, there is no need to look further - // noinspection ObjectEquality - if (cur == re) { - return null; - } - } else { - cur = cur.getCause(); - } - } - - return null; - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index f12c33e..323915b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -147,9 +147,9 @@ public final class ConnectionUtils { * region re-lookups. */ static class MasterlessConnection extends ConnectionImplementation { - MasterlessConnection(Configuration conf, boolean managed, + MasterlessConnection(Configuration conf, ExecutorService pool, User user) throws IOException { - super(conf, managed, pool, user); + super(conf, pool, user); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index c90ce93..c0877a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -204,9 +204,7 @@ public class HBaseAdmin implements Admin { @Deprecated public HBaseAdmin(Configuration c) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { - // Will not leak connections, as the new implementation of the constructor - // does not throw exceptions anymore. - this(ConnectionManager.getConnectionInternal(new Configuration(c))); + this(ConnectionFactory.createConnection(new Configuration(c))); this.cleanupConnectionOnClose = true; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index e4f05b0..cc5e9fa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -45,15 +45,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; * connections are managed at a lower level. * *

HConnections are used by {@link HTable} mostly but also by - * {@link HBaseAdmin}, and {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}. - * HConnection instances can be shared. Sharing - * is usually what you want because rather than each HConnection instance - * having to do its own discovery of regions out on the cluster, instead, all - * clients get to share the one cache of locations. {@link ConnectionManager} does the - * sharing for you if you go by it getting connections. Sharing makes cleanup of - * HConnections awkward. See {@link ConnectionFactory} for cleanup discussion. + * {@link HBaseAdmin}, and {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}. * - * @see ConnectionManager * @see ConnectionFactory * @deprecated in favor of {@link Connection} and {@link ConnectionFactory} */ @@ -79,7 +72,6 @@ public interface HConnection extends Connection { * be created for each using thread. * This is a lightweight operation, pooling or caching of the returned HTableInterface * is neither required nor desired. - * Note that the HConnection needs to be unmanaged * (created with {@link ConnectionFactory#createConnection(Configuration)}). * @param tableName * @return an HTable to use for interactions with this table @@ -92,7 +84,6 @@ public interface HConnection extends Connection { * be created for each using thread. * This is a lightweight operation, pooling or caching of the returned HTableInterface * is neither required nor desired. - * Note that the HConnection needs to be unmanaged * (created with {@link ConnectionFactory#createConnection(Configuration)}). * @param tableName * @return an HTable to use for interactions with this table @@ -105,7 +96,6 @@ public interface HConnection extends Connection { * be created for each using thread. * This is a lightweight operation, pooling or caching of the returned HTableInterface * is neither required nor desired. - * Note that the HConnection needs to be unmanaged * (created with {@link ConnectionFactory#createConnection(Configuration)}). * @param tableName * @return an HTable to use for interactions with this table @@ -119,7 +109,6 @@ public interface HConnection extends Connection { * be created for each using thread. * This is a lightweight operation, pooling or caching of the returned HTableInterface * is neither required nor desired. - * Note that the HConnection needs to be unmanaged * (created with {@link ConnectionFactory#createConnection(Configuration)}). * @param tableName * @param pool The thread pool to use for batch operations, null to use a default pool. @@ -133,7 +122,6 @@ public interface HConnection extends Connection { * be created for each using thread. * This is a lightweight operation, pooling or caching of the returned HTableInterface * is neither required nor desired. - * Note that the HConnection needs to be unmanaged * (created with {@link ConnectionFactory#createConnection(Configuration)}). * @param tableName * @param pool The thread pool to use for batch operations, null to use a default pool. @@ -147,9 +135,8 @@ public interface HConnection extends Connection { * be created for each using thread. * This is a lightweight operation, pooling or caching of the returned HTableInterface * is neither required nor desired. - * Note that the HConnection needs to be unmanaged * (created with {@link ConnectionFactory#createConnection(Configuration)}). - * @param tableName + * @param tableName table to get interface for * @param pool The thread pool to use for batch operations, null to use a default pool. * @return an HTable to use for interactions with this table */ @@ -162,10 +149,6 @@ public interface HConnection extends Connection { * * This is a lightweight operation. Pooling or caching of the returned RegionLocator is neither * required nor desired. - * - * RegionLocator needs to be unmanaged - * (created with {@link ConnectionFactory#createConnection(Configuration)}). - * * @param tableName Name of the table who's region is to be examined * @return A RegionLocator instance */ @@ -176,7 +159,7 @@ public interface HConnection extends Connection { * Retrieve an Admin implementation to administer an HBase cluster. * The returned Admin is not guaranteed to be thread-safe. A new instance should be created for * each using thread. This is a lightweight operation. Pooling or caching of the returned - * Admin is not recommended. Note that HConnection needs to be unmanaged + * Admin is not recommended. * * @return an Admin instance for cluster administration */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java deleted file mode 100644 index f37690c..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; - -/** - * Denotes a unique key to an {@link HConnection} instance. - * - * In essence, this class captures the properties in {@link Configuration} - * that may be used in the process of establishing a connection. In light of - * that, if any new such properties are introduced into the mix, they must be - * added to the {@link HConnectionKey#properties} list. - * - */ -class HConnectionKey { - final static String[] CONNECTION_PROPERTIES = new String[] { - HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.ZOOKEEPER_CLIENT_PORT, - HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, - HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.HBASE_META_SCANNER_CACHING, - HConstants.HBASE_CLIENT_INSTANCE_ID, - HConstants.RPC_CODEC_CONF_KEY, - HConstants.USE_META_REPLICAS, - RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY}; - - private Map properties; - private String username; - - HConnectionKey(Configuration conf) { - Map m = new HashMap(); - if (conf != null) { - for (String property : CONNECTION_PROPERTIES) { - String value = conf.get(property); - if (value != null) { - m.put(property, value); - } - } - } - this.properties = Collections.unmodifiableMap(m); - - try { - UserProvider provider = UserProvider.instantiate(conf); - User currentUser = provider.getCurrent(); - if (currentUser != null) { - username = currentUser.getName(); - } - } catch (IOException ioe) { - ConnectionManager.LOG.warn( - "Error obtaining current user, skipping username in HConnectionKey", ioe); - } - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - if (username != null) { - result = username.hashCode(); - } - for (String property : CONNECTION_PROPERTIES) { - String value = properties.get(property); - if (value != null) { - result = prime * result + value.hashCode(); - } - } - - return result; - } - - - @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="ES_COMPARING_STRINGS_WITH_EQ", - justification="Optimization") - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HConnectionKey that = (HConnectionKey) obj; - if (this.username != null && !this.username.equals(that.username)) { - return false; - } else if (this.username == null && that.username != null) { - return false; - } - if (this.properties == null) { - if (that.properties != null) { - return false; - } - } else { - if (that.properties == null) { - return false; - } - for (String property : CONNECTION_PROPERTIES) { - String thisValue = this.properties.get(property); - String thatValue = that.properties.get(property); - //noinspection StringEquality - if (thisValue == thatValue) { - continue; - } - if (thisValue == null || !thisValue.equals(thatValue)) { - return false; - } - } - } - return true; - } - - @Override - public String toString() { - return "HConnectionKey{" + - "properties=" + properties + - ", username='" + username + '\'' + - '}'; - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index c77e2ae..f6dabd0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -319,12 +319,9 @@ public class HTable implements HTableInterface { @Deprecated public static boolean isTableEnabled(Configuration conf, final TableName tableName) throws IOException { - return ConnectionManager.execute(new HConnectable(conf) { - @Override - public Boolean connect(HConnection connection) throws IOException { - return connection.isTableEnabled(tableName); - } - }); + try(Connection conn = ConnectionFactory.createConnection(conf)) { + return conn.getAdmin().isTableEnabled(tableName); + } } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index 6110f0d..6d155ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes; /** * Container for Actions (i.e. Get, Delete, or Put), which are grouped by - * regionName. Intended to be used with ConnectionManager.processBatch() + * regionName. Intended to be used with {@link AsyncProcess}. */ @InterfaceAudience.Private public final class MultiAction { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NeedUnmanagedConnectionException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NeedUnmanagedConnectionException.java deleted file mode 100644 index 6ea0688..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NeedUnmanagedConnectionException.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Used for internal signalling that a Connection implementation needs to be - * user-managed to be used for particular request types. - */ -@InterfaceAudience.Private -public class NeedUnmanagedConnectionException extends DoNotRetryIOException { - private static final long serialVersionUID = 1876775844L; - - public NeedUnmanagedConnectionException() { - super("The connection has to be unmanaged."); - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index a9c903e..0c2d345 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -129,8 +129,6 @@ public class ReversedScannerCallable extends ScannerCallable { } // check how often we retry. - // ConnectionManager will call instantiateServer with reload==true - // if and only if for retries. if (reload && this.scanMetrics != null) { this.scanMetrics.countOfRPCRetries.incrementAndGet(); if (isRegionServerRemote) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 226782c..402cfed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -153,8 +153,6 @@ public class ScannerCallable extends RegionServerCallable { } // check how often we retry. - // ConnectionManager will call instantiateServer with reload==true - // if and only if for retries. if (reload && this.scanMetrics != null) { this.scanMetrics.countOfRPCRetries.incrementAndGet(); if (isRegionServerRemote) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 15ed6ec..52d66d9 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -798,7 +798,7 @@ public class TestAsyncProcess { ClusterConnection conn = new MyConnectionImpl(configuration); BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE)); - configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true); + configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true); mutator.ap = ap; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index 9671ea6..f085ace 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -261,45 +261,13 @@ public class TestClientNoCluster extends Configured implements Tool { /** * Override to shutdown going to zookeeper for cluster id and meta location. */ - static class ScanOpenNextThenExceptionThenRecoverConnection - extends ConnectionImplementation { - final ClientService.BlockingInterface stub; - - ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf, - boolean managed, ExecutorService pool) throws IOException { - super(conf, managed); - // Mock up my stub so open scanner returns a scanner id and then on next, we throw - // exceptions for three times and then after that, we return no more to scan. - this.stub = Mockito.mock(ClientService.BlockingInterface.class); - long sid = 12345L; - try { - Mockito.when(stub.scan((RpcController)Mockito.any(), - (ClientProtos.ScanRequest)Mockito.any())). - thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()). - thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))). - thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid). - setMoreResults(false).build()); - } catch (ServiceException e) { - throw new IOException(e); - } - } - - @Override - public BlockingInterface getClient(ServerName sn) throws IOException { - return this.stub; - } - } - - /** - * Override to shutdown going to zookeeper for cluster id and meta location. - */ static class RegionServerStoppedOnScannerOpenConnection extends ConnectionImplementation { final ClientService.BlockingInterface stub; - RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed, + RegionServerStoppedOnScannerOpenConnection(Configuration conf, ExecutorService pool, User user) throws IOException { - super(conf, managed); + super(conf, pool, user); // Mock up my stub so open scanner returns a scanner id and then on next, we throw // exceptions for three times and then after that, we return no more to scan. this.stub = Mockito.mock(ClientService.BlockingInterface.class); @@ -329,9 +297,9 @@ public class TestClientNoCluster extends Configured implements Tool { extends ConnectionImplementation { final ClientService.BlockingInterface stub; - RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user) + RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user) throws IOException { - super(conf, managed); + super(conf, pool, user); // Mock up my stub so an exists call -- which turns into a get -- throws an exception this.stub = Mockito.mock(ClientService.BlockingInterface.class); try { @@ -364,10 +332,10 @@ public class TestClientNoCluster extends Configured implements Tool { final AtomicLong sequenceids = new AtomicLong(0); private final Configuration conf; - ManyServersManyRegionsConnection(Configuration conf, boolean managed, + ManyServersManyRegionsConnection(Configuration conf, ExecutorService pool, User user) throws IOException { - super(conf, managed, pool, user); + super(conf, pool, user); int serverCount = conf.getInt("hbase.test.servers", 10); this.serversByClient = new HashMap(serverCount); diff --git a/hbase-common/src/test/resources/log4j.properties b/hbase-common/src/test/resources/log4j.properties index 69171f7..13a95b4 100644 --- a/hbase-common/src/test/resources/log4j.properties +++ b/hbase-common/src/test/resources/log4j.properties @@ -63,4 +63,4 @@ log4j.logger.org.apache.hadoop.hbase=DEBUG log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR # Enable this to get detailed connection error/retry logging. -# log4j.logger.org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation=TRACE +# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index 52d22b7..4ed8add 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -40,14 +40,14 @@ import org.apache.hadoop.hbase.security.UserProvider; @InterfaceAudience.Private @InterfaceStability.Evolving public class CoprocessorHConnection extends ConnectionImplementation { - private static final NonceGenerator NO_NONCE_GEN = new ConnectionManager.NoNonceGenerator(); + private static final NonceGenerator NO_NONCE_GEN = new NoNonceGenerator(); /** - * Create an unmanaged {@link HConnection} based on the environment in which we are running the + * Create an {@link HConnection} based on the environment in which we are running the * coprocessor. The {@link HConnection} must be externally cleaned up (we bypass the usual HTable * cleanup mechanisms since we own everything). * @param env environment hosting the {@link HConnection} - * @return an unmanaged {@link HConnection}. + * @return instance of {@link HConnection}. * @throws IOException if we cannot create the connection */ public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env) @@ -60,7 +60,7 @@ public class CoprocessorHConnection extends ConnectionImplementation { return new CoprocessorHConnection((HRegionServer) services); } } - return ConnectionManager.createConnectionInternal(env.getConfiguration()); + return (ClusterConnection) ConnectionFactory.createConnection(env.getConfiguration()); } private final ServerName serverName; @@ -95,7 +95,7 @@ public class CoprocessorHConnection extends ConnectionImplementation { * @throws IOException if we cannot create the connection */ public CoprocessorHConnection(Configuration conf, HRegionServer server) throws IOException { - super(conf, false, null, UserProvider.instantiate(conf).getCurrent()); + super(conf, null, UserProvider.instantiate(conf).getCurrent()); this.server = server; this.serverName = server.getServerName(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index e27251a..def460f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -39,9 +39,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -613,24 +611,8 @@ extends InputFormat { protected void setHTable(HTable table) throws IOException { this.table = table; this.connection = table.getConnection(); - try { - this.regionLocator = table.getRegionLocator(); - this.admin = this.connection.getAdmin(); - } catch (NeedUnmanagedConnectionException exception) { - LOG.warn("You are using an HTable instance that relies on an HBase-managed Connection. " + - "This is usually due to directly creating an HTable, which is deprecated. Instead, you " + - "should create a Connection object and then request a Table instance from it. If you " + - "don't need the Table instance for your own use, you should instead use the " + - "TableInputFormatBase.initalizeTable method directly."); - LOG.info("Creating an additional unmanaged connection because user provided one can't be " + - "used for administrative actions. We'll close it when we close out the table."); - LOG.debug("Details about our failure to request an administrative interface.", exception); - // Do we need a "copy the settings from this Connection" method? are things like the User - // properly maintained by just looking again at the Configuration? - this.connection = ConnectionFactory.createConnection(this.connection.getConfiguration()); - this.regionLocator = this.connection.getRegionLocator(table.getName()); - this.admin = this.connection.getAdmin(); - } + this.regionLocator = table.getRegionLocator(); + this.admin = this.connection.getAdmin(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ServerResourceCheckerJUnitListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ServerResourceCheckerJUnitListener.java index 4e01b5e..4b750e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ServerResourceCheckerJUnitListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ServerResourceCheckerJUnitListener.java @@ -19,24 +19,9 @@ package org.apache.hadoop.hbase; -import org.apache.hadoop.hbase.ResourceChecker.Phase; -import org.apache.hadoop.hbase.client.HConnectionTestingUtility; - /** * Monitor the resources. use by the tests All resources in {@link ResourceCheckerJUnitListener} * plus the number of connection. */ public class ServerResourceCheckerJUnitListener extends ResourceCheckerJUnitListener { - - static class ConnectionCountResourceAnalyzer extends ResourceChecker.ResourceAnalyzer { - @Override - public int getVal(Phase phase) { - return HConnectionTestingUtility.getConnectionCount(); - } - } - - @Override - protected void addResourceAnalyzer(ResourceChecker rc) { - rc.addResourceAnalyzer(new ConnectionCountResourceAnalyzer()); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java index f70a0d7..eefadd8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java @@ -183,10 +183,8 @@ public class TestMetaTableAccessorNoCluster { // Return the RegionLocations object when locateRegion // The ugly format below comes of 'Important gotcha on spying real objects!' from // http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html - ClusterConnection cConnection = - HConnectionTestingUtility.getSpiedClusterConnection(UTIL.getConfiguration()); Mockito.doReturn(rl).when - (cConnection).locateRegion((TableName)Mockito.any(), (byte[])Mockito.any(), + (connection).locateRegion((TableName)Mockito.any(), (byte[])Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt()); // Now shove our HRI implementation into the spied-upon connection. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 0a534b0..06fdd7f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -38,7 +38,7 @@ public class HConnectionTestingUtility { /* * Not part of {@link HBaseTestingUtility} because this class is not * in same package as {@link HConnection}. Would have to reveal ugly - * {@link ConnectionManager} innards to HBaseTestingUtility to give it access. + * {@link ConnectionImplementation} innards to HBaseTestingUtility to give it access. */ /** * Get a Mocked {@link HConnection} that goes with the passed conf @@ -52,17 +52,9 @@ public class HConnectionTestingUtility { */ public static ClusterConnection getMockedConnection(final Configuration conf) throws ZooKeeperConnectionException { - HConnectionKey connectionKey = new HConnectionKey(conf); - synchronized (ConnectionManager.CONNECTION_INSTANCES) { - ConnectionImplementation connection = - ConnectionManager.CONNECTION_INSTANCES.get(connectionKey); - if (connection == null) { - connection = Mockito.mock(ConnectionImplementation.class); - Mockito.when(connection.getConfiguration()).thenReturn(conf); - ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection); - } - return connection; - } + ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class); + Mockito.when(connection.getConfiguration()).thenReturn(conf); + return connection; } /** @@ -99,7 +91,6 @@ public class HConnectionTestingUtility { throws IOException { ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class); Mockito.when(c.getConfiguration()).thenReturn(conf); - ConnectionManager.CONNECTION_INSTANCES.put(new HConnectionKey(conf), c); Mockito.doNothing().when(c).close(); // Make it so we return a particular location when asked. final HRegionLocation loc = new HRegionLocation(hri, sn); @@ -151,38 +142,8 @@ public class HConnectionTestingUtility { */ public static ClusterConnection getSpiedConnection(final Configuration conf) throws IOException { - HConnectionKey connectionKey = new HConnectionKey(conf); - synchronized (ConnectionManager.CONNECTION_INSTANCES) { - ConnectionImplementation connection = - ConnectionManager.CONNECTION_INSTANCES.get(connectionKey); - if (connection == null) { - connection = Mockito.spy(new ConnectionImplementation(conf, false)); - ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection); - } - return connection; - } - } - - public static ClusterConnection getSpiedClusterConnection(final Configuration conf) - throws IOException { - HConnectionKey connectionKey = new HConnectionKey(conf); - synchronized (ConnectionManager.CONNECTION_INSTANCES) { - ConnectionImplementation connection = - ConnectionManager.CONNECTION_INSTANCES.get(connectionKey); - if (connection == null) { - connection = Mockito.spy(new ConnectionImplementation(conf, false)); - ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection); - } - return connection; - } - } - - /** - * @return Count of extant connection instances - */ - public static int getConnectionCount() { - synchronized (ConnectionManager.CONNECTION_INSTANCES) { - return ConnectionManager.CONNECTION_INSTANCES.size(); - } + ConnectionImplementation connection = + Mockito.spy(new ConnectionImplementation(conf, null, null)); + return connection; } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index a352c4e..8a1918b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -633,20 +632,6 @@ public class TestAdmin2 { } /** - * HBASE-4417 checkHBaseAvailable() doesn't close zk connections - */ - @Test (timeout=300000) - public void testCheckHBaseAvailableClosesConnection() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - - int initialCount = HConnectionTestingUtility.getConnectionCount(); - HBaseAdmin.checkHBaseAvailable(conf); - int finalCount = HConnectionTestingUtility.getConnectionCount(); - - Assert.assertEquals(initialCount, finalCount) ; - } - - /** * Check that we have an exception if the cluster is not there. */ @Test (timeout=300000) @@ -657,8 +642,6 @@ public class TestAdmin2 { conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 9999)+10); - int initialCount = HConnectionTestingUtility.getConnectionCount(); - long start = System.currentTimeMillis(); try { HBaseAdmin.checkHBaseAvailable(conf); @@ -670,10 +653,6 @@ public class TestAdmin2 { } long end = System.currentTimeMillis(); - int finalCount = HConnectionTestingUtility.getConnectionCount(); - - Assert.assertEquals(initialCount, finalCount) ; - LOG.info("It took "+(end-start)+" ms to find out that" + " HBase was not available"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index a4ceaa8..ae30708 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -4037,7 +4037,7 @@ public class TestFromClientSide { */ HTable createUnmangedHConnectionHTable(final TableName tableName) throws IOException { TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY); - HConnection conn = ConnectionManager.createConnection(TEST_UTIL.getConfiguration()); + Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); return (HTable)conn.getTable(tableName); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 0b08562..51b9eac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -23,15 +23,12 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.net.SocketTimeoutException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; @@ -46,7 +43,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -73,7 +69,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.jboss.netty.util.internal.DetectionUtil; import org.junit.AfterClass; import org.junit.Assert; @@ -141,11 +136,6 @@ public class TestHCM { TEST_UTIL.shutdownMiniCluster(); } - - private static int getHConnectionManagerCacheSize(){ - return HConnectionTestingUtility.getConnectionCount(); - } - @Test public void testClusterConnection() throws IOException { ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1, @@ -153,26 +143,26 @@ public class TestHCM { new SynchronousQueue(), Threads.newDaemonThreadFactory("test-hcm")); - HConnection con1 = ConnectionManager.createConnection(TEST_UTIL.getConfiguration()); - HConnection con2 = ConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool); + Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool); // make sure the internally created ExecutorService is the one passed assertTrue(otherPool == ((ConnectionImplementation)con2).getCurrentBatchPool()); String tableName = "testClusterConnection"; TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close(); - HTable t = (HTable)con1.getTable(tableName, otherPool); + HTable t = (HTable)con1.getTable(TableName.valueOf(tableName), otherPool); // make sure passing a pool to the getTable does not trigger creation of an internal pool assertNull("Internal Thread pool should be null", ((ConnectionImplementation)con1).getCurrentBatchPool()); // table should use the pool passed assertTrue(otherPool == t.getPool()); t.close(); - t = (HTable)con2.getTable(tableName); + t = (HTable)con2.getTable(TableName.valueOf(tableName)); // table should use the connectin's internal pool assertTrue(otherPool == t.getPool()); t.close(); - t = (HTable)con2.getTable(Bytes.toBytes(tableName)); + t = (HTable)con2.getTable(TableName.valueOf(tableName)); // try other API too assertTrue(otherPool == t.getPool()); t.close(); @@ -182,7 +172,7 @@ public class TestHCM { assertTrue(otherPool == t.getPool()); t.close(); - t = (HTable)con1.getTable(tableName); + t = (HTable)con1.getTable(TableName.valueOf(tableName)); ExecutorService pool = ((ConnectionImplementation)con1).getCurrentBatchPool(); // make sure an internal pool was created assertNotNull("An internal Thread pool should have been created", pool); @@ -190,7 +180,7 @@ public class TestHCM { assertTrue(t.getPool() == pool); t.close(); - t = (HTable)con1.getTable(tableName); + t = (HTable)con1.getTable(TableName.valueOf(tableName)); // still using the *same* internal pool assertTrue(t.getPool() == pool); t.close(); @@ -535,7 +525,6 @@ public class TestHCM { } finally { syncBlockingFilter.set(true); t.join(); - ConnectionManager.getConnection(c2).close(); TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true); } @@ -568,28 +557,6 @@ public class TestHCM { } } - @Test - public void abortingHConnectionRemovesItselfFromHCM() throws Exception { - // Save off current HConnections - Map oldHBaseInstances = - new HashMap(); - oldHBaseInstances.putAll(ConnectionManager.CONNECTION_INSTANCES); - - ConnectionManager.CONNECTION_INSTANCES.clear(); - - try { - HConnection connection = ConnectionManager.getConnection(TEST_UTIL.getConfiguration()); - connection.abort("test abortingHConnectionRemovesItselfFromHCM", new Exception( - "test abortingHConnectionRemovesItselfFromHCM")); - Assert.assertNotSame(connection, - ConnectionManager.getConnection(TEST_UTIL.getConfiguration())); - } finally { - // Put original HConnections back - ConnectionManager.CONNECTION_INSTANCES.clear(); - ConnectionManager.CONNECTION_INSTANCES.putAll(oldHBaseInstances); - } - } - /** * Test that when we delete a location using the first row of a region * that we really delete it. @@ -710,7 +677,7 @@ public class TestHCM { Assert.assertArrayEquals(e.getRow(0).getRow(), ROW); // Check that we unserialized the exception as expected - Throwable cause = ConnectionManager.findException(e.getCause(0)); + Throwable cause = ConnectionImplementation.findException(e.getCause(0)); Assert.assertNotNull(cause); Assert.assertTrue(cause instanceof RegionMovedException); } @@ -846,35 +813,6 @@ public class TestHCM { table.close(); } - /** - * Make sure that {@link Configuration} instances that are essentially the - * same map to the same {@link HConnection} instance. - */ - @Test - public void testConnectionSameness() throws Exception { - Connection previousConnection = null; - for (int i = 0; i < 2; i++) { - // set random key to differentiate the connection from previous ones - Configuration configuration = TEST_UTIL.getConfiguration(); - configuration.set("some_key", String.valueOf(_randy.nextInt())); - LOG.info("The hash code of the current configuration is: " - + configuration.hashCode()); - Connection currentConnection = ConnectionManager - .getConnection(configuration); - if (previousConnection != null) { - assertTrue( - "Did not get the same connection even though its key didn't change", - previousConnection == currentConnection); - } - previousConnection = currentConnection; - // change the configuration, so that it is no longer reachable from the - // client's perspective. However, since its part of the LRU doubly linked - // list, it will eventually get thrown out, at which time it should also - // close the corresponding {@link HConnection}. - configuration.set("other_key", String.valueOf(_randy.nextInt())); - } - } - @Test public void testClosing() throws Exception { Configuration configuration = @@ -911,13 +849,8 @@ public class TestHCM { // created from the same configuration, yet they are different assertTrue(c1 != c2); assertTrue(c1.getConfiguration() == c2.getConfiguration()); - // make sure these were not cached - Connection c3 = ConnectionManager.getConnection(configuration); - assertTrue(c1 != c3); - assertTrue(c2 != c3); } - /** * This test checks that one can connect to the cluster with only the * ZooKeeper quorum set. Other stuff like master address will be read @@ -929,12 +862,12 @@ public class TestHCM { Configuration c = new Configuration(); c.set(HConstants.ZOOKEEPER_QUORUM, TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); - c.set(HConstants.ZOOKEEPER_CLIENT_PORT , + c.set(HConstants.ZOOKEEPER_CLIENT_PORT, TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT)); // This should be enough to connect - HConnection conn = ConnectionManager.getConnection(c); - assertTrue( conn.isMasterRunning() ); + HConnection conn = (HConnection) ConnectionFactory.createConnection(c); + assertTrue(conn.isMasterRunning()); conn.close(); } @@ -1074,8 +1007,8 @@ public class TestHCM { try { long timeBase = timeMachine.currentTime(); long largeAmountOfTime = ANY_PAUSE * 1000; - ConnectionManager.ServerErrorTracker tracker = - new ConnectionManager.ServerErrorTracker(largeAmountOfTime, 100); + ConnectionImplementation.ServerErrorTracker tracker = + new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100); // The default backoff is 0. assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE)); @@ -1127,86 +1060,7 @@ public class TestHCM { private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) { assertTrue("Value not within jitter: " + expected + " vs " + actual, - Math.abs(actual - expected) <= (0.01f * jitterBase)); - } - - /** - * Tests that a destroyed connection does not have a live zookeeper. - * Below is timing based. We put up a connection to a table and then close the connection while - * having a background thread running that is forcing close of the connection to try and - * provoke a close catastrophe; we are hoping for a car crash so we can see if we are leaking - * zk connections. - * @throws Exception - */ - @Ignore ("Flakey test: See HBASE-8996")@Test - public void testDeleteForZKConnLeak() throws Exception { - TEST_UTIL.createTable(TABLE_NAME4, FAM_NAM); - final Configuration config = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - config.setInt("zookeeper.recovery.retry", 1); - config.setInt("zookeeper.recovery.retry.intervalmill", 1000); - config.setInt("hbase.rpc.timeout", 2000); - config.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - - ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, - 5, TimeUnit.SECONDS, - new SynchronousQueue(), - Threads.newDaemonThreadFactory("test-hcm-delete")); - - pool.submit(new Runnable() { - @Override - public void run() { - while (!Thread.interrupted()) { - try { - HConnection conn = ConnectionManager.getConnection(config); - LOG.info("Connection " + conn); - ConnectionManager.deleteStaleConnection(conn); - LOG.info("Connection closed " + conn); - // TODO: This sleep time should be less than the time that it takes to open and close - // a table. Ideally we would do a few runs first to measure. For now this is - // timing based; hopefully we hit the bad condition. - Threads.sleep(10); - } catch (Exception e) { - } - } - } - }); - - // Use connection multiple times. - for (int i = 0; i < 30; i++) { - Connection c1 = null; - try { - c1 = ConnectionManager.getConnectionInternal(config); - LOG.info("HTable connection " + i + " " + c1); - Table table = c1.getTable(TABLE_NAME4, pool); - table.close(); - LOG.info("HTable connection " + i + " closed " + c1); - } catch (Exception e) { - LOG.info("We actually want this to happen!!!! So we can see if we are leaking zk", e); - } finally { - if (c1 != null) { - if (c1.isClosed()) { - // cannot use getZooKeeper as method instantiates watcher if null - Field zkwField = c1.getClass().getDeclaredField("keepAliveZookeeper"); - zkwField.setAccessible(true); - Object watcher = zkwField.get(c1); - - if (watcher != null) { - if (((ZooKeeperWatcher)watcher).getRecoverableZooKeeper().getState().isAlive()) { - // non-synchronized access to watcher; sleep and check again in case zk connection - // hasn't been cleaned up yet. - Thread.sleep(1000); - if (((ZooKeeperWatcher) watcher).getRecoverableZooKeeper().getState().isAlive()) { - pool.shutdownNow(); - fail("Live zookeeper in closed connection"); - } - } - } - } - c1.close(); - } - } - } - pool.shutdownNow(); + Math.abs(actual - expected) <= (0.01f * jitterBase)); } @Test(timeout = 60000) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java index ad595d4..7651686 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java @@ -75,7 +75,7 @@ public class TestMetaWithReplicas { TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.getConfiguration().setInt( StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1000); - TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniCluster(3); // disable the balancer LoadBalancerTracker l = new LoadBalancerTracker(TEST_UTIL.getZooKeeperWatcher(), new Abortable() { @@ -86,7 +86,7 @@ public class TestMetaWithReplicas { } @Override public void abort(String why, Throwable e) { - aborted = true; + aborted = true; } }); l.setBalancerOn(false); @@ -108,7 +108,7 @@ public class TestMetaWithReplicas { assertTrue(TEST_UTIL.getHBaseAdmin().getTableDescriptor(TableName.META_TABLE_NAME) .getRegionReplication() == 3); } - + @Test public void testZookeeperNodesForReplicas() throws Exception { // Checks all the znodes exist when meta's replicas are enabled @@ -409,7 +409,9 @@ public class TestMetaWithReplicas { public void testShutdownOfReplicaHolder() throws Exception { // checks that the when the server holding meta replica is shut down, the meta replica // can be recovered - RegionLocations rl = ConnectionManager.getConnectionInternal(TEST_UTIL.getConfiguration()). + ClusterConnection conn = (ClusterConnection) + ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + RegionLocations rl = conn. locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); HRegionLocation hrl = rl.getRegionLocation(1); ServerName oldServer = hrl.getServerName(); @@ -418,12 +420,12 @@ public class TestMetaWithReplicas { do { LOG.debug("Waiting for the replica " + hrl.getRegionInfo() + " to come up"); Thread.sleep(30000); //wait for the detection/recovery - rl = ConnectionManager.getConnectionInternal(TEST_UTIL.getConfiguration()). - locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); + rl = conn.locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); hrl = rl.getRegionLocation(1); i++; } while ((hrl == null || hrl.getServerName().equals(oldServer)) && i < 3); assertTrue(i != 3); + conn.close(); } @Test -- 1.9.5 (Apple Git-50.3)