From d28450cfe0fc51805a9a1a0233dbb190e710284f Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 21 Dec 2017 18:49:59 +0800 Subject: [PATCH] HBASE-19543 Abstract a replication storage interface to extract the zk specific code --- .../apache/hadoop/hbase/util/CollectionUtils.java | 3 + hbase-replication/pom.xml | 12 + .../hbase/replication/ReplicationPeerStorage.java | 53 +++ .../hbase/replication/ReplicationQueueStorage.java | 174 +++++++++ .../hbase/replication/ReplicationStateZKBase.java | 1 - .../replication/ReplicationStorageFactory.java | 40 ++ .../replication/ZKReplicationPeerStorage.java | 165 ++++++++ .../replication/ZKReplicationQueueStorage.java | 419 +++++++++++++++++++++ .../replication/ZKReplicationStorageBase.java | 75 ++++ .../replication/TestZKReplicationPeerStorage.java | 171 +++++++++ .../replication/TestZKReplicationQueueStorage.java | 119 ++++++ .../org/apache/hadoop/hbase/master/HMaster.java | 36 +- .../apache/hadoop/hbase/master/MasterServices.java | 6 +- .../hbase/master/procedure/MasterProcedureEnv.java | 20 +- .../hbase/master/replication/AddPeerProcedure.java | 6 +- .../master/replication/DisablePeerProcedure.java | 7 +- .../master/replication/EnablePeerProcedure.java | 6 +- .../master/replication/ModifyPeerProcedure.java | 41 +- .../master/replication/RemovePeerProcedure.java | 6 +- .../master/replication/ReplicationManager.java | 199 ---------- .../master/replication/ReplicationPeerManager.java | 331 ++++++++++++++++ .../replication/UpdatePeerConfigProcedure.java | 7 +- .../client/replication/TestReplicationAdmin.java | 60 +-- .../hbase/master/MockNoopMasterServices.java | 10 +- .../hadoop/hbase/master/TestMasterNoCluster.java | 4 +- 25 files changed, 1676 insertions(+), 295 deletions(-) create mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java create mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java create mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java create mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java create mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java create mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java create mode 100644 hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java create mode 100644 hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java index 875b124..8bbb6f1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java @@ -107,6 +107,9 @@ public class CollectionUtils { return list.get(list.size() - 1); } + public static List nullToEmpty(List list) { + return list != null ? list : Collections.emptyList(); + } /** * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the * value already exists. Notice that the implementation does not guarantee that the supplier will diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml index 0236601..f914509 100644 --- a/hbase-replication/pom.xml +++ b/hbase-replication/pom.xml @@ -121,6 +121,18 @@ org.apache.hbase hbase-zookeeper + + org.apache.hbase + hbase-common + test-jar + test + + + org.apache.hbase + hbase-zookeeper + test-jar + test + org.apache.commons diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java new file mode 100644 index 0000000..5082f42 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Optional; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Perform read/write to the replication peer storage. + */ +@InterfaceAudience.Private +public interface ReplicationPeerStorage { + + /** + * Add a replication peer. + * @param peerId + * @param peerConfig + * @param enabled + * @throws ReplicationException if there are errors accessing the storage service. + */ + void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException; + + void removePeer(String peerId) throws ReplicationException; + + void setPeerState(String peerId, boolean enabled) throws ReplicationException; + + void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException; + + List listPeerIds() throws ReplicationException; + + boolean isPeerEnabled(String peerId) throws ReplicationException; + + Optional getPeerConfig(String peerId) throws ReplicationException; +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java new file mode 100644 index 0000000..9e05769 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +/** + * Perform read/write to the replication queue storage. + */ +@InterfaceAudience.Private +public interface ReplicationQueueStorage { + + /** + * Remove a replication queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue. + */ + void removeQueue(ServerName serverName, String queueId) throws ReplicationException; + + /** + * Add a new WAL file to the given queue for a given regionserver. If the queue does not exist it + * is created. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue. + * @param fileName name of the WAL + */ + void addLog(ServerName serverName, String queueId, String fileName) throws ReplicationException; + + /** + * Remove an WAL file from the given queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue. + * @param fileName name of the WAL + */ + void removeLog(ServerName serverName, String queueId, String fileName) throws ReplicationException; + + /** + * Set the current position for a specific WAL in a given queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue + * @param fileName name of the WAL + * @param position the current position in the file + */ + void setLogPosition(ServerName serverName, String queueId, String fileName, long position) + throws ReplicationException; + + /** + * Get the current position for a specific WAL in a given queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue + * @param fileName name of the WAL + * @return the current position in the file + */ + long getLogPosition(ServerName serverName, String queueId, String fileName) + throws ReplicationException; + + /** + * Get queueIds from a dead region server, whose queues has not been claimed by other region + * servers. + * @return empty if the queue exists but no children, null if the queue does not exist. + */ + List getUnclaimedQueueIds(ServerName serverName) throws ReplicationException; + + /** + * Change ownership for the queue identified by queueId and belongs to a dead region server. + * @param sourceServerName the name of the dead region server + * @param destServerName the name of the target region server + * @param queueId the id of the queue + * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue. + */ + Pair> claimQueue(ServerName sourceServerName, String queueId, + ServerName destServerName) throws ReplicationException; + + /** + * Remove the record of region server if the queue is empty. + * @param regionserver + */ + void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException; + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + List getListOfReplicators() throws ReplicationException; + + /** + * Get a list of all queues for the specified region server. + * @param serverName the server name of the region server that owns the set of queues + * @return a list of queueIds, null if this region server is not a replicator. + */ + List getAllQueues(ServerName serverName) throws ReplicationException; + + /** + * Load all wals in all replication queues. This method guarantees to return a snapshot which + * contains all WALs in the zookeeper at the start of this call even there is concurrent queue + * failover. However, some newly created WALs during the call may not be included. + */ + Set getAllWALs() throws ReplicationException; + + /** + * Add a peer to hfile reference queue if peer does not exist. + * @param peerId peer cluster id to be added + * @throws ReplicationException if fails to add a peer id to hfile reference queue + */ + void addPeerToHFileRefs(String peerId) throws ReplicationException; + + /** + * Remove a peer from hfile reference queue. + * @param peerId peer cluster id to be removed + */ + void removePeerFromHFileRefs(String peerId) throws ReplicationException; + + /** + * Add new hfile references to the queue. + * @param peerId peer cluster id to which the hfiles need to be replicated + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue } + * @throws ReplicationException if fails to add a hfile reference + */ + void addHFileRefs(String peerId, List> pairs) throws ReplicationException; + + /** + * Remove hfile references from the queue. + * @param peerId peer cluster id from which this hfile references needs to be removed + * @param files list of hfile references to be removed + */ + void removeHFileRefs(String peerId, List files) throws ReplicationException; + + /** + * Get the change version number of replication hfile references node. This can be used as + * optimistic locking to get a consistent snapshot of the replication queues of hfile references. + * @return change version number of hfile references node + */ + int getHFileRefsNodeChangeVersion() throws ReplicationException; + + /** + * Get list of all peers from hfile reference queue. + * @return a list of peer ids + * @throws KeeperException zookeeper exception + */ + List getAllPeersFromHFileRefsQueue() throws ReplicationException; + + /** + * Get a list of all hfile references in the given peer. + * @param peerId a String that identifies the peer + * @return a list of hfile references, null if not found any + * @throws KeeperException zookeeper exception + */ + List getReplicableHFiles(String peerId) throws ReplicationException; +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index ad970c6..edbbd4b 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -63,7 +63,6 @@ public abstract class ReplicationStateZKBase { protected final Configuration conf; protected final Abortable abortable; - // Public for testing public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); public static final byte[] DISABLED_ZNODE_BYTES = diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java new file mode 100644 index 0000000..304d6ae --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Used to create replication storage(peer, queue) classes. + *

+ * For now we only have zk based implementation. + */ +@InterfaceAudience.Private +public class ReplicationStorageFactory { + + public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) { + return new ZKReplicationPeerStorage(zk, conf); + } + + public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk, + Configuration conf) { + return new ZKReplicationQueueStorage(zk, conf); + } +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java new file mode 100644 index 0000000..e650133 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * ZK based replication peer storage. + */ +@InterfaceAudience.Private +class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage { + + private static final Log LOG = LogFactory.getLog(ZKReplicationPeerStorage.class); + + public static final byte[] ENABLED_ZNODE_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); + public static final byte[] DISABLED_ZNODE_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); + + /** + * The name of the znode that contains the replication status of a remote slave (i.e. peer) + * cluster. + */ + private final String peerStateNodeName; + + /** + * The name of the znode that contains a list of all remote slave (i.e. peer) clusters. + */ + private final String peersZNode; + + public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); + String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); + } + + private String getPeerStateNode(String peerId) { + return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName); + } + + private String getPeerNode(String peerId) { + return ZNodePaths.joinZNode(peersZNode, peerId); + } + + @Override + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException { + try { + ZKUtil.createWithParents(zookeeper, peersZNode); + ZKUtil.multiOrSequential(zookeeper, + Arrays.asList( + ZKUtilOp.createAndFailSilent(getPeerNode(peerId), + ReplicationPeerConfigUtil.toByteArray(peerConfig)), + ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), + enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), + false); + } catch (KeeperException e) { + throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); + } + } + + @Override + public void removePeer(String peerId) throws ReplicationException { + try { + ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException("Could not remove peer with id=" + peerId, e); + } + } + + @Override + public void setPeerState(String peerId, boolean enabled) throws ReplicationException { + byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; + try { + ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes); + } catch (KeeperException e) { + throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e); + } + } + + @Override + public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + try { + ZKUtil.setData(this.zookeeper, getPeerNode(peerId), + ReplicationPeerConfigUtil.toByteArray(peerConfig)); + } catch (KeeperException e) { + throw new ReplicationException( + "There was a problem trying to save changes to the " + "replication peer " + peerId, e); + } + } + + @Override + public List listPeerIds() throws ReplicationException { + try { + return CollectionUtils.nullToEmpty(ZKUtil.listChildrenAndWatchThem(zookeeper, peersZNode)); + } catch (KeeperException e) { + throw new ReplicationException("Cannot get the list of peers", e); + } + } + + @Override + public boolean isPeerEnabled(String peerId) throws ReplicationException { + try { + return Arrays.equals(ENABLED_ZNODE_BYTES, + ZKUtil.getData(zookeeper, getPeerStateNode(peerId))); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); + } + } + + @Override + public Optional getPeerConfig(String peerId) + throws ReplicationException { + byte[] data; + try { + data = ZKUtil.getData(zookeeper, getPeerNode(peerId)); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); + } + if (data == null || data.length == 0) { + return Optional.empty(); + } + try { + return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data)); + } catch (DeserializationException e) { + LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e); + return Optional.empty(); + } + } +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java new file mode 100644 index 0000000..115f17d --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.util.CollectionUtils.nullToEmpty; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.BadVersionException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.KeeperException.NotEmptyException; +import org.apache.zookeeper.data.Stat; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; + +/** + * ZK based replication queue storage. + */ +@InterfaceAudience.Private +class ZKReplicationQueueStorage extends ZKReplicationStorageBase + implements ReplicationQueueStorage { + + private static final Log LOG = LogFactory.getLog(ZKReplicationQueueStorage.class); + + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = + "zookeeper.znode.replication.hfile.refs"; + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; + + /** + * The name of the znode that contains all replication queues + */ + private final String queuesZNode; + + /** + * The name of the znode that contains queues of hfile references to be replicated + */ + private final String hfileRefsZNode; + + public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + + String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); + String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); + this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); + this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); + } + + private String getRsNode(ServerName serverName) { + return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); + } + + private String getQueueNode(ServerName serverName, String queueId) { + return ZNodePaths.joinZNode(getRsNode(serverName), queueId); + } + + private String getFileNode(ServerName serverName, String queueId, String fileName) { + return ZNodePaths.joinZNode(getQueueNode(serverName, queueId), fileName); + } + + @Override + public void removeQueue(ServerName serverName, String queueId) throws ReplicationException { + try { + ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e); + } + } + + @Override + public void addLog(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + try { + ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + } + } + + @Override + public void removeLog(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + String fileNode = getFileNode(serverName, queueId, fileName); + try { + ZKUtil.deleteNode(zookeeper, fileNode); + } catch (NoNodeException e) { + LOG.warn(fileNode + " has already been deleted when removing log"); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + } + } + + @Override + public void setLogPosition(ServerName serverName, String queueId, String fileName, long position) + throws ReplicationException { + try { + ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName), + ZKUtil.positionToByteArray(position)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to set log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); + } + } + + @Override + public long getLogPosition(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + byte[] bytes; + try { + bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName)); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Failed to get log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + } + try { + return ZKUtil.parseWALPositionFrom(bytes); + } catch (DeserializationException de) { + LOG.warn("Failed to parse log position (serverName=" + serverName + ", queueId=" + queueId + + ", fileName=" + fileName + ")"); + } + // if we can not parse the position, start at the beginning of the wal file again + return 0; + } + + @Override + public List getUnclaimedQueueIds(ServerName serverName) throws ReplicationException { + try { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName))); + } catch (KeeperException e) { + throw new ReplicationException("Failed to getUnClaimedQueueIds for RS" + serverName, e); + } + } + + @Override + public Pair> claimQueue(ServerName sourceServerName, String queueId, + ServerName destServerName) throws ReplicationException { + LOG.info( + "Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + try { + String oldQueueNode = getQueueNode(sourceServerName, queueId); + List wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode); + String newQueueId = queueId + "-" + sourceServerName; + if (wals == null || wals.isEmpty()) { + ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode); + LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty"); + return new Pair<>(newQueueId, Collections.emptySortedSet()); + } + String newQueueNode = getQueueNode(destServerName, newQueueId); + List listOfOps = new ArrayList<>(); + SortedSet logQueue = new TreeSet<>(); + // create the new cluster znode + listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY)); + // get the offset of the logs and set it to new znodes + for (String wal : wals) { + String oldWalZnode = ZNodePaths.joinZNode(oldQueueNode, wal); + byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode); + LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset)); + String newLogZnode = ZNodePaths.joinZNode(newQueueNode, wal); + listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); + logQueue.add(wal); + } + // add delete op for peer + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode)); + + if (LOG.isTraceEnabled()) { + LOG.trace("The multi list size is: " + listOfOps.size()); + } + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); + + LOG.info( + "Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + return new Pair<>(newQueueId, logQueue); + } catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) { + // Multi call failed; it looks like some other regionserver took away the logs. + // These exceptions mean that zk tells us the request can not be execute so it is safe to just + // return a null. For other types of exception should be thrown out to notify the upper layer. + LOG.info( + "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + + " failed with " + e.toString() + ", maybe someone else has already took away the logs"); + return null; + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Claim queue queueId=" + queueId + " from " + + sourceServerName + " to " + destServerName + " failed", e); + } + } + + @Override + public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException { + try { + ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName)); + } catch (NotEmptyException e) { + // keep silence to avoid logging too much. + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove replicator for " + serverName, e); + } + } + + private List getListOfReplicators0() throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode)).stream() + .map(ServerName::parseServerName).collect(toList()); + } + + @Override + public List getListOfReplicators() throws ReplicationException { + try { + return getListOfReplicators0(); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of replicators", e); + } + } + + private List getLogsInQueue0(ServerName serverName, String queueId) + throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId))); + } + + private List getAllQueues0(ServerName serverName) throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName))); + } + + @Override + public List getAllQueues(ServerName serverName) throws ReplicationException { + try { + return getAllQueues0(serverName); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e); + } + } + + private int getQueuesZNodeCversion() throws KeeperException { + Stat stat = new Stat(); + ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); + return stat.getCversion(); + } + + @Override + public Set getAllWALs() throws ReplicationException { + try { + for (int retry = 0;; retry++) { + int v0 = getQueuesZNodeCversion(); + List rss = getListOfReplicators0(); + if (rss.isEmpty()) { + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return Collections.emptySet(); + } + Set wals = Sets.newHashSet(); + for (ServerName rs : rss) { + for (String queueId : getAllQueues0(rs)) { + wals.addAll(getLogsInQueue0(rs, queueId)); + } + } + int v1 = getQueuesZNodeCversion(); + if (v0 == v1) { + return wals; + } + LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", + v0, v1, retry)); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to get all wals", e); + } + } + + private String getHFileRefsPeerNode(String peerId) { + return ZNodePaths.joinZNode(hfileRefsZNode, peerId); + } + + private String getHFileNode(String peerNode, String fileName) { + return ZNodePaths.joinZNode(peerNode, fileName); + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + try { + if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { + LOG.info("Adding peer " + peerId + " to hfile reference queue."); + ZKUtil.createWithParents(zookeeper, peerNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", + e); + } + } + + @Override + public void removePeerFromHFileRefs(String peerId) throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + try { + if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Peer " + peerNode + " not found in hfile reference queue."); + } + } else { + LOG.info("Removing peer " + peerNode + " from hfile reference queue."); + ZKUtil.deleteNodeRecursively(zookeeper, peerNode); + } + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to remove peer " + peerId + " from hfile reference queue.", e); + } + } + + @Override + public void addHFileRefs(String peerId, List> pairs) + throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Adding hfile references " + pairs + " in queue " + peerNode); + } + List listOfOps = + pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n)) + .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); + if (debugEnabled) { + LOG.debug("The multi list size for adding hfile references in zk for node " + peerNode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e); + } + } + + @Override + public void removeHFileRefs(String peerId, List files) throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Removing hfile references " + files + " from queue " + peerNode); + } + + List listOfOps = files.stream().map(n -> getHFileNode(peerNode, n)) + .map(ZKUtilOp::deleteNodeFailSilent).collect(toList()); + if (debugEnabled) { + LOG.debug("The multi list size for removing hfile references in zk for node " + peerNode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e); + } + } + + @Override + public int getHFileRefsNodeChangeVersion() throws ReplicationException { + Stat stat = new Stat(); + try { + ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get stat of replication hfile references node.", e); + } + return stat.getCversion(); + } + + @Override + public List getAllPeersFromHFileRefsQueue() throws ReplicationException { + try { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of all peers in hfile references node.", + e); + } + } + + @Override + public List getReplicableHFiles(String peerId) throws ReplicationException { + try { + return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId))); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, + e); + } + } + +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java new file mode 100644 index 0000000..b8a2044 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * This is a base class for maintaining replication related data,for example, peer, queue, etc, in + * zookeeper. + */ +@InterfaceAudience.Private +class ZKReplicationStorageBase { + + /** The name of the base znode that contains all replication state. */ + protected final String replicationZNode; + + protected final ZKWatcher zookeeper; + protected final Configuration conf; + + protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) { + this.zookeeper = zookeeper; + this.conf = conf; + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + + this.replicationZNode = + ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName); + + } + + /** + * Serialized protobuf of state with pb magic prefix prepended suitable for use as + * content of a peer-state znode under a peer cluster id as in + * /hbase/replication/peers/PEER_ID/peer-state. + */ + protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { + ReplicationProtos.ReplicationState msg = + ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); + // There is no toByteArray on this pb Message? + // 32 bytes is default which seems fair enough here. + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16); + msg.writeTo(cos); + cos.flush(); + baos.flush(); + return ProtobufUtil.prependPBMagic(baos.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java new file mode 100644 index 0000000..a3be1e6 --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Stream; + +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestZKReplicationPeerStorage { + + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static ZKReplicationPeerStorage STORAGE; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniZKCluster(); + STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniZKCluster(); + } + + private Set randNamespaces(Random rand) { + return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) + .collect(toSet()); + } + + private Map> randTableCFs(Random rand) { + int size = rand.nextInt(5); + Map> map = new HashMap<>(); + for (int i = 0; i < size; i++) { + TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); + List cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) + .limit(rand.nextInt(5)).collect(toList()); + map.put(tn, cfs); + } + return map; + } + + private ReplicationPeerConfig getConfig(int seed) { + Random rand = new Random(seed); + ReplicationPeerConfig config = new ReplicationPeerConfig(); + config.setClusterKey(Long.toHexString(rand.nextLong())); + config.setReplicationEndpointImpl(Long.toHexString(rand.nextLong())); + config.setNamespaces(randNamespaces(rand)); + config.setExcludeNamespaces(randNamespaces(rand)); + config.setTableCFsMap(randTableCFs(rand)); + config.setReplicateAllUserTables(rand.nextBoolean()); + config.setBandwidth(rand.nextInt(1000)); + return config; + } + + private void assertSetEquals(Set expected, Set actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach(s -> assertTrue(actual.contains(s))); + } + + private void assertMapEquals(Map> expected, + Map> actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach((expectedTn, expectedCFs) -> { + List actualCFs = actual.get(expectedTn); + if (expectedCFs == null || expectedCFs.size() == 0) { + assertTrue(actual.containsKey(expectedTn)); + assertTrue(actualCFs == null || actualCFs.size() == 0); + } else { + assertNotNull(actualCFs); + assertEquals(expectedCFs.size(), actualCFs.size()); + for (Iterator expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator(); + expectedIt.hasNext();) { + assertEquals(expectedIt.next(), actualIt.next()); + } + } + }); + } + + private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { + assertEquals(expected.getClusterKey(), actual.getClusterKey()); + assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); + assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); + assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); + assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); + assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); + assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); + assertEquals(expected.getBandwidth(), actual.getBandwidth()); + } + + @Test + public void test() throws ReplicationException { + int peerCount = 10; + for (int i = 0; i < peerCount; i++) { + STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); + } + List peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount, peerIds.size()); + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get()); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); + } + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get()); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + String toRemove = Integer.toString(peerCount / 2); + STORAGE.removePeer(toRemove); + peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount - 1, peerIds.size()); + assertFalse(peerIds.contains(toRemove)); + assertFalse(STORAGE.getPeerConfig(toRemove).isPresent()); + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java new file mode 100644 index 0000000..fe70baf --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestZKReplicationQueueStorage { + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static ZKReplicationQueueStorage STORAGE; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniZKCluster(); + STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniZKCluster(); + } + + @After + public void tearDownAfterTest() throws ReplicationException { + for (ServerName serverName : STORAGE.getListOfReplicators()) { + for (String queue : STORAGE.getAllQueues(serverName)) { + STORAGE.removeQueue(serverName, queue); + } + STORAGE.removeReplicatorIfQueueIsEmpty(serverName); + } + for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) { + STORAGE.removePeerFromHFileRefs(peerId); + } + } + + private ServerName getServerName(int i) { + return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); + } + + @Test + public void testReplicator() throws ReplicationException { + assertTrue(STORAGE.getListOfReplicators().isEmpty()); + String queueId = "1"; + for (int i = 0; i < 10; i++) { + STORAGE.addLog(getServerName(i), queueId, "file" + i); + } + List replicators = STORAGE.getListOfReplicators(); + assertEquals(10, replicators.size()); + for (int i = 0; i < 10; i++) { + assertThat(replicators, hasItems(getServerName(i))); + } + for (int i = 0; i < 5; i++) { + STORAGE.removeQueue(getServerName(i), queueId); + } + for (int i = 0; i < 10; i++) { + STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i)); + } + replicators = STORAGE.getListOfReplicators(); + assertEquals(5, replicators.size()); + for (int i = 5; i < 10; i++) { + assertThat(replicators, hasItems(getServerName(i))); + } + } + + private String getFileName(String base, int i) { + return String.format(base + "-%04d", i); + } + + @Test + public void testAddRemoveLog() throws ReplicationException { + ServerName serverName = ServerName.valueOf("127.0.0.1", 8000, 10000); + assertTrue(STORAGE.getAllQueues(serverName).isEmpty()); + String queue1 = "1"; + String queue2 = "2"; + for (int i = 0; i < 10; i++) { + STORAGE.addLog(serverName, queue1, getFileName("file1", i)); + STORAGE.addLog(serverName, queue2, getFileName("file2", i)); + } + List queueIds = STORAGE.getAllQueues(serverName); + assertEquals(2, queueIds.size()); + assertThat(queueIds, hasItems("1", "2")); + + assertEquals(0, STORAGE.getLogPosition(serverName, queue1, getFileName("file1", 0))); + STORAGE.setLogPosition(serverName, queue1, getFileName("file1", 0), 100); + assertEquals(100, STORAGE.getLogPosition(serverName, queue1, getFileName("file1", 0))); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f5b8feb..4855ada 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -37,6 +37,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -131,7 +132,7 @@ import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.mob.MobConstants; @@ -323,7 +324,7 @@ public class HMaster extends HRegionServer implements MasterServices { private AssignmentManager assignmentManager; // manager of replication - private ReplicationManager replicationManager; + private ReplicationPeerManager replicationPeerManager; // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting @@ -693,8 +694,8 @@ public class HMaster extends HRegionServer implements MasterServices { /** * Initialize all ZK based system trackers. */ - void initializeZKBasedSystemTrackers() throws IOException, - InterruptedException, KeeperException, CoordinatedStateException { + void initializeZKBasedSystemTrackers() throws IOException, InterruptedException, KeeperException, + CoordinatedStateException, ReplicationException { this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf); this.normalizer.setMasterServices(this); @@ -712,7 +713,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.assignmentManager = new AssignmentManager(this); this.assignmentManager.start(); - this.replicationManager = new ReplicationManager(conf, zooKeeper, this); + this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); this.regionServerTracker.start(); @@ -757,9 +758,8 @@ public class HMaster extends HRegionServer implements MasterServices { *

  • Handle either fresh cluster start or master failover
  • * */ - private void finishActiveMasterInitialization(MonitoredTask status) - throws IOException, InterruptedException, KeeperException, CoordinatedStateException { - + private void finishActiveMasterInitialization(MonitoredTask status) throws IOException, + InterruptedException, KeeperException, CoordinatedStateException, ReplicationException { activeMaster = true; Thread zombieDetector = new Thread(new InitializationMonitor(this), "ActiveMasterInitializationMonitor-" + System.currentTimeMillis()); @@ -3346,18 +3346,19 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException, - IOException { + public ReplicationPeerConfig getReplicationPeerConfig(String peerId) + throws ReplicationException, IOException { if (cpHost != null) { cpHost.preGetReplicationPeerConfig(peerId); } - final ReplicationPeerConfig peerConfig = this.replicationManager.getPeerConfig(peerId); - LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId + ", config=" - + peerConfig); + LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId); + Optional peerConfig = + this.replicationPeerManager.getPeerConfig(peerId); + if (cpHost != null) { cpHost.postGetReplicationPeerConfig(peerId); } - return peerConfig; + return peerConfig.orElse(null); } @Override @@ -3376,7 +3377,8 @@ public class HMaster extends HRegionServer implements MasterServices { } LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex); Pattern pattern = regex == null ? null : Pattern.compile(regex); - List peers = this.replicationManager.listReplicationPeers(pattern); + List peers = + this.replicationPeerManager.listPeers(pattern); if (cpHost != null) { cpHost.postListReplicationPeers(regex); } @@ -3526,7 +3528,7 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ReplicationManager getReplicationManager() { - return replicationManager; + public ReplicationPeerManager getReplicationPeerManager() { + return replicationPeerManager; } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 43df8b1..67baf14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -459,9 +459,9 @@ public interface MasterServices extends Server { IOException; /** - * Returns the {@link ReplicationManager}. + * Returns the {@link ReplicationPeerManager}. */ - ReplicationManager getReplicationManager(); + ReplicationPeerManager getReplicationPeerManager(); /** * Update the peerConfig for the specified peer diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 3596f82..222d4b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -26,22 +26,22 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving @@ -138,8 +138,8 @@ public class MasterProcedureEnv implements ConfigurationObserver { return remoteDispatcher; } - public ReplicationManager getReplicationManager() { - return master.getReplicationManager(); + public ReplicationPeerManager getReplicationPeerManager() { + return master.getReplicationPeerManager(); } public boolean isRunning() { @@ -151,22 +151,22 @@ public class MasterProcedureEnv implements ConfigurationObserver { return master.isInitialized(); } - public boolean waitInitialized(Procedure proc) { + public boolean waitInitialized(Procedure proc) { return master.getInitializedEvent().suspendIfNotReady(proc); } - public boolean waitServerCrashProcessingEnabled(Procedure proc) { + public boolean waitServerCrashProcessingEnabled(Procedure proc) { if (master instanceof HMaster) { return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc); } return false; } - public boolean waitFailoverCleanup(Procedure proc) { + public boolean waitFailoverCleanup(Procedure proc) { return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc); } - public void setEventReady(ProcedureEvent event, boolean isReady) { + public void setEventReady(ProcedureEvent event, boolean isReady) { if (isReady) { event.wake(procSched); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index c3862d8..df63096 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -58,16 +58,18 @@ public class AddPeerProcedure extends ModifyPeerProcedure { } @Override - protected void prePeerModification(MasterProcedureEnv env) throws IOException { + protected void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preAddReplicationPeer(peerId, peerConfig); } + env.getReplicationPeerManager().preAddPeer(peerId, peerConfig); } @Override protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { - env.getReplicationManager().addReplicationPeer(peerId, peerConfig, enabled); + env.getReplicationPeerManager().addPeer(peerId, peerConfig, enabled); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java index 0b32db9..fec2ab6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -51,12 +52,12 @@ public class DisablePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preDisableReplicationPeer(peerId); } + env.getReplicationPeerManager().preDisablePeer(peerId); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) - throws IllegalArgumentException, Exception { - env.getReplicationManager().disableReplicationPeer(peerId); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().disablePeer(peerId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java index 92ba000..ef9af0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -51,11 +52,12 @@ public class EnablePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preEnableReplicationPeer(peerId); } + env.getReplicationPeerManager().preEnablePeer(peerId); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { - env.getReplicationManager().enableReplicationPeer(peerId); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().enablePeer(peerId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 23f6f87..69d9ca6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -29,6 +28,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyPeerStateData; @@ -67,17 +67,16 @@ public abstract class ModifyPeerProcedure } /** - * Called before we start the actual processing. If an exception is thrown then we will give up - * and mark the procedure as failed directly. + * Called before we start the actual processing. The implementation should call the pre CP hook, + * and also the pre-check for the peer modification. + *

    + * If an IOException is thrown then we will give up and mark the procedure as failed directly. If + * all checks passes then the procedure can not be rolled back any more. */ - protected abstract void prePeerModification(MasterProcedureEnv env) throws IOException; + protected abstract void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException; - /** - * We will give up and mark the procedure as failure if {@link IllegalArgumentException} is - * thrown, for other type of Exception we will retry. - */ - protected abstract void updatePeerStorage(MasterProcedureEnv env) - throws IllegalArgumentException, Exception; + protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException; /** * Called before we finish the procedure. The implementation can do some logging work, and also @@ -100,23 +99,24 @@ public abstract class ModifyPeerProcedure try { prePeerModification(env); } catch (IOException e) { - LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + - ", mark the procedure as failure and give up", e); - setFailure("prePeerModification", e); + LOG.warn( + getClass().getName() + " failed to call CP hook or the pre check is failed for peer " + + peerId + ", mark the procedure as failure and give up", + e); + setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); releaseLatch(); return Flow.NO_MORE_STATE; + } catch (ReplicationException e) { + LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + + ", retry", e); + throw new ProcedureYieldException(); } setNextState(PeerModificationState.UPDATE_PEER_STORAGE); return Flow.HAS_MORE_STATE; case UPDATE_PEER_STORAGE: try { updatePeerStorage(env); - } catch (IllegalArgumentException e) { - setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", - new DoNotRetryIOException(e)); - releaseLatch(); - return Flow.NO_MORE_STATE; - } catch (Exception e) { + } catch (ReplicationException e) { LOG.warn( getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e); throw new ProcedureYieldException(); @@ -158,8 +158,7 @@ public abstract class ModifyPeerProcedure @Override protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) throws IOException, InterruptedException { - if (state == PeerModificationState.PRE_PEER_MODIFICATION || - state == PeerModificationState.UPDATE_PEER_STORAGE) { + if (state == PeerModificationState.PRE_PEER_MODIFICATION) { // actually the peer related operations has no rollback, but if we haven't done any // modifications on the peer storage, we can just return. return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 3daad6d..6b683de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -51,11 +52,12 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preRemoveReplicationPeer(peerId); } + env.getReplicationPeerManager().preRemovePeer(peerId); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { - env.getReplicationManager().removeReplicationPeer(peerId); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().removePeer(peerId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java deleted file mode 100644 index b6f8784..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Manages and performs all replication admin operations. - *

    - * Used to add/remove a replication peer. - */ -@InterfaceAudience.Private -public class ReplicationManager { - private final ReplicationQueuesClient replicationQueuesClient; - private final ReplicationPeers replicationPeers; - - public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable) - throws IOException { - try { - this.replicationQueuesClient = ReplicationFactory - .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); - this.replicationQueuesClient.init(); - this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, - this.replicationQueuesClient, abortable); - this.replicationPeers.init(); - } catch (Exception e) { - throw new IOException("Failed to construct ReplicationManager", e); - } - } - - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException { - checkPeerConfig(peerConfig); - replicationPeers.registerPeer(peerId, peerConfig, enabled); - replicationPeers.peerConnected(peerId); - } - - public void removeReplicationPeer(String peerId) throws ReplicationException { - replicationPeers.peerDisconnected(peerId); - replicationPeers.unregisterPeer(peerId); - } - - public void enableReplicationPeer(String peerId) throws ReplicationException { - this.replicationPeers.enablePeer(peerId); - } - - public void disableReplicationPeer(String peerId) throws ReplicationException { - this.replicationPeers.disablePeer(peerId); - } - - public ReplicationPeerConfig getPeerConfig(String peerId) - throws ReplicationException, ReplicationPeerNotFoundException { - ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId); - if (peerConfig == null) { - throw new ReplicationPeerNotFoundException(peerId); - } - return peerConfig; - } - - public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) - throws ReplicationException, IOException { - checkPeerConfig(peerConfig); - this.replicationPeers.updatePeerConfig(peerId, peerConfig); - } - - public List listReplicationPeers(Pattern pattern) - throws ReplicationException { - List peers = new ArrayList<>(); - List peerIds = replicationPeers.getAllPeerIds(); - for (String peerId : peerIds) { - if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) { - peers.add(new ReplicationPeerDescription(peerId, - replicationPeers.getStatusOfPeerFromBackingStore(peerId), - replicationPeers.getReplicationPeerConfig(peerId))); - } - } - return peers; - } - - /** - * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. - * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to - * peer cluster. - * - * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. - * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. - */ - private void checkPeerConfig(ReplicationPeerConfig peerConfig) { - if (peerConfig.replicateAllUserTables()) { - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || - (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " + - "when you want replicate all cluster"); - } - checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), - peerConfig.getExcludeTableCFsMap()); - } else { - if ((peerConfig.getExcludeNamespaces() != null - && !peerConfig.getExcludeNamespaces().isEmpty()) - || (peerConfig.getExcludeTableCFsMap() != null - && !peerConfig.getExcludeTableCFsMap().isEmpty())) { - throw new IllegalArgumentException( - "Need clean exclude-namespaces or exclude-table-cfs config firstly" - + " when replicate_all flag is false"); - } - checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), - peerConfig.getTableCFsMap()); - } - checkConfiguredWALEntryFilters(peerConfig); - } - - /** - * Set a namespace in the peer config means that all tables in this namespace will be replicated - * to the peer cluster. - *

      - *
    1. If peer config already has a namespace, then not allow set any table of this namespace to - * the peer config.
    2. - *
    3. If peer config already has a table, then not allow set this table's namespace to the peer - * config.
    4. - *
    - *

    - * Set a exclude namespace in the peer config means that all tables in this namespace can't be - * replicated to the peer cluster. - *

      - *
    1. If peer config already has a exclude namespace, then not allow set any exclude table of - * this namespace to the peer config.
    2. - *
    3. If peer config already has a exclude table, then not allow set this table's namespace as a - * exclude namespace.
    4. - *
    - */ - private void checkNamespacesAndTableCfsConfigConflict(Set namespaces, - Map> tableCfs) { - if (namespaces == null || namespaces.isEmpty()) { - return; - } - if (tableCfs == null || tableCfs.isEmpty()) { - return; - } - for (Map.Entry> entry : tableCfs.entrySet()) { - TableName table = entry.getKey(); - if (namespaces.contains(table.getNamespaceAsString())) { - throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces " - + table.getNamespaceAsString() + " in peer config"); - } - } - } - - private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) { - String filterCSV = peerConfig.getConfiguration() - .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); - if (filterCSV != null && !filterCSV.isEmpty()) { - String[] filters = filterCSV.split(","); - for (String filter : filters) { - try { - Class.forName(filter).newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("Configured WALEntryFilter " + filter + - " could not be created. Failing add/update " + "peer operation.", e); - } - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java new file mode 100644 index 0000000..5b8b61e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Manages and performs all replication admin operations. + *

    + * Used to add/remove a replication peer. + */ +@InterfaceAudience.Private +public class ReplicationPeerManager { + + private final ReplicationPeerStorage peerStorage; + + private final ReplicationQueueStorage queueStorage; + + private final ConcurrentMap peers; + + private ReplicationPeerManager(ReplicationPeerStorage peerStorage, + ReplicationQueueStorage queueStorage, + ConcurrentMap peers) { + this.peerStorage = peerStorage; + this.queueStorage = queueStorage; + this.peers = peers; + } + + private void checkQueuesDeleted(String peerId) + throws ReplicationException, DoNotRetryIOException { + for (ServerName replicator : queueStorage.getListOfReplicators()) { + List queueIds = queueStorage.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + + ", replicator: " + replicator + ", queueId: " + queueId); + } + } + } + if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { + throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); + } + } + + public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException, ReplicationException { + if (peerId.contains("-")) { + throw new DoNotRetryIOException("Found invalid peer name: " + peerId); + } + checkPeerConfig(peerConfig); + if (peers.containsKey(peerId)) { + throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); + } + // make sure that there is no queues with the same peer id. This may happen when we create a + // peer with the same id with a old deleted peer. If the replication queues for the old peer + // have not been cleaned up yet then we should not create the new peer, otherwise the old wal + // file may also be replicated. + checkQueuesDeleted(peerId); + } + + private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { + ReplicationPeerDescription desc = peers.get(peerId); + if (desc == null) { + throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist"); + } + return desc; + } + + public void preRemovePeer(String peerId) throws DoNotRetryIOException { + checkPeerExists(peerId); + } + + public void preEnablePeer(String peerId) throws DoNotRetryIOException { + ReplicationPeerDescription desc = checkPeerExists(peerId); + if (desc.isEnabled()) { + throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); + } + } + + public void preDisablePeer(String peerId) throws DoNotRetryIOException { + ReplicationPeerDescription desc = checkPeerExists(peerId); + if (!desc.isEnabled()) { + throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); + } + } + + public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + checkPeerConfig(peerConfig); + ReplicationPeerDescription desc = checkPeerExists(peerId); + ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); + if (!StringUtils.isBlank(peerConfig.getClusterKey()) && + !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) { + throw new DoNotRetryIOException( + "Changing the cluster key on an existing peer is not allowed. Existing key '" + + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + + peerConfig.getClusterKey() + "'"); + } + + if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && + !peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { + throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + } + } + + private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) { + ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig(); + copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); + copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); + copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap()); + copiedPeerConfig.setNamespaces(peerConfig.getNamespaces()); + copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()); + copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces()); + copiedPeerConfig.setBandwidth(peerConfig.getBandwidth()); + copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables()); + copiedPeerConfig.setClusterKey(peerConfig.getClusterKey()); + copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); + return copiedPeerConfig; + } + + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException { + if (peers.containsKey(peerId)) { + // this should be a retry, just return + return; + } + ReplicationPeerConfig copiedPeerConfig = copy(peerConfig); + peerStorage.addPeer(peerId, copiedPeerConfig, enabled); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); + } + + public void removePeer(String peerId) throws ReplicationException { + if (!peers.containsKey(peerId)) { + // this should be a retry, just return + return; + } + peerStorage.removePeer(peerId); + peers.remove(peerId); + } + + private void setPeerState(String peerId, boolean enabled) throws ReplicationException { + ReplicationPeerDescription desc = peers.get(peerId); + if (desc.isEnabled() == enabled) { + // this should be a retry, just return + return; + } + peerStorage.setPeerState(peerId, enabled); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); + } + + public void enablePeer(String peerId) throws ReplicationException { + setPeerState(peerId, true); + } + + public void disablePeer(String peerId) throws ReplicationException { + setPeerState(peerId, false); + } + + public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + // the checking rules are too complicated here so we give up checking whether this is a retry. + ReplicationPeerDescription desc = peers.get(peerId); + ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); + ReplicationPeerConfig newPeerConfig = copy(peerConfig); + // we need to use the new conf to overwrite the old one. + newPeerConfig.getConfiguration().putAll(oldPeerConfig.getConfiguration()); + newPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); + newPeerConfig.getPeerData().putAll(oldPeerConfig.getPeerData()); + newPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); + + peerStorage.updatePeerConfig(peerId, newPeerConfig); + peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); + } + + public List listPeers(Pattern pattern) { + if (pattern == null) { + return new ArrayList<>(peers.values()); + } + return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) + .collect(Collectors.toList()); + } + + public Optional getPeerConfig(String peerId) { + ReplicationPeerDescription desc = peers.get(peerId); + return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); + } + + /** + * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. + * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer + * cluster. + *

    + * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. + * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. + */ + private static void checkPeerConfig(ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + if (peerConfig.replicateAllUserTables()) { + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || + (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + + "when you want replicate all cluster"); + } + checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), + peerConfig.getExcludeTableCFsMap()); + } else { + if ((peerConfig.getExcludeNamespaces() != null && + !peerConfig.getExcludeNamespaces().isEmpty()) || + (peerConfig.getExcludeTableCFsMap() != null && + !peerConfig.getExcludeTableCFsMap().isEmpty())) { + throw new DoNotRetryIOException( + "Need clean exclude-namespaces or exclude-table-cfs config firstly" + + " when replicate_all flag is false"); + } + checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), + peerConfig.getTableCFsMap()); + } + checkConfiguredWALEntryFilters(peerConfig); + } + + /** + * Set a namespace in the peer config means that all tables in this namespace will be replicated + * to the peer cluster. + *

      + *
    1. If peer config already has a namespace, then not allow set any table of this namespace to + * the peer config.
    2. + *
    3. If peer config already has a table, then not allow set this table's namespace to the peer + * config.
    4. + *
    + *

    + * Set a exclude namespace in the peer config means that all tables in this namespace can't be + * replicated to the peer cluster. + *

      + *
    1. If peer config already has a exclude namespace, then not allow set any exclude table of + * this namespace to the peer config.
    2. + *
    3. If peer config already has a exclude table, then not allow set this table's namespace as a + * exclude namespace.
    4. + *
    + */ + private static void checkNamespacesAndTableCfsConfigConflict(Set namespaces, + Map> tableCfs) throws DoNotRetryIOException { + if (namespaces == null || namespaces.isEmpty()) { + return; + } + if (tableCfs == null || tableCfs.isEmpty()) { + return; + } + for (Map.Entry> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + if (namespaces.contains(table.getNamespaceAsString())) { + throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " + + table.getNamespaceAsString() + " in peer config"); + } + } + } + + private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + String filterCSV = peerConfig.getConfiguration() + .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterCSV != null && !filterCSV.isEmpty()) { + String[] filters = filterCSV.split(","); + for (String filter : filters) { + try { + Class.forName(filter).newInstance(); + } catch (Exception e) { + throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + + " could not be created. Failing add/update " + "peer operation.", e); + } + } + } + } + + public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) + throws ReplicationException { + ReplicationPeerStorage peerStorage = + ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); + ConcurrentMap peers = new ConcurrentHashMap<>(); + for (String peerId : peerStorage.listPeerIds()) { + Optional peerConfig = peerStorage.getPeerConfig(peerId); + boolean enabled = peerStorage.isPeerEnabled(peerId); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get())); + } + return new ReplicationPeerManager(peerStorage, + ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index 435eefc..d16d8de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.yetus.audience.InterfaceAudience; @@ -59,12 +60,12 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); } + env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) - throws IllegalArgumentException, Exception { - env.getReplicationManager().updatePeerConfig(peerId, peerConfig); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index b42ba31..735c2b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -18,6 +18,13 @@ package org.apache.hadoop.hbase.client.replication; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -27,23 +34,21 @@ import java.util.Map; import java.util.Set; import java.util.regex.Pattern; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -51,21 +56,12 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - /** * Unit testing of ReplicationAdmin */ @Category({MediumTests.class, ClientTests.class}) public class TestReplicationAdmin { - private static final Log LOG = - LogFactory.getLog(TestReplicationAdmin.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -99,6 +95,21 @@ public class TestReplicationAdmin { TEST_UTIL.shutdownMiniCluster(); } + @After + public void tearDown() throws Exception { + for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) { + hbaseAdmin.removeReplicationPeer(desc.getPeerId()); + } + ReplicationQueueStorage queueStorage = ReplicationStorageFactory + .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); + for (ServerName serverName : queueStorage.getListOfReplicators()) { + for (String queue : queueStorage.getAllQueues(serverName)) { + queueStorage.removeQueue(serverName, queue); + } + queueStorage.removeReplicatorIfQueueIsEmpty(serverName); + } + } + /** * Simple testing of adding and removing peers, basically shows that * all interactions with ZK work @@ -185,32 +196,29 @@ public class TestReplicationAdmin { ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); rpc2.setClusterKey(KEY_SECOND); Configuration conf = TEST_UTIL.getConfiguration(); - ZKWatcher zkw = new ZKWatcher(conf, "Test HBaseAdmin", null); - ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw)); - repQueues.init("server1"); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf); + ServerName serverName = ServerName.valueOf("server1", 8000, 1234); // add queue for ID_ONE - repQueues.addLog(ID_ONE, "file1"); + queueStorage.addLog(serverName, ID_ONE, "file1"); try { admin.addPeer(ID_ONE, rpc1, null); fail(); } catch (Exception e) { // OK! } - repQueues.removeQueue(ID_ONE); - assertEquals(0, repQueues.getAllQueues().size()); + queueStorage.removeQueue(serverName, ID_ONE); + assertEquals(0, queueStorage.getAllQueues(serverName).size()); // add recovered queue for ID_ONE - repQueues.addLog(ID_ONE + "-server2", "file1"); + queueStorage.addLog(serverName, ID_ONE + "-server2", "file1"); try { admin.addPeer(ID_ONE, rpc2, null); fail(); } catch (Exception e) { // OK! } - repQueues.removeAllQueues(); - zkw.close(); } /** @@ -406,7 +414,7 @@ public class TestReplicationAdmin { tableCFs.clear(); tableCFs.put(tableName2, null); admin.removePeerTableCFs(ID_ONE, tableCFs); - assertTrue(false); + fail(); } catch (ReplicationException e) { } tableCFs.clear(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 540a67c..7196b7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.master; import static org.mockito.Mockito.mock; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.List; @@ -42,7 +44,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -56,8 +58,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import com.google.protobuf.Service; - public class MockNoopMasterServices implements MasterServices, Server { private final Configuration conf; private final MetricsMaster metricsMaster; @@ -462,7 +462,7 @@ public class MockNoopMasterServices implements MasterServices, Server { } @Override - public ProcedureEvent getInitializedEvent() { + public ProcedureEvent getInitializedEvent() { return null; } @@ -477,7 +477,7 @@ public class MockNoopMasterServices implements MasterServices, Server { } @Override - public ReplicationManager getReplicationManager() { + public ReplicationPeerManager getReplicationPeerManager() { return null; } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index 20c9fe1..f09b1c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.replication.ReplicationException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -269,7 +271,7 @@ public class TestMasterNoCluster { @Override void initializeZKBasedSystemTrackers() throws IOException, InterruptedException, - KeeperException, CoordinatedStateException { + KeeperException, CoordinatedStateException, ReplicationException { super.initializeZKBasedSystemTrackers(); // Record a newer server in server manager at first getServerManager().recordNewServerWithLock(newServer, ServerLoad.EMPTY_SERVERLOAD); -- 2.7.4