From be4602dba2b62221afb731f680dc99738bf0664c Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 2 Nov 2016 22:13:23 +0800 Subject: [PATCH] HBASE-16991 Make the initialization of AsyncConnection asynchronous --- .../hadoop/hbase/client/AsyncConnectionImpl.java | 19 +-- .../apache/hadoop/hbase/client/AsyncRegistry.java | 5 +- .../hadoop/hbase/client/AsyncRegistryFactory.java | 43 +++++++ .../hbase/client/ClusterRegistryFactory.java | 43 ------- .../hadoop/hbase/client/ConnectionFactory.java | 131 +++++++++++---------- .../hbase/client/SingletonAsyncConnection.java | 93 +++++++++++++++ .../hadoop/hbase/client/ZKAsyncRegistry.java | 43 ++++--- .../hbase/client/TestAsyncGetMultiThread.java | 6 +- .../hbase/client/TestAsyncRegionLocator.java | 4 +- .../TestAsyncSingleRequestRpcRetryingCaller.java | 25 ++-- .../apache/hadoop/hbase/client/TestAsyncTable.java | 2 +- .../hbase/client/TestAsyncTableNoncedRetry.java | 5 +- .../hbase/client/TestAsyncTableSmallScan.java | 2 +- .../hbase/client/TestSingletonAsyncConnection.java | 82 +++++++++++++ 14 files changed, 337 insertions(+), 166 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingletonAsyncConnection.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSingletonAsyncConnection.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 6cad6a2..4366d12 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; @@ -27,14 +26,11 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLE import io.netty.util.HashedWheelTimer; import java.io.IOException; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -53,8 +49,6 @@ import org.apache.hadoop.hbase.util.Threads; @InterfaceAudience.Private class AsyncConnectionImpl implements AsyncConnection { - private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class); - private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); @@ -68,8 +62,6 @@ class AsyncConnectionImpl implements AsyncConnection { final AsyncRegistry registry; - private final String clusterId; - private final int rpcTimeout; private final RpcClient rpcClient; @@ -87,18 +79,13 @@ class AsyncConnectionImpl implements AsyncConnection { private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); @SuppressWarnings("deprecation") - public AsyncConnectionImpl(Configuration conf, User user) { + public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, + User user) { this.conf = conf; this.user = user; this.connConf = new AsyncConnectionConfiguration(conf); this.locator = new AsyncRegionLocator(this); - this.registry = ClusterRegistryFactory.getRegistry(conf); - this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { - if (LOG.isDebugEnabled()) { - LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT); - } - return CLUSTER_ID_DEFAULT; - }); + this.registry = registry; this.rpcClient = RpcClientFactory.createClient(conf, clusterId); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java index 731cf09..4570043 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java @@ -28,9 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc.. * All stuffs that may be related to zookeeper at client side are placed here. *

- * Most methods are executed asynchronously except getClusterId. It will be executed synchronously - * and should be called only once when initialization. - *

* Internal use only. */ @InterfaceAudience.Private @@ -46,7 +43,7 @@ interface AsyncRegistry extends Closeable { *

* The upper layer should store this value somewhere as it will not be change any more. */ - String getClusterId(); + CompletableFuture getClusterId(); /** * Get the number of 'running' regionservers. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java new file mode 100644 index 0000000..2fc3322 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * Get instance of configured Registry. + */ +@InterfaceAudience.Private +final class AsyncRegistryFactory { + + static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + + private AsyncRegistryFactory() { + } + + /** + * @return The cluster registry implementation to use. + */ + static AsyncRegistry getRegistry(Configuration conf) { + Class clazz = + conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class); + return ReflectionUtils.newInstance(clazz, conf); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java deleted file mode 100644 index 48bfb18..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ReflectionUtils; - -/** - * Get instance of configured Registry. - */ -@InterfaceAudience.Private -final class ClusterRegistryFactory { - - static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; - - private ClusterRegistryFactory() { - } - - /** - * @return The cluster registry implementation to use. - */ - static AsyncRegistry getRegistry(Configuration conf) { - Class clazz = - conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class); - return ReflectionUtils.newInstance(clazz, conf); - } -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index ca34211..7cbcc20 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.lang.reflect.Constructor; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; @@ -30,13 +31,12 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.ReflectionUtils; - /** - * A non-instantiable class that manages creation of {@link Connection}s. - * Managing the lifecycle of the {@link Connection}s to the cluster is the responsibility of - * the caller. - * From a {@link Connection}, {@link Table} implementations are retrieved - * with {@link Connection#getTable(TableName)}. Example: + * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of + * the {@link Connection}s to the cluster is the responsibility of the caller. From a + * {@link Connection}, {@link Table} implementations are retrieved with + * {@link Connection#getTable(TableName)}. Example: + * *

  * Connection connection = ConnectionFactory.createConnection(config);
  * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
  *
  * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
  * implementations.
- *
  * @see Connection
  * @since 0.99.0
  */
@@ -58,23 +57,20 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 @InterfaceStability.Evolving
 public class ConnectionFactory {
 
-  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
-      "hbase.client.async.connection.impl";
+  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl";
 
   /** No public c.tors */
   protected ConnectionFactory() {
   }
 
   /**
-   * Create a new Connection instance using default HBaseConfiguration. Connection
-   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all + * housekeeping for a connection to the cluster. All tables and interfaces created from returned + * connection share zookeeper connection, meta cache, and connections to region servers and + * masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection();
    * Table table = connection.getTable(TableName.valueOf("mytable"));
@@ -96,13 +92,11 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("mytable"));
@@ -125,13 +119,11 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("mytable"));
@@ -156,13 +148,11 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -179,21 +169,18 @@ public class ConnectionFactory {
    * @param user the user the connection is for
    * @return Connection object for conf
    */
-  public static Connection createConnection(Configuration conf, User user)
-  throws IOException {
+  public static Connection createConnection(Configuration conf, User user) throws IOException {
     return createConnection(conf, null, user);
   }
 
   /**
    * Create a new Connection instance using the passed conf instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
-   * created from returned connection share zookeeper connection, meta cache, and connections
-   * to region servers and masters.
-   * 
- * The caller is responsible for calling {@link Connection#close()} on the returned - * connection instance. + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: * - * Typical usage: *
    * Connection connection = ConnectionFactory.createConnection(conf);
    * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -212,7 +199,7 @@ public class ConnectionFactory {
    * @return Connection object for conf
    */
   public static Connection createConnection(Configuration conf, ExecutorService pool, User user)
-  throws IOException {
+      throws IOException {
     if (user == null) {
       UserProvider provider = UserProvider.instantiate(conf);
       user = provider.getCurrent();
@@ -228,9 +215,8 @@ public class ConnectionFactory {
     }
     try {
       // Default HCM#HCI is not accessible; make it so before invoking.
-      Constructor constructor =
-        clazz.getDeclaredConstructor(Configuration.class,
-          ExecutorService.class, User.class);
+      Constructor constructor = clazz.getDeclaredConstructor(Configuration.class,
+        ExecutorService.class, User.class);
       constructor.setAccessible(true);
       return (Connection) constructor.newInstance(conf, pool, user);
     } catch (Exception e) {
@@ -241,9 +227,9 @@ public class ConnectionFactory {
   /**
    * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
    * @see #createAsyncConnection(Configuration)
-   * @return AsyncConnection object
+   * @return AsyncConnection object wrapped by CompletableFuture
    */
-  public static AsyncConnection createAsyncConnection() throws IOException {
+  public static CompletableFuture createAsyncConnection() {
     return createAsyncConnection(HBaseConfiguration.create());
   }
 
@@ -252,12 +238,20 @@ public class ConnectionFactory {
    * User object created by {@link UserProvider}. The given {@code conf} will also be used to
    * initialize the {@link UserProvider}.
    * @param conf configuration
-   * @return AsyncConnection object
+   * @return AsyncConnection object wrapped by CompletableFuture
    * @see #createAsyncConnection(Configuration, User)
    * @see UserProvider
    */
-  public static AsyncConnection createAsyncConnection(Configuration conf) throws IOException {
-    return createAsyncConnection(conf, UserProvider.instantiate(conf).getCurrent());
+  public static CompletableFuture createAsyncConnection(Configuration conf) {
+    User user;
+    try {
+      user = UserProvider.instantiate(conf).getCurrent();
+    } catch (IOException e) {
+      CompletableFuture future = new CompletableFuture<>();
+      future.completeExceptionally(e);
+      return future;
+    }
+    return createAsyncConnection(conf, user);
   }
 
   /**
@@ -273,17 +267,30 @@ public class ConnectionFactory {
    * as it is thread safe.
    * @param conf configuration
    * @param user the user the asynchronous connection is for
-   * @return AsyncConnection object
+   * @return AsyncConnection object wrapped by CompletableFuture
    * @throws IOException
    */
-  public static AsyncConnection createAsyncConnection(Configuration conf, User user)
-      throws IOException {
-    Class clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
-      AsyncConnectionImpl.class, AsyncConnection.class);
-    try {
-      return ReflectionUtils.newInstance(clazz, conf, user);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+  public static CompletableFuture createAsyncConnection(Configuration conf,
+      User user) {
+    CompletableFuture future = new CompletableFuture<>();
+    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
+    registry.getClusterId().whenComplete((clusterId, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      if (clusterId == null) {
+        future.completeExceptionally(new IOException("clusterid came back null"));
+        return;
+      }
+      Class clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
+        AsyncConnectionImpl.class, AsyncConnection.class);
+      try {
+        future.complete(ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user));
+      } catch (Exception e) {
+        future.completeExceptionally(e);
+      }
+    });
+    return future;
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingletonAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingletonAsyncConnection.java
new file mode 100644
index 0000000..85e9ae6
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingletonAsyncConnection.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A holder class that holds an AsyncConnection instance.
+ * 

+ * The creation of this instance will be completed immediately without blocking so it is useful for + * writing fully asynchronous code. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class SingletonAsyncConnection { + + private final Configuration conf; + + private final AtomicReference> future = + new AtomicReference<>(); + + public SingletonAsyncConnection(Configuration conf) { + this.conf = conf; + } + + /** + * The AsyncConnection gotten from the returned CompletableFuture will always be the same + * instance. + */ + public CompletableFuture get() { + CompletableFuture f = future.get(); + if (f != null) { + return f; + } + for (;;) { + if (future.compareAndSet(null, new CompletableFuture<>())) { + CompletableFuture toComplete = future.get(); + ConnectionFactory.createAsyncConnection(conf).whenComplete((conn, error) -> { + if (error != null) { + toComplete.completeExceptionally(error); + // we need to reset the future holder so we will get a chance to recreate an async + // connection at next try. + future.set(null); + return; + } + toComplete.complete(conn); + }); + return toComplete; + } else { + f = future.get(); + if (f != null) { + return f; + } + } + } + } + + public CompletableFuture close() { + CompletableFuture f = future.get(); + if (f == null) { + return CompletableFuture.completedFuture(null); + } + CompletableFuture closeFuture = new CompletableFuture<>(); + f.whenComplete((conn, error) -> { + if (error == null) { + IOUtils.closeQuietly(conn); + } + closeFuture.complete(null); + }); + return closeFuture; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java index c76aa3e..47b68e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; @@ -53,8 +54,7 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.zookeeper.data.Stat; /** - * Cache the cluster registry data in memory and use zk watcher to update. The only exception is - * {@link #getClusterId()}, it will fetch the data from zk directly. + * Fetch the registry data from zookeeper. */ @InterfaceAudience.Private class ZKAsyncRegistry implements AsyncRegistry { @@ -79,26 +79,6 @@ class ZKAsyncRegistry implements AsyncRegistry { this.zk.start(); } - @Override - public String getClusterId() { - try { - byte[] data = zk.getData().forPath(znodePaths.clusterIdZNode); - if (data == null || data.length == 0) { - return null; - } - data = removeMetaData(data); - return ClusterId.parseFrom(data).toString(); - } catch (Exception e) { - LOG.warn("failed to get cluster id", e); - return null; - } - } - - @Override - public void close() { - zk.close(); - } - private interface CuratorEventProcessor { T process(CuratorEvent event) throws Exception; } @@ -120,6 +100,20 @@ class ZKAsyncRegistry implements AsyncRegistry { return future; } + private static String getClusterId(CuratorEvent event) throws DeserializationException { + byte[] data = event.getData(); + if (data == null || data.length == 0) { + return null; + } + data = removeMetaData(data); + return ClusterId.parseFrom(data).toString(); + } + + @Override + public CompletableFuture getClusterId() { + return exec(zk.getData(), znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId); + } + private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event) throws IOException { byte[] data = event.getData(); @@ -249,4 +243,9 @@ class ZKAsyncRegistry implements AsyncRegistry { return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) .thenApply(proto -> proto != null ? proto.getInfoPort() : 0); } + + @Override + public void close() { + zk.close(); + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java index b20e616..d53cadf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java @@ -86,7 +86,7 @@ public class TestAsyncGetMultiThread { } TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); AsyncTable table = CONN.getTable(TABLE_NAME); List> futures = new ArrayList<>(); IntStream.range(0, COUNT) @@ -114,8 +114,8 @@ public class TestAsyncGetMultiThread { public void test() throws IOException, InterruptedException, ExecutionException { int numThreads = 20; AtomicBoolean stop = new AtomicBoolean(false); - ExecutorService executor = - Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-")); + ExecutorService executor = Executors.newFixedThreadPool(numThreads, + Threads.newDaemonThreadFactory("TestAsyncGet-")); List> futures = new ArrayList<>(); IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> { run(stop); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java index 2e46d8a..e17c3c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java @@ -68,7 +68,9 @@ public class TestAsyncRegionLocator { public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(3); TEST_UTIL.getAdmin().setBalancerRunning(false, true); - CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()); + AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, + registry.getClusterId().get(), User.getCurrent()); LOCATOR = CONN.getLocator(); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index 67d2661..563048c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -83,12 +83,15 @@ public class TestAsyncSingleRequestRpcRetryingCaller { } } - private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) throws IOException { + private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) + throws IOException, InterruptedException, ExecutionException { Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt); conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires); - asyncConn = new AsyncConnectionImpl(conf, User.getCurrent()); + AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); + asyncConn = new AsyncConnectionImpl(conf, registry, registry.getClusterId().get(), + User.getCurrent()); } @Test @@ -116,7 +119,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { } @Test - public void testMaxRetries() throws IOException, InterruptedException { + public void testMaxRetries() throws IOException, InterruptedException, ExecutionException { initConn(0, 10, 2); try { asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS) @@ -128,7 +131,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { } @Test - public void testOperationTimeout() throws IOException, InterruptedException { + public void testOperationTimeout() throws IOException, InterruptedException, ExecutionException { initConn(0, 100, Integer.MAX_VALUE); long startNs = System.nanoTime(); try { @@ -177,14 +180,14 @@ public class TestAsyncSingleRequestRpcRetryingCaller { void updateCachedLocation(HRegionLocation loc, Throwable exception) { } }; - try (AsyncConnectionImpl mockedConn = - new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) { + try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(asyncConn.getConfiguration(), + asyncConn.registry, asyncConn.registry.getClusterId().get(), User.getCurrent()) { - @Override - AsyncRegionLocator getLocator() { - return mockedLocator; - } - }) { + @Override + AsyncRegionLocator getLocator() { + return mockedLocator; + } + }) { AsyncTable table = new AsyncTableImpl(mockedConn, TABLE_NAME); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); assertTrue(errorTriggered.get()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 8ba3414..e4e1033 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -73,7 +73,7 @@ public class TestAsyncTable { TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index 840f844..3601485 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -76,13 +76,14 @@ public class TestAsyncTableNoncedRetry { TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()) { + AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, + registry.getClusterId().get(), User.getCurrent()) { @Override public NonceGenerator getNonceGenerator() { return NONCE_GENERATOR; } - }; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java index 972780e..ff5c086 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java @@ -59,7 +59,7 @@ public class TestAsyncTableSmallScan { } TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); TEST_UTIL.waitTableAvailable(TABLE_NAME); - ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); List> futures = new ArrayList<>(); IntStream.range(0, COUNT) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSingletonAsyncConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSingletonAsyncConnection.java new file mode 100644 index 0000000..d24f65f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSingletonAsyncConnection.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; + +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestSingletonAsyncConnection { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private static SingletonAsyncConnection CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + CONN = new SingletonAsyncConnection(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close().get(); + TEST_UTIL.shutdownMiniCluster(); + } + + private CountDownLatch latch; + + private void test(AsyncConnection conn) { + conn.getTable(TABLE_NAME).put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> { + conn.getTable(TABLE_NAME).get(new Get(ROW)).thenAccept(r -> { + assertArrayEquals(VALUE, r.getValue(FAMILY, QUALIFIER)); + latch.countDown(); + }); + }); + } + + @Test + public void test() throws InterruptedException { + latch = new CountDownLatch(1); + CONN.get().thenAccept(this::test); + latch.await(); + } +} -- 1.9.1