From c7f946fe24ca3e9a75f0572c09476eef4180887e Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 28 Oct 2019 23:26:13 -0700 Subject: [PATCH] HBASE-18095: Zookeeper-less client connection implementation Generally when a HBaseClient tries to create a cluster Connection, it fetches a bunch of metadata from Zookeeper (like active master address, clusterID, meta locations etc) before it creates the underlying transport. However exposing ZK to all the clients is a DDOS risk and ZK connections in the past have caused issues by not timing out on blocking RPCs (more context in the JIRA). This patch attempts to remove this ZK dependency by making the client fetch all the meta information directly from list of client configured HMaster endpoints. The patch adds a a new AsyncRegistry implementation that encapsulates this logic of fetching this meta information from the provided master end points. New RPCs are added to the HMasters to help fetch this information. Meta HRL caching: ---------------- One critical piece of metadata needed by clients to query tables is meta HRegionLocations. These are fetched from ZK by default. Since this patch moves away from ZK, it adds an in-memory cache of these locations on both Active/StandBy HMasters. ZK Listeners are registered to keep the cache up-to-date. New client configs: ------------------ - 'hbase.client.asyncregistry.masteraddrs' Should be set to a list of comma separated HMaster host:port addresses. - Should be used in conjunction with 'hbase.client.registry.impl' set to HMasterAsyncRegistry class. * Testing is still a WIP. Error paths are missing (ex: a master is not accessible etc). --- .../MetaRegionsNotAvailableException.java | 35 +++ .../hbase/client/AsyncConnectionImpl.java | 3 + .../hadoop/hbase/client/AsyncRegistry.java | 11 - .../hbase/client/AsyncRegistryFactory.java | 2 +- .../hbase/client/HMasterAsyncRegistry.java | 230 ++++++++++++++++++ .../hadoop/hbase/client/ZKAsyncRegistry.java | 11 - .../hbase/shaded/protobuf/ProtobufUtil.java | 1 + .../hadoop/hbase/zookeeper/ZNodePaths.java | 11 + .../hbase/client/DoNothingAsyncRegistry.java | 10 - .../src/main/protobuf/Master.proto | 43 ++++ .../apache/hadoop/hbase/master/HMaster.java | 7 + .../hbase/master/MasterRpcServices.java | 53 ++++ .../hbase/master/MetaRegionLocationCache.java | 167 +++++++++++++ .../hadoop/hbase/HBaseTestingUtility.java | 24 ++ .../hbase/client/DummyAsyncRegistry.java | 10 - .../client/TestHMasterAsyncRegistry.java | 120 +++++++++ .../hbase/client/TestZKAsyncRegistry.java | 3 - .../master/TestHMasterAsyncRegistryRPCs.java | 201 +++++++++++++++ 18 files changed, 896 insertions(+), 46 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java new file mode 100644 index 0000000000..add2edb7e8 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.yetus.audience.InterfaceAudience; + +import java.io.IOException; + +/** + * Thrown by master when meta region locations are not cached whatever reason. + * Client is expected to retry when running into this. + */ +@InterfaceAudience.Private +public class MetaRegionsNotAvailableException extends IOException { + private static final long serialVersionUID = (1L << 14) - 1L; + + public MetaRegionsNotAvailableException(String msg) { + super(msg); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 78fad9ea23..7df16fddb0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -394,4 +394,7 @@ class AsyncConnectionImpl implements AsyncConnection { Optional getConnectionMetrics() { return metrics; } + + @VisibleForTesting + AsyncRegistry getRegistry() { return registry; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java index 96329dcfb8..e50a9f5b4c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java @@ -26,7 +26,6 @@ import org.apache.yetus.audience.InterfaceAudience; /** * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc.. - * All stuffs that may be related to zookeeper at client side are placed here. *

* Internal use only. */ @@ -45,21 +44,11 @@ interface AsyncRegistry extends Closeable { */ CompletableFuture getClusterId(); - /** - * Get the number of 'running' regionservers. - */ - CompletableFuture getCurrentNrHRS(); - /** * Get the address of HMaster. */ CompletableFuture getMasterAddress(); - /** - * Get the info port of HMaster. - */ - CompletableFuture getMasterInfoPort(); - /** * Closes this instance and releases any system resources associated with it */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java index 28726ae5dd..aebb1de5a4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; @InterfaceAudience.Private final class AsyncRegistryFactory { - static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + public static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; private AsyncRegistryFactory() { } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java new file mode 100644 index 0000000000..c24a889354 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MetaRegionsNotAvailableException; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; + + +/** + * Fetches the meta information directly from HMaster by making relevant RPCs. HMaster RPC end points are looked up + * via client configuration 'hbase.client.asyncregistry.masteraddrs' as comma separated list of : + * values. Should be set in conjunction with 'hbase.client.registry.impl' set to this class impl. + * + * The class does not cache anything. It is the responsibility of the callers to cache and avoid repeated requests. + */ +@InterfaceAudience.Private +public class HMasterAsyncRegistry implements AsyncRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ZKAsyncRegistry.class); + public static final String CONF_KEY = "hbase.client.asyncregistry.masteraddrs"; + public static final String DEFAULT_HOST_PORT = "localhost:" + HConstants.DEFAULT_MASTER_PORT; + + // Parsed list of host and ports for masters from hbase-site.xml + private final List masterServers; + final Configuration conf; + // RPC client used to talk to the master servers. This uses a stand alone RpcClient instance because + // AsyncRegistry is created prior to creating a cluster Connection. The client is torn down in close(). + final RpcClient rpcClient; + final int rpcTimeout; + + public HMasterAsyncRegistry(Configuration config) { + masterServers = new ArrayList<>(); + conf = config; + parseHortPorts(); + // Passing the default cluster ID means that the token based authentication does not work for certain client + // implementations. + // TODO(bharathv): Figure out a way to fetch the CLUSTER ID using a non authenticated way. + rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); + rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, + DEFAULT_HBASE_RPC_TIMEOUT))); + } + + private void parseHortPorts() { + String hostPorts = conf.get(CONF_KEY, DEFAULT_HOST_PORT); + for (String hostPort: hostPorts.split(",")) { + masterServers.add(ServerName.valueOf(hostPort, ServerName.NON_STARTCODE)); + } + Preconditions.checkArgument(!masterServers.isEmpty(), String.format("%s is empty", CONF_KEY)); + // Randomize so that not every client sends requests in the same order. + Collections.shuffle(masterServers); + } + + /** + * Util that generates a master stub for a given ServerName. + */ + private MasterService.BlockingInterface getMasterStub(ServerName server) throws IOException { + return MasterService.newBlockingStub(rpcClient.createBlockingRpcChannel(server, User.getCurrent(), rpcTimeout)); + } + + /** + * Blocking RPC to fetch the meta region locations using one of the masters from the parsed list. + */ + private RegionLocations getMetaRegionLocationsHelper() throws MetaRegionsNotAvailableException { + List result = null; + for (ServerName sname: masterServers) { + try { + MasterService.BlockingInterface stub = getMasterStub(sname); + HBaseRpcController rpcController = RpcControllerFactory.instantiate(conf).newController(); + GetMetaLocationsResponse resp = stub.getMetaLocations(rpcController, + GetMetaLocationsRequest.getDefaultInstance()); + result = resp.getLocationsList(); + } catch (Exception e) { + LOG.warn("Error fetch meta locations from master {}. Trying others.", sname, e); + } + } + if (result == null || result.isEmpty()) { + throw new MetaRegionsNotAvailableException( + String.format("Meta locations not found. Probed masters: %s", conf.get(CONF_KEY, DEFAULT_HOST_PORT))); + } + List deserializedResult = new ArrayList<>(); + result.stream().forEach(location -> deserializedResult.add(ProtobufUtil.toRegionLocation(location))); + return new RegionLocations(deserializedResult); + } + + /** + * Picks the first master entry from 'masterHortPorts' to fetch the meta region locations. + */ + @Override + public CompletableFuture getMetaRegionLocation() { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + result.complete(this.getMetaRegionLocationsHelper()); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + /** + * Blocking RPC to get the cluster ID from the parsed master list. Returns null if no active + * master found. + */ + private String getClusterIdHelper() throws MasterNotRunningException { + // Loop through all the masters serially. We could be hitting some standby masters which cannot process this + // RPC, so we just skip them. + for (ServerName sname: masterServers) { + try { + MasterService.BlockingInterface stub = getMasterStub(sname); + HBaseRpcController rpcController = RpcControllerFactory.instantiate(conf).newController(); + GetClusterIdResponse resp = stub.getClusterId(rpcController, GetClusterIdRequest.getDefaultInstance()); + return resp.getClusterId(); + } catch (IOException e) { + LOG.warn("Error fetching cluster ID from master: {}", sname, e); + } catch (ServiceException e) { + // This is probably a standby master, can be ignored. + LOG.debug("Error fetching cluster ID from server: {}" , sname, e); + } + } + // If it comes to this point, no active master could be found. + throw new MasterNotRunningException( + String.format("No active master found. Probed masters: %s", conf.get(CONF_KEY, DEFAULT_HOST_PORT))); + } + + @Override + public CompletableFuture getClusterId() { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + result.complete(this.getClusterIdHelper()); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + /** + * Blocking RPC to get the active master address from the parsed list of master servers. + */ + private ServerName getMasterAddressHelper() throws MasterNotRunningException { + for (ServerName sname: masterServers) { + try { + MasterService.BlockingInterface stub = getMasterStub(sname); + HBaseRpcController rpcController = RpcControllerFactory.instantiate(conf).newController(); + IsActiveResponse resp = stub.isActive(rpcController, IsActiveRequest.getDefaultInstance()); + if (resp.getIsMasterActive()) { + return ServerName.valueOf(sname.getHostname(), sname.getPort(), resp.getStartCode()); + } + } catch (Exception e) { + + } + } + throw new MasterNotRunningException(String.format("No active master found. Probed masters: %s", + conf.get(CONF_KEY, DEFAULT_HOST_PORT))); + } + + /** + * @return the active master among the configured master addresses in 'masterHortPorts'. + */ + @Override + public CompletableFuture getMasterAddress() { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + result.complete(this.getMasterAddressHelper()); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + @Override + public void close() { + if (rpcClient != null) { + rpcClient.close(); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java index 36fa6bba75..08cff2157e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java @@ -210,11 +210,6 @@ class ZKAsyncRegistry implements AsyncRegistry { return future; } - @Override - public CompletableFuture getCurrentNrHRS() { - return zk.exists(znodePaths.rsZNode).thenApply(s -> s != null ? s.getNumChildren() : 0); - } - private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { if (data == null || data.length == 0) { return null; @@ -237,12 +232,6 @@ class ZKAsyncRegistry implements AsyncRegistry { }); } - @Override - public CompletableFuture getMasterInfoPort() { - return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) - .thenApply(proto -> proto != null ? proto.getInfoPort() : 0); - } - @Override public void close() { zk.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 81082174bc..6fe749bd08 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -3188,6 +3188,7 @@ public final class ProtobufUtil { } public static HBaseProtos.RegionLocation toRegionLocation(HRegionLocation loc) { + if (loc == null) return null; HBaseProtos.RegionLocation.Builder builder = HBaseProtos.RegionLocation.newBuilder(); builder.setRegionInfo(toRegionInfo(loc.getRegion())); if (loc.getServerName() != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java index c5e510fe4b..81326c9353 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java @@ -182,6 +182,17 @@ public class ZNodePaths { .orElseGet(() -> metaReplicaZNodes.get(DEFAULT_REPLICA_ID) + "-" + replicaId); } + /** + * Parses the meta replicaId from the passed path. + * @param path the name of the full path which includes baseZNode. + * @return replicaId + */ + public int getMetaReplicaIdFromPath(String path) { + // Extract the znode from path. The prefix is of the following format. + // baseZNode + PATH_SEPARATOR. + int prefixLen = baseZNode.length() + 1; + return getMetaReplicaIdFromZnode(path.substring(prefixLen)); + } /** * Parse the meta replicaId from the passed znode * @param znode the name of the znode, does not include baseZNode diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java index 66330687a2..f3234b4c80 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java @@ -43,21 +43,11 @@ class DoNothingAsyncRegistry implements AsyncRegistry { return CompletableFuture.completedFuture(null); } - @Override - public CompletableFuture getCurrentNrHRS() { - return CompletableFuture.completedFuture(0); - } - @Override public CompletableFuture getMasterAddress() { return CompletableFuture.completedFuture(null); } - @Override - public CompletableFuture getMasterInfoPort() { - return CompletableFuture.completedFuture(0); - } - @Override public void close() { } diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index fee9ab8419..b8187f6c94 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -585,6 +585,36 @@ message GetProceduresResponse { repeated Procedure procedure = 1; } +message IsActiveRequest { +} + +message IsActiveResponse { + required bool is_master_active = 1; + required uint64 start_code = 2; +} + +message GetMetaLocationsRequest { +} + +message GetMetaLocationsResponse { + repeated RegionLocation locations = 1; +} + +message GetPortInfoRequest { +} + +message GetPortInfoResponse { + required uint32 info_port = 1; + required uint32 master_port = 2; +} + +message GetClusterIdRequest { +} + +message GetClusterIdResponse { + required string cluster_id = 1; +} + message GetLocksRequest { } @@ -707,6 +737,19 @@ service MasterService { rpc GetClusterStatus(GetClusterStatusRequest) returns(GetClusterStatusResponse); + /** Returns whether this master is active or not. Served on both active/standby masters.*/ + rpc IsActive(IsActiveRequest) returns(IsActiveResponse); + + /** Used by client to get the location of meta replica(s). Served on both active/standby masters.*/ + rpc GetMetaLocations(GetMetaLocationsRequest) + returns(GetMetaLocationsResponse); + + /** Returns the ClusterId UUID for this cluster */ + rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse); + + /** Returns the port numbers used by HMaster instances. Served on both active/standby masters.*/ + rpc GetPortInfo(GetPortInfoRequest) returns(GetPortInfoResponse); + /** return true if master is available */ rpc IsMasterRunning(IsMasterRunningRequest) returns(IsMasterRunningResponse); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bb3c12c0b8..11fe3030d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -355,6 +355,9 @@ public class HMaster extends HRegionServer implements MasterServices { // manager of assignment nodes in zookeeper private AssignmentManager assignmentManager; + // Cache of meta locations indexed by replicas + private MetaRegionLocationCache metaRegionLocationCache; + // manager of replication private ReplicationPeerManager replicationPeerManager; @@ -518,6 +521,8 @@ public class HMaster extends HRegionServer implements MasterServices { maintenanceMode = false; } + metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper); + this.rsFatals = new MemoryBoundedLogMessageBuffer( conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); LOG.info("hbase.rootdir=" + getRootDir() + @@ -3851,4 +3856,6 @@ public class HMaster extends HRegionServer implements MasterServices { public HbckChore getHbckChore() { return this.hbckChore; } + + public MetaRegionLocationCache getMetaRegionLocationCache() { return this.metaRegionLocationCache; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 06a99fa543..63df0829f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClusterMetricsBuilder; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaRegionsNotAvailableException; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.Server; @@ -184,14 +185,20 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; @@ -205,6 +212,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNa import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.HbckService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; @@ -977,6 +986,50 @@ public class MasterRpcServices extends RSRpcServices return response.build(); } + @Override + public GetMetaLocationsResponse getMetaLocations(RpcController controller, GetMetaLocationsRequest req) + throws ServiceException { + GetMetaLocationsResponse.Builder response = GetMetaLocationsResponse.newBuilder(); + try { + // We skip the master init check since this RPC is served on both the active and standby masters as long as + // their cache is populated. + response.addAllLocations(master.getMetaRegionLocationCache().getCachedMetaRegionLocations()); + } catch (MetaRegionsNotAvailableException e) { + throw new ServiceException(e); + } + return response.build(); + } + + @Override + public IsActiveResponse isActive(RpcController controller, IsActiveRequest req) throws ServiceException { + IsActiveResponse.Builder response = IsActiveResponse.newBuilder(); + boolean isActiveMaster = master.isActiveMaster(); + response.setIsMasterActive(master.isActiveMaster()); + response.setStartCode(master.getStartcode()); + return response.build(); + } + + @Override + public GetPortInfoResponse getPortInfo(RpcController controller, GetPortInfoRequest req) throws ServiceException { + GetPortInfoResponse.Builder response = GetPortInfoResponse.newBuilder(); + response.setMasterPort(master.getConfiguration().getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT)); + response.setInfoPort(master.getConfiguration().getInt(HConstants.MASTER_INFO_PORT, + HConstants.DEFAULT_MASTER_INFOPORT)); + return response.build(); + } + + @Override + public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest req) throws ServiceException { + GetClusterIdResponse.Builder response = GetClusterIdResponse.newBuilder(); + try { + master.checkInitialized(); + response.setClusterId(master.getClusterId()); + } catch (IOException e) { + throw new ServiceException(e); + } + return response.build(); + } + /** * List the currently available/stored snapshots. Any in-progress snapshots are ignored */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java new file mode 100644 index 0000000000..62e8fb7071 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.hbase.MetaRegionsNotAvailableException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZKListener; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentNavigableMap; + +/** + * A cache of meta region location metadata. This cache is used to serve 'GetMetaLocations' RPCs from clients. Registers + * a listener on ZK to track changes to the meta table znodes. Clients are expected to retry if the meta + * information is stale. This class is thread-safe. + */ +@InterfaceAudience.Private +public class MetaRegionLocationCache extends ZKListener { + + private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class); + + // Maximum number of times we retry when ZK operation times out. Should this be configurable? + private static final int MAX_ZK_META_FETCH_RETRIES = 10; + + private ZKWatcher watcher; + // Cached meta region locations indexed by replica ID. + // CopyOnWriteArrayMap ensures synchronization during updates and a consistent snapshot during client requests. + // Even though CopyOnWriteArrayMap copies the data structure for every write, that should be OK since the + // size of the list is often small and mutations are not too often and we do not need to block client + // requests while mutations are in progress. + CopyOnWriteArrayMap cachedMetaLocations; + + private enum ZNodeOpType { + INIT, + CREATED, + CHANGED, + DELETED + }; + + public MetaRegionLocationCache(ZKWatcher zkWatcher) { + super(zkWatcher); + watcher = zkWatcher; + cachedMetaLocations = new CopyOnWriteArrayMap<>(); + watcher.registerListener(this); + // Populate the initial snapshot of data from meta znodes. + // This is needed because stand-by masters can potentially start after the initial znode creation. + populateInitialMetaLocations(); + } + + private void populateInitialMetaLocations() { + int retries = 0; + while (retries++ < MAX_ZK_META_FETCH_RETRIES) { + try { + List znodes = watcher.getMetaReplicaNodes(); + for (String znode: znodes) { + String path = watcher.getZNodePaths().joinZNode(watcher.getZNodePaths().baseZNode, znode); + updateMetaLocation(path, ZNodeOpType.INIT); + } + break; + } catch (KeeperException.OperationTimeoutException e) { + LOG.warn("Timed out connecting to ZK cluster", e); + + } catch (KeeperException e) { + LOG.warn("Error populating initial meta locations", e); + break; + } + } + } + + private void updateMetaLocation(String path, ZNodeOpType opType) { + if (!isValidMetaZNode(path)) return; + LOG.info("Meta znode for path {}: {}", path, opType.name()); + int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path); + if (opType == ZNodeOpType.DELETED) { + cachedMetaLocations.remove(replicaId); + return; + } + RegionState state = null; + int retries = 0; + while (retries++ < MAX_ZK_META_FETCH_RETRIES) { + try { + state = MetaTableLocator.getMetaRegionState(watcher, replicaId); + break; + } catch (KeeperException.OperationTimeoutException oe) { + // LOG and retry. + LOG.warn("Timed out fetching meta location information for path {}", path, oe); + } catch (KeeperException e) { + LOG.error("Error getting meta location for path {}", path, e); + break; + } + } + if (state == null) { + cachedMetaLocations.put(replicaId, null); + return; + } + cachedMetaLocations.put(replicaId, new HRegionLocation(state.getRegion(), state.getServerName())); + } + + /** + * Converts the current cache snapshot into a GetMetaLocations() RPC return payload. + * @return Protobuf serialized list of cached meta HRegionLocations + * @throws MetaRegionsNotAvailableException + */ + public List getCachedMetaRegionLocations() throws MetaRegionsNotAvailableException { + ConcurrentNavigableMap snapshot = + cachedMetaLocations.tailMap(cachedMetaLocations.firstKey(), true); + if (snapshot == null || snapshot.isEmpty()) { + // This could be possible if the master has not successfully initialized yet or meta region is stuck in + // some weird state. + throw new MetaRegionsNotAvailableException("Meta cache is empty"); + } + List result = new ArrayList<>(); + // Handle missing replicas, if any? + snapshot.values().stream().forEach(location -> result.add(ProtobufUtil.toRegionLocation(location))); + return result; + } + + /** + * Helper to check if the given 'path' corresponds to a meta znode. This listener is only interested in changes to + * meta znodes. + */ + private boolean isValidMetaZNode(String path) { + return watcher.getZNodePaths().isAnyMetaReplicaZNode(path); + } + + @Override + public void nodeCreated(String path) { + updateMetaLocation(path, ZNodeOpType.CREATED); + } + + @Override + public void nodeDeleted(String path) { + updateMetaLocation(path, ZNodeOpType.DELETED); + } + + @Override + public void nodeDataChanged(String path) { + updateMetaLocation(path, ZNodeOpType.CHANGED); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index a50ac11db6..034c615bbf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -213,6 +213,10 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { private volatile AsyncClusterConnection asyncConnection; + // Tracks any other connections created with custom client config. Used for testing clients with custom + // configurations. Tracked here so that they can be cleaned up on close() / restart. + private List customConnections = Collections.synchronizedList(new ArrayList<>()); + /** Filesystem URI used for map-reduce mini-cluster setup */ private static String FS_URI; @@ -1252,6 +1256,10 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { this.asyncConnection.close(); this.asyncConnection = null; } + for (AsyncClusterConnection connection: customConnections) { + Closeables.close(connection, true); + } + customConnections.clear(); this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(), option.getRsClass()); @@ -3067,6 +3075,19 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { return this.asyncConnection.toConnection(); } + /** + * Creates a new Connection instance for this mini cluster with a given input config. Use it wisely since + * connection creation is expensive. For all practical purposes @link(getConnection()) should be good enough. This + * helper should only used if one wants to test a custom client side configuration that differs from the conf used to + * spawn the mini-cluster. + */ + public AsyncClusterConnection getCustomConnection(Configuration conf) throws IOException { + User user = UserProvider.instantiate(conf).getCurrent(); + AsyncClusterConnection connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); + customConnections.add(connection); + return connection; + } + public AsyncClusterConnection getAsyncConnection() throws IOException { if (this.asyncConnection == null) { initConnection(); @@ -3077,6 +3098,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { public void closeConnection() throws IOException { Closeables.close(hbaseAdmin, true); Closeables.close(asyncConnection, true); + for (AsyncClusterConnection connection: customConnections) { + Closeables.close(connection, true); + } this.hbaseAdmin = null; this.asyncConnection = null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java index e9ae25d2ea..eccb8ef423 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java @@ -39,21 +39,11 @@ public class DummyAsyncRegistry implements AsyncRegistry { return null; } - @Override - public CompletableFuture getCurrentNrHRS() { - return null; - } - @Override public CompletableFuture getMasterAddress() { return null; } - @Override - public CompletableFuture getMasterInfoPort() { - return null; - } - @Override public void close() { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java new file mode 100644 index 0000000000..d269cd733b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM; +import static org.junit.Assert.assertEquals; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestHMasterAsyncRegistry { + private static final Logger LOG = LoggerFactory.getLogger(TestHMasterAsyncRegistry.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final TableName TEST_TABLE = TableName.valueOf("foo"); + private static final byte[] COL_FAM = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("col"); + + + // Automatically cleaned up on cluster shutdown. + private static AsyncClusterConnection customConnection; + private static HMasterAsyncRegistry REGISTRY; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3); + TEST_UTIL.startMiniCluster(3); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + // Set the master host:port in the client config used to create a custom connection. + String masterHostName = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getHostname(); + int masterPort = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getPort(); + conf.set(HMasterAsyncRegistry.CONF_KEY, HostAndPort.fromParts(masterHostName, masterPort).toString()); + conf.set(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, HMasterAsyncRegistry.class.getName()); + // make sure that we do not depend on this config when getting locations for meta replicas, see + // HBASE-21658. + conf.setInt(META_REPLICAS_NUM, 1); + REGISTRY = new HMasterAsyncRegistry(conf); + customConnection = TEST_UTIL.getCustomConnection(conf); + } + + @AfterClass + public static void tearDown() throws Exception { + IOUtils.closeQuietly(REGISTRY); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRegistryImpl() throws Exception { + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + assertEquals(REGISTRY.getClusterId().get(), master.getClusterId()); + assertEquals(REGISTRY.getMasterAddress().get(), master.getServerName()); + } + + /** + * Tests basic create, put, scan operations using the connection. + */ + @Test + public void testCustomConnectionBasicOps() throws Exception { + // Verify that the right registry is in use. + assertTrue(customConnection instanceof AsyncClusterConnectionImpl); + assertTrue(((AsyncClusterConnectionImpl) customConnection).getRegistry() instanceof HMasterAsyncRegistry); + Connection connection = customConnection.toConnection(); + // Create a test table. + Admin admin = connection.getAdmin(); + TableDescriptorBuilder builder = + TableDescriptorBuilder.newBuilder(TEST_TABLE).setColumnFamily(ColumnFamilyDescriptorBuilder.of(COL_FAM)); + admin.createTable(builder.build()); + try (Table table = connection.getTable(TEST_TABLE)){ + // Insert one row each region + int insertNum = 10; + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("row" + String.format("%03d", i))); + put.addColumn(COL_FAM, QUALIFIER, QUALIFIER); + table.put(put); + } + // Verify the row count. + try (ResultScanner scanner = table.getScanner(new Scan())) { + int count = 0; + for (Result r : scanner) { + Assert.assertTrue(!r.isEmpty()); + count++; + } + assertEquals(insertNum, count); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java index 5a72daea20..16f43bd9e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java @@ -84,11 +84,8 @@ public class TestZKAsyncRegistry { String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId(); assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId, clusterId); - assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(), - REGISTRY.getCurrentNrHRS().get().intValue()); assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(), REGISTRY.getMasterAddress().get()); - assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue()); RegionReplicaTestHelper .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java new file mode 100644 index 0000000000..c1b8a1e3df --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({ MasterTests.class}) +public class TestHMasterAsyncRegistryRPCs { + private static final Logger LOG = LoggerFactory.getLogger(TestHMasterAsyncRegistryRPCs.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static RpcClient rpcClient; + private static HMaster activeMaster; + private static List standByMasters; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(HConstants.MASTER_PORT, "0"); + conf.setStrings(HConstants.ZOOKEEPER_ZNODE_PARENT, "/metacachetest"); + TEST_UTIL.startMiniCluster(2); + activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); + standByMasters = new ArrayList<>(); + // Create a few standby masters. + for (int i = 0; i < 2; i++) { + standByMasters.add(TEST_UTIL.getHBaseCluster().startMaster().getMaster()); + } + rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); + } + + @AfterClass + public static void tearDown() throws Exception { + if (rpcClient != null) { + rpcClient.close(); + } + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Verifies that getMetaReplicaIdFromPath() parses the full meta znode paths correctly. + * @throws IOException + */ + @Test + public void TestGetMetaReplicaIdFromPath() throws IOException { + ZKWatcher zk = TEST_UTIL.getZooKeeperWatcher(); + String metaZnode= ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, zk.getZNodePaths().metaZNodePrefix); + assertEquals(0, zk.getZNodePaths().getMetaReplicaIdFromPath(metaZnode)); + for (int i = 1; i < 10; i++) { + assertEquals(i, zk.getZNodePaths().getMetaReplicaIdFromPath(metaZnode + "-" +i)); + } + for (String suffix : new String[]{"foo", "1234", "foo-123", "123-foo-234"}) { + try { + final String input = metaZnode + suffix; + zk.getZNodePaths().getMetaReplicaIdFromZnode(input); + fail("Exception not hit getMetaReplicaIdFromZnode(): " + input); + } catch (NumberFormatException e) { + // Expected + } + } + } + + private void verifyGetCachedMetadataLocations(HMaster master) throws IOException { + try { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + GetMetaLocationsResponse response = stub.getMetaLocations(null, + GetMetaLocationsRequest.getDefaultInstance()); + assertEquals(1, response.getLocationsCount()); + HBaseProtos.RegionLocation location = response.getLocations(0); + assertEquals(sm.getHostname(), location.getServerName().getHostName()); + } catch (ServiceException e) { + LOG.error("Error in GetCachedMetadataLocations. Active master: {}", master.isActiveMaster(), e); + e.printStackTrace(); + fail("Error calling GetCachedMetadataLocations()"); + } + } + + /** + * Verifies that both active and standby masters + * @throws IOException + */ + @Test + public void TestGetCachedMetadataLocations() throws IOException { + // Verify that the active and standby HMasters start correctly. + assertTrue(activeMaster.serviceStarted); + assertTrue(activeMaster.isActiveMaster()); + verifyGetCachedMetadataLocations(activeMaster); + for (HMaster standByMaster: standByMasters) { + assertTrue(!standByMaster.serviceStarted); + assertTrue(!standByMaster.isActiveMaster()); + verifyGetCachedMetadataLocations(standByMaster); + } + } + + private void verifyIsMasterActive(HMaster master, boolean expectedResult) throws Exception { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = MasterProtos.MasterService.newBlockingStub(channel); + IsActiveResponse response = stub.isActive(null, IsActiveRequest.getDefaultInstance()); + assertEquals(expectedResult, response.getIsMasterActive()); + } + + @Test + public void TestIsActiveRPC() throws Exception { + verifyIsMasterActive(activeMaster, true); + for (HMaster master: standByMasters) verifyIsMasterActive(master, false); + } + + private void verifyMasterPorts(HMaster master) throws Exception { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = MasterProtos.MasterService.newBlockingStub(channel); + GetPortInfoResponse response = stub.getPortInfo(null, GetPortInfoRequest.getDefaultInstance()); + Configuration conf = master.getConfiguration(); + assertEquals(response.getMasterPort(), conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT)); + assertEquals(response.getInfoPort(), conf.getInt(HConstants.MASTER_INFO_PORT, + HConstants.DEFAULT_MASTER_INFOPORT)); + } + + @Test + public void TestGetPortInfoRPC() throws Exception { + verifyMasterPorts(activeMaster); + for (HMaster master: standByMasters) verifyMasterPorts(master); + } + + public String getClusterId(HMaster master) throws IOException, ServiceException { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = MasterProtos.MasterService.newBlockingStub(channel); + GetClusterIdResponse response = stub.getClusterId(null, GetClusterIdRequest.getDefaultInstance()); + return response.getClusterId(); + } + + @Test + public void TestClusterIdRPC() throws Exception { + assertEquals(activeMaster.getClusterId(), getClusterId(activeMaster)); + try { + assertTrue(standByMasters.size() > 0); + getClusterId(standByMasters.get(0)); + fail("No exception thrown while fetching ClusterId for standby master: " + + standByMasters.get(0).getServerName().toString()); + } catch (ServiceException e) { + // Expected. + } + } +} -- 2.20.1 (Apple Git-117)