From f17c7c310e08aa431cee371e30fd47d81023e9dd Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 2 Nov 2016 17:34:03 +0800 Subject: [PATCH] HBASE-16991 Make the initialization of AsyncConnection asynchronous --- .../hadoop/hbase/client/AsyncConnectionImpl.java | 19 +-- .../apache/hadoop/hbase/client/AsyncRegistry.java | 5 +- .../hadoop/hbase/client/AsyncRegistryFactory.java | 43 +++++++ .../hbase/client/ClusterRegistryFactory.java | 43 ------- .../hadoop/hbase/client/ConnectionFactory.java | 131 +++++++++++---------- .../hadoop/hbase/client/ZKAsyncRegistry.java | 43 ++++--- .../hbase/client/TestAsyncGetMultiThread.java | 6 +- .../hbase/client/TestAsyncRegionLocator.java | 4 +- .../TestAsyncSingleRequestRpcRetryingCaller.java | 25 ++-- .../apache/hadoop/hbase/client/TestAsyncTable.java | 2 +- .../hbase/client/TestAsyncTableNoncedRetry.java | 5 +- .../hbase/client/TestAsyncTableSmallScan.java | 2 +- 12 files changed, 162 insertions(+), 166 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 6cad6a2..4366d12 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; @@ -27,14 +26,11 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLE import io.netty.util.HashedWheelTimer; import java.io.IOException; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -53,8 +49,6 @@ import org.apache.hadoop.hbase.util.Threads; @InterfaceAudience.Private class AsyncConnectionImpl implements AsyncConnection { - private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class); - private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); @@ -68,8 +62,6 @@ class AsyncConnectionImpl implements AsyncConnection { final AsyncRegistry registry; - private final String clusterId; - private final int rpcTimeout; private final RpcClient rpcClient; @@ -87,18 +79,13 @@ class AsyncConnectionImpl implements AsyncConnection { private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); @SuppressWarnings("deprecation") - public AsyncConnectionImpl(Configuration conf, User user) { + public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, + User user) { this.conf = conf; this.user = user; this.connConf = new AsyncConnectionConfiguration(conf); this.locator = new AsyncRegionLocator(this); - this.registry = ClusterRegistryFactory.getRegistry(conf); - this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { - if (LOG.isDebugEnabled()) { - LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT); - } - return CLUSTER_ID_DEFAULT; - }); + this.registry = registry; this.rpcClient = RpcClientFactory.createClient(conf, clusterId); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java index 731cf09..4570043 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java @@ -28,9 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc.. * All stuffs that may be related to zookeeper at client side are placed here. *

- * Most methods are executed asynchronously except getClusterId. It will be executed synchronously - * and should be called only once when initialization. - *

* Internal use only. */ @InterfaceAudience.Private @@ -46,7 +43,7 @@ interface AsyncRegistry extends Closeable { *

* The upper layer should store this value somewhere as it will not be change any more. */ - String getClusterId(); + CompletableFuture getClusterId(); /** * Get the number of 'running' regionservers. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java new file mode 100644 index 0000000..2fc3322 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * Get instance of configured Registry. + */ +@InterfaceAudience.Private +final class AsyncRegistryFactory { + + static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + + private AsyncRegistryFactory() { + } + + /** + * @return The cluster registry implementation to use. + */ + static AsyncRegistry getRegistry(Configuration conf) { + Class clazz = + conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class); + return ReflectionUtils.newInstance(clazz, conf); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java deleted file mode 100644 index 48bfb18..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ReflectionUtils; - -/** - * Get instance of configured Registry. - */ -@InterfaceAudience.Private -final class ClusterRegistryFactory { - - static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; - - private ClusterRegistryFactory() { - } - - /** - * @return The cluster registry implementation to use. - */ - static AsyncRegistry getRegistry(Configuration conf) { - Class clazz = - conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class); - return ReflectionUtils.newInstance(clazz, conf); - } -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index ca34211..7cbcc20 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.lang.reflect.Constructor; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; @@ -30,13 +31,12 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.ReflectionUtils; - /** - * A non-instantiable class that manages creation of {@link Connection}s. - * Managing the lifecycle of the {@link Connection}s to the cluster is the responsibility of - * the caller. - * From a {@link Connection}, {@link Table} implementations are retrieved - * with {@link Connection#getTable(TableName)}. Example: + * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of + * the {@link Connection}s to the cluster is the responsibility of the caller. From a + * {@link Connection}, {@link Table} implementations are retrieved with + * {@link Connection#getTable(TableName)}. Example: + * *

  * Connection connection = ConnectionFactory.createConnection(config);
  * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
  *
  * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
  * implementations.
- *
  * @see Connection
  * @since 0.99.0
  */
@@ -58,23 +57,20 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 @InterfaceStability.Evolving
 public class ConnectionFactory {
 
-  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
-      "hbase.client.async.connection.impl";
+  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl";
 
   /** No public c.tors */
   protected ConnectionFactory() {
   }
 
   /**
-   * Create a new Connection instance using default HBaseConfiguration. Connection
-   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all + * housekeeping for a connection to the cluster. All tables and interfaces created from returned + * connection share zookeeper connection, meta cache, and connections to region servers and + * masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection();
    * Table table = connection.getTable(TableName.valueOf("mytable"));
@@ -96,13 +92,11 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("mytable"));
@@ -125,13 +119,11 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("mytable"));
@@ -156,13 +148,11 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -179,21 +169,18 @@ public class ConnectionFactory {
    * @param user the user the connection is for
    * @return Connection object for conf
    */
-  public static Connection createConnection(Configuration conf, User user)
-  throws IOException {
+  public static Connection createConnection(Configuration conf, User user) throws IOException {
     return createConnection(conf, null, user);
   }
 
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -212,7 +199,7 @@ public class ConnectionFactory {
    * @return Connection object for conf
    */
   public static Connection createConnection(Configuration conf, ExecutorService pool, User user)
-  throws IOException {
+      throws IOException {
     if (user == null) {
       UserProvider provider = UserProvider.instantiate(conf);
       user = provider.getCurrent();
@@ -228,9 +215,8 @@ public class ConnectionFactory {
     }
     try {
       // Default HCM#HCI is not accessible; make it so before invoking.
-      Constructor constructor =
-        clazz.getDeclaredConstructor(Configuration.class,
-          ExecutorService.class, User.class);
+      Constructor constructor = clazz.getDeclaredConstructor(Configuration.class,
+        ExecutorService.class, User.class);
       constructor.setAccessible(true);
       return (Connection) constructor.newInstance(conf, pool, user);
     } catch (Exception e) {
@@ -241,9 +227,9 @@ public class ConnectionFactory {
   /**
    * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
    * @see #createAsyncConnection(Configuration)
-   * @return AsyncConnection object
+   * @return AsyncConnection object wrapped by CompletableFuture
    */
-  public static AsyncConnection createAsyncConnection() throws IOException {
+  public static CompletableFuture createAsyncConnection() {
     return createAsyncConnection(HBaseConfiguration.create());
   }
 
@@ -252,12 +238,20 @@ public class ConnectionFactory {
    * User object created by {@link UserProvider}. The given {@code conf} will also be used to
    * initialize the {@link UserProvider}.
    * @param conf configuration
-   * @return AsyncConnection object
+   * @return AsyncConnection object wrapped by CompletableFuture
    * @see #createAsyncConnection(Configuration, User)
    * @see UserProvider
    */
-  public static AsyncConnection createAsyncConnection(Configuration conf) throws IOException {
-    return createAsyncConnection(conf, UserProvider.instantiate(conf).getCurrent());
+  public static CompletableFuture createAsyncConnection(Configuration conf) {
+    User user;
+    try {
+      user = UserProvider.instantiate(conf).getCurrent();
+    } catch (IOException e) {
+      CompletableFuture future = new CompletableFuture<>();
+      future.completeExceptionally(e);
+      return future;
+    }
+    return createAsyncConnection(conf, user);
   }
 
   /**
@@ -273,17 +267,30 @@ public class ConnectionFactory {
    * as it is thread safe.
    * @param conf configuration
    * @param user the user the asynchronous connection is for
-   * @return AsyncConnection object
+   * @return AsyncConnection object wrapped by CompletableFuture
    * @throws IOException
    */
-  public static AsyncConnection createAsyncConnection(Configuration conf, User user)
-      throws IOException {
-    Class clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
-      AsyncConnectionImpl.class, AsyncConnection.class);
-    try {
-      return ReflectionUtils.newInstance(clazz, conf, user);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+  public static CompletableFuture createAsyncConnection(Configuration conf,
+      User user) {
+    CompletableFuture future = new CompletableFuture<>();
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
+    registry.getClusterId().whenComplete((clusterId, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      if (clusterId == null) {
+        future.completeExceptionally(new IOException("clusterid came back null"));
+        return;
+      }
+      Class clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
+        AsyncConnectionImpl.class, AsyncConnection.class);
+      try {
+        future.complete(ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user));
+      } catch (Exception e) {
+        future.completeExceptionally(e);
+      }
+    });
+    return future;
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
index c76aa3e..47b68e1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
@@ -53,8 +54,7 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.zookeeper.data.Stat;
 
 /**
- * Cache the cluster registry data in memory and use zk watcher to update. The only exception is
- * {@link #getClusterId()}, it will fetch the data from zk directly.
+ * Fetch the registry data from zookeeper.
  */
 @InterfaceAudience.Private
 class ZKAsyncRegistry implements AsyncRegistry {
@@ -79,26 +79,6 @@ class ZKAsyncRegistry implements AsyncRegistry {
     this.zk.start();
   }
 
-  @Override
-  public String getClusterId() {
-    try {
-      byte[] data = zk.getData().forPath(znodePaths.clusterIdZNode);
-      if (data == null || data.length == 0) {
-        return null;
-      }
-      data = removeMetaData(data);
-      return ClusterId.parseFrom(data).toString();
-    } catch (Exception e) {
-      LOG.warn("failed to get cluster id", e);
-      return null;
-    }
-  }
-
-  @Override
-  public void close() {
-    zk.close();
-  }
-
   private interface CuratorEventProcessor {
     T process(CuratorEvent event) throws Exception;
   }
@@ -120,6 +100,20 @@ class ZKAsyncRegistry implements AsyncRegistry {
     return future;
   }
 
+  private static String getClusterId(CuratorEvent event) throws DeserializationException {
+    byte[] data = event.getData();
+    if (data == null || data.length == 0) {
+      return null;
+    }
+    data = removeMetaData(data);
+    return ClusterId.parseFrom(data).toString();
+  }
+
+  @Override
+  public CompletableFuture getClusterId() {
+    return exec(zk.getData(), znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
+  }
+
   private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event)
       throws IOException {
     byte[] data = event.getData();
@@ -249,4 +243,9 @@ class ZKAsyncRegistry implements AsyncRegistry {
     return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
         .thenApply(proto -> proto != null ? proto.getInfoPort() : 0);
   }
+
+  @Override
+  public void close() {
+    zk.close();
+  }
 }
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
index b20e616..d53cadf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
@@ -86,7 +86,7 @@ public class TestAsyncGetMultiThread {
     }
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
     AsyncTable table = CONN.getTable(TABLE_NAME);
     List> futures = new ArrayList<>();
     IntStream.range(0, COUNT)
@@ -114,8 +114,8 @@ public class TestAsyncGetMultiThread {
   public void test() throws IOException, InterruptedException, ExecutionException {
     int numThreads = 20;
     AtomicBoolean stop = new AtomicBoolean(false);
-    ExecutorService executor =
-        Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads,
+      Threads.newDaemonThreadFactory("TestAsyncGet-"));
     List> futures = new ArrayList<>();
     IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
       run(stop);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
index 2e46d8a..e17c3c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
@@ -68,7 +68,9 @@ public class TestAsyncRegionLocator {
   public static void setUp() throws Exception {
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.getAdmin().setBalancerRunning(false, true);
-    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
+        registry.getClusterId().get(), User.getCurrent());
     LOCATOR = CONN.getLocator();
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 67d2661..563048c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -83,12 +83,15 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     }
   }
 
-  private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) throws IOException {
+  private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires)
+      throws IOException, InterruptedException, ExecutionException {
     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.setInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt);
     conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs);
     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires);
-    asyncConn = new AsyncConnectionImpl(conf, User.getCurrent());
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
+    asyncConn = new AsyncConnectionImpl(conf, registry, registry.getClusterId().get(),
+        User.getCurrent());
   }
 
   @Test
@@ -116,7 +119,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
   }
 
   @Test
-  public void testMaxRetries() throws IOException, InterruptedException {
+  public void testMaxRetries() throws IOException, InterruptedException, ExecutionException {
     initConn(0, 10, 2);
     try {
       asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
@@ -128,7 +131,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
   }
 
   @Test
-  public void testOperationTimeout() throws IOException, InterruptedException {
+  public void testOperationTimeout() throws IOException, InterruptedException, ExecutionException {
     initConn(0, 100, Integer.MAX_VALUE);
     long startNs = System.nanoTime();
     try {
@@ -177,14 +180,14 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
       void updateCachedLocation(HRegionLocation loc, Throwable exception) {
       }
     };
-    try (AsyncConnectionImpl mockedConn =
-        new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) {
+    try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(asyncConn.getConfiguration(),
+        asyncConn.registry, asyncConn.registry.getClusterId().get(), User.getCurrent()) {
 
-          @Override
-          AsyncRegionLocator getLocator() {
-            return mockedLocator;
-          }
-        }) {
+      @Override
+      AsyncRegionLocator getLocator() {
+        return mockedLocator;
+      }
+    }) {
       AsyncTable table = new AsyncTableImpl(mockedConn, TABLE_NAME);
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
       assertTrue(errorTriggered.get());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 8ba3414..e4e1033 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -73,7 +73,7 @@ public class TestAsyncTable {
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
   @AfterClass
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 840f844..3601485 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -76,13 +76,14 @@ public class TestAsyncTableNoncedRetry {
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()) {
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+    ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
+        registry.getClusterId().get(), User.getCurrent()) {
 
       @Override
       public NonceGenerator getNonceGenerator() {
         return NONCE_GENERATOR;
       }
-
     };
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
index 972780e..ff5c086 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
@@ -59,7 +59,7 @@ public class TestAsyncTableSmallScan {
     }
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
     AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
     List> futures = new ArrayList<>();
     IntStream.range(0, COUNT)
-- 
2.7.4