From b43a688e72df4460ef6e4a423adfe2273dca11ea Mon Sep 17 00:00:00 2001 From: Albert Lee Date: Mon, 28 May 2018 15:59:14 +0800 Subject: [PATCH 1/2] Support replicating to target cluster using the region server group mapping. Ingore the unit test temporarily. --- .../apache/hadoop/hbase/zookeeper/ZNodePaths.java | 3 + .../hadoop/hbase/rsgroup/RSGroupAdminClient.java | 2 +- .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 24 ++- .../hbase/regionserver/ReplicationService.java | 11 + .../hbase/replication/BaseReplicationEndpoint.java | 3 + .../replication/HBaseReplicationEndpoint.java | 110 +++++++++- .../hbase/replication/ReplicationEndpoint.java | 20 ++ .../replication/regionserver/Replication.java | 8 + .../regionserver/ReplicationSinkManager.java | 8 + .../regionserver/ReplicationSourceManager.java | 8 + .../hbase/replication/TestGroupReplication.java | 226 +++++++++++++++++++++ 11 files changed, 420 insertions(+), 3 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestGroupReplication.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java index 32792f64e6..4c6fc0cd75 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java @@ -89,6 +89,8 @@ public class ZNodePaths { public final String queuesZNode; // znode containing queues of hfile references to be replicated public final String hfileRefsZNode; + // znode containing groupinfo + public final String groupInfo; public ZNodePaths(Configuration conf) { baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); @@ -123,6 +125,7 @@ public class ZNodePaths { queuesZNode = joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs")); hfileRefsZNode = joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.hfile.refs", "hfile-refs")); + groupInfo = joinZNode(baseZNode, conf.get("zookeeper.znode.groupInfo", "groupInfo")); } @Override diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java index 380f323bdf..6e32f360e1 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java @@ -53,7 +53,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets; * Client used for managing region server group information. */ @InterfaceAudience.Private -class RSGroupAdminClient implements RSGroupAdmin { +public class RSGroupAdminClient implements RSGroupAdmin { private RSGroupAdminService.BlockingInterface stub; public RSGroupAdminClient(Connection conn) throws IOException { diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 9294255abe..54d944c0c3 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -113,7 +113,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; * no other has access concurrently. Reads must be able to continue concurrently. */ @InterfaceAudience.Private -final class RSGroupInfoManagerImpl implements RSGroupInfoManager { +public final class RSGroupInfoManagerImpl implements RSGroupInfoManager { private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); /** Table descriptor for hbase:rsgroup catalog table */ @@ -347,6 +347,28 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { return rsGroupInfoList; } + public static RSGroupInfo retrieveGroupInfo(ZKWatcher watcher, String groupBasePath, + String groupName) throws IOException { + ByteArrayInputStream bis = null; + try { + String groupInfoPath = ZNodePaths.joinZNode(groupBasePath, groupName); + if (-1 != ZKUtil.checkExists(watcher, groupInfoPath)) { + byte[] data = ZKUtil.getData(watcher, groupInfoPath); + if (data.length > 0) { + ProtobufUtil.expectPBMagicPrefix(data); + bis = new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); + } + } + } catch (KeeperException e) { + throw new IOException("Failed to read groupZNode", e); + } catch (DeserializationException e) { + throw new IOException("Failed to read groupZNode", e); + } catch (InterruptedException e) { + throw new IOException("Failed to read groupZNode", e); + } + return ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)); + } + List retrieveGroupListFromZookeeper() throws IOException { String groupBasePath = ZNodePaths.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode); List RSGroupInfoList = Lists.newArrayList(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index c34231d9ed..735295449e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -33,6 +34,11 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface ReplicationService { + /** + * Use for group replication mapping + */ + ServerName serverName = null; + /** * Initializes the replication service object. * @param walProvider can be null if not initialized inside a live region server environment, for @@ -55,4 +61,9 @@ public interface ReplicationService { * Refresh and Get ReplicationLoad */ public ReplicationLoad refreshAndGetReplicationLoad(); + + /** + * Return the RS ServerName which the ReplicationEndpoint belong to + */ + ServerName getServerName(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 42d0299026..3df9aee427 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; +import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +42,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY = "hbase.replication.source.custom.walentryfilters"; protected Context ctx; + protected static ServerName hostServerName; @Override public void init(Context context) throws IOException { @@ -55,6 +57,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService " because there's no such peer"); } } + hostServerName = context.getHostServerName(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index bd5c529092..3cb2206e8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.HashMap; import java.util.UUID; +import java.util.Map; import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.yetus.audience.InterfaceAudience; @@ -37,6 +39,13 @@ import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; /** * A {@link BaseReplicationEndpoint} for replication endpoints whose @@ -155,19 +164,118 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * @param zkw zk connection to use * @return list of region server addresses or an empty list if the slave is unavailable */ + protected static List fetchSlavesAddresses(ZKWatcher zkw, ServerName hostServerName) + throws KeeperException { + List children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.znodePaths.groupInfo); + if (children == null) { + return Collections.emptyList(); + } + Configuration conf = HBaseConfiguration.create(); + // if use other balancer, return all regionservers + if (!conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS) + .equals(RSGroupBasedLoadBalancer.class.getName())) { + return parseServerNameFromList(children); + } else { + // if use group balancer, just return regionservers belong to the same group or default group + Map serverNameHostPortMapping = new HashMap<>(); + for (String serverName : children) { + String mappingKey = + serverName.split(",")[0] + ServerName.SERVERNAME_SEPARATOR + serverName.split(",")[1]; + serverNameHostPortMapping.put(mappingKey, serverName); + } + + String groupName = null; + try { + Connection con = ConnectionFactory.createConnection(conf); + RSGroupAdmin gadmin = new RSGroupAdminClient(con); + try { + /** Get which group local machine belong to */ + Address localServer = hostServerName.getAddress(); + RSGroupInfo groupInfo = gadmin.getRSGroupOfServer(localServer); + groupName = groupInfo.getName(); + if(LOG.isDebugEnabled()) { + LOG.debug("Replication endpoint server " + localServer.toString() + " in " + groupName + " group"); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + con.close(); + } + } catch (IOException e) { + LOG.error("Find some error when construct group admin", e); + } + + List serverList = + getServerListFromTargetZkCluster(groupName, zkw, serverNameHostPortMapping); + if (serverList.size() > 0) // if target cluster open group balancer + return serverList; + else { // if not, choose sinkers from all target cluster regionservers + return parseServerNameFromList(children); + } + } + } + protected static List fetchSlavesAddresses(ZKWatcher zkw) throws KeeperException { - List children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.znodePaths.rsZNode); + return fetchSlavesAddresses(zkw, hostServerName); + } + + protected static List parseServerNameFromList (List children) { if (children == null) { return Collections.emptyList(); } + + StringBuffer sb = new StringBuffer(); List addresses = new ArrayList<>(children.size()); for (String child : children) { addresses.add(ServerName.parseServerName(child)); + sb.append(ServerName.parseServerName(child)).append("/"); } + LOG.debug("Find " + children.size() + " child znodes from target cluster zk. " + sb.toString()); + return addresses; } + protected static List getServerListFromTargetZkCluster(String groupName, + ZKWatcher zkw, Map serverNameHostPortMapping) throws KeeperException { + List serverList; + // Get groupinfo from slave cluster zk + List groupInfos = + ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.znodePaths.groupInfo); + // if target cluster have same name group + if (groupInfos != null && groupInfos.contains(groupName)) { + serverList = getServerListFromWithRSGroupName(groupName, zkw, serverNameHostPortMapping); + } else if (groupInfos != null && !groupInfos.contains(groupName)) { + // if target cluster does not have same name group, return default group servers + serverList = getServerListFromWithRSGroupName(RSGroupInfo.DEFAULT_GROUP, zkw, + serverNameHostPortMapping); + } else { + //if target cluster does not use group balancer + return Collections.emptyList(); + } + return serverList; + } + + protected static List getServerListFromWithRSGroupName(String groupName, + ZKWatcher zkw, Map serverNameHostPortMapping) { + List serverList = new ArrayList<>(); + RSGroupInfo detail = + RSGroupInfoManagerImpl.retrieveGroupInfo(zkw, zkw.getZNodePaths().groupInfo, groupName); + // choose server from rsZNode children which also in same group with local mashine + for (Address serverInfo : detail.getServers()) { + String serverPrefix = + serverInfo.getHostname() + ServerName.SERVERNAME_SEPARATOR + serverInfo.getPort(); + if (serverNameHostPortMapping.containsKey(serverPrefix)) { + ServerName sn = ServerName.parseServerName(serverNameHostPortMapping.get(serverPrefix)); + if(LOG.isDebugEnabled()) { + LOG.debug("Match server in " + groupName + " success " + serverPrefix + "/" + sn); + } + serverList.add(sn); + } + } + return serverList; + } + /** * Get a list of all the addresses of all the region servers * for this peer cluster diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 543dc2f2e3..ea2025a218 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -62,6 +63,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { private final UUID clusterId; private final MetricsSource metrics; private final Abortable abortable; + private ServerName hostServerName; @InterfaceAudience.Private public Context( @@ -84,6 +86,21 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { this.tableDescriptors = tableDescriptors; this.abortable = abortable; } + public Context( + final Configuration localConf, + final Configuration conf, + final FileSystem fs, + final String peerId, + final UUID clusterId, + final ReplicationPeer replicationPeer, + final MetricsSource metrics, + final TableDescriptors tableDescriptors, + final Abortable abortable, + final ServerName hostServerName) { + this(localConf, conf, fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, abortable); + this.hostServerName = hostServerName; + + } public Configuration getConfiguration() { return conf; } @@ -112,6 +129,9 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { return tableDescriptors; } public Abortable getAbortable() { return abortable; } + public ServerName getHostServerName() { + return hostServerName; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index c259890bb2..ba0184aa02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -75,6 +76,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer private int statsThreadPeriod; // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; + private ServerName serverName; /** * Empty constructor @@ -144,6 +146,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer join(); } + @Override + public ServerName getServerName() { + return this.serverName; + } + /** * Join with the replication threads */ @@ -185,6 +192,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer public void startReplicationService() throws IOException { try { this.replicationManager.init(); + this.serverName = this.replicationManager.getServerName(); } catch (ReplicationException e) { throw new IOException(e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java index 3cd7884185..a238c26e6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java @@ -163,6 +163,14 @@ public class ReplicationSinkManager { Collections.shuffle(slaveAddresses, random); int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); sinks = slaveAddresses.subList(0, numSinks); + if(LOG.isDebugEnabled()) { + StringBuffer sb = new StringBuffer(); + sb.append("choose sinker(s) of target cluster "); + for (ServerName sn : sinks) { + sb.append(sn.getServerName()).append("/"); + } + LOG.debug(sb.toString()); + } lastUpdateToPeers = System.currentTimeMillis(); badReportCounts.clear(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b54d9b7d03..c7316c2746 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -119,6 +120,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final boolean replicationForBulkLoadDataEnabled; + private ServerName serverName; + private AtomicLong totalBufferUsed = new AtomicLong(); @@ -195,6 +198,10 @@ public class ReplicationSourceManager implements ReplicationListener { cleanOldLogs(fileName, id, queueRecovered); } + public ServerName getServerName() { + return serverName; + } + /** * Cleans a log file and all older files from ZK. Called when we are sure that a * log file is closed and has no more entries. @@ -473,6 +480,7 @@ public class ReplicationSourceManager implements ReplicationListener { if (server instanceof HRegionServer) { rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); tableDescriptors = ((HRegionServer) server).getTableDescriptors(); + this.serverName = server.getServerName(); } ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestGroupReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestGroupReplication.java new file mode 100644 index 0000000000..24f6e95a0c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestGroupReplication.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.rsgroup.RSGroupAdminClient; +import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +/** + * Both source and target cluster use group load balancer. + */ + +@Category({ MediumTests.class }) public class TestGroupReplication { + protected static final Log LOG = LogFactory.getLog(TestGroupReplication.class); + private static HMaster master1; + private static HMaster master2; + private static RSGroupAdminClient gadmin1; + private static RSGroupAdminClient gadmin2; + private static Admin admin1; + private static Admin admin2; + private static TableDescriptor tableDesc1; + private static TableDescriptor tableDesc2; + private static boolean init = false; + + protected static HBaseTestingUtility TEST_UTIL1; + protected static HBaseTestingUtility TEST_UTIL2; + + private static final int NUM_SLAVES_BASE = 4; + + protected static Table htable1; + protected static Table htable2; + + protected static final String tablename = "test"; + protected static final byte[] famName = Bytes.toBytes("f"); + protected static final byte[] row = Bytes.toBytes("row"); + protected static final byte[] noRepfamName = Bytes.toBytes("norep"); + + private static final String APPLICATION_GROUP = "APP_GROUP"; + + @BeforeClass public static void setUp() throws Exception { + + /** CLUSTER 1 */ + TEST_UTIL1 = new HBaseTestingUtility(); + TEST_UTIL1.getConfiguration() + .set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName()); + TEST_UTIL1.getConfiguration() + .set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "" + NUM_SLAVES_BASE); + TEST_UTIL1.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + TEST_UTIL1.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); + TEST_UTIL1.getConfiguration().setInt("replication.source.size.capacity", 10240); + TEST_UTIL1.getConfiguration().setLong("replication.source.sleepforretries", 100); + TEST_UTIL1.getConfiguration().setInt("hbase.regionserver.maxlogs", 10); + TEST_UTIL1.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); + TEST_UTIL1.getConfiguration().setInt("zookeeper.recovery.retry", 1); + TEST_UTIL1.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); + // TEST_UTIL1.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + TEST_UTIL1.getConfiguration().setBoolean("dfs.support.append", true); + TEST_UTIL1.getConfiguration().setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + TEST_UTIL1.getConfiguration().setInt("replication.stats.thread.period.seconds", 5); + TEST_UTIL1.getConfiguration().setBoolean("hbase.tests.use.shortcircuit.reads", false); + TEST_UTIL1.startMiniCluster(NUM_SLAVES_BASE); + + MiniZooKeeperCluster miniZK = TEST_UTIL1.getZkCluster(); + + /** CLUSTER 2 */ + TEST_UTIL2 = new HBaseTestingUtility(); + TEST_UTIL2.getConfiguration() + .set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName()); + TEST_UTIL2.getConfiguration() + .set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "" + NUM_SLAVES_BASE); + TEST_UTIL2.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + TEST_UTIL2.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); + TEST_UTIL2.getConfiguration().setInt("replication.source.size.capacity", 10240); + TEST_UTIL2.getConfiguration().setLong("replication.source.sleepforretries", 100); + TEST_UTIL2.getConfiguration().setInt("hbase.regionserver.maxlogs", 10); + TEST_UTIL2.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); + TEST_UTIL2.getConfiguration().setInt("zookeeper.recovery.retry", 1); + TEST_UTIL2.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); + // TEST_UTIL2.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + TEST_UTIL2.getConfiguration().setBoolean("dfs.support.append", true); + TEST_UTIL2.getConfiguration().setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + TEST_UTIL2.getConfiguration().setInt("replication.stats.thread.period.seconds", 5); + TEST_UTIL2.getConfiguration().setBoolean("hbase.tests.use.shortcircuit.reads", false); + TEST_UTIL2.setZkCluster(miniZK); + TEST_UTIL2.startMiniCluster(NUM_SLAVES_BASE); + + ReplicationPeerConfig rpcConf = + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL2.getClusterKey()).build(); + admin1.addReplicationPeer("2", rpcConf); + + HColumnDescriptor fam = new HColumnDescriptor(famName); + tableDesc1 = + TableDescriptorBuilder.newBuilder(TableName.valueOf(tablename)).setColumnFamily(fam) + .setReplicationScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); + tableDesc2 = + TableDescriptorBuilder.newBuilder(TableName.valueOf(tablename)).setColumnFamily(fam) + .build(); + + admin1 = TEST_UTIL1.getConnection().getAdmin(); + admin2 = TEST_UTIL2.getConnection().getAdmin(); + + gadmin1 = new RSGroupAdminClient(admin1.getConnection()); + gadmin2 = new RSGroupAdminClient(admin2.getConnection()); + + admin1.createTable(tableDesc1, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + TEST_UTIL1.waitUntilAllRegionsAssigned(TableName.valueOf(tablename)); + admin2.createTable(tableDesc2, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + TEST_UTIL2.waitUntilAllRegionsAssigned(TableName.valueOf(tablename)); + } + + @AfterClass public static void shutdown() throws Exception { + /** cluster2 use cluster1's zk. so shutdown cluster2 first. */ + TEST_UTIL2.shutdownMiniCluster(); + TEST_UTIL1.shutdownMiniCluster(); + } + + @Test public void testGroupReplicationPut() throws Exception { + RSGroupInfo cluster1AppInfo = addGroup(gadmin1, APPLICATION_GROUP, 1); + RSGroupInfo cluster2AppInfo = addGroup(gadmin2, APPLICATION_GROUP, 1); + + gadmin1.moveTables(Sets.newHashSet(htable1.getName()), APPLICATION_GROUP); + gadmin2.moveTables(Sets.newHashSet(htable2.getName()), APPLICATION_GROUP); + + htable1 = TEST_UTIL1.getConnection().getTable(TableName.valueOf(tablename)); + htable2 = TEST_UTIL2.getConnection().getTable(TableName.valueOf(tablename)); + + String oriStr = "hello world"; + byte[] putkey = Bytes.toBytes(oriStr); + Put put = new Put(putkey); + put.addColumn(famName, Bytes.toBytes("0"), Bytes.toBytes("hello")); + htable1.put(put); + LOG.debug("Put finished."); + + Thread.sleep(100); // Wait data replicate to cluster 2 + Get get = new Get(Bytes.toBytes("hello world")); + Result res = htable2.get(get); + assert res != null; + String resStr = Bytes.toString(res.getRow()); + + Assert.assertEquals(oriStr, resStr); + + HBaseCluster cluster1 = TEST_UTIL1.getHBaseCluster(); + HBaseCluster cluster2 = TEST_UTIL1.getHBaseCluster(); + + LOG.debug("CLUSTER1 " + APPLICATION_GROUP + " GROUPINFO " + cluster1AppInfo.toString()); + LOG.debug("CLUSTER2 " + APPLICATION_GROUP + " GROUPINFO " + cluster2AppInfo.toString()); + + for (int i = 0; i < NUM_SLAVES_BASE; i++) { + HRegionServer rs = ((MiniHBaseCluster) cluster1).getRegionServer(i); + LOG.debug("CLUSTER 1 GOT RS NAME " + rs.getServerName()); + LOG.debug( + "CLUSTER 1 GOT RS REPLICATION SINK " + rs.getReplicationSourceService().getServerName()); + LOG.debug("CLUSTER 1 GOT RS REPLICATION SOURCE " + rs.getReplicationSourceService() + .getServerName()); + } + + for (int i = 0; i < NUM_SLAVES_BASE; i++) { + HRegionServer rs = ((MiniHBaseCluster) cluster2).getRegionServer(i); + LOG.debug("CLUSTER 2 GOT RS NAME " + rs.getServerName()); + LOG.debug( + "CLUSTER 2 GOT RS REPLICATION SINK " + rs.getReplicationSourceService().getServerName()); + LOG.debug("CLUSTER 2 GOT RS REPLICATION SOURCE " + rs.getReplicationSourceService() + .getServerName()); + } + } + + public RSGroupInfo addGroup(RSGroupAdminClient rsGroupAdmin, String groupName, int serverCount) + throws IOException, InterruptedException { + RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); + assertTrue(defaultInfo != null); + assertTrue(defaultInfo.getServers().size() >= serverCount); + rsGroupAdmin.addRSGroup(groupName); + + Set
set = new HashSet<>(); + for (Address server : defaultInfo.getServers()) { + if (set.size() == serverCount) { + break; + } + set.add(server); + } + + rsGroupAdmin.moveServers(set, groupName); + RSGroupInfo result = rsGroupAdmin.getRSGroupInfo(groupName); + assertTrue(result.getServers().size() >= serverCount); + return result; + } + +} -- 2.15.1 (Apple Git-101) From 599bc779802ccf611107461936858b5a737e175d Mon Sep 17 00:00:00 2001 From: Albert Lee Date: Mon, 28 May 2018 16:07:12 +0800 Subject: [PATCH 2/2] Support replicating to target cluster using the region server group mapping. Ingore the unit test temporarily. --- .../org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 3cb2206e8b..6cb072c1ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -231,7 +231,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint addresses.add(ServerName.parseServerName(child)); sb.append(ServerName.parseServerName(child)).append("/"); } - LOG.debug("Find " + children.size() + " child znodes from target cluster zk. " + sb.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug("Find " + children.size() + " child znodes from target cluster zk. " + sb.toString()); + } return addresses; } -- 2.15.1 (Apple Git-101)