From 57e79469300a66fc62f84b40fe91c4b1e0f135c4 Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Thu, 19 May 2016 17:14:33 -0700 Subject: [PATCH] HBASE-16342 Added Replication WAL offset tracking out of ZooKeeper and into an HBase table. Provided new implementation of ReplicationQueues and ReplicationQueuesClient that implement this. Peer tracking still remains inside of ZooKeeper though. Includes H-Base issues: HBASE-15883, HBASE-15958, HBASE-15974, HBASE-16036, HBASE-16083, HBASE-16080, HBASE-1620, and HBASE-15937 --- .../hbase/client/replication/ReplicationAdmin.java | 4 +- .../hbase/replication/ReplicationFactory.java | 20 +- .../hbase/replication/ReplicationQueues.java | 18 +- .../replication/ReplicationQueuesArguments.java | 70 +++ .../hbase/replication/ReplicationQueuesClient.java | 12 +- .../ReplicationQueuesClientArguments.java | 40 ++ .../replication/ReplicationQueuesClientZKImpl.java | 51 ++- .../hbase/replication/ReplicationQueuesZKImpl.java | 34 +- .../hbase/replication/ReplicationTableBase.java | 502 +++++++++++++++++++++ .../TableBasedReplicationQueuesClientImpl.java | 112 +++++ .../TableBasedReplicationQueuesImpl.java | 427 ++++++++++++++++++ .../master/ReplicationHFileCleaner.java | 8 +- .../replication/master/ReplicationLogCleaner.java | 46 +- .../replication/regionserver/Replication.java | 6 +- .../regionserver/ReplicationSourceManager.java | 78 +++- .../hadoop/hbase/util/hbck/ReplicationChecker.java | 12 +- .../client/replication/TestReplicationAdmin.java | 3 +- .../hbase/master/cleaner/TestLogsCleaner.java | 6 +- .../cleaner/TestReplicationHFileCleaner.java | 4 +- .../replication/TestReplicationStateBasic.java | 8 +- .../replication/TestReplicationStateHBaseImpl.java | 473 +++++++++++++++++++ .../replication/TestReplicationStateZKImpl.java | 19 +- .../replication/TestReplicationTableBase.java | 115 +++++ .../regionserver/TestReplicationSourceManager.java | 207 +++------ .../TestReplicationSourceManagerZkImpl.java | 174 +++++++ ...TestTableBasedReplicationSourceManagerImpl.java | 63 +++ .../apache/hadoop/hbase/util/TestHBaseFsck.java | 4 +- 27 files changed, 2235 insertions(+), 281 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 1305002..1824d65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -123,7 +124,8 @@ public class ReplicationAdmin implements Closeable { zkw = createZooKeeperWatcher(); try { this.replicationQueuesClient = - ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); + ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, + this.connection, zkw)); this.replicationQueuesClient.init(); this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.replicationQueuesClient, this.connection); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 91e77ca..b84641c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication; +import org.apache.commons.lang.reflect.ConstructorUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; @@ -30,21 +31,26 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @InterfaceAudience.Private public class ReplicationFactory { - public static ReplicationQueues getReplicationQueues(final ZooKeeperWatcher zk, - Configuration conf, Abortable abortable) { - return new ReplicationQueuesZKImpl(zk, conf, abortable); + public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args) + throws Exception { + Class classToBuild = args.getConf().getClass("hbase.region.replica." + + "replication.replicationQueues.class", ReplicationQueuesZKImpl.class); + return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args); } - public static ReplicationQueuesClient getReplicationQueuesClient(final ZooKeeperWatcher zk, - Configuration conf, Abortable abortable) { - return new ReplicationQueuesClientZKImpl(zk, conf, abortable); + public static ReplicationQueuesClient getReplicationQueuesClient( + ReplicationQueuesClientArguments args) + throws Exception { + Class classToBuild = args.getConf().getClass("hbase.region.replica." + + "replication.replicationQueuesClient.class", ReplicationQueuesClientZKImpl.class); + return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args); } public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { return getReplicationPeers(zk, conf, null, abortable); } - + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, final ReplicationQueuesClient queuesClient, Abortable abortable) { return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 507367b..d31ac34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.replication; import java.util.List; -import java.util.SortedMap; -import java.util.SortedSet; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -83,37 +83,37 @@ public interface ReplicationQueues { /** * Get a list of all WALs in the given queue. * @param queueId a String that identifies the queue - * @return a list of WALs, null if this region server is dead and has no outstanding queues + * @return a list of WALs, null if no such queue exists for this server */ List getLogsInQueue(String queueId); /** * Get a list of all queues for this region server. - * @return a list of queueIds, null if this region server is dead and has no outstanding queues + * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues */ List getAllQueues(); /** * Take ownership for the set of queues belonging to a dead region server. * @param regionserver the id of the dead region server - * @return A SortedMap of the queues that have been claimed, including a SortedSet of WALs in + * @return A Map of the queues that have been claimed, including a Set of WALs in * each queue. Returns an empty map if no queues were failed-over. */ - SortedMap> claimQueues(String regionserver); + Map> claimQueues(String regionserver); /** * Get a list of all region servers that have outstanding replication queues. These servers could * be alive, dead or from a previous run of the cluster. * @return a list of server names */ - List getListOfReplicators(); + List getListOfReplicators() throws ReplicationException; /** * Checks if the provided znode is the same as this region server's - * @param znode to check + * @param regionserver the id of the region server * @return if this is this rs's znode */ - boolean isThisOurZnode(String znode); + boolean isThisOurRegionServer(String regionserver); /** * Add a peer to hfile reference queue if peer does not exist. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java new file mode 100644 index 0000000..12fc6a1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various + * ReplicationQueues Implementations with different constructor arguments by reflection. + */ +@InterfaceAudience.Private +public class ReplicationQueuesArguments { + + private ZooKeeperWatcher zk; + private Configuration conf; + private Abortable abort; + + public ReplicationQueuesArguments(Configuration conf, Abortable abort) { + this.conf = conf; + this.abort = abort; + } + + public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) { + this(conf, abort); + setZk(zk); + } + + public ZooKeeperWatcher getZk() { + return zk; + } + + public void setZk(ZooKeeperWatcher zk) { + this.zk = zk; + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Abortable getAbortable() { + return abort; + } + + public void setAbortable(Abortable abort) { + this.abort = abort; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index 7fa3bbb..160a9d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import java.util.List; +import java.util.Set; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -42,7 +43,7 @@ public interface ReplicationQueuesClient { * @return a list of server names * @throws KeeperException zookeeper exception */ - List getListOfReplicators() throws KeeperException; + List getListOfReplicators() throws KeeperException, ReplicationException; /** * Get a list of all WALs in the given queue on the given region server. @@ -61,11 +62,12 @@ public interface ReplicationQueuesClient { List getAllQueues(String serverName) throws KeeperException; /** - * Get the cversion of replication rs node. This can be used as optimistic locking to get a - * consistent snapshot of the replication queues. - * @return cversion of replication rs node + * Load all wals in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all WALs in the zookeeper at the start of this call even there + * is concurrent queue failover. However, some newly created WALs during the call may + * not be included. */ - int getQueuesZNodeCversion() throws KeeperException; + Set getAllWALs() throws KeeperException; /** * Get the change version number of replication hfile references node. This can be used as diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java new file mode 100644 index 0000000..834f831 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct + * various ReplicationQueuesClient Implementations with different constructor arguments by + * reflection. + */ +@InterfaceAudience.Private +public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments { + public ReplicationQueuesClientArguments(Configuration conf, Abortable abort, + ZooKeeperWatcher zk) { + super(conf, abort, zk); + } + public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) { + super(conf, abort); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index cc407e3..b0ded7d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -19,7 +19,12 @@ package org.apache.hadoop.hbase.replication; import java.util.List; +import java.util.Set; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; @@ -32,6 +37,12 @@ import org.apache.zookeeper.data.Stat; public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements ReplicationQueuesClient { + Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class); + + public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) { + this(args.getZk(), args.getConf(), args.getAbortable()); + } + public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { super(zk, conf, abortable); @@ -74,7 +85,45 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem return result; } - @Override public int getQueuesZNodeCversion() throws KeeperException { + @Override + public Set getAllWALs() throws KeeperException { + /** + * Load all wals in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all WALs in the zookeeper at the start of this call even there + * is concurrent queue failover. However, some newly created WALs during the call may + * not be included. + */ + for (int retry = 0; ; retry++) { + int v0 = getQueuesZNodeCversion(); + List rss = getListOfReplicators(); + if (rss == null) { + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return ImmutableSet.of(); + } + Set wals = Sets.newHashSet(); + for (String rs : rss) { + List listOfPeers = getAllQueues(rs); + // if rs just died, this will be null + if (listOfPeers == null) { + continue; + } + for (String id : listOfPeers) { + List peersWals = getLogsInQueue(rs, id); + if (peersWals != null) { + wals.addAll(peersWals); + } + } + } + int v1 = getQueuesZNodeCversion(); + if (v0 == v1) { + return wals; + } + LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", + v0, v1, retry)); + } + } + + public int getQueuesZNodeCversion() throws KeeperException { try { Stat stat = new Stat(); ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 0833bca..169891a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.replication; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.Map; +import java.util.Set; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -72,6 +72,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); + public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) { + this(args.getZk(), args.getConf(), args.getAbortable()); + } + public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { super(zk, conf, abortable); @@ -169,13 +173,13 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public boolean isThisOurZnode(String znode) { - return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode); + public boolean isThisOurRegionServer(String regionserver) { + return ZKUtil.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode); } @Override - public SortedMap> claimQueues(String regionserverZnode) { - SortedMap> newQueues = new TreeMap>(); + public Map> claimQueues(String regionserverZnode) { + Map> newQueues = new HashMap<>(); // check whether there is multi support. If yes, use it. if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue"); @@ -226,7 +230,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R this.abortable.abort("Failed to get a list of queues for region server: " + this.myQueuesZnode, e); } - return listOfQueues; + return listOfQueues == null ? new ArrayList() : listOfQueues; } /** @@ -310,8 +314,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R * @param znode pertaining to the region server to copy the queues from * @return WAL queues sorted per peer cluster */ - private SortedMap> copyQueuesFromRSUsingMulti(String znode) { - SortedMap> queues = new TreeMap>(); + private Map> copyQueuesFromRSUsingMulti(String znode) { + Map> queues = new HashMap<>(); // hbase/replication/rs/deadrs String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); List peerIdsToProcess = null; @@ -338,7 +342,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R continue; // empty log queue. } // create the new cluster znode - SortedSet logQueue = new TreeSet(); + Set logQueue = new HashSet(); queues.put(newPeerId, logQueue); ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); listOfOps.add(op); @@ -381,10 +385,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R * @param znode server names to copy * @return all wals for all peers of that cluster, null if an error occurred */ - private SortedMap> copyQueuesFromRS(String znode) { + private Map> copyQueuesFromRS(String znode) { // TODO this method isn't atomic enough, we could start copying and then // TODO fail for some reason and we would end up with znodes we don't want. - SortedMap> queues = new TreeMap>(); + Map> queues = new HashMap<>(); try { String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); List clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath); @@ -414,7 +418,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, HConstants.EMPTY_BYTE_ARRAY); - SortedSet logQueue = new TreeSet(); + Set logQueue = new HashSet(); queues.put(newCluster, logQueue); for (String wal : wals) { String z = ZKUtil.joinZNode(clusterPath, wal); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java new file mode 100644 index 0000000..9c26489 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java @@ -0,0 +1,502 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.replication; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/* + * Abstract class that provides an interface to the Replication Table. Which is currently + * being used for WAL offset tracking. + * The basic schema of this table will store each individual queue as a + * seperate row. The row key will be a unique identifier of the creating server's name and the + * queueId. Each queue must have the following two columns: + * COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue + * COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this + * queue. The most recent previous owner is the leftmost entry. + * They will also have columns mapping [WAL filename : offset] + * The most flexible method of interacting with the Replication Table is by calling + * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up + * to the caller to close the returned table. + */ +@InterfaceAudience.Private +abstract class ReplicationTableBase { + + /** Name of the HBase Table used for tracking replication*/ + public static final TableName REPLICATION_TABLE_NAME = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + + // Column family and column names for Queues in the Replication Table + public static final byte[] CF_QUEUE = Bytes.toBytes("q"); + public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o"); + public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h"); + + // Column Descriptor for the Replication Table + private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = + new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) + .setInMemory(true) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + .setBloomFilterType(BloomType.NONE); + + // The value used to delimit the queueId and server name inside of a queue's row key. Currently a + // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens. + // See HBASE-11394. + public static final String ROW_KEY_DELIMITER = "-"; + + // The value used to delimit server names in the queue history list + public static final String QUEUE_HISTORY_DELIMITER = "|"; + + /* + * Because many Replication Table operations abort the server on failure, we have to make sure that + * ReplicationTable operations can handle longer periods of Replication Table region unavailability + * Because RPC requests fail almost immediately on region unavailability, the amount of time we can + * tolerate region unavailability is largely determined by (retries * pause). With the default + * setup we can tolerate at least 10 minutes of region unavailability before aborting. + */ + private static final int DEFAULT_CLIENT_RETRIES = 100; + private static final int DEFAULT_CLIENT_PAUSE = 6000; + private static final int DEFAULT_RPC_TIMEOUT = 60000; + private static final int DEFAULT_SCANNER_TIMEOUT = DEFAULT_CLIENT_RETRIES * DEFAULT_CLIENT_PAUSE; + + /* + * Make sure that the HBase Replication Table initialization has the proper timeouts. Because + * HBase servers can come up a lot sooner than the cluster is ready to create tables and this + * is a one time operation, we can accept longer pauses than normal. + */ + private static final int DEFAULT_INIT_RETRIES = 60; + private static final int DEFAULT_INIT_PAUSE = 60000; + private static final int DEFAULT_INIT_RPC_TIMEOUT = 60000; + + // We only need a single thread to initialize the Replication Table + private static final int NUM_INITIALIZE_WORKERS = 1; + + protected final Configuration conf; + protected final Abortable abortable; + private final Connection connection; + private final Executor executor; + private volatile CountDownLatch replicationTableInitialized; + private int clientRetries; + private int clientPause; + private int rpcTimeout; + private int scannerTimeout; + private int operationTimeout; + private int initRetries; + private int initPause; + private int initRpcTimeout; + private int initOperationTimeout; + + public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { + this.conf = new Configuration(conf); + this.abortable = abort; + readTimeoutConf(); + decorateTimeoutConf(); + this.connection = ConnectionFactory.createConnection(this.conf); + this.executor = setUpExecutor(); + this.replicationTableInitialized = new CountDownLatch(1); + createReplicationTableInBackground(); + } + + /** + * Read in the configuration values that set up the Replication Table operation timeouts and + * retries + */ + private void readTimeoutConf() { + clientRetries = conf.getInt("hbase.replication.table.client.retries", DEFAULT_CLIENT_RETRIES); + clientPause = conf.getInt("hbase.replication.table.client.pause", DEFAULT_CLIENT_PAUSE); + rpcTimeout = conf.getInt("hbase.replication.table.rpc.timeout", DEFAULT_RPC_TIMEOUT); + scannerTimeout = conf.getInt("hbase.replication.table.scanner.timeout", + DEFAULT_SCANNER_TIMEOUT); + initRetries = conf.getInt("hbase.replication.table.init.retries", DEFAULT_INIT_RETRIES); + initPause = conf.getInt("hbase.replication.table.init.pause", DEFAULT_INIT_PAUSE); + initRpcTimeout = conf.getInt("hbase.replication.table.init.timeout", DEFAULT_INIT_RPC_TIMEOUT); + operationTimeout = calculateTimeout(clientRetries, clientPause, rpcTimeout); + initOperationTimeout = calculateTimeout(initRetries, initPause, initRpcTimeout); + } + + private int calculateTimeout(int retries, int pause, int rpcTimeout) { + return retries * (pause + rpcTimeout); + } + + /** + * Set up the configuration values for normal Replication Table operations + */ + private void decorateTimeoutConf() { + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, clientPause); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, clientRetries); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scannerTimeout); + conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, operationTimeout); + conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, operationTimeout); + } + + /** + * Sets up the thread pool executor used to build the Replication Table in the background + * @return the configured executor + */ + private Executor setUpExecutor() { + ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS, + NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); + tfb.setNameFormat("ReplicationTableExecutor-%d"); + tfb.setDaemon(true); + tempExecutor.setThreadFactory(tfb.build()); + return tempExecutor; + } + + /** + * Get whether the Replication Table has been successfully initialized yet + * @return whether the Replication Table is initialized + */ + public boolean getInitializationStatus() { + return replicationTableInitialized.getCount() == 0; + } + + /** + * Build the row key for the given queueId. This will uniquely identify it from all other queues + * in the cluster. + * @param serverName The owner of the queue + * @param queueId String identifier of the queue + * @return String representation of the queue's row key + */ + protected String buildQueueRowKey(String serverName, String queueId) { + return queueId + ROW_KEY_DELIMITER + serverName; + } + + /** + * Parse the original queueId from a row key + * @param rowKey String representation of a queue's row key + * @return the original queueId + */ + protected String getRawQueueIdFromRowKey(String rowKey) { + return rowKey.split(ROW_KEY_DELIMITER)[0]; + } + + /** + * Returns a queue's row key given either its raw or reclaimed queueId + * + * @param queueId queueId of the queue + * @return byte representation of the queue's row key + */ + protected byte[] queueIdToRowKey(String serverName, String queueId) { + // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen + // then this is not a reclaimed queue. + if (!queueId.contains(ROW_KEY_DELIMITER)) { + return Bytes.toBytes(buildQueueRowKey(serverName, queueId)); + // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the + // queue's row key + } else { + return Bytes.toBytes(queueId); + } + } + + /** + * Creates a "|" delimited record of the queue's past region server owners. + * + * @param originalHistory the queue's original owner history + * @param oldServer the name of the server that used to own the queue + * @return the queue's new owner history + */ + protected String buildClaimedQueueHistory(String originalHistory, String oldServer) { + return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory; + } + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + protected List getListOfReplicators() throws ReplicationException { + // scan all of the queues and return a list of all unique OWNER values + Set peerServers = new HashSet(); + ResultScanner allQueuesInCluster = null; + try (Table replicationTable = getOrBlockOnReplicationTable()){ + Scan scan = new Scan(); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + allQueuesInCluster = replicationTable.getScanner(scan); + for (Result queue : allQueuesInCluster) { + peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER))); + } + } catch (IOException e) { + String errMsg = "Failed getting list of replicators"; + throw new ReplicationException(errMsg, e); + } finally { + if (allQueuesInCluster != null) { + allQueuesInCluster.close(); + } + } + return new ArrayList(peerServers); + } + + protected List getAllQueues(String serverName) { + List allQueues = new ArrayList(); + ResultScanner queueScanner = null; + try { + queueScanner = getQueuesBelongingToServer(serverName); + for (Result queue : queueScanner) { + String rowKey = Bytes.toString(queue.getRow()); + // If the queue does not have a Owner History, then we must be its original owner. So we + // want to return its queueId in raw form + if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) { + allQueues.add(getRawQueueIdFromRowKey(rowKey)); + } else { + allQueues.add(rowKey); + } + } + return allQueues; + } catch (IOException e) { + String errMsg = "Failed getting list of all replication queues for serverName=" + serverName; + abortable.abort(errMsg, e); + return null; + } finally { + if (queueScanner != null) { + queueScanner.close(); + } + } + } + + protected List getLogsInQueue(String serverName, String queueId) { + String rowKey = queueId; + if (!queueId.contains(ROW_KEY_DELIMITER)) { + rowKey = buildQueueRowKey(serverName, queueId); + } + return getLogsInQueue(Bytes.toBytes(rowKey)); + } + + protected List getLogsInQueue(byte[] rowKey) { + String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey); + try (Table replicationTable = getOrBlockOnReplicationTable()) { + Get getQueue = new Get(rowKey); + Result queue = replicationTable.get(getQueue); + if (queue == null || queue.isEmpty()) { + abortable.abort(errMsg, new ReplicationException(errMsg)); + return null; + } + return readWALsFromResult(queue); + } catch (IOException e) { + abortable.abort(errMsg, e); + return null; + } + } + + /** + * Read all of the WAL's from a queue into a list + * + * @param queue HBase query result containing the queue + * @return a list of all the WAL filenames + */ + protected List readWALsFromResult(Result queue) { + List wals = new ArrayList<>(); + Map familyMap = queue.getFamilyMap(CF_QUEUE); + for (byte[] cQualifier : familyMap.keySet()) { + // Ignore the meta data fields of the queue + if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, + COL_QUEUE_OWNER_HISTORY)) { + continue; + } + wals.add(Bytes.toString(cQualifier)); + } + return wals; + } + + /** + * Get the queue id's and meta data (Owner and History) for the queues belonging to the named + * server + * + * @param server name of the server + * @return a ResultScanner over the QueueIds belonging to the server + * @throws IOException + */ + protected ResultScanner getQueuesBelongingToServer(String server) throws IOException { + Scan scan = new Scan(); + SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); + scan.setFilter(filterMyQueues); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY); + try (Table replicationTable = getOrBlockOnReplicationTable()) { + ResultScanner results = replicationTable.getScanner(scan); + return results; + } + } + + /** + * Attempts to acquire the Replication Table. This operation will block until it is assigned by + * the CreateReplicationWorker thread. It is up to the caller of this method to close the + * returned Table + * @return the Replication Table when it is created + * @throws IOException + */ + protected Table getOrBlockOnReplicationTable() throws IOException { + // Sleep until the Replication Table becomes available + try { + replicationTableInitialized.await(); + } catch (InterruptedException e) { + String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " + + e.getMessage(); + throw new InterruptedIOException(errMsg); + } + return getAndSetUpReplicationTable(); + } + + /** + * Creates a new copy of the Replication Table and sets up the proper Table time outs for it + * + * @return the Replication Table + * @throws IOException + */ + private Table getAndSetUpReplicationTable() throws IOException { + Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME); + setReplicationTableTimeOuts(replicationTable); + return replicationTable; + } + + /** + * Increases the RPC and operations timeouts for the Replication Table + */ + private Table setReplicationTableTimeOuts(Table replicationTable) { + replicationTable.setRpcTimeout(rpcTimeout); + replicationTable.setOperationTimeout(operationTimeout); + return replicationTable; + } + + /** + * Builds the Replication Table in a background thread. Any method accessing the Replication Table + * should do so through getOrBlockOnReplicationTable() + * + * @return the Replication Table + * @throws IOException if the Replication Table takes too long to build + */ + private void createReplicationTableInBackground() throws IOException { + executor.execute(new CreateReplicationTableWorker()); + } + + /** + * Attempts to build the Replication Table. Will continue blocking until we have a valid + * Table for the Replication Table. + */ + private class CreateReplicationTableWorker implements Runnable { + + private Configuration initConf; + private Connection initConnection; + private Admin initAdmin; + + @Override + public void run() { + try { + initConf = buildTableInitConf(); + initConnection = ConnectionFactory.createConnection(initConf); + initAdmin = initConnection.getAdmin(); + createReplicationTable(); + int maxRetries = conf.getInt("hbase.replication.createtable.retries.number", + DEFAULT_CLIENT_RETRIES); + RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, DEFAULT_RPC_TIMEOUT); + RetryCounter retryCounter = counterFactory.create(); + while (!replicationTableAvailable()) { + retryCounter.sleepUntilNextRetry(); + if (!retryCounter.shouldRetry()) { + throw new IOException("Unable to acquire the Replication Table"); + } + } + replicationTableInitialized.countDown(); + initAdmin.close(); + } catch (IOException | InterruptedException e) { + abortable.abort("Failed building Replication Table", e); + } + } + + private Configuration buildTableInitConf() { + Configuration tempConf = new Configuration(conf); + tempConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, initRetries); + tempConf.setInt(HConstants.HBASE_CLIENT_PAUSE, initPause); + tempConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, initRpcTimeout); + tempConf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, initOperationTimeout); + return tempConf; + } + + /** + * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR + * in TableBasedReplicationQueuesImpl + * + * @throws IOException + */ + private void createReplicationTable() throws IOException { + HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); + replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); + try { + initAdmin.createTable(replicationTableDescriptor); + } catch (TableExistsException e) { + // In this case we can just continue as normal + } + } + + /** + * Checks whether the Replication Table exists yet + * + * @return whether the Replication Table exists + * @throws IOException + */ + private boolean replicationTableAvailable() { + try { + + return initAdmin.tableExists(REPLICATION_TABLE_NAME) && + initAdmin.isTableAvailable(REPLICATION_TABLE_NAME); + } catch (Exception e) { + // Catches a null pointer exception that is thrown when admin.tableExists() is called too + // early. In this case assume the Table is not available yet. + return false; + } + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java new file mode 100644 index 0000000..130827b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Implements the ReplicationQueuesClient interface on top of the Replication Table. It utilizes + * the ReplicationTableBase to access the Replication Table. + */ +@InterfaceAudience.Private +public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase + implements ReplicationQueuesClient { + + public TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args) + throws IOException { + super(args.getConf(), args.getAbortable()); + } + public TableBasedReplicationQueuesClientImpl(Configuration conf, + Abortable abortable) throws IOException { + super(conf, abortable); + } + + @Override + public void init() throws ReplicationException{ + // no-op + } + + @Override + public List getListOfReplicators() throws ReplicationException { + return super.getListOfReplicators(); + } + + @Override + public List getLogsInQueue(String serverName, String queueId) { + return super.getLogsInQueue(serverName, queueId); + } + + @Override + public List getAllQueues(String serverName) { + return super.getAllQueues(serverName); + } + + @Override + public Set getAllWALs() { + Set allWals = new HashSet(); + ResultScanner allQueues = null; + try (Table replicationTable = getOrBlockOnReplicationTable()) { + allQueues = replicationTable.getScanner(new Scan()); + for (Result queue : allQueues) { + for (String wal : readWALsFromResult(queue)) { + allWals.add(wal); + } + } + } catch (IOException e) { + String errMsg = "Failed getting all WAL's in Replication Table"; + abortable.abort(errMsg, e); + } finally { + if (allQueues != null) { + allQueues.close(); + } + } + return allWals; + } + + @Override + public int getHFileRefsNodeChangeVersion() throws KeeperException { + // TODO + throw new NotImplementedException(); + } + + @Override + public List getAllPeersFromHFileRefsQueue() throws KeeperException { + // TODO + throw new NotImplementedException(); + } + + @Override + public List getReplicableHFiles(String peerId) throws KeeperException { + // TODO + throw new NotImplementedException(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java new file mode 100644 index 0000000..1e5d11b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java @@ -0,0 +1,427 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This class provides an implementation of the ReplicationQueues interface using an HBase table + * "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table. + */ +@InterfaceAudience.Private +public class TableBasedReplicationQueuesImpl extends ReplicationTableBase + implements ReplicationQueues { + + private static final Log LOG = LogFactory.getLog(TableBasedReplicationQueuesImpl.class); + + // Common byte values used in replication offset tracking + private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L); + private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes(""); + + private String serverName = null; + private byte[] serverNameBytes = null; + + // TODO: Only use this variable temporarily. Eventually we want to use HBase to store all + // TODO: replication information + private ReplicationStateZKBase replicationState; + + public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException { + this(args.getConf(), args.getAbortable(), args.getZk()); + } + + public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw) + throws IOException { + super(conf, abort); + replicationState = new ReplicationStateZKBase(zkw, conf, abort) {}; + } + + @Override + public void init(String serverName) throws ReplicationException { + this.serverName = serverName; + this.serverNameBytes = Bytes.toBytes(serverName); + } + + @Override + public List getListOfReplicators() throws ReplicationException { + return super.getListOfReplicators(); + } + + @Override + public void removeQueue(String queueId) { + try { + if (checkQueueExists(queueId)) { + byte[] rowKey = queueIdToRowKey(queueId); + Delete deleteQueue = new Delete(rowKey); + safeQueueUpdate(deleteQueue); + } else { + LOG.info("No logs were registered for queue id=" + queueId + " so no rows were removed " + + "from the replication table while removing the queue"); + } + } catch (IOException | ReplicationException e) { + String errMsg = "Failed removing queue queueId=" + queueId; + abortable.abort(errMsg, e); + } + } + + @Override + public void addLog(String queueId, String filename) throws ReplicationException { + try (Table replicationTable = getOrBlockOnReplicationTable()) { + if (!checkQueueExists(queueId)) { + // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values + Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); + putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + replicationTable.put(putNewQueue); + } else { + // Otherwise simply add the new log and offset as a new column + Put putNewLog = new Put(queueIdToRowKey(queueId)); + putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + safeQueueUpdate(putNewLog); + } + } catch (IOException | ReplicationException e) { + String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; + throw new ReplicationException(errMsg, e); + } + } + + @Override + public void removeLog(String queueId, String filename) { + try { + byte[] rowKey = queueIdToRowKey(queueId); + Delete delete = new Delete(rowKey); + delete.addColumns(CF_QUEUE, Bytes.toBytes(filename)); + safeQueueUpdate(delete); + } catch (IOException | ReplicationException e) { + String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename; + abortable.abort(errMsg, e); + } + } + + @Override + public void setLogPosition(String queueId, String filename, long position) { + try (Table replicationTable = getOrBlockOnReplicationTable()) { + byte[] rowKey = queueIdToRowKey(queueId); + // Check that the log exists. addLog() must have been called before setLogPosition(). + Get checkLogExists = new Get(rowKey); + checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename)); + if (!replicationTable.exists(checkLogExists)) { + String errMsg = "Could not set position of non-existent log from queueId=" + queueId + + ", filename=" + filename; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + // Update the log offset if it exists + Put walAndOffset = new Put(rowKey); + walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position)); + safeQueueUpdate(walAndOffset); + } catch (IOException | ReplicationException e) { + String errMsg = "Failed writing log position queueId=" + queueId + "filename=" + + filename + " position=" + position; + abortable.abort(errMsg, e); + } + } + + @Override + public long getLogPosition(String queueId, String filename) throws ReplicationException { + try { + byte[] rowKey = queueIdToRowKey(queueId); + Get getOffset = new Get(rowKey); + getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename)); + Result result = getResultIfOwner(getOffset); + if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) { + throw new ReplicationException("Could not read empty result while getting log position " + + "queueId=" + queueId + ", filename=" + filename); + } + return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename))); + } catch (IOException e) { + throw new ReplicationException("Could not get position in log for queueId=" + queueId + + ", filename=" + filename); + } + } + + @Override + public void removeAllQueues() { + List myQueueIds = getAllQueues(); + for (String queueId : myQueueIds) { + removeQueue(queueId); + } + } + + @Override + public List getLogsInQueue(String queueId) { + String errMsg = "Failed getting logs in queue queueId=" + queueId; + byte[] rowKey = queueIdToRowKey(queueId); + List logs = new ArrayList(); + try { + Get getQueue = new Get(rowKey); + Result queue = getResultIfOwner(getQueue); + if (queue == null || queue.isEmpty()) { + String errMsgLostOwnership = "Failed getting logs for queue queueId=" + + Bytes.toString(rowKey) + " because the queue was missing or we lost ownership"; + abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership)); + return null; + } + Map familyMap = queue.getFamilyMap(CF_QUEUE); + for(byte[] cQualifier : familyMap.keySet()) { + if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, + COL_QUEUE_OWNER_HISTORY)) { + continue; + } + logs.add(Bytes.toString(cQualifier)); + } + } catch (IOException e) { + abortable.abort(errMsg, e); + return null; + } + return logs; + } + + @Override + public List getAllQueues() { + return getAllQueues(serverName); + } + + @Override + public Map> claimQueues(String regionserver) { + Map> queues = new HashMap<>(); + if (isThisOurRegionServer(regionserver)) { + return queues; + } + ResultScanner queuesToClaim = null; + try { + queuesToClaim = getQueuesBelongingToServer(regionserver); + for (Result queue : queuesToClaim) { + if (attemptToClaimQueue(queue, regionserver)) { + String rowKey = Bytes.toString(queue.getRow()); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey); + if (replicationState.peerExists(replicationQueueInfo.getPeerId())) { + Set sortedLogs = new HashSet(); + List logs = getLogsInQueue(queue.getRow()); + for (String log : logs) { + sortedLogs.add(log); + } + queues.put(rowKey, sortedLogs); + LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver); + } else { + // Delete orphaned queues + removeQueue(Bytes.toString(queue.getRow())); + LOG.info(serverName + " has deleted abandoned queue " + rowKey + " from " + + regionserver); + } + } + } + } catch (IOException | KeeperException e) { + String errMsg = "Failed claiming queues for regionserver=" + regionserver; + abortable.abort(errMsg, e); + queues.clear(); + } finally { + if (queuesToClaim != null) { + queuesToClaim.close(); + } + } + return queues; + } + + @Override + public boolean isThisOurRegionServer(String regionserver) { + return this.serverName.equals(regionserver); + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void removePeerFromHFileRefs(String peerId) { + // TODO + throw new NotImplementedException(); + } + + @Override + public void addHFileRefs(String peerId, List files) throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void removeHFileRefs(String peerId, List files) { + // TODO + throw new NotImplementedException(); + } + + private String buildQueueRowKey(String queueId) { + return buildQueueRowKey(serverName, queueId); + } + + /** + * Convenience method that gets the row key of the queue specified by queueId + * @param queueId queueId of a queue in this server + * @return the row key of the queue in the Replication Table + */ + private byte[] queueIdToRowKey(String queueId) { + return queueIdToRowKey(serverName, queueId); + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * + * @param put Row mutation to perform on the queue + */ + private void safeQueueUpdate(Put put) throws ReplicationException, IOException { + RowMutations mutations = new RowMutations(put.getRow()); + mutations.add(put); + safeQueueUpdate(mutations); + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * + * @param delete Row mutation to perform on the queue + */ + private void safeQueueUpdate(Delete delete) throws ReplicationException, + IOException{ + RowMutations mutations = new RowMutations(delete.getRow()); + mutations.add(delete); + safeQueueUpdate(mutations); + } + + /** + * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column + * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost + * ownership of the column or an IO Exception has occurred during the transaction. + * + * @param mutate Mutation to perform on a given queue + */ + private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{ + try (Table replicationTable = getOrBlockOnReplicationTable()) { + boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), + CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate); + if (!updateSuccess) { + throw new ReplicationException("Failed to update Replication Table because we lost queue " + + " ownership"); + } + } + } + + /** + * Check if the queue specified by queueId is stored in HBase + * + * @param queueId Either raw or reclaimed format of the queueId + * @return Whether the queue is stored in HBase + * @throws IOException + */ + private boolean checkQueueExists(String queueId) throws IOException { + try (Table replicationTable = getOrBlockOnReplicationTable()) { + byte[] rowKey = queueIdToRowKey(queueId); + return replicationTable.exists(new Get(rowKey)); + } + } + + /** + * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the + * recently killed server is still the OWNER before we claim it. + * + * @param queue The queue that we are trying to claim + * @param originalServer The server that originally owned the queue + * @return Whether we successfully claimed the queue + * @throws IOException + */ + private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{ + Put putQueueNameAndHistory = new Put(queue.getRow()); + putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName)); + String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE, + COL_QUEUE_OWNER_HISTORY)), originalServer); + putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, + Bytes.toBytes(newOwnerHistory)); + RowMutations claimAndRenameQueue = new RowMutations(queue.getRow()); + claimAndRenameQueue.add(putQueueNameAndHistory); + // Attempt to claim ownership for this queue by checking if the current OWNER is the original + // server. If it is not then another RS has already claimed it. If it is we set ourselves as the + // new owner and update the queue's history + try (Table replicationTable = getOrBlockOnReplicationTable()) { + boolean success = replicationTable.checkAndMutate(queue.getRow(), + CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), + claimAndRenameQueue); + return success; + } + } + + /** + * Attempts to run a Get on some queue. Will only return a non-null result if we currently own + * the queue. + * + * @param get The Get that we want to query + * @return The result of the Get if this server is the owner of the queue. Else it returns null. + * @throws IOException + */ + private Result getResultIfOwner(Get get) throws IOException { + Scan scan = new Scan(get); + // Check if the Get currently contains all columns or only specific columns + if (scan.getFamilyMap().size() > 0) { + // Add the OWNER column if the scan is already only over specific columns + scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); + } + scan.setMaxResultSize(1); + SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, + CompareFilter.CompareOp.EQUAL, serverNameBytes); + scan.setFilter(checkOwner); + ResultScanner scanner = null; + try (Table replicationTable = getOrBlockOnReplicationTable()) { + scanner = replicationTable.getScanner(scan); + Result result = scanner.next(); + return (result == null || result.isEmpty()) ? null : result; + } finally { + if (scanner != null) { + scanner.close(); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java index 5df9379..a7b2f26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -141,15 +142,16 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { super.setConf(conf); try { initReplicationQueuesClient(conf, zk); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } } private void initReplicationQueuesClient(Configuration conf, ZooKeeperWatcher zk) - throws ZooKeeperConnectionException, IOException { + throws Exception { this.zkw = zk; - this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, new WarnOnlyAbortable()); + this.rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments( + conf, new WarnOnlyAbortable(), zkw)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 9ecba11..9e724db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import java.io.IOException; import java.util.Collections; @@ -67,7 +68,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { try { // The concurrently created new WALs may not be included in the return list, // but they won't be deleted because they're not in the checking set. - wals = loadWALsFromQueues(); + wals = replicationQueues.getAllWALs(); } catch (KeeperException e) { LOG.warn("Failed to read zookeeper, skipping checking deletable files"); return Collections.emptyList(); @@ -88,43 +89,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { }}); } - /** - * Load all wals in all replication queues from ZK. This method guarantees to return a - * snapshot which contains all WALs in the zookeeper at the start of this call even there - * is concurrent queue failover. However, some newly created WALs during the call may - * not be included. - */ - private Set loadWALsFromQueues() throws KeeperException { - for (int retry = 0; ; retry++) { - int v0 = replicationQueues.getQueuesZNodeCversion(); - List rss = replicationQueues.getListOfReplicators(); - if (rss == null) { - LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); - return ImmutableSet.of(); - } - Set wals = Sets.newHashSet(); - for (String rs : rss) { - List listOfPeers = replicationQueues.getAllQueues(rs); - // if rs just died, this will be null - if (listOfPeers == null) { - continue; - } - for (String id : listOfPeers) { - List peersWals = replicationQueues.getLogsInQueue(rs, id); - if (peersWals != null) { - wals.addAll(peersWals); - } - } - } - int v1 = replicationQueues.getQueuesZNodeCversion(); - if (v0 == v1) { - return wals; - } - LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", - v0, v1, retry)); - } - } - @Override public void setConf(Configuration config) { // If replication is disabled, keep all members null @@ -148,10 +112,10 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { super.setConf(conf); try { this.zkw = zk; - this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, - new WarnOnlyAbortable()); + this.replicationQueues = ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), zkw)); this.replicationQueues.init(); - } catch (ReplicationException e) { + } catch (Exception e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 06138a5..42e1e2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; @@ -131,7 +132,8 @@ public class Replication extends WALActionsListener.Base implements if (replication) { try { this.replicationQueues = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server, + server.getZooKeeper())); this.replicationQueues.init(this.server.getServerName().toString()); this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); @@ -139,7 +141,7 @@ public class Replication extends WALActionsListener.Base implements this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, this.conf, this.server, this.server); - } catch (ReplicationException e) { + } catch (Exception e) { throw new IOException("Failed replication handler create", e); } UUID clusterId = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ab19223..4e866ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; @@ -65,6 +64,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; /** * This class is responsible to manage all the replication @@ -240,19 +241,11 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationQueues.addPeerToHFileRefs(id); } } - List currentReplicators = this.replicationQueues.getListOfReplicators(); - if (currentReplicators == null || currentReplicators.size() == 0) { - return; - } - List otherRegionServers = replicationTracker.getListOfRegionServers(); - LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " - + otherRegionServers); - - // Look if there's anything to process after a restart - for (String rs : currentReplicators) { - if (!otherRegionServers.contains(rs)) { - transferQueues(rs); - } + AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker(); + try { + this.executor.execute(adoptionWorker); + } catch (RejectedExecutionException ex) { + LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage()); } } @@ -317,9 +310,6 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void join() { this.executor.shutdown(); - if (this.sources.size() == 0) { - this.replicationQueues.removeAllQueues(); - } for (ReplicationSourceInterface source : this.sources) { source.terminate("Region server is closing"); } @@ -643,7 +633,7 @@ public class ReplicationSourceManager implements ReplicationListener { @Override public void run() { - if (this.rq.isThisOurZnode(rsZnode)) { + if (this.rq.isThisOurRegionServer(rsZnode)) { return; } // Wait a bit before transferring the queues, we may be shutting down. @@ -659,7 +649,7 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Not transferring queue since we are shutting down"); return; } - SortedMap> newQueues = null; + Map> newQueues = null; newQueues = this.rq.claimQueues(rsZnode); @@ -670,9 +660,9 @@ public class ReplicationSourceManager implements ReplicationListener { return; } - for (Map.Entry> entry : newQueues.entrySet()) { + for (Map.Entry> entry : newQueues.entrySet()) { String peerId = entry.getKey(); - SortedSet walsSet = entry.getValue(); + Set walsSet = entry.getValue(); try { // there is not an actual peer defined corresponding to peerId for the failover. ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); @@ -729,6 +719,52 @@ public class ReplicationSourceManager implements ReplicationListener { } } + class AdoptAbandonedQueuesWorker extends Thread{ + + public AdoptAbandonedQueuesWorker() {} + + @Override + public void run() { + List currentReplicators = null; + boolean success = false; + // Retry for as long as we can with 1 second sleeps + RetryCounterFactory retryCounterFactory = new RetryCounterFactory(Integer.MAX_VALUE, 1000); + RetryCounter retries = retryCounterFactory.create(); + while (!success && retries.shouldRetry()) { + try { + // getListOfReplicators() can throw an exception if Table based replication is being used + // and Replication Table is not up yet. It can also return null if no un-adopted queues + // exist. + currentReplicators = replicationQueues.getListOfReplicators(); + success = true; + } catch (ReplicationException e) { + try { + retries.sleepUntilNextRetry(); + LOG.warn("AdoptAbandonedQueuesWorker has failed getting list of replicators attempt=" + + retries.getAttemptTimes()); + } catch (InterruptedException ie) { + return; + } + } + } + if (currentReplicators == null || currentReplicators.size() == 0) { + return; + } + List otherRegionServers = replicationTracker.getListOfRegionServers(); + LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + + otherRegionServers); + + // Look if there's anything to process after a restart + for (String rs : currentReplicators) { + if (!otherRegionServers.contains(rs)) { + transferQueues(rs); + } + } + } + } + + + /** * Get the directory where wals are archived * @return the directory where wals are archived diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 64212c9..ef5bd83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -38,7 +38,9 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -67,13 +69,14 @@ public class ReplicationChecker { try { this.zkw = zkw; this.errorReporter = errorReporter; - this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection); + this.queuesClient = ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(conf, connection, zkw)); this.queuesClient.init(); this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, connection); this.replicationPeers.init(); this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection); - } catch (ReplicationException e) { + } catch (Exception e) { throw new IOException("failed to construct ReplicationChecker", e); } @@ -103,7 +106,6 @@ public class ReplicationChecker { undeletedQueueIds.put(replicator, new ArrayList()); } undeletedQueueIds.get(replicator).add(queueId); - String msg = "Undeleted replication queue for removed peer found: " + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), replicator, queueId); @@ -112,8 +114,8 @@ public class ReplicationChecker { } } } - } catch (KeeperException ke) { - throw new IOException(ke); + } catch (KeeperException | ReplicationException e) { + throw new IOException(e); } checkUnDeletedHFileRefsQueues(peerIds); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 03d7aee..128fc99 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; @@ -128,7 +129,7 @@ public class TestReplicationAdmin { Configuration conf = TEST_UTIL.getConfiguration(); ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(zkw, conf, null); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw)); repQueues.init("server1"); // add queue for ID_ONE diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 8efa754..162f782 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -48,7 +48,9 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -93,7 +95,7 @@ public class TestLogsCleaner { Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); repQueues.init(server.getServerName().toString()); final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); @@ -165,7 +167,7 @@ public class TestLogsCleaner { ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); cleaner.setConf(conf); - ReplicationQueuesClient rqcMock = Mockito.mock(ReplicationQueuesClient.class); + ReplicationQueuesClientZKImpl rqcMock = Mockito.mock(ReplicationQueuesClientZKImpl.class); Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4); Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index eb793dc..65dbb5f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; @@ -87,8 +88,7 @@ public class TestReplicationHFileCleaner { Replication.decorateMasterConfiguration(conf); rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); rp.init(); - - rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); rq.init(server.getServerName().toString()); try { fs = FileSystem.get(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 2d5277b..0ac518c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -22,8 +22,8 @@ import static org.junit.Assert.*; import java.util.ArrayList; import java.util.List; -import java.util.SortedMap; -import java.util.SortedSet; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -121,7 +121,7 @@ public abstract class TestReplicationStateBasic { rq1.removeQueue("bogus"); rq1.removeLog("bogus", "bogus"); rq1.removeAllQueues(); - assertNull(rq1.getAllQueues()); + assertEquals(0, rq1.getAllQueues().size()); assertEquals(0, rq1.getLogPosition("bogus", "bogus")); assertNull(rq1.getLogsInQueue("bogus")); assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size()); @@ -146,7 +146,7 @@ public abstract class TestReplicationStateBasic { assertEquals(0, rq3.claimQueues(server1).size()); assertEquals(2, rq3.getListOfReplicators().size()); - SortedMap> queues = rq2.claimQueues(server3); + Map> queues = rq2.claimQueues(server3); assertEquals(5, queues.size()); assertEquals(1, rq2.getListOfReplicators().size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java new file mode 100644 index 0000000..2c1533c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -0,0 +1,473 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static junit.framework.TestCase.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({ReplicationTests.class, MediumTests.class}) +public class TestReplicationStateHBaseImpl { + + private static Configuration conf; + private static HBaseTestingUtility utility; + private static ZooKeeperWatcher zkw; + private static String replicationZNode; + + private static ReplicationQueues rq1; + private static ReplicationQueues rq2; + private static ReplicationQueues rq3; + private static ReplicationQueuesClient rqc; + private static ReplicationPeers rp; + + + private static final String server0 = ServerName.valueOf("hostname0.example.org", 1234, -1L) + .toString(); + private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 1L) + .toString(); + private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L) + .toString(); + private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L) + .toString(); + + private static DummyServer ds0; + private static DummyServer ds1; + private static DummyServer ds2; + private static DummyServer ds3; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + utility = new HBaseTestingUtility(); + conf = utility.getConfiguration(); + conf.setClass("hbase.region.replica.replication.replicationQueues.class", + TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); + conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", + TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); + utility.startMiniCluster(); + // To make the Unit Test's run faster we set a lower pause value for table initialization + conf.setInt("hbase.replication.table.init.pause", 1000); + zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); + } + + @Before + public void setUp() { + try { + ds0 = new DummyServer(server0); + rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments( + conf, ds0)); + ds1 = new DummyServer(server1); + rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); + rq1.init(server1); + ds2 = new DummyServer(server2); + rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw)); + rq2.init(server2); + ds3 = new DummyServer(server3); + rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw)); + rq3.init(server3); + rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); + rp.init(); + } catch (Exception e) { + fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage()); + } + } + + @Test + public void checkNamingSchema() throws Exception { + assertTrue(rq1.isThisOurRegionServer(server1)); + assertTrue(!rq1.isThisOurRegionServer(server1 + "a")); + assertTrue(!rq1.isThisOurRegionServer(null)); + } + + @Test + public void testSingleReplicationQueuesHBaseImpl() { + try { + // Test adding in WAL files + assertEquals(0, rq1.getAllQueues().size()); + rq1.addLog("Queue1", "WALLogFile1.1"); + assertEquals(1, rq1.getAllQueues().size()); + rq1.addLog("Queue1", "WALLogFile1.2"); + rq1.addLog("Queue1", "WALLogFile1.3"); + rq1.addLog("Queue1", "WALLogFile1.4"); + rq1.addLog("Queue2", "WALLogFile2.1"); + rq1.addLog("Queue3", "WALLogFile3.1"); + assertEquals(3, rq1.getAllQueues().size()); + assertEquals(4, rq1.getLogsInQueue("Queue1").size()); + assertEquals(1, rq1.getLogsInQueue("Queue2").size()); + assertEquals(1, rq1.getLogsInQueue("Queue3").size()); + // Make sure that abortCount is still 0 + assertEquals(0, ds1.getAbortCount()); + // Make sure that getting a log from a non-existent queue triggers an abort + assertNull(rq1.getLogsInQueue("Queue4")); + assertEquals(1, ds1.getAbortCount()); + } catch (ReplicationException e) { + e.printStackTrace(); + fail("testAddLog received a ReplicationException"); + } + try { + + // Test updating the log positions + assertEquals(0L, rq1.getLogPosition("Queue1", "WALLogFile1.1")); + rq1.setLogPosition("Queue1", "WALLogFile1.1", 123L); + assertEquals(123L, rq1.getLogPosition("Queue1", "WALLogFile1.1")); + rq1.setLogPosition("Queue1", "WALLogFile1.1", 123456789L); + assertEquals(123456789L, rq1.getLogPosition("Queue1", "WALLogFile1.1")); + rq1.setLogPosition("Queue2", "WALLogFile2.1", 242L); + assertEquals(242L, rq1.getLogPosition("Queue2", "WALLogFile2.1")); + rq1.setLogPosition("Queue3", "WALLogFile3.1", 243L); + assertEquals(243L, rq1.getLogPosition("Queue3", "WALLogFile3.1")); + + // Test that setting log positions in non-existing logs will cause an abort + assertEquals(1, ds1.getAbortCount()); + rq1.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L); + assertEquals(2, ds1.getAbortCount()); + rq1.setLogPosition("NotHereQueue", "NotHereFile", 243L); + assertEquals(3, ds1.getAbortCount()); + rq1.setLogPosition("Queue1", "NotHereFile", 243l); + assertEquals(4, ds1.getAbortCount()); + + // Test reading log positions for non-existent queues and WAL's + try { + rq1.getLogPosition("Queue1", "NotHereWAL"); + fail("Replication queue should have thrown a ReplicationException for reading from a " + + "non-existent WAL"); + } catch (ReplicationException e) { + } + try { + rq1.getLogPosition("NotHereQueue", "NotHereWAL"); + fail("Replication queue should have thrown a ReplicationException for reading from a " + + "non-existent queue"); + } catch (ReplicationException e) { + } + // Test removing logs + rq1.removeLog("Queue1", "WALLogFile1.1"); + assertEquals(3, rq1.getLogsInQueue("Queue1").size()); + // Test removing queues + rq1.removeQueue("Queue2"); + assertEquals(2, rq1.getAllQueues().size()); + assertNull(rq1.getLogsInQueue("Queue2")); + // Test that getting logs from a non-existent queue aborts + assertEquals(5, ds1.getAbortCount()); + // Test removing all queues for a Region Server + rq1.removeAllQueues(); + assertEquals(0, rq1.getAllQueues().size()); + assertNull(rq1.getLogsInQueue("Queue1")); + // Test that getting logs from a non-existent queue aborts + assertEquals(6, ds1.getAbortCount()); + // Test removing a non-existent queue does not cause an abort. This is because we can + // attempt to remove a queue that has no corresponding Replication Table row (if we never + // registered a WAL for it) + rq1.removeQueue("NotHereQueue"); + assertEquals(6, ds1.getAbortCount()); + } catch (ReplicationException e) { + e.printStackTrace(); + fail("testAddLog received a ReplicationException"); + } + } + + @Test + public void TestMultipleReplicationQueuesHBaseImpl () { + try { + rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/1"), null); + rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/2"), null); + rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/3"), null); + } catch (ReplicationException e) { + fail("Failed to add peers to ReplicationPeers"); + } + try { + // Test adding in WAL files + rq1.addLog("Queue1", "WALLogFile1.1"); + rq1.addLog("Queue1", "WALLogFile1.2"); + rq1.addLog("Queue1", "WALLogFile1.3"); + rq1.addLog("Queue1", "WALLogFile1.4"); + rq1.addLog("Queue2", "WALLogFile2.1"); + rq1.addLog("Queue3", "WALLogFile3.1"); + rq2.addLog("Queue1", "WALLogFile1.1"); + rq2.addLog("Queue1", "WALLogFile1.2"); + rq2.addLog("Queue2", "WALLogFile2.1"); + rq3.addLog("Queue1", "WALLogFile1.1"); + // Test adding logs to replication queues + assertEquals(3, rq1.getAllQueues().size()); + assertEquals(2, rq2.getAllQueues().size()); + assertEquals(1, rq3.getAllQueues().size()); + assertEquals(4, rq1.getLogsInQueue("Queue1").size()); + assertEquals(1, rq1.getLogsInQueue("Queue2").size()); + assertEquals(1, rq1.getLogsInQueue("Queue3").size()); + assertEquals(2, rq2.getLogsInQueue("Queue1").size()); + assertEquals(1, rq2.getLogsInQueue("Queue2").size()); + assertEquals(1, rq3.getLogsInQueue("Queue1").size()); + } catch (ReplicationException e) { + e.printStackTrace(); + fail("testAddLogs received a ReplicationException"); + } + try { + // Test setting and reading offset in queues + rq1.setLogPosition("Queue1", "WALLogFile1.1", 1l); + rq1.setLogPosition("Queue1", "WALLogFile1.2", 2l); + rq1.setLogPosition("Queue1", "WALLogFile1.3", 3l); + rq1.setLogPosition("Queue2", "WALLogFile2.1", 4l); + rq1.setLogPosition("Queue2", "WALLogFile2.2", 5l); + rq1.setLogPosition("Queue3", "WALLogFile3.1", 6l); + rq2.setLogPosition("Queue1", "WALLogFile1.1", 7l); + rq2.setLogPosition("Queue2", "WALLogFile2.1", 8l); + rq3.setLogPosition("Queue1", "WALLogFile1.1", 9l); + assertEquals(1l, rq1.getLogPosition("Queue1", "WALLogFile1.1")); + assertEquals(2l, rq1.getLogPosition("Queue1", "WALLogFile1.2")); + assertEquals(4l, rq1.getLogPosition("Queue2", "WALLogFile2.1")); + assertEquals(6l, rq1.getLogPosition("Queue3", "WALLogFile3.1")); + assertEquals(7l, rq2.getLogPosition("Queue1", "WALLogFile1.1")); + assertEquals(8l, rq2.getLogPosition("Queue2", "WALLogFile2.1")); + assertEquals(9l, rq3.getLogPosition("Queue1", "WALLogFile1.1")); + assertEquals(rq1.getListOfReplicators().size(), 3); + assertEquals(rq2.getListOfReplicators().size(), 3); + assertEquals(rq3.getListOfReplicators().size(), 3); + } catch (ReplicationException e) { + fail("testAddLogs threw a ReplicationException"); + } + try { + // Test claiming queues + Map> claimedQueuesFromRq2 = rq1.claimQueues(server2); + // Check to make sure that list of peers with outstanding queues is decremented by one + // after claimQueues + assertEquals(rq1.getListOfReplicators().size(), 2); + assertEquals(rq2.getListOfReplicators().size(), 2); + assertEquals(rq3.getListOfReplicators().size(), 2); + // Check to make sure that we claimed the proper number of queues + assertEquals(2, claimedQueuesFromRq2.size()); + assertTrue(claimedQueuesFromRq2.containsKey("Queue1-" + server2)); + assertTrue(claimedQueuesFromRq2.containsKey("Queue2-" + server2)); + assertEquals(2, claimedQueuesFromRq2.get("Queue1-" + server2).size()); + assertEquals(1, claimedQueuesFromRq2.get("Queue2-" + server2).size()); + assertEquals(5, rq1.getAllQueues().size()); + // Check that all the logs in the other queue were claimed + assertEquals(2, rq1.getLogsInQueue("Queue1-" + server2).size()); + assertEquals(1, rq1.getLogsInQueue("Queue2-" + server2).size()); + // Check that the offsets of the claimed queues are the same + assertEquals(7l, rq1.getLogPosition("Queue1-" + server2, "WALLogFile1.1")); + assertEquals(8l, rq1.getLogPosition("Queue2-" + server2, "WALLogFile2.1")); + // Check that the queues were properly removed from rq2 + assertEquals(0, rq2.getAllQueues().size()); + assertNull(rq2.getLogsInQueue("Queue1")); + assertNull(rq2.getLogsInQueue("Queue2")); + // Check that non-existent peer queues are not claimed + rq1.addLog("UnclaimableQueue", "WALLogFile1.1"); + rq1.addLog("UnclaimableQueue", "WALLogFile1.2"); + assertEquals(6, rq1.getAllQueues().size()); + Map> claimedQueuesFromRq1 = rq3.claimQueues(server1); + assertEquals(rq1.getListOfReplicators().size(), 1); + assertEquals(rq2.getListOfReplicators().size(), 1); + assertEquals(rq3.getListOfReplicators().size(), 1); + // Note that we do not pick up the queue: UnclaimableQueue which was not registered in + // Replication Peers + assertEquals(6, rq3.getAllQueues().size()); + // Test claiming non-existing queues + Map> noQueues = rq3.claimQueues("NotARealServer"); + assertEquals(0, noQueues.size()); + assertEquals(6, rq3.getAllQueues().size()); + // Test claiming own queues + noQueues = rq3.claimQueues(server3); + assertEquals(0, noQueues.size()); + assertEquals(6, rq3.getAllQueues().size()); + // Check that rq3 still remain on list of replicators + assertEquals(1, rq3.getListOfReplicators().size()); + } catch (ReplicationException e) { + fail("testClaimQueue threw a ReplicationException"); + } + } + + @Test + public void TestReplicationQueuesClient() throws Exception{ + + // Test ReplicationQueuesClient log tracking + rq1.addLog("Queue1", "WALLogFile1.1"); + assertEquals(1, rqc.getLogsInQueue(server1, "Queue1").size()); + rq1.removeLog("Queue1", "WALLogFile1.1"); + assertEquals(0, rqc.getLogsInQueue(server1, "Queue1").size()); + rq2.addLog("Queue2", "WALLogFile2.1"); + rq2.addLog("Queue2", "WALLogFile2.2"); + assertEquals(2, rqc.getLogsInQueue(server2, "Queue2").size()); + rq3.addLog("Queue1", "WALLogFile1.1"); + rq3.addLog("Queue3", "WALLogFile3.1"); + rq3.addLog("Queue3", "WALLogFile3.2"); + + // Test ReplicationQueueClient log tracking for faulty cases + assertEquals(0, ds0.getAbortCount()); + assertNull(rqc.getLogsInQueue("NotHereServer", "NotHereQueue")); + assertNull(rqc.getLogsInQueue(server1, "NotHereQueue")); + assertNull(rqc.getLogsInQueue("NotHereServer", "WALLogFile1.1")); + assertEquals(3, ds0.getAbortCount()); + // Test ReplicationQueueClient replicators + List replicators = rqc.getListOfReplicators(); + assertEquals(3, replicators.size()); + assertTrue(replicators.contains(server1)); + assertTrue(replicators.contains(server2)); + rq1.removeQueue("Queue1"); + assertEquals(2, rqc.getListOfReplicators().size()); + + // Test ReplicationQueuesClient queue tracking + assertEquals(0, rqc.getAllQueues(server1).size()); + rq1.addLog("Queue2", "WALLogFile2.1"); + rq1.addLog("Queue3", "WALLogFile3.1"); + assertEquals(2, rqc.getAllQueues(server1).size()); + rq1.removeAllQueues(); + assertEquals(0, rqc.getAllQueues(server1).size()); + + // Test ReplicationQueuesClient queue tracking for faulty cases + assertEquals(0, rqc.getAllQueues("NotHereServer").size()); + + // Test ReplicationQueuesClient get all WAL's + assertEquals(5 , rqc.getAllWALs().size()); + rq3.removeLog("Queue1", "WALLogFile1.1"); + assertEquals(4, rqc.getAllWALs().size()); + rq3.removeAllQueues(); + assertEquals(2, rqc.getAllWALs().size()); + rq2.removeAllQueues(); + assertEquals(0, rqc.getAllWALs().size()); + } + + @After + public void clearQueues() throws Exception{ + rq1.removeAllQueues(); + rq2.removeAllQueues(); + rq3.removeAllQueues(); + assertEquals(0, rq1.getAllQueues().size()); + assertEquals(0, rq2.getAllQueues().size()); + assertEquals(0, rq3.getAllQueues().size()); + ds0.resetAbortCount(); + ds1.resetAbortCount(); + ds2.resetAbortCount(); + ds3.resetAbortCount(); + } + + @After + public void tearDown() throws KeeperException, IOException { + ZKUtil.deleteNodeRecursively(zkw, replicationZNode); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + utility.shutdownMiniCluster(); + utility.shutdownMiniZKCluster(); + } + + static class DummyServer implements Server { + private String serverName; + private boolean isAborted = false; + private boolean isStopped = false; + private int abortCount = 0; + + public DummyServer(String serverName) { + this.serverName = serverName; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return null; + } + + @Override + public ClusterConnection getConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + @Override + public ServerName getServerName() { + return ServerName.valueOf(this.serverName); + } + + @Override + public void abort(String why, Throwable e) { + abortCount++; + this.isAborted = true; + } + + @Override + public boolean isAborted() { + return this.isAborted; + } + + @Override + public void stop(String why) { + this.isStopped = true; + } + + @Override + public boolean isStopped() { + return this.isStopped; + } + + @Override + public ChoreService getChoreService() { + return null; + } + + public int getAbortCount() { + return abortCount; + } + + public void resetAbortCount() { + abortCount = 0; + } + + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index d6bf4ea..45984f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKConfig; @@ -48,7 +50,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(MediumTests.class) +@Category({ReplicationTests.class, MediumTests.class}) public class TestReplicationStateZKImpl extends TestReplicationStateBasic { private static final Log LOG = LogFactory.getLog(TestReplicationStateZKImpl.class); @@ -91,10 +93,17 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { DummyServer ds1 = new DummyServer(server1); DummyServer ds2 = new DummyServer(server2); DummyServer ds3 = new DummyServer(server3); - rq1 = ReplicationFactory.getReplicationQueues(zkw, conf, ds1); - rq2 = ReplicationFactory.getReplicationQueues(zkw, conf, ds2); - rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3); - rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1); + try { + rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); + rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw)); + rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw)); + rqc = ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(conf, ds1, zkw)); + } catch (Exception e) { + // This should not occur, because getReplicationQueues() only throws for + // TableBasedReplicationQueuesImpl + fail("ReplicationFactory.getReplicationQueues() threw an IO Exception"); + } rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java new file mode 100644 index 0000000..cc7f3e7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java @@ -0,0 +1,115 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests ReplicationTableBase behavior when the Master startup is delayed. The table initialization + * should be non-blocking, but any method calls that access the table should be blocking. + */ +@Category({ReplicationTests.class, MediumTests.class}) +public class TestReplicationTableBase { + + private static long SLEEP_MILLIS = 5000; + private static long TIME_OUT_MILLIS = 3000; + private static Configuration conf; + private static HBaseTestingUtility utility; + private static ZooKeeperWatcher zkw; + private static ReplicationTableBase rb; + private static ReplicationQueues rq; + private static ReplicationQueuesClient rqc; + private volatile boolean asyncRequestSuccess = false; + + @Test + public void testSlowStartup() throws Exception{ + utility = new HBaseTestingUtility(); + utility.startMiniZKCluster(); + conf = utility.getConfiguration(); + conf.setClass("hbase.region.replica.replication.replicationQueues.class", + TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); + conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", + TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // To make the Unit Test's run faster we set a lower pause value for table initialization + conf.setInt("hbase.replication.table.init.pause", 1000); + zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); + utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + rb = new ReplicationTableBase(conf, zkw) {}; + rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments( + conf, zkw, zkw)); + rqc = ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(conf, zkw, zkw)); + return true; + } + @Override + public String explainFailure() throws Exception { + return "Failed to initialize ReplicationTableBase, TableBasedReplicationQueuesClient and " + + "TableBasedReplicationQueues after a timeout=" + TIME_OUT_MILLIS + + " ms. Their initialization " + "should be non-blocking"; + } + }); + final RequestReplicationQueueData async = new RequestReplicationQueueData(); + async.start(); + Thread.sleep(SLEEP_MILLIS); + // Test that the Replication Table has not been assigned and the methods are blocking + assertFalse(rb.getInitializationStatus()); + assertFalse(asyncRequestSuccess); + utility.startMiniCluster(); + // Test that the methods do return the correct results after getting the table + utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + async.join(); + return true; + } + @Override + public String explainFailure() throws Exception { + return "ReplicationQueue failed to return list of replicators even after Replication Table " + + "was initialized timeout=" + TIME_OUT_MILLIS + " ms"; + } + }); + assertTrue(asyncRequestSuccess); + } + + public class RequestReplicationQueueData extends Thread { + @Override + public void run() { + try { + assertEquals(0, rq.getListOfReplicators().size()); + asyncRequestSuccess = true; + } catch (ReplicationException e) { + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 268d173..2e9328e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -33,7 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.SortedMap; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; @@ -66,13 +67,12 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -86,68 +86,63 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -@Category(MediumTests.class) -public class TestReplicationSourceManager { +/** + * An abstract class that tests ReplicationSourceManager. Classes that extend this class should + * set up the proper config for this class and initialize the proper cluster using + * HBaseTestingUtility. + */ +@Category({ReplicationTests.class, MediumTests.class}) +public abstract class TestReplicationSourceManager { - private static final Log LOG = + protected static final Log LOG = LogFactory.getLog(TestReplicationSourceManager.class); - private static Configuration conf; + protected static Configuration conf; - private static HBaseTestingUtility utility; + protected static HBaseTestingUtility utility; - private static Replication replication; + protected static Replication replication; - private static ReplicationSourceManager manager; + protected static ReplicationSourceManager manager; - private static ZooKeeperWatcher zkw; + protected static ZooKeeperWatcher zkw; - private static HTableDescriptor htd; + protected static HTableDescriptor htd; - private static HRegionInfo hri; + protected static HRegionInfo hri; - private static final byte[] r1 = Bytes.toBytes("r1"); + protected static final byte[] r1 = Bytes.toBytes("r1"); - private static final byte[] r2 = Bytes.toBytes("r2"); + protected static final byte[] r2 = Bytes.toBytes("r2"); - private static final byte[] f1 = Bytes.toBytes("f1"); + protected static final byte[] f1 = Bytes.toBytes("f1"); - private static final byte[] f2 = Bytes.toBytes("f2"); + protected static final byte[] f2 = Bytes.toBytes("f2"); - private static final TableName test = + protected static final TableName test = TableName.valueOf("test"); - private static final String slaveId = "1"; - - private static FileSystem fs; + protected static final String slaveId = "1"; - private static Path oldLogDir; + protected static FileSystem fs; - private static Path logDir; + protected static Path oldLogDir; - private static CountDownLatch latch; + protected static Path logDir; - private static List files = new ArrayList(); + protected static CountDownLatch latch; - @BeforeClass - public static void setUpBeforeClass() throws Exception { - - conf = HBaseConfiguration.create(); - conf.set("replication.replicationsource.implementation", - ReplicationSourceDummy.class.getCanonicalName()); - conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, - HConstants.REPLICATION_ENABLE_DEFAULT); - conf.setLong("replication.sleep.before.failover", 2000); - conf.setInt("replication.source.maxretriesmultiplier", 10); - utility = new HBaseTestingUtility(conf); - utility.startMiniZKCluster(); + protected static List files = new ArrayList(); + protected static NavigableMap scopes; + protected static void setupZkAndReplication() throws Exception { + // The implementing class should set up the conf + assertNotNull(conf); zkw = new ZooKeeperWatcher(conf, "test", null); ZKUtil.createWithParents(zkw, "/hbase/replication"); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); @@ -282,9 +277,11 @@ public class TestReplicationSourceManager { public void testClaimQueues() throws Exception { conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); + + ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, + server.getZooKeeper())); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); @@ -324,8 +321,8 @@ public class TestReplicationSourceManager { public void testCleanupFailoverQueues() throws Exception { final Server server = new DummyServer("hostname1.example.org"); ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, + server.getZooKeeper())); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet files = new TreeSet(); @@ -339,7 +336,8 @@ public class TestReplicationSourceManager { } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, + s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); @@ -347,8 +345,7 @@ public class TestReplicationSourceManager { NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( new Long(1), new Long(2))); - w1.start(); - w1.join(5000); + w1.run(); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); @@ -357,108 +354,6 @@ public class TestReplicationSourceManager { assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); } - @Test - public void testNodeFailoverDeadServerParsing() throws Exception { - LOG.debug("testNodeFailoverDeadServerParsing"); - conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); - final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); - ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); - repQueues.init(server.getServerName().toString()); - // populate some znodes in the peer znode - files.add("log1"); - files.add("log2"); - for (String file : files) { - repQueues.addLog("1", file); - } - - // create 3 DummyServers - Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); - Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); - Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); - - // simulate three servers fail sequentially - ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); - rq1.init(s1.getServerName().toString()); - SortedMap> testMap = - rq1.claimQueues(server.getServerName().getServerName()); - ReplicationQueues rq2 = - ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2); - rq2.init(s2.getServerName().toString()); - testMap = rq2.claimQueues(s1.getServerName().getServerName()); - ReplicationQueues rq3 = - ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3); - rq3.init(s3.getServerName().toString()); - testMap = rq3.claimQueues(s2.getServerName().getServerName()); - - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); - List result = replicationQueueInfo.getDeadRegionServers(); - - // verify - assertTrue(result.contains(server.getServerName().getServerName())); - assertTrue(result.contains(s1.getServerName().getServerName())); - assertTrue(result.contains(s2.getServerName().getServerName())); - - server.abort("", null); - } - - @Test - public void testFailoverDeadServerCversionChange() throws Exception { - LOG.debug("testFailoverDeadServerCversionChange"); - - conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); - final Server s0 = new DummyServer("cversion-change0.example.org"); - ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0); - repQueues.init(s0.getServerName().toString()); - // populate some znodes in the peer znode - files.add("log1"); - files.add("log2"); - for (String file : files) { - repQueues.addLog("1", file); - } - // simulate queue transfer - Server s1 = new DummyServer("cversion-change1.example.org"); - ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); - rq1.init(s1.getServerName().toString()); - - ReplicationQueuesClient client = - ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1); - - int v0 = client.getQueuesZNodeCversion(); - rq1.claimQueues(s0.getServerName().getServerName()); - int v1 = client.getQueuesZNodeCversion(); - // cversion should increased by 1 since a child node is deleted - assertEquals(v0 + 1, v1); - - s0.abort("", null); - } - - @Test - public void testCleanupUnknownPeerZNode() throws Exception { - final Server server = new DummyServer("hostname2.example.org"); - ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); - rq.init(server.getServerName().toString()); - // populate some znodes in the peer znode - // add log to an unknown peer - String group = "testgroup"; - rq.addLog("2", group + ".log1"); - rq.addLog("2", group + ".log2"); - - NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName()); - w1.run(); - - // The log of the unknown peer should be removed from zk - for (String peer : manager.getAllQueues()) { - assertTrue(peer.startsWith("1")); - } - } - - @Test public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { // 1. Create wal key WALKey logKey = new WALKey(); @@ -470,7 +365,7 @@ public class TestReplicationSourceManager { // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled assertNull("No bulk load entries scope should be added if bulk load replication is diabled.", - logKey.getScopes()); + logKey.getScopes()); } @Test @@ -508,8 +403,8 @@ public class TestReplicationSourceManager { try { DummyServer server = new DummyServer(); ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), + server, server.getZooKeeper())); rq.init(server.getServerName().toString()); // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface // initialization to throw an exception. @@ -571,7 +466,7 @@ public class TestReplicationSourceManager { } static class DummyNodeFailoverWorker extends Thread { - private SortedMap> logZnodesMap; + private Map> logZnodesMap; Server server; private String deadRsZnode; ReplicationQueues rq; @@ -580,8 +475,8 @@ public class TestReplicationSourceManager { this.deadRsZnode = znode; this.server = s; this.rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, + server.getZooKeeper())); this.rq.init(this.server.getServerName().toString()); } @@ -601,12 +496,12 @@ public class TestReplicationSourceManager { * @return 1 when the map is not empty. */ private int isLogZnodesMapPopulated() { - Collection> sets = logZnodesMap.values(); + Collection> sets = logZnodesMap.values(); if (sets.size() > 1) { throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size()); } if (sets.size() == 1) { - SortedSet s = sets.iterator().next(); + Set s = sets.iterator().next(); for (String file : files) { // at least one file was missing if (!s.contains(file)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java new file mode 100644 index 0000000..8a460a0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -0,0 +1,174 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; +import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and + * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in + * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors. + */ +@Category({ReplicationTests.class, MediumTests.class}) +public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = HBaseConfiguration.create(); + conf.set("replication.replicationsource.implementation", + ReplicationSourceDummy.class.getCanonicalName()); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, + HConstants.REPLICATION_ENABLE_DEFAULT); + conf.setLong("replication.sleep.before.failover", 2000); + conf.setInt("replication.source.maxretriesmultiplier", 10); + utility = new HBaseTestingUtility(conf); + utility.startMiniZKCluster(); + setupZkAndReplication(); + } + + // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl + @Test + public void testNodeFailoverDeadServerParsing() throws Exception { + LOG.debug("testNodeFailoverDeadServerParsing"); + conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); + final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); + ReplicationQueues repQueues = + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, + server.getZooKeeper())); + repQueues.init(server.getServerName().toString()); + // populate some znodes in the peer znode + files.add("log1"); + files.add("log2"); + for (String file : files) { + repQueues.addLog("1", file); + } + + // create 3 DummyServers + Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); + Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); + Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); + + // simulate three servers fail sequentially + ReplicationQueues rq1 = + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, + s1.getZooKeeper())); + rq1.init(s1.getServerName().toString()); + Map> testMap = + rq1.claimQueues(server.getServerName().getServerName()); + ReplicationQueues rq2 = + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2, + s2.getZooKeeper())); + rq2.init(s2.getServerName().toString()); + testMap = rq2.claimQueues(s1.getServerName().getServerName()); + ReplicationQueues rq3 = + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3, + s3.getZooKeeper())); + rq3.init(s3.getServerName().toString()); + testMap = rq3.claimQueues(s2.getServerName().getServerName()); + + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.keySet().iterator().next()); + List result = replicationQueueInfo.getDeadRegionServers(); + + // verify + assertTrue(result.contains(server.getServerName().getServerName())); + assertTrue(result.contains(s1.getServerName().getServerName())); + assertTrue(result.contains(s2.getServerName().getServerName())); + + server.stop(""); + } + + @Test + public void testFailoverDeadServerCversionChange() throws Exception { + LOG.debug("testFailoverDeadServerCversionChange"); + + conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); + final Server s0 = new DummyServer("cversion-change0.example.org"); + ReplicationQueues repQueues = + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0, + s0.getZooKeeper())); + repQueues.init(s0.getServerName().toString()); + // populate some znodes in the peer znode + files.add("log1"); + files.add("log2"); + for (String file : files) { + repQueues.addLog("1", file); + } + // simulate queue transfer + Server s1 = new DummyServer("cversion-change1.example.org"); + ReplicationQueues rq1 = + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, + s1.getZooKeeper())); + rq1.init(s1.getServerName().toString()); + + ReplicationQueuesClientZKImpl client = + (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); + + int v0 = client.getQueuesZNodeCversion(); + rq1.claimQueues(s0.getServerName().getServerName()); + int v1 = client.getQueuesZNodeCversion(); + // cversion should increase by 1 since a child node is deleted + assertEquals(v0 + 1, v1); + + s0.stop(""); + } + + @Test + public void testCleanupUnknownPeerZNode() throws Exception { + final Server server = new DummyServer("hostname2.example.org"); + ReplicationQueues rq = + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), + server, server.getZooKeeper())); + rq.init(server.getServerName().toString()); + // populate some znodes in the peer znode + // add log to an unknown peer + String group = "testgroup"; + rq.addLog("2", group + ".log1"); + rq.addLog("2", group + ".log2"); + + ReplicationSourceManager.NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName()); + w1.run(); + + // The log of the unknown peer should be removed from zk + for (String peer : manager.getAllQueues()) { + assertTrue(peer.startsWith("1")); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java new file mode 100644 index 0000000..09e2939 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java @@ -0,0 +1,63 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl; +import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; + +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Tests the ReplicationSourceManager with TableBasedReplicationQueue's and + * TableBasedReplicationQueuesClient + */ +@Category({ReplicationTests.class, MediumTests.class}) +public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationSourceManager { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = HBaseConfiguration.create(); + conf.set("replication.replicationsource.implementation", + ReplicationSourceDummy.class.getCanonicalName()); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, + HConstants.REPLICATION_ENABLE_DEFAULT); + // To make the Unit Test's run faster we set a lower pause value for table initialization + conf.setInt("hbase.replication.table.init.pause", 1000); + conf.setLong("replication.sleep.before.failover", 2000); + conf.setInt("replication.source.maxretriesmultiplier", 10); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); + utility = new HBaseTestingUtility(conf); + utility.startMiniCluster(); + conf.setClass("hbase.region.replica.replication.replicationQueues.class", + TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); + conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", + TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + setupZkAndReplication(); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index d8087f5..59ce4c3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; @@ -2267,7 +2268,8 @@ public class TestHBaseFsck { // create replicator ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(zkw, conf, connection); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, connection, + zkw)); repQueues.init("server1"); // queues for current peer, no errors repQueues.addLog("1", "file1"); -- 2.8.0-rc2