diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index ade32a8..c063f19 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -185,33 +185,41 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this(conf); this.user = user; this.batchPool = pool; - this.registry = setupRegistry(); - retrieveClusterId(); - - this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); - this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); - - // Do we publish the status? - boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, - HConstants.STATUS_PUBLISHED_DEFAULT); - Class listenerClass = - conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, - ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, - ClusterStatusListener.Listener.class); - if (shouldListen) { - if (listenerClass == null) { - LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + - ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); - } else { - clusterStatusListener = new ClusterStatusListener( - new ClusterStatusListener.DeadServerHandler() { - @Override - public void newDead(ServerName sn) { - clearCaches(sn); - rpcClient.cancelConnections(sn); - } - }, conf, listenerClass); + + try { + this.registry = setupRegistry(); + retrieveClusterId(); + + this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); + + // Do we publish the status? + boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, + HConstants.STATUS_PUBLISHED_DEFAULT); + Class listenerClass = + conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, + ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, + ClusterStatusListener.Listener.class); + if (shouldListen) { + if (listenerClass == null) { + LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + + ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); + } else { + clusterStatusListener = new ClusterStatusListener( + new ClusterStatusListener.DeadServerHandler() { + @Override + public void newDead(ServerName sn) { + clearCaches(sn); + rpcClient.cancelConnections(sn); + } + }, conf, listenerClass); + } } + } catch (Throwable e) { + // avoid leaks: registry, rpcClient, ... + LOG.debug("connection construction failed", e); + close(); + throw e; } } @@ -239,11 +247,18 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } else { nonceGenerator = new NoNonceGenerator(); } - stats = ServerStatisticTracker.create(conf); - this.asyncProcess = createAsyncProcess(this.conf); - this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); - this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); + try { + stats = ServerStatisticTracker.create(conf); + this.asyncProcess = createAsyncProcess(this.conf); + this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); + this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); + } catch (Throwable e) { + // avoid leaks: registry, rpcClient, ... + LOG.debug("connection construction failed", e); + close(); + throw e; + } } /** @@ -370,7 +385,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { synchronized (this) { if (batchPool == null) { this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), - conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); + conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); this.cleanupPool = true; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java index 9adcb6f..8ce1f66 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private final class RegistryFactory { + static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; private RegistryFactory() {} @@ -35,7 +36,7 @@ final class RegistryFactory { */ static Registry getRegistry(final Connection connection) throws IOException { - String registryClass = connection.getConfiguration().get("hbase.client.registry.impl", + String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY, ZooKeeperRegistry.class.getName()); Registry registry = null; try {