From 35d6954c1273e74fc81aa35732f87f6ceb37cad1 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 9 Nov 2017 11:20:18 +0800 Subject: [PATCH] HBASE-19200 Remove Registry and use AsyncRegistry directly --- .../hbase/client/ConnectionImplementation.java | 143 ++++++--------------- .../hadoop/hbase/client/ConnectionUtils.java | 17 +++ .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 61 ++------- .../org/apache/hadoop/hbase/client/Registry.java | 53 -------- .../hadoop/hbase/client/RegistryFactory.java | 50 ------- .../hadoop/hbase/client/ZKAsyncRegistry.java | 11 +- .../hadoop/hbase/client/ZooKeeperRegistry.java | 129 ------------------- .../hbase/client/DoNothingAsyncRegistry.java | 64 +++++++++ .../hadoop/hbase/client/TestAsyncProcess.java | 23 ++-- .../hadoop/hbase/client/TestBufferedMutator.java | 33 +---- .../hadoop/hbase/client/TestClientNoCluster.java | 66 +++++----- .../org/apache/hadoop/hbase/GenericTestUtils.java | 1 + 12 files changed, 184 insertions(+), 467 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 56a2e84..056fb46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -72,8 +73,6 @@ import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -206,7 +205,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { /** * Cluster registry of basic info such as clusterid and meta region location. */ - Registry registry; + private final AsyncRegistry registry; private final ClientBackoffPolicy backoffPolicy; @@ -284,7 +283,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.conf.get(BufferedMutator.CLASSNAME_KEY); try { - this.registry = setupRegistry(); + this.registry = AsyncRegistryFactory.getRegistry(conf); retrieveClusterId(); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); @@ -495,13 +494,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } /** - * @return The cluster registry implementation to use. - */ - private Registry setupRegistry() throws IOException { - return RegistryFactory.getRegistry(this); - } - - /** * For tests only. */ @VisibleForTesting @@ -523,7 +515,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (clusterId != null) { return; } - this.clusterId = this.registry.getClusterId(); + try { + this.clusterId = this.registry.getClusterId().get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("retrivev cluster id failed", e); + } if (clusterId == null) { clusterId = HConstants.CLUSTER_ID_DEFAULT; LOG.debug("clusterid came back null, using default " + clusterId); @@ -535,25 +531,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return this.conf; } - private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw) - throws MasterNotRunningException { - String errorMsg; - try { - if (ZKUtil.checkExists(zkw, zkw.znodePaths.baseZNode) == -1) { - errorMsg = "The node " + zkw.znodePaths.baseZNode+" is not in ZooKeeper. " - + "It should have been written by the master. " - + "Check the value configured in 'zookeeper.znode.parent'. " - + "There could be a mismatch with the one configured in the master."; - LOG.error(errorMsg); - throw new MasterNotRunningException(errorMsg); - } - } catch (KeeperException e) { - errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage(); - LOG.error(errorMsg); - throw new MasterNotRunningException(errorMsg, e); - } - } - /** * @return true if the master is running, throws an exception otherwise * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running @@ -772,7 +749,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } // Look up from zookeeper - locations = this.registry.getMetaRegionLocation(); + locations = ConnectionUtils.get(this.registry.getMetaRegionLocation()); if (locations != null) { cacheLocation(tableName, locations); } @@ -1124,37 +1101,25 @@ class ConnectionImplementation implements ClusterConnection, Closeable { */ private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() throws IOException, KeeperException { - ZooKeeperKeepAliveConnection zkw; - try { - zkw = getKeepAliveZooKeeperWatcher(); - } catch (IOException e) { - ExceptionUtil.rethrowIfInterrupt(e); - throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); - } - try { - checkIfBaseNodeAvailable(zkw); - ServerName sn = MasterAddressTracker.getMasterAddress(zkw); - if (sn == null) { - String msg = "ZooKeeper available but no active master location found"; - LOG.info(msg); - throw new MasterNotRunningException(msg); - } - if (isDeadServer(sn)) { - throw new MasterNotRunningException(sn + " is dead."); - } - // Use the security info interface name as our stub key - String key = getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, - hostnamesCanChange); - MasterProtos.MasterService.BlockingInterface stub = - (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { - BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); - return MasterProtos.MasterService.newBlockingStub(channel); - }); - isMasterRunning(stub); - return stub; - } finally { - zkw.close(); - } + ServerName sn = ConnectionUtils.get(registry.getMasterAddress()); + if (sn == null) { + String msg = "ZooKeeper available but no active master location found"; + LOG.info(msg); + throw new MasterNotRunningException(msg); + } + if (isDeadServer(sn)) { + throw new MasterNotRunningException(sn + " is dead."); + } + // Use the security info interface name as our stub key + String key = + getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange); + MasterProtos.MasterService.BlockingInterface stub = + (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); + return MasterProtos.MasterService.newBlockingStub(channel); + }); + isMasterRunning(stub); + return stub; } /** @@ -1215,26 +1180,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private ZooKeeperKeepAliveConnection keepAliveZookeeper; private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0); - /** - * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it. - * @return The shared instance. Never returns null. - */ - ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher() - throws IOException { - synchronized (masterAndZKLock) { - if (keepAliveZookeeper == null) { - if (this.closed) { - throw new IOException(toString() + " closed"); - } - // We don't check that our link to ZooKeeper is still valid - // But there is a retry mechanism in the ZooKeeperWatcher itself - keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this); - } - keepAliveZookeeperUserCount.addAndGet(1); - return keepAliveZookeeper; - } - } - void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) { if (zkw == null){ return; @@ -1947,26 +1892,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { @Override public void abort(final String msg, Throwable t) { - if (t instanceof KeeperException.SessionExpiredException - && keepAliveZookeeper != null) { - synchronized (masterAndZKLock) { - if (keepAliveZookeeper != null) { - LOG.warn("This client just lost it's session with ZooKeeper," + - " closing it." + - " It will be recreated next time someone needs it", t); - closeZooKeeperWatcher(); - } - } + if (t != null) { + LOG.fatal(msg, t); } else { - if (t != null) { - LOG.fatal(msg, t); - } else { - LOG.fatal(msg); - } - this.aborted = true; - close(); - this.closed = true; + LOG.fatal(msg); } + this.aborted = true; + close(); + this.closed = true; } @Override @@ -1981,7 +1914,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { @Override public int getCurrentNrHRS() throws IOException { - return this.registry.getCurrentNrHRS(); + return ConnectionUtils.get(this.registry.getCurrentNrHRS()); } @Override @@ -1995,7 +1928,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.metrics.shutdown(); } this.closed = true; - closeZooKeeperWatcher(); + registry.close(); this.stubs.clear(); if (clusterStatusListener != null) { clusterStatusListener.close(); @@ -2061,4 +1994,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { public RpcControllerFactory getRpcControllerFactory() { return this.rpcControllerFactory; } + + AsyncRegistry getRegistry() { + return registry; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index bc0ade2..e2ebc15 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -21,16 +21,20 @@ import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import com.google.common.base.Throwables; + import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -489,4 +493,17 @@ public final class ConnectionUtils { } scanMetrics.countOfRegions.incrementAndGet(); } + + static T get(CompletableFuture future) throws IOException { + try { + return future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw (IOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, IOException.class); + throw new IOException(cause); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 0c66faf..cd4e566 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -18,6 +18,10 @@ */ package org.apache.hadoop.hbase.client; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -97,14 +101,12 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.apache.zookeeper.KeeperException; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; @@ -206,10 +208,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; - /** * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that * this is an HBase-internal class as defined in @@ -2346,32 +2344,14 @@ public class HBaseAdmin implements Admin { } /** - * Check to see if HBase is running. Throw an exception if not. - * @param conf system configuration - * @throws MasterNotRunningException if the master is not running - * @throws ZooKeeperConnectionException if unable to connect to zookeeper - * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't want to have - * protobuf as part of our public API. Use {@link #available(Configuration)} - */ - // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not. - // MOB uses it too. - // NOTE: hbase-2.0.0 removes ServiceException from the throw. - @Deprecated - public static void checkHBaseAvailable(Configuration conf) - throws MasterNotRunningException, ZooKeeperConnectionException, IOException, - com.google.protobuf.ServiceException { - available(conf); - } - - /** * Is HBase available? Throw an exception if not. * @param conf system configuration * @throws MasterNotRunningException if the master is not running. - * @throws ZooKeeperConnectionException if unable to connect to zookeeper. - * // TODO do not expose ZKConnectionException. + * @throws ZooKeeperConnectionException if unable to connect to zookeeper. // TODO do not expose + * ZKConnectionException. */ public static void available(final Configuration conf) - throws MasterNotRunningException, ZooKeeperConnectionException, IOException { + throws MasterNotRunningException, ZooKeeperConnectionException, IOException { Configuration copyOfConf = HBaseConfiguration.create(conf); // We set it to make it fail as soon as possible if HBase is not available copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); @@ -2381,26 +2361,6 @@ public class HBaseAdmin implements Admin { // If the connection exists, we may have a connection to ZK that does not work anymore try (ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(copyOfConf)) { - // Check ZK first. - // If the connection exists, we may have a connection to ZK that does not work anymore - ZooKeeperKeepAliveConnection zkw = null; - try { - // This is NASTY. FIX!!!! Dependent on internal implementation! TODO - zkw = ((ConnectionImplementation) connection) - .getKeepAliveZooKeeperWatcher(); - zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.znodePaths.baseZNode, false); - } catch (IOException e) { - throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); - } catch (InterruptedException e) { - throw (InterruptedIOException) - new InterruptedIOException("Can't connect to ZooKeeper").initCause(e); - } catch (KeeperException e){ - throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); - } finally { - if (zkw != null) { - zkw.close(); - } - } // can throw MasterNotRunningException connection.isMasterRunning(); } @@ -3235,12 +3195,7 @@ public class HBaseAdmin implements Admin { private ServerName getMasterAddress() throws IOException { // TODO: Fix! Reaching into internal implementation!!!! ConnectionImplementation connection = (ConnectionImplementation)this.connection; - ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher(); - try { - return MasterAddressTracker.getMasterAddress(zkw); - } catch (KeeperException e) { - throw new IOException("Failed to get master server name from MasterAddressTracker", e); - } + return ConnectionUtils.get(connection.getRegistry().getMasterAddress()); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java deleted file mode 100644 index 4d0527a..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Registry.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; - -import org.apache.hadoop.hbase.RegionLocations; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Cluster registry. - * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc. - * Internal use only. - */ -@InterfaceAudience.Private -interface Registry { - /** - * @param connection - */ - void init(Connection connection); - - /** - * @return Meta region location - * @throws IOException - */ - RegionLocations getMetaRegionLocation() throws IOException; - - /** - * @return Cluster id. - */ - String getClusterId(); - - /** - * @return Count of 'running' regionservers - * @throws IOException - */ - int getCurrentNrHRS() throws IOException; -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java deleted file mode 100644 index 7b2ac0b..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; - -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Get instance of configured Registry. - */ -@InterfaceAudience.Private -final class RegistryFactory { - static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; - - private RegistryFactory() {} - - /** - * @return The cluster registry implementation to use. - * @throws IOException - */ - static Registry getRegistry(final Connection connection) - throws IOException { - String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY, - ZooKeeperRegistry.class.getName()); - Registry registry = null; - try { - registry = (Registry)Class.forName(registryClass).newInstance(); - } catch (Throwable t) { - throw new IOException(t); - } - registry.init(connection); - return registry; - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java index 259b665..91ca28b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; -import static org.apache.hadoop.hbase.HRegionInfo.DEFAULT_REPLICA_ID; -import static org.apache.hadoop.hbase.HRegionInfo.FIRST_META_REGIONINFO; +import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID; +import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO; import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; @@ -42,17 +42,18 @@ import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.data.Stat; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; + /** * Fetch the registry data from zookeeper. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java deleted file mode 100644 index 746382f..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperRegistry.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -/** - * A cluster registry that stores to zookeeper. - */ -@InterfaceAudience.Private -class ZooKeeperRegistry implements Registry { - private static final Log LOG = LogFactory.getLog(ZooKeeperRegistry.class); - // Needs an instance of hci to function. Set after construct this instance. - ConnectionImplementation hci; - - @Override - public void init(Connection connection) { - if (!(connection instanceof ConnectionImplementation)) { - throw new RuntimeException("This registry depends on ConnectionImplementation"); - } - this.hci = (ConnectionImplementation)connection; - } - - @Override - public RegionLocations getMetaRegionLocation() throws IOException { - ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher(); - try { - if (LOG.isTraceEnabled()) { - LOG.trace("Looking up meta region location in ZK," + " connection=" + this); - } - List servers = new MetaTableLocator().blockUntilAvailable(zkw, hci.rpcTimeout, - hci.getConfiguration()); - if (LOG.isTraceEnabled()) { - if (servers == null) { - LOG.trace("Looked up meta region location, connection=" + this + - "; servers = null"); - } else { - StringBuilder str = new StringBuilder(); - for (ServerName s : servers) { - str.append(s.toString()); - str.append(" "); - } - LOG.trace("Looked up meta region location, connection=" + this + - "; servers = " + str.toString()); - } - } - if (servers == null) return null; - HRegionLocation[] locs = new HRegionLocation[servers.size()]; - int i = 0; - for (ServerName server : servers) { - RegionInfo h = RegionReplicaUtil.getRegionInfoForReplica( - RegionInfoBuilder.FIRST_META_REGIONINFO, i); - if (server == null) locs[i++] = null; - else locs[i++] = new HRegionLocation(h, server, 0); - } - return new RegionLocations(locs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } finally { - zkw.close(); - } - } - - private String clusterId = null; - - @Override - public String getClusterId() { - if (this.clusterId != null) return this.clusterId; - // No synchronized here, worse case we will retrieve it twice, that's - // not an issue. - ZooKeeperKeepAliveConnection zkw = null; - try { - zkw = hci.getKeepAliveZooKeeperWatcher(); - this.clusterId = ZKClusterId.readClusterIdZNode(zkw); - if (this.clusterId == null) { - LOG.info("ClusterId read in ZooKeeper is null"); - } - } catch (KeeperException e) { - LOG.warn("Can't retrieve clusterId from ZooKeeper", e); - } catch (IOException e) { - LOG.warn("Can't retrieve clusterId from ZooKeeper", e); - } finally { - if (zkw != null) zkw.close(); - } - return this.clusterId; - } - - @Override - public int getCurrentNrHRS() throws IOException { - ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher(); - try { - // We go to zk rather than to master to get count of regions to avoid - // HTable having a Master dependency. See HBase-2828 - return ZKUtil.getNumberOfChildren(zkw, zkw.znodePaths.rsZNode); - } catch (KeeperException ke) { - throw new IOException("Unexpected ZooKeeper exception", ke); - } finally { - zkw.close(); - } - } -} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java new file mode 100644 index 0000000..6633068 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Registry that does nothing. Otherwise, default Registry wants zookeeper up and running. + */ +@InterfaceAudience.Private +class DoNothingAsyncRegistry implements AsyncRegistry { + + public DoNothingAsyncRegistry(Configuration conf) { + } + + @Override + public CompletableFuture getMetaRegionLocation() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture getClusterId() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture getCurrentNrHRS() { + return CompletableFuture.completedFuture(0); + } + + @Override + public CompletableFuture getMasterAddress() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture getMasterInfoPort() { + return CompletableFuture.completedFuture(0); + } + + @Override + public void close() { + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index a0f18f4..9003a9b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -37,6 +37,7 @@ import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -456,23 +457,24 @@ public class TestAsyncProcess { * Returns our async process. */ static class MyConnectionImpl extends ConnectionImplementation { - public static class TestRegistry implements Registry { - @Override - public void init(Connection connection) {} + public static class TestRegistry extends DoNothingAsyncRegistry { + + public TestRegistry(Configuration conf) { + super(conf); + } @Override - public RegionLocations getMetaRegionLocation() throws IOException { - return null; + public CompletableFuture getClusterId() { + return CompletableFuture.completedFuture("testClusterId"); } @Override - public String getClusterId() { - return "testClusterId"; + public CompletableFuture getCurrentNrHRS() { + return CompletableFuture.completedFuture(1); } @Override - public int getCurrentNrHRS() throws IOException { - return 1; + public void close() { } } @@ -483,7 +485,8 @@ public class TestAsyncProcess { } private static Configuration setupConf(Configuration conf) { - conf.setClass(RegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, Registry.class); + conf.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, + AsyncRegistry.class); return conf; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java index d1a3eb9..50befce 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java @@ -17,14 +17,12 @@ */ package org.apache.hadoop.hbase.client; - import static org.junit.Assert.assertTrue; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -40,35 +38,6 @@ public class TestBufferedMutator { public TestName name = new TestName(); /** - * Registry that does nothing. - * Otherwise, default Registry wants zookeeper up and running. - */ - public static class DoNothingRegistry implements Registry { - @Override - public void init(Connection connection) { - // TODO Auto-generated method stub - } - - @Override - public RegionLocations getMetaRegionLocation() throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getClusterId() { - // TODO Auto-generated method stub - return null; - } - - @Override - public int getCurrentNrHRS() throws IOException { - // TODO Auto-generated method stub - return 0; - } - } - - /** * My BufferedMutator. * Just to prove that I can insert a BM other than default. */ @@ -83,7 +52,7 @@ public class TestBufferedMutator { public void testAlternateBufferedMutatorImpl() throws IOException { BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(name.getMethodName())); Configuration conf = HBaseConfiguration.create(); - conf.set(RegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingRegistry.class.getName()); + conf.set(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingAsyncRegistry.class.getName()); try (Connection connection = ConnectionFactory.createConnection(conf)) { BufferedMutator bm = connection.getBufferedMutator(params); // Assert we get default BM if nothing specified. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index 0f11156..9f32976 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -27,18 +27,20 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.SortedMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.CellComparatorImpl; -import org.apache.hadoop.hbase.DoNotRetryIOException; + import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -49,6 +51,26 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Stopwatch; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; @@ -73,26 +95,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrEx import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; - -import org.apache.hadoop.hbase.shaded.com.google.common.base.Stopwatch; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; /** * Test client behavior w/o setting up a cluster. @@ -117,27 +119,27 @@ public class TestClientNoCluster extends Configured implements Tool { /** * Simple cluster registry inserted in place of our usual zookeeper based one. */ - static class SimpleRegistry implements Registry { + static class SimpleRegistry extends DoNothingAsyncRegistry { final ServerName META_HOST = META_SERVERNAME; - @Override - public void init(Connection connection) { + public SimpleRegistry(Configuration conf) { + super(conf); } @Override - public RegionLocations getMetaRegionLocation() throws IOException { - return new RegionLocations( - new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, META_HOST)); + public CompletableFuture getMetaRegionLocation() { + return CompletableFuture.completedFuture(new RegionLocations( + new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST))); } @Override - public String getClusterId() { - return HConstants.CLUSTER_ID_DEFAULT; + public CompletableFuture getClusterId() { + return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT); } @Override - public int getCurrentNrHRS() throws IOException { - return 1; + public CompletableFuture getCurrentNrHRS() { + return CompletableFuture.completedFuture(1); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/GenericTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/GenericTestUtils.java index 08565e0..56be657 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/GenericTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/GenericTestUtils.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; /** * Test provides some very generic helpers which might be used across the tests + * *** for running UTs in hbase-server */ public abstract class GenericTestUtils { -- 2.7.4