commit 7b52165efef147f8d70ed343f2939e945e9f9b3e Author: Enis Soztutar Date: Tue Oct 11 17:42:59 2016 -0700 v0 diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/CachedConnectionFactory.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/CachedConnectionFactory.java new file mode 100644 index 0000000..aeb8b85 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/CachedConnectionFactory.java @@ -0,0 +1,384 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.StoppableImplementation; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; + + +/** + * CachedConnectionFactory is a utility class to manage creation, caching and closing of + * {@link Connection} objects in a cached manner. {@link Connection} is a heavy-weight object that + * holds the individual socket connections to server, meta cache and ZooKeeper connections. + * Connection objects are usually created via {@link ConnectionFactory} and their lifecycle is + * explicitly managed by the application(see the javadoc). Typically a Connection is created once + * per application, it is used from multiple threads, and only closed when application finishes. + * In this case, the application keeps a reference to the created Connection in a shared space. + * CachedConnectionFactory, on the other hand, creates ref-counted Connection objects that are + * shared automatically between different calls of {@link #getConnection(Configuration)}. Returned + * Connection objects are thread safe and can be used safely without the fear of any other thread + * closing the connection behind. However, users are responsible to explicitly call {@link #close()} + * method to release the Connection so that resources can be released when ref count reaches zero. + *

+ * CachedConnectionFactory can be created with maxIdleTime property which defines the behavior for + * the Connection lifecycle. If maxIdleTime is set to a non-positive value, the Connections returned + * by {@link #getConnection(Configuration)} calls will be ref-counted and actually closed when ref + * count reaches zero. + * Else, if maxIdleTime is a positive value, connections will be kept open for maxIdleTime after the + * last reference to the connection is closed. Since Connection creation is costly, it maybe + * preferable for some online / low latency applications to not close the connection in case a later + * call for getConnection() can arrive. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CachedConnectionFactory implements Closeable { + + private static final Log LOG = LogFactory.getLog(CachedConnectionFactory.class); + + /** + * Denotes a unique key to an {@link Connection} instance. + * + * In essence, this class captures the properties in {@link Configuration} + * that may be used in the process of establishing a connection. In light of + * that, if any new such properties are introduced into the mix, they must be + * added to the list. + */ +// @InterfaceAudience.Private + class ConnectionKey { +// final static String[] CONNECTION_PROPERTIES = new String[] { +// HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, +// HConstants.ZOOKEEPER_CLIENT_PORT, +// HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, +// HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, +// HConstants.HBASE_RPC_TIMEOUT_KEY, +// HConstants.HBASE_META_SCANNER_CACHING, +// HConstants.HBASE_CLIENT_INSTANCE_ID, +// HConstants.RPC_CODEC_CONF_KEY, +// HConstants.USE_META_REPLICAS, +// RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY}; + + private String userName; + + public ConnectionKey(Configuration conf, String userName) { + this.userName = userName; + } + //TODO: equals and hashCode + } + + @InterfaceAudience.Private + class DelegatingConnection implements Connection { + protected final Connection connection; + + DelegatingConnection(Connection connection) { + this.connection = connection; + } + + @Override + public void abort(String why, Throwable e) { + connection.abort(why, e); + } + + @Override + public boolean isAborted() { + return connection.isAborted(); + } + + @Override + public Configuration getConfiguration() { + return connection.getConfiguration(); + } + + @Override + public Table getTable(TableName tableName) throws IOException { + return connection.getTable(tableName); + } + + @Override + public Table getTable(TableName tableName, ExecutorService pool) throws IOException { + return connection.getTable(tableName, pool); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + return connection.getBufferedMutator(tableName); + } + + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { + return connection.getBufferedMutator(params); + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) throws IOException { + return connection.getRegionLocator(tableName); + } + + @Override + public Admin getAdmin() throws IOException { + return connection.getAdmin(); + } + + @Override + public void close() throws IOException { + connection.close(); + } + + @Override + public boolean isClosed() { + return connection.isClosed(); + } + } + + @InterfaceAudience.Private + class RefCountedConnection extends DelegatingConnection { + private final ConnectionKey key; + private int refCount = 0; + private long lastAccessTime; + + RefCountedConnection(ConnectionKey key, Connection conn, String user) { + super(conn); + this.key = key; + } + + void updateAccessTime() { + lastAccessTime = EnvironmentEdgeManager.currentTime(); + } + + /** + * Increment this client's reference count. + */ + private void incCount() { + ++refCount; + } + + /** + * Decrement this client's reference count. + */ + private void decCount() { + if (refCount > 0) { + --refCount; + } + } + + /** + * Return if this client has no reference + * + * @return true if this client has no reference; false otherwise + */ + private boolean isZeroReference() { + return refCount == 0; + } + + @Override + public void close() throws IOException { + // this is exposed to the user + releaseConnection(key, this); + } + + /** + * Whether the connection should close (timed out and no references) + * @return whether the connection should be closed or not + */ + private boolean shouldClose() { + if (!isZeroReference()) { + return false; + } + if (maxIdleTime <= 0) { + return true; + } + long timeoutTime = lastAccessTime + maxIdleTime; + return EnvironmentEdgeManager.currentTime() > timeoutTime; + } + + boolean checkAndClose() throws UncheckedIOException { + updateAccessTime(); + if (connection.isAborted() || connection.isClosed()) { + LOG.warn("Unexpected: cached Connection is aborted/closed, removed from cache"); + actualClose(); + return true; + } + if (shouldClose()) { + actualClose(); + return true; + } + return false; + } + + boolean decCheckAndClose() throws UncheckedIOException { + decCount(); + return checkAndClose(); + } + + public void actualClose() throws UncheckedIOException { + try { + super.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + /** + * Keep a map of connections keyed by ConnectionKey. This map is the main synchronization point + * and the authoritative source for deciding on whether a Connection can be closed or not. + */ + private final ConcurrentHashMap connections + = new ConcurrentHashMap<>(); + private final String realUserName; + private final UserGroupInformation realUser; + private final UserProvider userProvider; + private final Configuration conf; + private final ChoreService choreService; + private final int maxIdleTime; + + protected CachedConnectionFactory(final Configuration conf, final UserProvider userProvider, + final int cleanInterval, final int maxIdleTime) throws IOException { + Stoppable stoppable = new StoppableImplementation(); + + if (maxIdleTime > 0) { + this.choreService = new ChoreService("CachedConnectionFactory"); + ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) { + @Override + protected void chore() { + for (Map.Entry entry : connections.entrySet()) { + checkAndClose(entry.getKey(), entry.getValue()); + } + } + }; + // Start the daemon cleaner chore + choreService.scheduleChore(cleaner); + } else { + choreService = null; + } + + this.maxIdleTime = maxIdleTime; + this.realUser = userProvider.getCurrent().getUGI(); + this.realUserName = realUser.getShortUserName(); + this.userProvider = userProvider; + this.conf = conf; + } + + /** + * Get the cached connection for the current user or create a new one. + */ + private RefCountedConnection getOrCreateConnection(Configuration conf, String userName, Function<>) + throws IOException { + + // TODO: This ConnectionKey parsing is also very expensive since it searches tens of values + // in the Hadoop Configuration object. Have a WeakReferencePool to cache from Conf -> ConnKey ? + ConnectionKey key = new ConnectionKey(conf, userName); + try { + return connections.compute(key, (k, conn) -> { + if (conn == null) { + // create a new connection atomically if it does not exists in map + UserGroupInformation ugi = realUser; + if (!userName.equals(realUserName)) { + ugi = UserGroupInformation.createProxyUser(userName, realUser); + } + User user = userProvider.create(ugi); + Connection c = null; + try { + c = ConnectionFactory.createConnection(conf, user); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + conn = new RefCountedConnection(key, c, userName); + } + conn.updateAccessTime(); // atomically increment update time and ref count + conn.incCount(); + return conn; + }); + } catch (UncheckedIOException ex) { + throw ex.getCause() == null? new IOException(ex) : ex.getCause(); + } + } + + private void checkAndClose(ConnectionKey key, RefCountedConnection conn) { + try { + connections.computeIfPresent(key, (k, v) -> { + return v.checkAndClose() ? null : v; + }); + } catch (UncheckedIOException ex) { + LOG.warn(ex); // do not throw it again + } + } + + private void releaseConnection(ConnectionKey key, RefCountedConnection conn) throws IOException { + try { + connections.computeIfPresent(key, (k, v) -> { + return v.decCheckAndClose() ? null : v; + }); + } catch (UncheckedIOException ex) { + throw ex.getCause() == null ? new IOException(ex) : ex.getCause(); + } + } + + /** + * Obtain a cached connection. + * @param conf + * @return + * @throws IOException + */ + public Connection getConnection(Configuration conf) throws IOException { + return getOrCreateConnection(conf, this.realUserName); + } + + /** + * Obtain a cached connection for the proxy user. + * @param conf + * @param proxyUser the proxy user. + * @return + * @throws IOException + */ + public Connection getConnection(Configuration conf, String proxyUser) throws IOException { + return getOrCreateConnection(conf, proxyUser); + } + + /** + * Called when cache is no longer needed so that it can perform cleanup operations, stop + * all threads and close all remaining connections. + */ + public void close() { + // TODO: close all connections + + if (choreService != null) { + choreService.shutdown(); + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/StoppableImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/util/StoppableImplementation.java similarity index 100% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/util/StoppableImplementation.java rename to hbase-client/src/main/java/org/apache/hadoop/hbase/util/StoppableImplementation.java