From d9a15a78ddeaea9067e11e39b38b76f16c5733fe Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 26 Dec 2017 15:10:59 +0800 Subject: [PATCH] HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly --- .../hbase/replication/ReplicationFactory.java | 15 +- .../hbase/replication/ReplicationQueues.java | 160 -------- .../replication/ReplicationQueuesArguments.java | 70 ---- .../hbase/replication/ReplicationQueuesZKImpl.java | 407 ------------------- .../hbase/replication/ReplicationTableBase.java | 442 -------------------- .../replication/ReplicationTrackerZKImpl.java | 21 +- .../TableBasedReplicationQueuesImpl.java | 448 --------------------- .../replication/ZKReplicationQueueStorage.java | 22 + .../replication/TestReplicationStateBasic.java | 131 +++--- .../replication/TestReplicationStateZKImpl.java | 41 +- .../org/apache/hadoop/hbase/master/HMaster.java | 17 +- .../regionserver/DumpReplicationQueues.java | 15 +- .../regionserver/RecoveredReplicationSource.java | 17 +- .../RecoveredReplicationSourceShipper.java | 22 +- .../replication/regionserver/Replication.java | 38 +- .../regionserver/ReplicationSource.java | 22 +- .../regionserver/ReplicationSourceInterface.java | 11 +- .../regionserver/ReplicationSourceManager.java | 212 +++++----- .../hbase/master/cleaner/TestLogsCleaner.java | 12 +- .../cleaner/TestReplicationHFileCleaner.java | 26 +- .../cleaner/TestReplicationZKNodeCleaner.java | 22 +- .../hbase/replication/ReplicationSourceDummy.java | 6 +- .../replication/TestMultiSlaveReplication.java | 2 - .../regionserver/TestReplicationSourceManager.java | 98 ++--- .../TestReplicationSourceManagerZkImpl.java | 58 ++- 25 files changed, 351 insertions(+), 1984 deletions(-) delete mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java delete mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java delete mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java delete mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java delete mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 07111c6..5e70e57 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,12 +17,11 @@ */ package org.apache.hadoop.hbase.replication; -import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; /** * A factory class for instantiating replication objects that deal with replication state. @@ -31,15 +29,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @InterfaceAudience.Private public class ReplicationFactory { - public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class; - - public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args) - throws Exception { - Class classToBuild = args.getConf().getClass("hbase.region.replica." + - "replication.replicationQueues.class", defaultReplicationQueueClass); - return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args); - } - public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, Abortable abortable) { return getReplicationPeers(zk, conf, null, abortable); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java deleted file mode 100644 index 7f440b1..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.List; -import java.util.SortedSet; - -import org.apache.hadoop.fs.Path; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.util.Pair; - -/** - * This provides an interface for maintaining a region server's replication queues. These queues - * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled) - * that still need to be replicated to remote clusters. - */ -@InterfaceAudience.Private -public interface ReplicationQueues { - - /** - * Initialize the region server replication queue interface. - * @param serverName The server name of the region server that owns the replication queues this - * interface manages. - */ - void init(String serverName) throws ReplicationException; - - /** - * Remove a replication queue. - * @param queueId a String that identifies the queue. - */ - void removeQueue(String queueId); - - /** - * Add a new WAL file to the given queue. If the queue does not exist it is created. - * @param queueId a String that identifies the queue. - * @param filename name of the WAL - */ - void addLog(String queueId, String filename) throws ReplicationException; - - /** - * Remove an WAL file from the given queue. - * @param queueId a String that identifies the queue. - * @param filename name of the WAL - */ - void removeLog(String queueId, String filename); - - /** - * Set the current position for a specific WAL in a given queue. - * @param queueId a String that identifies the queue - * @param filename name of the WAL - * @param position the current position in the file - */ - void setLogPosition(String queueId, String filename, long position); - - /** - * Get the current position for a specific WAL in a given queue. - * @param queueId a String that identifies the queue - * @param filename name of the WAL - * @return the current position in the file - */ - long getLogPosition(String queueId, String filename) throws ReplicationException; - - /** - * Remove all replication queues for this region server. - */ - void removeAllQueues(); - - /** - * Get a list of all WALs in the given queue. - * @param queueId a String that identifies the queue - * @return a list of WALs, null if no such queue exists for this server - */ - List getLogsInQueue(String queueId); - - /** - * Get a list of all queues for this region server. - * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues - */ - List getAllQueues(); - - /** - * Get queueIds from a dead region server, whose queues has not been claimed by other region - * servers. - * @return empty if the queue exists but no children, null if the queue does not exist. - */ - List getUnClaimedQueueIds(String regionserver); - - /** - * Take ownership for the queue identified by queueId and belongs to a dead region server. - * @param regionserver the id of the dead region server - * @param queueId the id of the queue - * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue. - */ - Pair> claimQueue(String regionserver, String queueId); - - /** - * Remove the znode of region server if the queue is empty. - * @param regionserver - */ - void removeReplicatorIfQueueIsEmpty(String regionserver); - - /** - * Get a list of all region servers that have outstanding replication queues. These servers could - * be alive, dead or from a previous run of the cluster. - * @return a list of server names - */ - List getListOfReplicators(); - - /** - * Checks if the provided znode is the same as this region server's - * @param regionserver the id of the region server - * @return if this is this rs's znode - */ - boolean isThisOurRegionServer(String regionserver); - - /** - * Add a peer to hfile reference queue if peer does not exist. - * @param peerId peer cluster id to be added - * @throws ReplicationException if fails to add a peer id to hfile reference queue - */ - void addPeerToHFileRefs(String peerId) throws ReplicationException; - - /** - * Remove a peer from hfile reference queue. - * @param peerId peer cluster id to be removed - */ - void removePeerFromHFileRefs(String peerId); - - /** - * Add new hfile references to the queue. - * @param peerId peer cluster id to which the hfiles need to be replicated - * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which - * will be added in the queue } - * @throws ReplicationException if fails to add a hfile reference - */ - void addHFileRefs(String peerId, List> pairs) throws ReplicationException; - - /** - * Remove hfile references from the queue. - * @param peerId peer cluster id from which this hfile references needs to be removed - * @param files list of hfile references to be removed - */ - void removeHFileRefs(String peerId, List files); -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java deleted file mode 100644 index c2a5df3..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various - * ReplicationQueues Implementations with different constructor arguments by reflection. - */ -@InterfaceAudience.Private -public class ReplicationQueuesArguments { - - private ZKWatcher zk; - private Configuration conf; - private Abortable abort; - - public ReplicationQueuesArguments(Configuration conf, Abortable abort) { - this.conf = conf; - this.abort = abort; - } - - public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZKWatcher zk) { - this(conf, abort); - setZk(zk); - } - - public ZKWatcher getZk() { - return zk; - } - - public void setZk(ZKWatcher zk) { - this.zk = zk; - } - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - } - - public Abortable getAbortable() { - return abort; - } - - public void setAbortable(Abortable abort) { - this.abort = abort; - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java deleted file mode 100644 index 7551cb7..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.ArrayList; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class provides an implementation of the - * interface using ZooKeeper. The - * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of - * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is - * the regionserver name (a concatenation of the region server’s hostname, client port and start - * code). For example: - * - * /hbase/replication/rs/hostname.example.org,6020,1234 - * - * Within this znode, the region server maintains a set of WAL replication queues. These queues are - * represented by child znodes named using there give queue id. For example: - * - * /hbase/replication/rs/hostname.example.org,6020,1234/1 - * /hbase/replication/rs/hostname.example.org,6020,1234/2 - * - * Each queue has one child znode for every WAL that still needs to be replicated. The value of - * these WAL child znodes is the latest position that has been replicated. This position is updated - * every time a WAL entry is replicated. For example: - * - * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254] - */ -@InterfaceAudience.Private -public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues { - - /** Znode containing all replication queues for this region server. */ - private String myQueuesZnode; - - private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class); - - public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) { - this(args.getZk(), args.getConf(), args.getAbortable()); - } - - public ReplicationQueuesZKImpl(final ZKWatcher zk, Configuration conf, - Abortable abortable) { - super(zk, conf, abortable); - } - - @Override - public void init(String serverName) throws ReplicationException { - this.myQueuesZnode = ZNodePaths.joinZNode(this.queuesZNode, serverName); - try { - if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode); - } - } catch (KeeperException e) { - throw new ReplicationException("Could not initialize replication queues.", e); - } - if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { - try { - if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); - } - } catch (KeeperException e) { - throw new ReplicationException("Could not initialize hfile references replication queue.", - e); - } - } - } - - @Override - public void removeQueue(String queueId) { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, - ZNodePaths.joinZNode(this.myQueuesZnode, queueId)); - } catch (KeeperException e) { - this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e); - } - } - - @Override - public void addLog(String queueId, String filename) throws ReplicationException { - String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - znode = ZNodePaths.joinZNode(znode, filename); - try { - ZKUtil.createWithParents(this.zookeeper, znode); - } catch (KeeperException e) { - throw new ReplicationException( - "Could not add log because znode could not be created. queueId=" + queueId - + ", filename=" + filename); - } - } - - @Override - public void removeLog(String queueId, String filename) { - try { - String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - znode = ZNodePaths.joinZNode(znode, filename); - ZKUtil.deleteNode(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename=" - + filename + ")", e); - } - } - - @Override - public void setLogPosition(String queueId, String filename, long position) { - try { - String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - znode = ZNodePaths.joinZNode(znode, filename); - // Why serialize String of Long and not Long as bytes? - ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position)); - } catch (KeeperException e) { - this.abortable.abort("Failed to write replication wal position (filename=" + filename - + ", position=" + position + ")", e); - } - } - - @Override - public long getLogPosition(String queueId, String filename) throws ReplicationException { - String clusterZnode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - String znode = ZNodePaths.joinZNode(clusterZnode, filename); - byte[] bytes = null; - try { - bytes = ZKUtil.getData(this.zookeeper, znode); - } catch (KeeperException e) { - throw new ReplicationException("Internal Error: could not get position in log for queueId=" - + queueId + ", filename=" + filename, e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return 0; - } - try { - return ZKUtil.parseWALPositionFrom(bytes); - } catch (DeserializationException de) { - LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename - + " znode content, continuing."); - } - // if we can not parse the position, start at the beginning of the wal file - // again - return 0; - } - - @Override - public boolean isThisOurRegionServer(String regionserver) { - return ZNodePaths.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode); - } - - @Override - public List getUnClaimedQueueIds(String regionserver) { - if (isThisOurRegionServer(regionserver)) { - return null; - } - String rsZnodePath = ZNodePaths.joinZNode(this.queuesZNode, regionserver); - List queues = null; - try { - queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath); - } catch (KeeperException e) { - this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e); - } - return queues; - } - - @Override - public Pair> claimQueue(String regionserver, String queueId) { - LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue"); - return moveQueueUsingMulti(regionserver, queueId); - } - - @Override - public void removeReplicatorIfQueueIsEmpty(String regionserver) { - String rsPath = ZNodePaths.joinZNode(this.queuesZNode, regionserver); - try { - List list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath); - if (list != null && list.isEmpty()){ - ZKUtil.deleteNode(this.zookeeper, rsPath); - } - } catch (KeeperException e) { - LOG.warn("Got error while removing replicator", e); - } - } - - @Override - public void removeAllQueues() { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode); - } catch (KeeperException e) { - // if the znode is already expired, don't bother going further - if (e instanceof KeeperException.SessionExpiredException) { - return; - } - this.abortable.abort("Failed to delete replication queues for region server: " - + this.myQueuesZnode, e); - } - } - - @Override - public List getLogsInQueue(String queueId) { - String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e); - } - return result; - } - - @Override - public List getAllQueues() { - List listOfQueues = null; - try { - listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get a list of queues for region server: " - + this.myQueuesZnode, e); - } - return listOfQueues == null ? new ArrayList<>() : listOfQueues; - } - - /** - * It "atomically" copies one peer's wals queue from another dead region server and returns them - * all sorted. The new peer id is equal to the old peer id appended with the dead server's znode. - * @param znode pertaining to the region server to copy the queues from - * @peerId peerId pertaining to the queue need to be copied - */ - private Pair> moveQueueUsingMulti(String znode, String peerId) { - try { - // hbase/replication/rs/deadrs - String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode); - List listOfOps = new ArrayList<>(); - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); - - String newPeerId = peerId + "-" + znode; - String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId); - // check the logs queue for the old peer cluster - String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId); - List wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); - - if (!peerExists(replicationQueueInfo.getPeerId())) { - LOG.warn("Peer " + replicationQueueInfo.getPeerId() + - " didn't exist, will move its queue to avoid the failure of multi op"); - for (String wal : wals) { - String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal); - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); - } - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - return null; - } - - SortedSet logQueue = new TreeSet<>(); - if (wals == null || wals.isEmpty()) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - } else { - // create the new cluster znode - ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); - listOfOps.add(op); - // get the offset of the logs and set it to new znodes - for (String wal : wals) { - String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal); - byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode); - LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset)); - String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal); - listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); - logQueue.add(wal); - } - // add delete op for peer - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - - if (LOG.isTraceEnabled()) - LOG.trace(" The multi list size is: " + listOfOps.size()); - } - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - - LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue"); - return new Pair<>(newPeerId, logQueue); - } catch (KeeperException e) { - // Multi call failed; it looks like some other regionserver took away the logs. - LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - } catch (InterruptedException e) { - LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - Thread.currentThread().interrupt(); - } - return null; - } - - @Override - public void addHFileRefs(String peerId, List> pairs) - throws ReplicationException { - String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - boolean debugEnabled = LOG.isDebugEnabled(); - if (debugEnabled) { - LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode); - } - - int size = pairs.size(); - List listOfOps = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.createAndFailSilent( - ZNodePaths.joinZNode(peerZnode, pairs.get(i).getSecond().getName()), - HConstants.EMPTY_BYTE_ARRAY)); - } - if (debugEnabled) { - LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode - + " is " + listOfOps.size()); - } - try { - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); - } catch (KeeperException e) { - throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e); - } - } - - @Override - public void removeHFileRefs(String peerId, List files) { - String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - boolean debugEnabled = LOG.isDebugEnabled(); - if (debugEnabled) { - LOG.debug("Removing hfile references " + files + " from queue " + peerZnode); - } - - int size = files.size(); - List listOfOps = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZNodePaths.joinZNode(peerZnode, files.get(i)))); - } - if (debugEnabled) { - LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode - + " is " + listOfOps.size()); - } - try { - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); - } catch (KeeperException e) { - LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e); - } - } - - @Override - public void addPeerToHFileRefs(String peerId) throws ReplicationException { - String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - try { - if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { - LOG.info("Adding peer " + peerId + " to hfile reference queue."); - ZKUtil.createWithParents(this.zookeeper, peerZnode); - } - } catch (KeeperException e) { - throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", - e); - } - } - - @Override - public void removePeerFromHFileRefs(String peerId) { - final String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - try { - if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { - if (LOG.isDebugEnabled()) { - LOG.debug("Peer " + peerZnode + " not found in hfile reference queue."); - } - return; - } else { - LOG.info("Removing peer " + peerZnode + " from hfile reference queue."); - ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode); - } - } catch (KeeperException e) { - LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.", - e); - } - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java deleted file mode 100644 index 86124bc..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java +++ /dev/null @@ -1,442 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.RetryCounterFactory; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/* - * Abstract class that provides an interface to the Replication Table. Which is currently - * being used for WAL offset tracking. - * The basic schema of this table will store each individual queue as a - * seperate row. The row key will be a unique identifier of the creating server's name and the - * queueId. Each queue must have the following two columns: - * COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue - * COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this - * queue. The most recent previous owner is the leftmost entry. - * They will also have columns mapping [WAL filename : offset] - * The most flexible method of interacting with the Replication Table is by calling - * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up - * to the caller to close the returned table. - */ -@InterfaceAudience.Private -abstract class ReplicationTableBase { - - /** Name of the HBase Table used for tracking replication*/ - public static final TableName REPLICATION_TABLE_NAME = - TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); - - // Column family and column names for Queues in the Replication Table - public static final byte[] CF_QUEUE = Bytes.toBytes("q"); - public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o"); - public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h"); - - // Column Descriptor for the Replication Table - private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = - new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) - .setInMemory(true) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // TODO: Figure out which bloom filter to use - .setBloomFilterType(BloomType.NONE); - - // The value used to delimit the queueId and server name inside of a queue's row key. Currently a - // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens. - // See HBASE-11394. - public static final String ROW_KEY_DELIMITER = "-"; - - // The value used to delimit server names in the queue history list - public static final String QUEUE_HISTORY_DELIMITER = "|"; - - /* - * Make sure that HBase table operations for replication have a high number of retries. This is - * because the server is aborted if any HBase table operation fails. Each RPC will be attempted - * 3600 times before exiting. This provides each operation with 2 hours of retries - * before the server is aborted. - */ - private static final int CLIENT_RETRIES = 3600; - private static final int RPC_TIMEOUT = 2000; - private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; - - // We only need a single thread to initialize the Replication Table - private static final int NUM_INITIALIZE_WORKERS = 1; - - protected final Configuration conf; - protected final Abortable abortable; - private final Connection connection; - private final Executor executor; - private volatile CountDownLatch replicationTableInitialized; - - public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { - this.conf = new Configuration(conf); - this.abortable = abort; - decorateConf(); - this.connection = ConnectionFactory.createConnection(this.conf); - this.executor = setUpExecutor(); - this.replicationTableInitialized = new CountDownLatch(1); - createReplicationTableInBackground(); - } - - /** - * Modify the connection's config so that operations run on the Replication Table have longer and - * a larger number of retries - */ - private void decorateConf() { - this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); - } - - /** - * Sets up the thread pool executor used to build the Replication Table in the background - * @return the configured executor - */ - private Executor setUpExecutor() { - ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS, - NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); - ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); - tfb.setNameFormat("ReplicationTableExecutor-%d"); - tfb.setDaemon(true); - tempExecutor.setThreadFactory(tfb.build()); - return tempExecutor; - } - - /** - * Get whether the Replication Table has been successfully initialized yet - * @return whether the Replication Table is initialized - */ - public boolean getInitializationStatus() { - return replicationTableInitialized.getCount() == 0; - } - - /** - * Increases the RPC and operations timeouts for the Replication Table - */ - private Table setReplicationTableTimeOuts(Table replicationTable) { - replicationTable.setRpcTimeout(RPC_TIMEOUT); - replicationTable.setOperationTimeout(OPERATION_TIMEOUT); - return replicationTable; - } - - /** - * Build the row key for the given queueId. This will uniquely identify it from all other queues - * in the cluster. - * @param serverName The owner of the queue - * @param queueId String identifier of the queue - * @return String representation of the queue's row key - */ - protected String buildQueueRowKey(String serverName, String queueId) { - return queueId + ROW_KEY_DELIMITER + serverName; - } - - /** - * Parse the original queueId from a row key - * @param rowKey String representation of a queue's row key - * @return the original queueId - */ - protected String getRawQueueIdFromRowKey(String rowKey) { - return rowKey.split(ROW_KEY_DELIMITER)[0]; - } - - /** - * Returns a queue's row key given either its raw or reclaimed queueId - * - * @param queueId queueId of the queue - * @return byte representation of the queue's row key - */ - protected byte[] queueIdToRowKey(String serverName, String queueId) { - // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen - // then this is not a reclaimed queue. - if (!queueId.contains(ROW_KEY_DELIMITER)) { - return Bytes.toBytes(buildQueueRowKey(serverName, queueId)); - // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the - // queue's row key - } else { - return Bytes.toBytes(queueId); - } - } - - /** - * Creates a "|" delimited record of the queue's past region server owners. - * - * @param originalHistory the queue's original owner history - * @param oldServer the name of the server that used to own the queue - * @return the queue's new owner history - */ - protected String buildClaimedQueueHistory(String originalHistory, String oldServer) { - return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory; - } - - /** - * Get a list of all region servers that have outstanding replication queues. These servers could - * be alive, dead or from a previous run of the cluster. - * @return a list of server names - */ - protected List getListOfReplicators() { - // scan all of the queues and return a list of all unique OWNER values - Set peerServers = new HashSet<>(); - ResultScanner allQueuesInCluster = null; - try (Table replicationTable = getOrBlockOnReplicationTable()){ - Scan scan = new Scan(); - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); - allQueuesInCluster = replicationTable.getScanner(scan); - for (Result queue : allQueuesInCluster) { - peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER))); - } - } catch (IOException e) { - String errMsg = "Failed getting list of replicators"; - abortable.abort(errMsg, e); - } finally { - if (allQueuesInCluster != null) { - allQueuesInCluster.close(); - } - } - return new ArrayList<>(peerServers); - } - - protected List getAllQueues(String serverName) { - List allQueues = new ArrayList<>(); - ResultScanner queueScanner = null; - try { - queueScanner = getQueuesBelongingToServer(serverName); - for (Result queue : queueScanner) { - String rowKey = Bytes.toString(queue.getRow()); - // If the queue does not have a Owner History, then we must be its original owner. So we - // want to return its queueId in raw form - if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) { - allQueues.add(getRawQueueIdFromRowKey(rowKey)); - } else { - allQueues.add(rowKey); - } - } - return allQueues; - } catch (IOException e) { - String errMsg = "Failed getting list of all replication queues for serverName=" + serverName; - abortable.abort(errMsg, e); - return null; - } finally { - if (queueScanner != null) { - queueScanner.close(); - } - } - } - - protected List getLogsInQueue(String serverName, String queueId) { - String rowKey = queueId; - if (!queueId.contains(ROW_KEY_DELIMITER)) { - rowKey = buildQueueRowKey(serverName, queueId); - } - return getLogsInQueue(Bytes.toBytes(rowKey)); - } - - protected List getLogsInQueue(byte[] rowKey) { - String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey); - try (Table replicationTable = getOrBlockOnReplicationTable()) { - Get getQueue = new Get(rowKey); - Result queue = replicationTable.get(getQueue); - if (queue == null || queue.isEmpty()) { - abortable.abort(errMsg, new ReplicationException(errMsg)); - return null; - } - return readWALsFromResult(queue); - } catch (IOException e) { - abortable.abort(errMsg, e); - return null; - } - } - - /** - * Read all of the WAL's from a queue into a list - * - * @param queue HBase query result containing the queue - * @return a list of all the WAL filenames - */ - protected List readWALsFromResult(Result queue) { - List wals = new ArrayList<>(); - Map familyMap = queue.getFamilyMap(CF_QUEUE); - for (byte[] cQualifier : familyMap.keySet()) { - // Ignore the meta data fields of the queue - if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, - COL_QUEUE_OWNER_HISTORY)) { - continue; - } - wals.add(Bytes.toString(cQualifier)); - } - return wals; - } - - /** - * Get the queue id's and meta data (Owner and History) for the queues belonging to the named - * server - * - * @param server name of the server - * @return a ResultScanner over the QueueIds belonging to the server - * @throws IOException - */ - protected ResultScanner getQueuesBelongingToServer(String server) throws IOException { - Scan scan = new Scan(); - SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, - CompareOperator.EQUAL, Bytes.toBytes(server)); - scan.setFilter(filterMyQueues); - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY); - try (Table replicationTable = getOrBlockOnReplicationTable()) { - ResultScanner results = replicationTable.getScanner(scan); - return results; - } - } - - /** - * Attempts to acquire the Replication Table. This operation will block until it is assigned by - * the CreateReplicationWorker thread. It is up to the caller of this method to close the - * returned Table - * @return the Replication Table when it is created - * @throws IOException - */ - protected Table getOrBlockOnReplicationTable() throws IOException { - // Sleep until the Replication Table becomes available - try { - replicationTableInitialized.await(); - } catch (InterruptedException e) { - String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " + - e.getMessage(); - throw new InterruptedIOException(errMsg); - } - return getAndSetUpReplicationTable(); - } - - /** - * Creates a new copy of the Replication Table and sets up the proper Table time outs for it - * - * @return the Replication Table - * @throws IOException - */ - private Table getAndSetUpReplicationTable() throws IOException { - Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME); - setReplicationTableTimeOuts(replicationTable); - return replicationTable; - } - - /** - * Builds the Replication Table in a background thread. Any method accessing the Replication Table - * should do so through getOrBlockOnReplicationTable() - * - * @return the Replication Table - * @throws IOException if the Replication Table takes too long to build - */ - private void createReplicationTableInBackground() throws IOException { - executor.execute(new CreateReplicationTableWorker()); - } - - /** - * Attempts to build the Replication Table. Will continue blocking until we have a valid - * Table for the Replication Table. - */ - private class CreateReplicationTableWorker implements Runnable { - - private Admin admin; - - @Override - public void run() { - try { - admin = connection.getAdmin(); - if (!replicationTableExists()) { - createReplicationTable(); - } - int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number", - CLIENT_RETRIES); - RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT); - RetryCounter retryCounter = counterFactory.create(); - while (!replicationTableExists()) { - retryCounter.sleepUntilNextRetry(); - if (!retryCounter.shouldRetry()) { - throw new IOException("Unable to acquire the Replication Table"); - } - } - replicationTableInitialized.countDown(); - } catch (IOException | InterruptedException e) { - abortable.abort("Failed building Replication Table", e); - } - } - - /** - * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR - * in TableBasedReplicationQueuesImpl - * - * @throws IOException - */ - private void createReplicationTable() throws IOException { - HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); - replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); - try { - admin.createTable(replicationTableDescriptor); - } catch (TableExistsException e) { - // In this case we can just continue as normal - } - } - - /** - * Checks whether the Replication Table exists yet - * - * @return whether the Replication Table exists - * @throws IOException - */ - private boolean replicationTableExists() { - try { - return admin.tableExists(REPLICATION_TABLE_NAME); - } catch (IOException e) { - return false; - } - } - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java index 2c522f6..5659e4b 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -54,6 +53,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements super(zookeeper, conf, abortable); this.stopper = stopper; this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); + // watch the changes + refreshOtherRegionServersList(true); } @Override @@ -71,7 +72,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements */ @Override public List getListOfRegionServers() { - refreshOtherRegionServersList(); + refreshOtherRegionServersList(false); List list = null; synchronized (otherRegionServers) { @@ -137,7 +138,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements if (!path.startsWith(this.watcher.znodePaths.rsZNode)) { return false; } - return refreshOtherRegionServersList(); + return refreshOtherRegionServersList(true); } } @@ -157,8 +158,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements * @return true if the local list of the other region servers was updated with the ZK data (even * if it was empty), false if the data was missing in ZK */ - private boolean refreshOtherRegionServersList() { - List newRsList = getRegisteredRegionServers(); + private boolean refreshOtherRegionServersList(boolean watch) { + List newRsList = getRegisteredRegionServers(watch); if (newRsList == null) { return false; } else { @@ -174,10 +175,14 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements * Get a list of all the other region servers in this cluster and set a watch * @return a list of server nanes */ - private List getRegisteredRegionServers() { + private List getRegisteredRegionServers(boolean watch) { List result = null; try { - result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode); + if (watch) { + result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode); + } else { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.zookeeper.znodePaths.rsZNode); + } } catch (KeeperException e) { this.abortable.abort("Get list of registered region servers", e); } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java deleted file mode 100644 index b6c849c..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java +++ /dev/null @@ -1,448 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.replication; - -import org.apache.commons.lang3.NotImplementedException; -import org.apache.hadoop.conf.Configuration; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; - -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; - -/** - * This class provides an implementation of the ReplicationQueues interface using an HBase table - * "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table. - */ -@InterfaceAudience.Private -public class TableBasedReplicationQueuesImpl extends ReplicationTableBase - implements ReplicationQueues { - - private static final Logger LOG = LoggerFactory.getLogger(TableBasedReplicationQueuesImpl.class); - - // Common byte values used in replication offset tracking - private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L); - private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes(""); - - private String serverName = null; - private byte[] serverNameBytes = null; - - // TODO: Only use this variable temporarily. Eventually we want to use HBase to store all - // TODO: replication information - private ReplicationStateZKBase replicationState; - - public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException { - this(args.getConf(), args.getAbortable(), args.getZk()); - } - - public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZKWatcher zkw) - throws IOException { - super(conf, abort); - replicationState = new ReplicationStateZKBase(zkw, conf, abort) {}; - } - - @Override - public void init(String serverName) throws ReplicationException { - this.serverName = serverName; - this.serverNameBytes = Bytes.toBytes(serverName); - } - - @Override - public List getListOfReplicators() { - return super.getListOfReplicators(); - } - - @Override - public void removeQueue(String queueId) { - try { - byte[] rowKey = queueIdToRowKey(queueId); - if (checkQueueExists(queueId)) { - Delete deleteQueue = new Delete(rowKey); - safeQueueUpdate(deleteQueue); - } else { - LOG.info("No logs were registered for queue id=" + queueId + " so no rows were removed " + - "from the replication table while removing the queue"); - } - } catch (IOException | ReplicationException e) { - String errMsg = "Failed removing queue queueId=" + queueId; - abortable.abort(errMsg, e); - } - } - - @Override - public void addLog(String queueId, String filename) throws ReplicationException { - try (Table replicationTable = getOrBlockOnReplicationTable()) { - if (!checkQueueExists(queueId)) { - // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values - Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); - putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - replicationTable.put(putNewQueue); - } else { - // Otherwise simply add the new log and offset as a new column - Put putNewLog = new Put(queueIdToRowKey(queueId)); - putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - safeQueueUpdate(putNewLog); - } - } catch (IOException | ReplicationException e) { - String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); - } - } - - @Override - public void removeLog(String queueId, String filename) { - try { - byte[] rowKey = queueIdToRowKey(queueId); - Delete delete = new Delete(rowKey); - delete.addColumns(CF_QUEUE, Bytes.toBytes(filename)); - safeQueueUpdate(delete); - } catch (IOException | ReplicationException e) { - String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); - } - } - - @Override - public void setLogPosition(String queueId, String filename, long position) { - try (Table replicationTable = getOrBlockOnReplicationTable()) { - byte[] rowKey = queueIdToRowKey(queueId); - // Check that the log exists. addLog() must have been called before setLogPosition(). - Get checkLogExists = new Get(rowKey); - checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename)); - if (!replicationTable.exists(checkLogExists)) { - String errMsg = "Could not set position of non-existent log from queueId=" + queueId + - ", filename=" + filename; - abortable.abort(errMsg, new ReplicationException(errMsg)); - return; - } - // Update the log offset if it exists - Put walAndOffset = new Put(rowKey); - walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position)); - safeQueueUpdate(walAndOffset); - } catch (IOException | ReplicationException e) { - String errMsg = "Failed writing log position queueId=" + queueId + "filename=" + - filename + " position=" + position; - abortable.abort(errMsg, e); - } - } - - @Override - public long getLogPosition(String queueId, String filename) throws ReplicationException { - try { - byte[] rowKey = queueIdToRowKey(queueId); - Get getOffset = new Get(rowKey); - getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename)); - Result result = getResultIfOwner(getOffset); - if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) { - throw new ReplicationException("Could not read empty result while getting log position " + - "queueId=" + queueId + ", filename=" + filename); - } - return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename))); - } catch (IOException e) { - throw new ReplicationException("Could not get position in log for queueId=" + queueId + - ", filename=" + filename); - } - } - - @Override - public void removeAllQueues() { - List myQueueIds = getAllQueues(); - for (String queueId : myQueueIds) { - removeQueue(queueId); - } - } - - @Override - public List getLogsInQueue(String queueId) { - String errMsg = "Failed getting logs in queue queueId=" + queueId; - byte[] rowKey = queueIdToRowKey(queueId); - List logs = new ArrayList<>(); - try { - Get getQueue = new Get(rowKey); - Result queue = getResultIfOwner(getQueue); - if (queue == null || queue.isEmpty()) { - String errMsgLostOwnership = "Failed getting logs for queue queueId=" + - Bytes.toString(rowKey) + " because the queue was missing or we lost ownership"; - abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership)); - return null; - } - Map familyMap = queue.getFamilyMap(CF_QUEUE); - for(byte[] cQualifier : familyMap.keySet()) { - if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, - COL_QUEUE_OWNER_HISTORY)) { - continue; - } - logs.add(Bytes.toString(cQualifier)); - } - } catch (IOException e) { - abortable.abort(errMsg, e); - return null; - } - return logs; - } - - @Override - public List getAllQueues() { - return getAllQueues(serverName); - } - - @Override public List getUnClaimedQueueIds(String regionserver) { - if (isThisOurRegionServer(regionserver)) { - return null; - } - try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) { - List res = new ArrayList<>(); - for (Result queue : queuesToClaim) { - String rowKey = Bytes.toString(queue.getRow()); - res.add(rowKey); - } - return res.isEmpty() ? null : res; - } catch (IOException e) { - String errMsg = "Failed getUnClaimedQueueIds"; - abortable.abort(errMsg, e); - } - return null; - } - - @Override public void removeReplicatorIfQueueIsEmpty(String regionserver) { - // Do nothing here - } - - @Override - public Pair> claimQueue(String regionserver, String queueId) { - if (isThisOurRegionServer(regionserver)) { - return null; - } - - try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){ - for (Result queue : queuesToClaim) { - String rowKey = Bytes.toString(queue.getRow()); - if (!rowKey.equals(queueId)){ - continue; - } - if (attemptToClaimQueue(queue, regionserver)) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey); - if (replicationState.peerExists(replicationQueueInfo.getPeerId())) { - SortedSet sortedLogs = new TreeSet<>(); - List logs = getLogsInQueue(queue.getRow()); - for (String log : logs) { - sortedLogs.add(log); - } - LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver); - return new Pair<>(rowKey, sortedLogs); - } else { - // Delete orphaned queues - removeQueue(Bytes.toString(queue.getRow())); - LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " + - regionserver); - } - } - } - } catch (IOException | KeeperException e) { - String errMsg = "Failed claiming queues for regionserver=" + regionserver; - abortable.abort(errMsg, e); - } - return null; - } - - @Override - public boolean isThisOurRegionServer(String regionserver) { - return this.serverName.equals(regionserver); - } - - @Override - public void addPeerToHFileRefs(String peerId) throws ReplicationException { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - @Override - public void removePeerFromHFileRefs(String peerId) { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - @Override - public void addHFileRefs(String peerId, List> pairs) - throws ReplicationException { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - @Override - public void removeHFileRefs(String peerId, List files) { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - private String buildQueueRowKey(String queueId) { - return buildQueueRowKey(serverName, queueId); - } - - /** - * Convenience method that gets the row key of the queue specified by queueId - * @param queueId queueId of a queue in this server - * @return the row key of the queue in the Replication Table - */ - private byte[] queueIdToRowKey(String queueId) { - return queueIdToRowKey(serverName, queueId); - } - - /** - * See safeQueueUpdate(RowMutations mutate) - * - * @param put Row mutation to perform on the queue - */ - private void safeQueueUpdate(Put put) throws ReplicationException, IOException { - RowMutations mutations = new RowMutations(put.getRow()); - mutations.add(put); - safeQueueUpdate(mutations); - } - - /** - * See safeQueueUpdate(RowMutations mutate) - * - * @param delete Row mutation to perform on the queue - */ - private void safeQueueUpdate(Delete delete) throws ReplicationException, - IOException{ - RowMutations mutations = new RowMutations(delete.getRow()); - mutations.add(delete); - safeQueueUpdate(mutations); - } - - /** - * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column - * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost - * ownership of the column or an IO Exception has occurred during the transaction. - * - * @param mutate Mutation to perform on a given queue - */ - private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{ - try (Table replicationTable = getOrBlockOnReplicationTable()) { - boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF_QUEUE) - .qualifier(COL_QUEUE_OWNER).ifEquals(serverNameBytes).thenMutate(mutate); - if (!updateSuccess) { - throw new ReplicationException("Failed to update Replication Table because we lost queue " + - " ownership"); - } - } - } - - /** - * Check if the queue specified by queueId is stored in HBase - * - * @param queueId Either raw or reclaimed format of the queueId - * @return Whether the queue is stored in HBase - * @throws IOException - */ - private boolean checkQueueExists(String queueId) throws IOException { - try (Table replicationTable = getOrBlockOnReplicationTable()) { - byte[] rowKey = queueIdToRowKey(queueId); - return replicationTable.exists(new Get(rowKey)); - } - } - - /** - * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the - * recently killed server is still the OWNER before we claim it. - * - * @param queue The queue that we are trying to claim - * @param originalServer The server that originally owned the queue - * @return Whether we successfully claimed the queue - * @throws IOException - */ - private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{ - Put putQueueNameAndHistory = new Put(queue.getRow()); - putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName)); - String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE, - COL_QUEUE_OWNER_HISTORY)), originalServer); - putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, - Bytes.toBytes(newOwnerHistory)); - RowMutations claimAndRenameQueue = new RowMutations(queue.getRow()); - claimAndRenameQueue.add(putQueueNameAndHistory); - // Attempt to claim ownership for this queue by checking if the current OWNER is the original - // server. If it is not then another RS has already claimed it. If it is we set ourselves as the - // new owner and update the queue's history - try (Table replicationTable = getOrBlockOnReplicationTable()) { - boolean success = replicationTable.checkAndMutate(queue.getRow(), CF_QUEUE) - .qualifier(COL_QUEUE_OWNER).ifEquals(Bytes.toBytes(originalServer)) - .thenMutate(claimAndRenameQueue); - return success; - } - } - - /** - * Attempts to run a Get on some queue. Will only return a non-null result if we currently own - * the queue. - * - * @param get The Get that we want to query - * @return The result of the Get if this server is the owner of the queue. Else it returns null. - * @throws IOException - */ - private Result getResultIfOwner(Get get) throws IOException { - Scan scan = new Scan(get); - // Check if the Get currently contains all columns or only specific columns - if (scan.getFamilyMap().size() > 0) { - // Add the OWNER column if the scan is already only over specific columns - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); - } - scan.setMaxResultSize(1); - SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, - CompareOperator.EQUAL, serverNameBytes); - scan.setFilter(checkOwner); - ResultScanner scanner = null; - try (Table replicationTable = getOrBlockOnReplicationTable()) { - scanner = replicationTable.getScanner(scan); - Result result = scanner.next(); - return (result == null || result.isEmpty()) ? null : result; - } finally { - if (scanner != null) { - scanner.close(); - } - } - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 0275d52..41f50d8 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -54,6 +54,28 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe /** * ZK based replication queue storage. + *

+ * The base znode for each regionserver is the regionserver name. For example: + * + *

+ * /hbase/replication/rs/hostname.example.org,6020,1234
+ * 
+ * + * Within this znode, the region server maintains a set of WAL replication queues. These queues are + * represented by child znodes named using there give queue id. For example: + * + *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1
+ * /hbase/replication/rs/hostname.example.org,6020,1234/2
+ * 
+ * + * Each queue has one child znode for every WAL that still needs to be replicated. The value of + * these WAL child znodes is the latest position that has been replicated. This position is updated + * every time a WAL entry is replicated. For example: + * + *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
+ * 
*/ @InterfaceAudience.Private class ZKReplicationQueueStorage extends ZKReplicationStorageBase diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 6fe869c..6bcbb5f 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -42,9 +42,8 @@ import org.slf4j.LoggerFactory; */ public abstract class TestReplicationStateBasic { - protected ReplicationQueues rq1; - protected ReplicationQueues rq2; - protected ReplicationQueues rq3; + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); + protected ReplicationQueueStorage rqs; protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); @@ -63,8 +62,6 @@ public abstract class TestReplicationStateBasic { protected static final int ZK_MAX_COUNT = 300; protected static final int ZK_SLEEP_INTERVAL = 100; // millis - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); - @Test public void testReplicationQueueStorage() throws ReplicationException { // Test methods with empty state @@ -76,15 +73,13 @@ public abstract class TestReplicationStateBasic { * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- * server2: zero queues */ - rq1.init(server1.getServerName()); - rq2.init(server2.getServerName()); - rq1.addLog("qId1", "trash"); - rq1.removeLog("qId1", "trash"); - rq1.addLog("qId2", "filename1"); - rq1.addLog("qId3", "filename2"); - rq1.addLog("qId3", "filename3"); - rq2.addLog("trash", "trash"); - rq2.removeQueue("trash"); + rqs.addWAL(server1, "qId1", "trash"); + rqs.removeWAL(server1, "qId1", "trash"); + rqs.addWAL(server1,"qId2", "filename1"); + rqs.addWAL(server1,"qId3", "filename2"); + rqs.addWAL(server1,"qId3", "filename3"); + rqs.addWAL(server2,"trash", "trash"); + rqs.removeQueue(server2,"trash"); List reps = rqs.getListOfReplicators(); assertEquals(2, reps.size()); @@ -105,62 +100,55 @@ public abstract class TestReplicationStateBasic { assertTrue(list.contains("qId3")); } + private void removeAllQueues(ServerName serverName) throws ReplicationException { + for (String queue: rqs.getAllQueues(serverName)) { + rqs.removeQueue(serverName, queue); + } + } @Test public void testReplicationQueues() throws ReplicationException { - rq1.init(server1.getServerName()); - rq2.init(server2.getServerName()); - rq3.init(server3.getServerName()); // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) rp.init(); - // 3 replicators should exist - assertEquals(3, rq1.getListOfReplicators().size()); - rq1.removeQueue("bogus"); - rq1.removeLog("bogus", "bogus"); - rq1.removeAllQueues(); - assertEquals(0, rq1.getAllQueues().size()); - assertEquals(0, rq1.getLogPosition("bogus", "bogus")); - assertNull(rq1.getLogsInQueue("bogus")); - assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString())); - - rq1.setLogPosition("bogus", "bogus", 5L); + rqs.removeQueue(server1, "bogus"); + rqs.removeWAL(server1, "bogus", "bogus"); + removeAllQueues(server1); + assertEquals(0, rqs.getAllQueues(server1).size()); + assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus")); + assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); + assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty()); populateQueues(); - assertEquals(3, rq1.getListOfReplicators().size()); - assertEquals(0, rq2.getLogsInQueue("qId1").size()); - assertEquals(5, rq3.getLogsInQueue("qId5").size()); - assertEquals(0, rq3.getLogPosition("qId1", "filename0")); - rq3.setLogPosition("qId5", "filename4", 354L); - assertEquals(354L, rq3.getLogPosition("qId5", "filename4")); + assertEquals(3, rqs.getListOfReplicators().size()); + assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); + assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); + assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); + rqs.setWALPosition(server3, "qId5", "filename4", 354L); + assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); - assertEquals(5, rq3.getLogsInQueue("qId5").size()); - assertEquals(0, rq2.getLogsInQueue("qId1").size()); - assertEquals(0, rq1.getAllQueues().size()); - assertEquals(1, rq2.getAllQueues().size()); - assertEquals(5, rq3.getAllQueues().size()); + assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); + assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); + assertEquals(0, rqs.getAllQueues(server1).size()); + assertEquals(1, rqs.getAllQueues(server2).size()); + assertEquals(5, rqs.getAllQueues(server3).size()); - assertEquals(0, rq3.getUnClaimedQueueIds(server1.getServerName()).size()); - rq3.removeReplicatorIfQueueIsEmpty(server1.getServerName()); - assertEquals(2, rq3.getListOfReplicators().size()); + assertEquals(0, rqs.getAllQueues(server1).size()); + rqs.removeReplicatorIfQueueIsEmpty(server1); + assertEquals(2, rqs.getListOfReplicators().size()); - List queues = rq2.getUnClaimedQueueIds(server3.getServerName()); + List queues = rqs.getAllQueues(server3); assertEquals(5, queues.size()); for (String queue : queues) { - rq2.claimQueue(server3.getServerName(), queue); + rqs.claimQueue(server3, queue, server2); } - rq2.removeReplicatorIfQueueIsEmpty(server3.getServerName()); - assertEquals(1, rq2.getListOfReplicators().size()); - - // Try to claim our own queues - assertNull(rq2.getUnClaimedQueueIds(server2.getServerName())); - rq2.removeReplicatorIfQueueIsEmpty(server2.getServerName()); - - assertEquals(6, rq2.getAllQueues().size()); + rqs.removeReplicatorIfQueueIsEmpty(server3); + assertEquals(1, rqs.getListOfReplicators().size()); - rq2.removeAllQueues(); - - assertEquals(0, rq2.getListOfReplicators().size()); + assertEquals(6, rqs.getAllQueues(server2).size()); + removeAllQueues(server2); + rqs.removeReplicatorIfQueueIsEmpty(server2); + assertEquals(0, rqs.getListOfReplicators().size()); } @Test @@ -197,7 +185,6 @@ public abstract class TestReplicationStateBasic { @Test public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { rp.init(); - rq1.init(server1.getServerName()); List> files1 = new ArrayList<>(3); files1.add(new Pair<>(null, new Path("file_1"))); @@ -206,8 +193,8 @@ public abstract class TestReplicationStateBasic { assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - rq1.addPeerToHFileRefs(ID_ONE); - rq1.addHFileRefs(ID_ONE, files1); + rqs.addPeerToHFileRefs(ID_ONE); + rqs.addHFileRefs(ID_ONE, files1); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); List hfiles2 = new ArrayList<>(files1.size()); @@ -215,43 +202,41 @@ public abstract class TestReplicationStateBasic { hfiles2.add(p.getSecond().getName()); } String removedString = hfiles2.remove(0); - rq1.removeHFileRefs(ID_ONE, hfiles2); + rqs.removeHFileRefs(ID_ONE, hfiles2); assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); hfiles2 = new ArrayList<>(1); hfiles2.add(removedString); - rq1.removeHFileRefs(ID_ONE, hfiles2); + rqs.removeHFileRefs(ID_ONE, hfiles2); assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); rp.unregisterPeer(ID_ONE); } @Test public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { - rq1.init(server1.getServerName()); - rp.init(); rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - rq1.addPeerToHFileRefs(ID_ONE); + rqs.addPeerToHFileRefs(ID_ONE); rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); - rq1.addPeerToHFileRefs(ID_TWO); + rqs.addPeerToHFileRefs(ID_TWO); List> files1 = new ArrayList<>(3); files1.add(new Pair<>(null, new Path("file_1"))); files1.add(new Pair<>(null, new Path("file_2"))); files1.add(new Pair<>(null, new Path("file_3"))); - rq1.addHFileRefs(ID_ONE, files1); - rq1.addHFileRefs(ID_TWO, files1); + rqs.addHFileRefs(ID_ONE, files1); + rqs.addHFileRefs(ID_TWO, files1); assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); rp.unregisterPeer(ID_ONE); - rq1.removePeerFromHFileRefs(ID_ONE); + rqs.removePeerFromHFileRefs(ID_ONE); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); rp.unregisterPeer(ID_TWO); - rq1.removePeerFromHFileRefs(ID_TWO); + rqs.removePeerFromHFileRefs(ID_TWO); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); } @@ -360,15 +345,15 @@ public abstract class TestReplicationStateBasic { * 3, 4, 5 log files respectively */ protected void populateQueues() throws ReplicationException { - rq1.addLog("trash", "trash"); - rq1.removeQueue("trash"); + rqs.addWAL(server1, "trash", "trash"); + rqs.removeQueue(server1, "trash"); - rq2.addLog("qId1", "trash"); - rq2.removeLog("qId1", "trash"); + rqs.addWAL(server2, "qId1", "trash"); + rqs.removeWAL(server2, "qId1", "trash"); for (int i = 1; i < 6; i++) { for (int j = 0; j < i; j++) { - rq3.addLog("qId" + i, "filename" + j); + rqs.addWAL(server3, "qId" + i, "filename" + j); } // Add peers for the corresponding queues so they are not orphans rp.registerPeer("qId" + i, diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 5fe7c55..ac869d9 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.hbase.replication; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.IOException; import org.apache.hadoop.conf.Configuration; @@ -40,7 +36,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +49,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { private static HBaseZKTestingUtility utility; private static ZKWatcher zkw; private static String replicationZNode; - private ReplicationQueuesZKImpl rqZK; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -84,23 +78,9 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { @Before public void setUp() { zkTimeoutCount = 0; - WarnOnlyAbortable abortable = new WarnOnlyAbortable(); - try { - rq1 = ReplicationFactory - .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); - rq2 = ReplicationFactory - .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); - rq3 = ReplicationFactory - .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); - rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - } catch (Exception e) { - // This should not occur, because getReplicationQueues() only throws for - // TableBasedReplicationQueuesImpl - fail("ReplicationFactory.getReplicationQueues() threw an IO Exception"); - } - rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); + rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable()); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); - rqZK = new ReplicationQueuesZKImpl(zkw, conf, abortable); } @After @@ -113,23 +93,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { utility.shutdownMiniZKCluster(); } - @Test - public void testIsPeerPath_PathToParentOfPeerNode() { - assertFalse(rqZK.isPeerPath(rqZK.peersZNode)); - } - - @Test - public void testIsPeerPath_PathToChildOfPeerNode() { - String peerChild = ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child"); - assertFalse(rqZK.isPeerPath(peerChild)); - } - - @Test - public void testIsPeerPath_ActualPeerPath() { - String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1"); - assertTrue(rqZK.isPeerPath(peerPath)); - } - private static class WarnOnlyAbortable implements Abortable { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b72255a..468ff89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -168,10 +168,8 @@ import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.AccessDeniedException; @@ -1156,15 +1154,12 @@ public class HMaster extends HRegionServer implements MasterServices { } // Start replication zk node cleaner - if (conf.getClass("hbase.region.replica.replication.replicationQueues.class", - ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class) { - try { - replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval, - new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this)); - getChoreService().scheduleChore(replicationZKNodeCleanerChore); - } catch (Exception e) { - LOG.error("start replicationZKNodeCleanerChore failed", e); - } + try { + replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval, + new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this)); + getChoreService().scheduleChore(replicationZKNodeCleanerChore); + } catch (Exception e) { + LOG.error("start replicationZKNodeCleanerChore failed", e); } replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval); getChoreService().scheduleChore(replicationMetaCleaner); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 1faaae3..ce03f1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -50,8 +50,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -308,14 +306,10 @@ public class DumpReplicationQueues extends Configured implements Tool { boolean hdfs) throws Exception { ReplicationQueueStorage queueStorage; ReplicationPeers replicationPeers; - ReplicationQueues replicationQueues; ReplicationTracker replicationTracker; - ReplicationQueuesArguments replicationArgs = - new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), zkw); StringBuilder sb = new StringBuilder(); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs); replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection); replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), @@ -329,7 +323,6 @@ public class DumpReplicationQueues extends Configured implements Tool { } for (ServerName regionserver : regionservers) { List queueIds = queueStorage.getAllQueues(regionserver); - replicationQueues.init(regionserver.getServerName()); if (!liveRegionServers.contains(regionserver.getServerName())) { deadRegionServers.add(regionserver.getServerName()); } @@ -339,17 +332,17 @@ public class DumpReplicationQueues extends Configured implements Tool { if (!peerIds.contains(queueInfo.getPeerId())) { deletedQueues.add(regionserver + "/" + queueId); sb.append( - formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs)); + formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); } else { sb.append( - formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs)); + formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); } } } return sb.toString(); } - private String formatQueue(ServerName regionserver, ReplicationQueues replicationQueues, + private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage, ReplicationQueueInfo queueInfo, String queueId, List wals, boolean isDeleted, boolean hdfs) throws Exception { StringBuilder sb = new StringBuilder(); @@ -371,7 +364,7 @@ public class DumpReplicationQueues extends Configured implements Tool { peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); for (String wal : wals) { - long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal); + long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal); sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index bd191e3..e0c45d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,15 +28,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class that handles the recovered source of a replication stream, which is transfered from @@ -52,10 +51,10 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, + ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { - super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode, + super.init(conf, fs, manager, queueStorage, replicationPeers, server, peerClusterZnode, clusterId, replicationEndpoint, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @@ -64,7 +63,7 @@ public class RecoveredReplicationSource extends ReplicationSource { protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { final RecoveredReplicationSourceShipper worker = new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, - this.replicationQueues); + this.queueStorage); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 630b90b..fb365bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -23,13 +23,13 @@ import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Used by a {@link RecoveredReplicationSource}. @@ -40,14 +40,14 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class); protected final RecoveredReplicationSource source; - private final ReplicationQueues replicationQueues; + private final ReplicationQueueStorage replicationQueues; public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue queue, RecoveredReplicationSource source, - ReplicationQueues replicationQueues) { + ReplicationQueueStorage queueStorage) { super(conf, walGroupId, queue, source); this.source = source; - this.replicationQueues = replicationQueues; + this.replicationQueues = queueStorage; } @Override @@ -116,11 +116,11 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper long startPosition = 0; String peerClusterZnode = source.getPeerClusterZnode(); try { - startPosition = this.replicationQueues.getLogPosition(peerClusterZnode, - this.queue.peek().getName()); + startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(), + peerClusterZnode, this.queue.peek().getName()); if (LOG.isTraceEnabled()) { - LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " - + startPosition); + LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + + startPosition); } } catch (ReplicationException e) { terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 571ee75..6d871cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,11 +28,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,26 +37,32 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; /** * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. @@ -74,7 +74,7 @@ public class Replication implements LoggerFactory.getLogger(Replication.class); private boolean replicationForBulkLoadData; private ReplicationSourceManager replicationManager; - private ReplicationQueues replicationQueues; + private ReplicationQueueStorage queueStorage; private ReplicationPeers replicationPeers; private ReplicationTracker replicationTracker; private Configuration conf; @@ -127,10 +127,8 @@ public class Replication implements } try { - this.replicationQueues = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server, - server.getZooKeeper())); - this.replicationQueues.init(this.server.getServerName().toString()); + this.queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); this.replicationPeers.init(); @@ -147,7 +145,7 @@ public class Replication implements throw new IOException("Could not read cluster id", ke); } this.replicationManager = - new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf, + new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 19ea240..fef9054 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -41,9 +40,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; @@ -52,7 +48,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; @@ -60,6 +56,10 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; @@ -83,7 +83,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private Map> queues = new HashMap<>(); // per group queue size, keep no more than this number of logs in each wal group protected int queueSizePerGroup; - protected ReplicationQueues replicationQueues; + protected ReplicationQueueStorage queueStorage; private ReplicationPeers replicationPeers; protected Configuration conf; @@ -148,7 +148,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf */ @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, + ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; @@ -161,7 +161,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); - this.replicationQueues = replicationQueues; + this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; this.manager = manager; this.fs = fs; @@ -229,7 +229,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf List tableCfs = tableCFMap.get(tableName); if (tableCFMap.containsKey(tableName) && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { - this.replicationQueues.addHFileRefs(peerId, pairs); + this.queueStorage.addHFileRefs(peerId, pairs); metrics.incrSizeOfHFileRefsQueue(pairs.size()); } else { LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " @@ -238,7 +238,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } else { // user has explicitly not defined any table cfs for replication, means replicate all the // data - this.replicationQueues.addHFileRefs(peerId, pairs); + this.queueStorage.addHFileRefs(peerId, pairs); metrics.incrSizeOfHFileRefsQueue(pairs.size()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index b6cf54d..4b9ed74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; import java.util.UUID; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,9 +31,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; /** * Interface that defines a replication source @@ -47,15 +47,10 @@ public interface ReplicationSourceInterface { * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use - * @param replicationQueues - * @param replicationPeers * @param server the server for this region server - * @param peerClusterZnode - * @param clusterId - * @throws IOException */ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, + ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a263fc3..c4cff89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -40,6 +40,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -47,6 +48,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; @@ -60,7 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -68,6 +70,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -95,7 +98,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final List sources; // List of all the sources we got from died RSs private final List oldsources; - private final ReplicationQueues replicationQueues; + private final ReplicationQueueStorage queueStorage; private final ReplicationTracker replicationTracker; private final ReplicationPeers replicationPeers; // UUID for this cluster @@ -130,7 +133,7 @@ public class ReplicationSourceManager implements ReplicationListener { /** * Creates a replication manager and sets the watch on all the other registered region servers - * @param replicationQueues the interface for manipulating replication queues + * @param queueStorage the interface for manipulating replication queues * @param replicationPeers * @param replicationTracker * @param conf the configuration to use @@ -140,14 +143,14 @@ public class ReplicationSourceManager implements ReplicationListener { * @param oldLogDir the directory where old logs are archived * @param clusterId */ - public ReplicationSourceManager(ReplicationQueues replicationQueues, + public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider) throws IOException { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList<>(); - this.replicationQueues = replicationQueues; + this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; @@ -184,6 +187,19 @@ public class ReplicationSourceManager implements ReplicationListener { connection = ConnectionFactory.createConnection(conf); } + @FunctionalInterface + private interface ReplicationQueueOperation { + void exec() throws ReplicationException; + } + + private void abortWhenFail(ReplicationQueueOperation op) { + try { + op.exec(); + } catch (ReplicationException e) { + LOG.warn("Operation replication queue failed", e); + } + } + /** * Provide the id of the peer and a log key and this method will figure which * wal it belongs to and will log, for this region server, the current @@ -195,12 +211,13 @@ public class ReplicationSourceManager implements ReplicationListener { * @param queueRecovered indicates if this queue comes from another region server * @param holdLogInZK if true then the log is retained in ZK */ - public void logPositionAndCleanOldLogs(Path log, String id, long position, - boolean queueRecovered, boolean holdLogInZK) { + public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, + boolean holdLogInZK) { String fileName = log.getName(); - this.replicationQueues.setLogPosition(id, fileName, position); + abortWhenFail( + () -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position)); if (holdLogInZK) { - return; + return; } cleanOldLogs(fileName, id, queueRecovered); } @@ -227,13 +244,15 @@ public class ReplicationSourceManager implements ReplicationListener { } } } - } + } private void cleanOldLogs(SortedSet wals, String key, String id) { SortedSet walSet = wals.headSet(key); - LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); + } for (String wal : walSet) { - this.replicationQueues.removeLog(id, wal); + abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); } walSet.clear(); } @@ -248,7 +267,7 @@ public class ReplicationSourceManager implements ReplicationListener { if (replicationForBulkLoadDataEnabled) { // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case // when a peer was added before replication for bulk loaded data was enabled. - this.replicationQueues.addPeerToHFileRefs(id); + this.queueStorage.addPeerToHFileRefs(id); } } AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker(); @@ -264,15 +283,12 @@ public class ReplicationSourceManager implements ReplicationListener { * need to enqueue the latest log of each wal group and do replication * @param id the id of the peer cluster * @return the source that was created - * @throws IOException */ @VisibleForTesting ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getConnectedPeer(id); - ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, - this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer, - walFileLengthProvider); + ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); Map> walsByGroup = new HashMap<>(); @@ -287,11 +303,10 @@ public class ReplicationSourceManager implements ReplicationListener { logs.add(name); walsByGroup.put(walPrefix, logs); try { - this.replicationQueues.addLog(id, name); + this.queueStorage.addWAL(server.getServerName(), id, name); } catch (ReplicationException e) { - String message = - "Cannot add log to queue when creating a new source, queueId=" + id - + ", filename=" + name; + String message = "Cannot add log to queue when creating a new source, queueId=" + id + + ", filename=" + name; server.stop(message); throw e; } @@ -316,7 +331,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param peerId Id of the peer cluster queue of wals to delete */ public void deleteSource(String peerId, boolean closeConnection) { - this.replicationQueues.removeQueue(peerId); + abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId)); if (closeConnection) { this.replicationPeers.peerDisconnected(peerId); } @@ -376,8 +391,8 @@ public class ReplicationSourceManager implements ReplicationListener { } @VisibleForTesting - List getAllQueues() { - return replicationQueues.getAllQueues(); + List getAllQueues() throws ReplicationException { + return queueStorage.getAllQueues(server.getServerName()); } void preLogRoll(Path newLog) throws IOException { @@ -411,10 +426,10 @@ public class ReplicationSourceManager implements ReplicationListener { synchronized (replicationPeers) { for (String id : replicationPeers.getConnectedPeerIds()) { try { - this.replicationQueues.addLog(id, logName); + this.queueStorage.addWAL(server.getServerName(), id, logName); } catch (ReplicationException e) { - throw new IOException("Cannot add log to replication queue" - + " when creating a new source, queueId=" + id + ", filename=" + logName, e); + throw new IOException("Cannot add log to replication queue" + + " when creating a new source, queueId=" + id + ", filename=" + logName, e); } } } @@ -461,19 +476,11 @@ public class ReplicationSourceManager implements ReplicationListener { /** * Factory method to create a replication source - * @param conf the configuration to use - * @param fs the file system to use - * @param manager the manager to use - * @param server the server object for this region server * @param peerId the id of the peer cluster * @return the created source - * @throws IOException */ - private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs, - ReplicationSourceManager manager, ReplicationQueues replicationQueues, - ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId, - ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer, - WALFileLengthProvider walFileLengthProvider) throws IOException { + private ReplicationSourceInterface getReplicationSource(String peerId, + ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException { RegionServerCoprocessorHost rsServerHost = null; TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { @@ -490,9 +497,8 @@ public class ReplicationSourceManager implements ReplicationListener { // Default to HBase inter-cluster replication endpoint replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); } - @SuppressWarnings("rawtypes") - Class c = Class.forName(replicationEndpointImpl); - replicationEndpoint = (ReplicationEndpoint) c.newInstance(); + replicationEndpoint = Class.forName(replicationEndpointImpl) + .asSubclass(ReplicationEndpoint.class).newInstance(); if(rsServerHost != null) { ReplicationEndpoint newReplicationEndPoint = rsServerHost .postCreateReplicationEndPoint(replicationEndpoint); @@ -509,7 +515,7 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(peerId); // init replication source - src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId, + src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, clusterId, replicationEndpoint, walFileLengthProvider, metrics); // init replication endpoint @@ -520,21 +526,21 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Transfer all the queues of the specified to this region server. - * First it tries to grab a lock and if it works it will move the - * znodes and finally will delete the old znodes. - * + * Transfer all the queues of the specified to this region server. First it tries to grab a lock + * and if it works it will move the znodes and finally will delete the old znodes. + *

* It creates one old source for any type of source of the old rs. - * @param rsZnode */ - private void transferQueues(String rsZnode) { - NodeFailoverWorker transfer = - new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers, - this.clusterId); + private void transferQueues(ServerName deadRS) { + if (server.getServerName().equals(deadRS)) { + // it's just us, give up + return; + } + NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS); try { this.executor.execute(transfer); } catch (RejectedExecutionException ex) { - LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage()); + LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage()); } } @@ -571,7 +577,7 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); addSource(id); if (replicationForBulkLoadDataEnabled) { - this.replicationQueues.addPeerToHFileRefs(id); + this.queueStorage.addPeerToHFileRefs(id); } } } @@ -624,12 +630,12 @@ public class ReplicationSourceManager implements ReplicationListener { deleteSource(id, true); } // Remove HFile Refs znode from zookeeper - this.replicationQueues.removePeerFromHFileRefs(id); + abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id)); } @Override public void regionServerRemoved(String regionserver) { - transferQueues(regionserver); + transferQueues(ServerName.valueOf(regionserver)); } /** @@ -638,37 +644,21 @@ public class ReplicationSourceManager implements ReplicationListener { */ class NodeFailoverWorker extends Thread { - private String rsZnode; - private final ReplicationQueues rq; - private final ReplicationPeers rp; - private final UUID clusterId; + private final ServerName deadRS; - /** - * @param rsZnode - */ - public NodeFailoverWorker(String rsZnode) { - this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId); - } - - public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues, - final ReplicationPeers replicationPeers, final UUID clusterId) { - super("Failover-for-"+rsZnode); - this.rsZnode = rsZnode; - this.rq = replicationQueues; - this.rp = replicationPeers; - this.clusterId = clusterId; + @VisibleForTesting + public NodeFailoverWorker(ServerName deadRS) { + super("Failover-for-" + deadRS); + this.deadRS = deadRS; } @Override public void run() { - if (this.rq.isThisOurRegionServer(rsZnode)) { - return; - } // Wait a bit before transferring the queues, we may be shutting down. // This sleep may not be enough in some cases. try { Thread.sleep(sleepBeforeFailover + - (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting before transferring a queue."); Thread.currentThread().interrupt(); @@ -679,25 +669,30 @@ public class ReplicationSourceManager implements ReplicationListener { return; } Map> newQueues = new HashMap<>(); - List peers = rq.getUnClaimedQueueIds(rsZnode); - while (peers != null && !peers.isEmpty()) { - Pair> peer = this.rq.claimQueue(rsZnode, - peers.get(ThreadLocalRandom.current().nextInt(peers.size()))); - long sleep = sleepBeforeFailover/2; - if (peer != null) { - newQueues.put(peer.getFirst(), peer.getSecond()); - sleep = sleepBeforeFailover; + try { + List peers = queueStorage.getAllQueues(deadRS); + while (!peers.isEmpty()) { + Pair> peer = queueStorage.claimQueue(deadRS, + peers.get(ThreadLocalRandom.current().nextInt(peers.size())), server.getServerName()); + long sleep = sleepBeforeFailover / 2; + if (!peer.getSecond().isEmpty()) { + newQueues.put(peer.getFirst(), peer.getSecond()); + sleep = sleepBeforeFailover; + } + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting before transferring a queue."); + Thread.currentThread().interrupt(); + } + peers = queueStorage.getAllQueues(deadRS); } - try { - Thread.sleep(sleep); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting before transferring a queue."); - Thread.currentThread().interrupt(); + if (!peers.isEmpty()) { + queueStorage.removeReplicatorIfQueueIsEmpty(deadRS); } - peers = rq.getUnClaimedQueueIds(rsZnode); - } - if (peers != null) { - rq.removeReplicatorIfQueueIsEmpty(rsZnode); + } catch (ReplicationException e) { + server.abort("Failed to claim queue from dead regionserver", e); + return; } // Copying over the failed queue is completed. if (newQueues.isEmpty()) { @@ -722,8 +717,8 @@ public class ReplicationSourceManager implements ReplicationListener { + ex); } if (peer == null || peerConfig == null) { - LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode); - replicationQueues.removeQueue(peerId); + LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS); + abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); continue; } // track sources in walsByIdRecoveredQueues @@ -740,13 +735,11 @@ public class ReplicationSourceManager implements ReplicationListener { } // enqueue sources - ReplicationSourceInterface src = - getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, - server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider); + ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // see removePeer synchronized (oldsources) { - if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) { + if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) { src.terminate("Recovered queue doesn't belong to any current peer"); closeRecoveredQueue(src); continue; @@ -765,22 +758,29 @@ public class ReplicationSourceManager implements ReplicationListener { } } - class AdoptAbandonedQueuesWorker extends Thread{ + class AdoptAbandonedQueuesWorker extends Thread { public AdoptAbandonedQueuesWorker() {} @Override public void run() { - List currentReplicators = replicationQueues.getListOfReplicators(); + List currentReplicators = null; + try { + currentReplicators = queueStorage.getListOfReplicators(); + } catch (ReplicationException e) { + server.abort("Failed to get all replicators", e); + return; + } if (currentReplicators == null || currentReplicators.isEmpty()) { return; } - List otherRegionServers = replicationTracker.getListOfRegionServers(); - LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " - + otherRegionServers); + List otherRegionServers = replicationTracker.getListOfRegionServers().stream() + .map(ServerName::valueOf).collect(Collectors.toList()); + LOG.info( + "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); // Look if there's anything to process after a restart - for (String rs : currentReplicators) { + for (ServerName rs : currentReplicators) { if (!otherRegionServers.contains(rs)) { transferQueues(rs); } @@ -846,7 +846,7 @@ public class ReplicationSourceManager implements ReplicationListener { } public void cleanUpHFileRefs(String peerId, List files) { - this.replicationQueues.removeHFileRefs(peerId, files); + abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index e678e1b..8e7c83f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -45,9 +45,8 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -112,9 +111,8 @@ public class TestLogsCleaner { Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); - ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues( - new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); - repQueues.init(server.getServerName().toString()); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs"); String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); @@ -145,7 +143,7 @@ public class TestLogsCleaner { // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner if (i % (30 / 3) == 1) { - repQueues.addLog(fakeMachineName, fileName.getName()); + queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName()); LOG.info("Replication log file: " + fileName); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index cfad645..6a34493 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -47,9 +47,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -73,19 +72,16 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; @Category({ MasterTests.class, SmallTests.class }) public class TestReplicationHFileCleaner { - private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Server server; - private static ReplicationQueues rq; + private static ReplicationQueueStorage rq; private static ReplicationPeers rp; private static final String peerId = "TestReplicationHFileCleaner"; private static Configuration conf = TEST_UTIL.getConfiguration(); static FileSystem fs = null; Path root; - /** - * @throws java.lang.Exception - */ @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); @@ -94,20 +90,10 @@ public class TestReplicationHFileCleaner { Replication.decorateMasterConfiguration(conf); rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); rp.init(); - rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); - rq.init(server.getServerName().toString()); - try { - fs = FileSystem.get(conf); - } finally { - if (fs != null) { - fs.close(); - } - } + rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); + fs = FileSystem.get(conf); } - /** - * @throws java.lang.Exception - */ @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniZKCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java index 8178266..2ad8bd7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java @@ -26,10 +26,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -49,14 +47,12 @@ public class TestReplicationZKNodeCleaner { private final Configuration conf; private final ZKWatcher zkw; - private final ReplicationQueues repQueues; + private final ReplicationQueueStorage repQueues; public TestReplicationZKNodeCleaner() throws Exception { conf = TEST_UTIL.getConfiguration(); zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null); - repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, - zkw)); - assertTrue(repQueues instanceof ReplicationQueuesZKImpl); + repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); } @BeforeClass @@ -72,9 +68,8 @@ public class TestReplicationZKNodeCleaner { @Test public void testReplicationZKNodeCleaner() throws Exception { - repQueues.init(SERVER_ONE.getServerName()); // add queue for ID_ONE which isn't exist - repQueues.addLog(ID_ONE, "file1"); + repQueues.addWAL(SERVER_ONE, ID_ONE, "file1"); ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); Map> undeletedQueues = cleaner.getUnDeletedQueues(); @@ -84,7 +79,7 @@ public class TestReplicationZKNodeCleaner { assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE)); // add a recovery queue for ID_TWO which isn't exist - repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2"); + repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2"); undeletedQueues = cleaner.getUnDeletedQueues(); assertEquals(1, undeletedQueues.size()); @@ -100,11 +95,10 @@ public class TestReplicationZKNodeCleaner { @Test public void testReplicationZKNodeCleanerChore() throws Exception { - repQueues.init(SERVER_ONE.getServerName()); // add queue for ID_ONE which isn't exist - repQueues.addLog(ID_ONE, "file1"); + repQueues.addWAL(SERVER_ONE, ID_ONE, "file1"); // add a recovery queue for ID_TWO which isn't exist - repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2"); + repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2"); // Wait the cleaner chore to run Thread.sleep(20000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 7ea79f9..14c5e56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -46,9 +45,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { MetricsSource metrics; WALFileLengthProvider walFileLengthProvider; AtomicBoolean startup = new AtomicBoolean(false); + @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId, + ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.manager = manager; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 9da0745..c57d9bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -94,8 +94,6 @@ public class TestMultiSlaveReplication { conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); conf1.setInt("hbase.master.cleaner.interval", 5 * 1000); - conf1.setClass("hbase.region.replica.replication.replicationQueues.class", - ReplicationQueuesZKImpl.class, ReplicationQueues.class); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 9f234a8..ed0814d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -67,10 +67,10 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -95,6 +95,7 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -328,18 +329,14 @@ public abstract class TestReplicationSourceManager { @Test public void testClaimQueues() throws Exception { - final Server server = new DummyServer("hostname0.example.org"); - - - ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, - server.getZooKeeper())); - rq.init(server.getServerName().toString()); + Server server = new DummyServer("hostname0.example.org"); + ReplicationQueueStorage rq = ReplicationStorageFactory + .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { - rq.addLog("1", file); + rq.addWAL(server.getServerName(), "1", file); } // create 3 DummyServers Server s1 = new DummyServer("dummyserver1.example.org"); @@ -347,12 +344,9 @@ public abstract class TestReplicationSourceManager { Server s3 = new DummyServer("dummyserver3.example.org"); // create 3 DummyNodeFailoverWorkers - DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker( - server.getServerName().getServerName(), s1); - DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker( - server.getServerName().getServerName(), s2); - DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker( - server.getServerName().getServerName(), s3); + DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1); + DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2); + DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3); latch = new CountDownLatch(3); // start the threads @@ -371,11 +365,9 @@ public abstract class TestReplicationSourceManager { @Test public void testCleanupFailoverQueues() throws Exception { - final Server server = new DummyServer("hostname1.example.org"); - ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, - server.getZooKeeper())); - rq.init(server.getServerName().toString()); + Server server = new DummyServer("hostname1.example.org"); + ReplicationQueueStorage rq = ReplicationStorageFactory + .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // populate some znodes in the peer znode SortedSet files = new TreeSet<>(); String group = "testgroup"; @@ -384,19 +376,14 @@ public abstract class TestReplicationSourceManager { files.add(file1); files.add(file2); for (String file : files) { - rq.addLog("1", file); + rq.addWAL(server.getServerName(), "1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); - ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, - s1.getZooKeeper())); - rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); NodeFailoverWorker w1 = - manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( - new Long(1), new Long(2))); + manager.new NodeFailoverWorker(server.getServerName()); w1.run(); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); @@ -408,17 +395,16 @@ public abstract class TestReplicationSourceManager { @Test public void testCleanupUnknownPeerZNode() throws Exception { - final Server server = new DummyServer("hostname2.example.org"); - ReplicationQueues rq = ReplicationFactory.getReplicationQueues( - new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); - rq.init(server.getServerName().toString()); + Server server = new DummyServer("hostname2.example.org"); + ReplicationQueueStorage rq = ReplicationStorageFactory + .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // populate some znodes in the peer znode // add log to an unknown peer String group = "testgroup"; - rq.addLog("2", group + ".log1"); - rq.addLog("2", group + ".log2"); + rq.addWAL(server.getServerName(), "2", group + ".log1"); + rq.addWAL(server.getServerName(), "2", group + ".log2"); - NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName()); + NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); w1.run(); // The log of the unknown peer should be removed from zk @@ -481,10 +467,8 @@ public abstract class TestReplicationSourceManager { .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase"); try { DummyServer server = new DummyServer(); - final ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments( - server.getConfiguration(), server, server.getZooKeeper())); - rq.init(server.getServerName().toString()); + ReplicationQueueStorage rq = ReplicationStorageFactory + .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface // initialization to throw an exception. conf.set("replication.replicationsource.implementation", @@ -498,11 +482,11 @@ public abstract class TestReplicationSourceManager { assertNull(manager.getSource(peerId)); // Create a replication queue for the fake peer - rq.addLog(peerId, "FakeFile"); + rq.addWAL(server.getServerName(), peerId, "FakeFile"); // Unregister peer, this should remove the peer and clear all queues associated with it // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. removePeerAndWait(peerId); - assertFalse(rq.getAllQueues().contains(peerId)); + assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId)); } finally { conf.set("replication.replicationsource.implementation", replicationSourceImplName); removePeerAndWait(peerId); @@ -625,11 +609,12 @@ public abstract class TestReplicationSourceManager { } } Waiter.waitFor(conf, 20000, new Waiter.Predicate() { - @Override public boolean evaluate() throws Exception { + @Override + public boolean evaluate() throws Exception { List peers = rp.getAllPeerIds(); - return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null) - && (!peers.contains(peerId)) - && manager.getSource(peerId) == null; + return (!manager.getAllQueues().contains(peerId)) && + (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) && + manager.getSource(peerId) == null; } }); } @@ -672,25 +657,24 @@ public abstract class TestReplicationSourceManager { static class DummyNodeFailoverWorker extends Thread { private Map> logZnodesMap; Server server; - private String deadRsZnode; - ReplicationQueues rq; + private ServerName deadRS; + ReplicationQueueStorage rq; - public DummyNodeFailoverWorker(String znode, Server s) throws Exception { - this.deadRsZnode = znode; + public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception { + this.deadRS = deadRS; this.server = s; - this.rq = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, - server.getZooKeeper())); - this.rq.init(this.server.getServerName().toString()); + this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), + server.getConfiguration()); } @Override public void run() { try { logZnodesMap = new HashMap<>(); - List queues = rq.getUnClaimedQueueIds(deadRsZnode); - for(String queue:queues){ - Pair> pair = rq.claimQueue(deadRsZnode, queue); + List queues = rq.getAllQueues(deadRS); + for (String queue : queues) { + Pair> pair = + rq.claimQueue(deadRS, queue, server.getServerName()); if (pair != null) { logZnodesMap.put(pair.getFirst(), pair.getSecond()); } @@ -729,7 +713,7 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId, + ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throw new IOException("Failing deliberately"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index aeab8b0..c6d9eef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -25,11 +25,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.junit.BeforeClass; @@ -41,8 +40,9 @@ import org.junit.experimental.categories.Category; * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors. */ -@Category({ReplicationTests.class, MediumTests.class}) +@Category({ ReplicationTests.class, MediumTests.class }) public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager { + @BeforeClass public static void setUpBeforeClass() throws Exception { conf = HBaseConfiguration.create(); @@ -58,16 +58,14 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl @Test public void testNodeFailoverDeadServerParsing() throws Exception { - final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); - ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, - server.getZooKeeper())); - repQueues.init(server.getServerName().toString()); + Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { - repQueues.addLog("1", file); + queueStorage.addWAL(server.getServerName(), "1", file); } // create 3 DummyServers @@ -76,30 +74,22 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three servers fail sequentially - ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, - s1.getZooKeeper())); - rq1.init(s1.getServerName().toString()); - String serverName = server.getServerName().getServerName(); - List unclaimed = rq1.getUnClaimedQueueIds(serverName); - rq1.claimQueue(serverName, unclaimed.get(0)).getSecond(); - rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); - ReplicationQueues rq2 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2, - s2.getZooKeeper())); - rq2.init(s2.getServerName().toString()); - serverName = s1.getServerName().getServerName(); - unclaimed = rq2.getUnClaimedQueueIds(serverName); - rq2.claimQueue(serverName, unclaimed.get(0)).getSecond(); - rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); - ReplicationQueues rq3 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3, - s3.getZooKeeper())); - rq3.init(s3.getServerName().toString()); - serverName = s2.getServerName().getServerName(); - unclaimed = rq3.getUnClaimedQueueIds(serverName); - String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst(); - rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); + ServerName serverName = server.getServerName(); + List unclaimed = queueStorage.getAllQueues(serverName); + queueStorage.claimQueue(serverName, unclaimed.get(0), s1.getServerName()); + queueStorage.removeReplicatorIfQueueIsEmpty(serverName); + + serverName = s1.getServerName(); + unclaimed = queueStorage.getAllQueues(serverName); + queueStorage.claimQueue(serverName, unclaimed.get(0), s2.getServerName()); + queueStorage.removeReplicatorIfQueueIsEmpty(serverName); + + serverName = s2.getServerName(); + unclaimed = queueStorage.getAllQueues(serverName); + String queue3 = + queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst(); + queueStorage.removeReplicatorIfQueueIsEmpty(serverName); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3); List result = replicationQueueInfo.getDeadRegionServers(); // verify -- 2.7.4