diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index ade32a8..4e64831 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -182,44 +182,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable { */ ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException { - this(conf); + this.conf = conf; this.user = user; this.batchPool = pool; - this.registry = setupRegistry(); - retrieveClusterId(); - - this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); - this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); - - // Do we publish the status? - boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, - HConstants.STATUS_PUBLISHED_DEFAULT); - Class listenerClass = - conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, - ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, - ClusterStatusListener.Listener.class); - if (shouldListen) { - if (listenerClass == null) { - LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + - ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); - } else { - clusterStatusListener = new ClusterStatusListener( - new ClusterStatusListener.DeadServerHandler() { - @Override - public void newDead(ServerName sn) { - clearCaches(sn); - rpcClient.cancelConnections(sn); - } - }, conf, listenerClass); - } - } - } - - /** - * For tests. - */ - protected ConnectionImplementation(Configuration conf) { - this.conf = conf; this.tableConfig = new TableConfiguration(conf); this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, @@ -239,11 +204,49 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } else { nonceGenerator = new NoNonceGenerator(); } - stats = ServerStatisticTracker.create(conf); - this.asyncProcess = createAsyncProcess(this.conf); + + this.stats = ServerStatisticTracker.create(conf); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); + this.asyncProcess = createAsyncProcess(this.conf); + + boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, + HConstants.STATUS_PUBLISHED_DEFAULT); + Class listenerClass = + conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, + ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, + ClusterStatusListener.Listener.class); + + try { + this.registry = setupRegistry(); + retrieveClusterId(); + + this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); + + // Do we publish the status? + if (shouldListen) { + if (listenerClass == null) { + LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + + ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); + } else { + clusterStatusListener = new ClusterStatusListener( + new ClusterStatusListener.DeadServerHandler() { + @Override + public void newDead(ServerName sn) { + clearCaches(sn); + rpcClient.cancelConnections(sn); + } + }, conf, listenerClass); + } + } + } catch (Throwable e) { + // avoid leaks: registry, rpcClient, ... + LOG.debug("connection construction failed", e); + close(); + throw e; + } } /** @@ -370,7 +373,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { synchronized (this) { if (batchPool == null) { this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), - conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); + conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); this.cleanupPool = true; } } @@ -478,7 +481,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { protected String clusterId = null; - void retrieveClusterId() { + protected void retrieveClusterId() { if (clusterId != null) return; this.clusterId = this.registry.getClusterId(); if (clusterId == null) { @@ -1979,9 +1982,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - return new AsyncProcess(this, conf, this.batchPool, - RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false, - RpcControllerFactory.instantiate(conf)); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java index 9adcb6f..8ce1f66 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private final class RegistryFactory { + static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; private RegistryFactory() {} @@ -35,7 +36,7 @@ final class RegistryFactory { */ static Registry getRegistry(final Connection connection) throws IOException { - String registryClass = connection.getConfiguration().get("hbase.client.registry.impl", + String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY, ZooKeeperRegistry.class.getName()); Registry registry = null; try { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index fa3ed32..7e7139a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -342,11 +342,35 @@ public class TestAsyncProcess { * Returns our async process. */ static class MyConnectionImpl extends ConnectionImplementation { + public static class TestRegistry implements Registry { + @Override + public void init(Connection connection) {} + + @Override + public RegionLocations getMetaRegionLocation() throws IOException { + return null; + } + + @Override + public String getClusterId() { + return "testClusterId"; + } + + @Override + public int getCurrentNrHRS() throws IOException { + return 1; + } + } + final AtomicInteger nbThreads = new AtomicInteger(0); + protected MyConnectionImpl(Configuration conf) throws IOException { + super(setupConf(conf), null, null); + } - protected MyConnectionImpl(Configuration conf) { - super(conf); + private static Configuration setupConf(Configuration conf) { + conf.setClass(RegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, Registry.class); + return conf; } @Override @@ -363,7 +387,7 @@ public class TestAsyncProcess { List hrl; final boolean usedRegions[]; - protected MyConnectionImpl2(List hrl) { + protected MyConnectionImpl2(List hrl) throws IOException { super(conf); this.hrl = hrl; this.usedRegions = new boolean[hrl.size()]; @@ -382,7 +406,6 @@ public class TestAsyncProcess { } return null; } - } @Rule